Compare commits

..

1 Commits

Author SHA1 Message Date
xiaolan
f3a72264af chore: update dependencies and refactor webhook handling
- Added new dependencies for SQLite support and improved HTTP client functionality in go.mod and go.sum.
- Refactored webhook server implementation to utilize a simplified version, enhancing code maintainability.
- Updated API client to leverage a generic request method, streamlining API interactions.
- Modified configuration to include access token for webhook server, improving security.
- Enhanced event handling and request processing in the API client for better performance.
2026-01-05 18:42:45 +08:00
10 changed files with 346 additions and 181 deletions

View File

@@ -50,7 +50,7 @@ enabled = true
connection_type = "ws"
self_id = "123456789"
nickname = "TestBot"
ws_url = "ws://127.0.0.1:3001"
ws_url = "ws://192.168.10.148:3001"
access_token = "hDeu66@_DDhgMf<9"
timeout = 30
heartbeat = 30

21
go.mod
View File

@@ -9,29 +9,38 @@ require (
github.com/bytedance/sonic v1.14.2
github.com/fasthttp/websocket v1.5.12
github.com/fsnotify/fsnotify v1.9.0
github.com/glebarez/sqlite v1.11.0
github.com/valyala/fasthttp v1.58.0
go.uber.org/fx v1.20.0
go.uber.org/zap v1.26.0
golang.org/x/time v0.14.0
)
require github.com/robfig/cron/v3 v3.0.1
require (
github.com/Tnze/go-mc v1.20.2
github.com/chromedp/chromedp v0.14.2
github.com/robfig/cron/v3 v3.0.1
gorm.io/gorm v1.31.1
)
require (
github.com/Tnze/go-mc v1.20.2 // indirect
github.com/chromedp/cdproto v0.0.0-20250724212937-08a3db8b4327 // indirect
github.com/chromedp/chromedp v0.14.2 // indirect
github.com/chromedp/sysutil v1.1.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/glebarez/go-sqlite v1.21.2 // indirect
github.com/go-json-experiment/json v0.0.0-20250725192818-e39067aee2d2 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gobwas/ws v1.4.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
golang.org/x/text v0.21.0 // indirect
gorm.io/driver/sqlite v1.6.0 // indirect
gorm.io/gorm v1.31.1 // indirect
modernc.org/libc v1.22.5 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect
modernc.org/sqlite v1.23.1 // indirect
)
require (

32
go.sum
View File

@@ -23,10 +23,16 @@ github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gE
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/fasthttp/websocket v1.5.12 h1:e4RGPpWW2HTbL3zV0Y/t7g0ub294LkiuXXUuTOUInlE=
github.com/fasthttp/websocket v1.5.12/go.mod h1:I+liyL7/4moHojiOgUOIKEWm9EIxHqxZChS+aMFltyg=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo=
github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k=
github.com/glebarez/sqlite v1.11.0 h1:wSG0irqzP6VurnMEpFGer5Li19RpIRi2qvQz++w0GMw=
github.com/glebarez/sqlite v1.11.0/go.mod h1:h8/o8j5wiAsqSPoWELDUdJXhjAhsVliSn7bWZjOhrgQ=
github.com/go-json-experiment/json v0.0.0-20250725192818-e39067aee2d2 h1:iizUGZ9pEquQS5jTGkh4AqeeHCMbfbjeb0zMt0aEFzs=
github.com/go-json-experiment/json v0.0.0-20250725192818-e39067aee2d2/go.mod h1:TiCD2a1pcmjd7YnhGH0f/zKNcCD06B029pHhzV23c2M=
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
@@ -35,6 +41,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.4.0 h1:CTaoG1tojrh4ucGPcoJFiAQUAsEWekEWvLy7GsVNqGs=
github.com/gobwas/ws v1.4.0/go.mod h1:G3gNqMNtPppf5XUz7O4shetPpcZ1VJ7zt18dlUeakrc=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ=
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
@@ -45,10 +53,17 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY=
github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80 h1:6Yzfa6GP0rIo/kULo2bwGEkFvCePZ3qHDDTC3/J9Swo=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde h1:x0TT0RDC7UhAVbbWWBzr41ElhJx5tXPWkIHA2HWPRuw=
github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38 h1:D0vL7YNisV2yqE55+q0lFuGse6U8lxlg7fYTctlT5Gc=
@@ -86,9 +101,8 @@ golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VA
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
@@ -99,7 +113,13 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ=
gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8=
gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg=
gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs=
modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE=
modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY=
modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ=
modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E=
modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds=
modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM=
modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk=

View File

@@ -185,7 +185,7 @@ func (a *Adapter) startWebhook() error {
return fmt.Errorf("webhook_listen_addr is required for webhook mode")
}
a.webhookServer = NewWebhookServer(a.config.WebhookListenAddr, a.logger)
a.webhookServer = NewWebhookServer(a.config.WebhookListenAddr, a.config.AccessToken, a.logger)
// 启动服务器
if err := a.webhookServer.Start(); err != nil {

View File

@@ -5,20 +5,19 @@ import (
"fmt"
"time"
"cellbot/pkg/net"
"github.com/bytedance/sonic"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
// APIClient Milky API 客户端
// 用于调用协议端的 API (POST /api/:api)
// 使用 pkg/net.HTTPClient 提供的通用 HTTP 请求功能
type APIClient struct {
httpClient *net.HTTPClient
baseURL string
accessToken string
httpClient *fasthttp.Client
logger *zap.Logger
timeout time.Duration
retryCount int
}
// NewAPIClient 创建 API 客户端
@@ -30,17 +29,17 @@ func NewAPIClient(baseURL, accessToken string, timeout time.Duration, retryCount
retryCount = 3
}
config := net.HTTPClientConfig{
BaseURL: baseURL,
Timeout: timeout,
RetryCount: retryCount,
}
return &APIClient{
httpClient: net.NewHTTPClient(config, logger.Named("api-client"), nil),
baseURL: baseURL,
accessToken: accessToken,
httpClient: &fasthttp.Client{
ReadTimeout: timeout,
WriteTimeout: timeout,
MaxConnsPerHost: 100,
},
logger: logger.Named("api-client"),
timeout: timeout,
retryCount: retryCount,
logger: logger.Named("api-client"),
}
}
@@ -69,63 +68,26 @@ func (c *APIClient) Call(ctx context.Context, endpoint string, input interface{}
zap.String("endpoint", endpoint),
zap.String("url", url))
// 重试机制
var lastErr error
for i := 0; i <= c.retryCount; i++ {
if i > 0 {
c.logger.Info("Retrying API call",
zap.String("endpoint", endpoint),
zap.Int("attempt", i),
zap.Int("max", c.retryCount))
// 指数退避
backoff := time.Duration(i) * time.Second
select {
case <-time.After(backoff):
case <-ctx.Done():
return nil, ctx.Err()
}
}
resp, err := c.doRequest(ctx, url, inputData)
if err != nil {
lastErr = err
continue
}
return resp, nil
}
return nil, fmt.Errorf("API call failed after %d retries: %w", c.retryCount, lastErr)
return c.doRequest(ctx, url, inputData)
}
// doRequest 执行单次请求
func (c *APIClient) doRequest(ctx context.Context, url string, inputData []byte) (*APIResponse, error) {
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)
// 设置请求
req.SetRequestURI(url)
req.Header.SetMethod("POST")
req.Header.SetContentType("application/json")
req.SetBody(inputData)
// 设置 Authorization 头
if c.accessToken != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.accessToken))
// 使用 pkg/net.HTTPClient 的通用请求方法
config := net.GenericRequestConfig{
URL: url,
Method: "POST",
Body: inputData,
AccessToken: c.accessToken,
}
// 发送请求
err := c.httpClient.DoTimeout(req, resp, c.timeout)
resp, err := c.httpClient.DoGenericRequest(ctx, config)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
// 检查 HTTP 状态码
statusCode := resp.StatusCode()
switch statusCode {
switch resp.StatusCode {
case 401:
return nil, fmt.Errorf("unauthorized: access token invalid or missing")
case 404:
@@ -135,12 +97,12 @@ func (c *APIClient) doRequest(ctx context.Context, url string, inputData []byte)
case 200:
// 继续处理
default:
return nil, fmt.Errorf("unexpected status code: %d", statusCode)
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
// 解析响应
var apiResp APIResponse
if err := sonic.Unmarshal(resp.Body(), &apiResp); err != nil {
if err := sonic.Unmarshal(resp.Body, &apiResp); err != nil {
return nil, fmt.Errorf("failed to parse response: %w", err)
}
@@ -162,28 +124,14 @@ func (c *APIClient) doRequest(ctx context.Context, url string, inputData []byte)
}
// CallWithoutRetry 调用 API不重试
// 注意pkg/net.HTTPClient 的通用请求已经内置了重试机制
// 这个方法保留是为了向后兼容,但仍然会使用底层的重试机制
func (c *APIClient) CallWithoutRetry(ctx context.Context, endpoint string, input interface{}) (*APIResponse, error) {
// 序列化输入参数
var inputData []byte
var err error
if input == nil {
inputData = []byte("{}")
} else {
inputData, err = sonic.Marshal(input)
if err != nil {
return nil, fmt.Errorf("failed to marshal input: %w", err)
}
}
// 构建 URL
url := fmt.Sprintf("%s/api/%s", c.baseURL, endpoint)
return c.doRequest(ctx, url, inputData)
return c.Call(ctx, endpoint, input)
}
// Close 关闭客户端
func (c *APIClient) Close() error {
// fasthttp.Client 不需要显式关闭
// pkg/net.HTTPClient 的 fasthttp.Client 不需要显式关闭
return nil
}

View File

@@ -1,115 +1,60 @@
package milky
import (
"cellbot/pkg/net"
"fmt"
"github.com/bytedance/sonic"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
// WebhookServer Webhook 服务器
// 用于接收协议端 POST 推送的事件
// 使用 pkg/net.SimpleWebhookServer 实现,避免重复的 fasthttp 代码
type WebhookServer struct {
server *fasthttp.Server
eventChan chan []byte
logger *zap.Logger
addr string
server *net.SimpleWebhookServer
logger *zap.Logger
addr string
}
// NewWebhookServer 创建 Webhook 服务器
func NewWebhookServer(addr string, logger *zap.Logger) *WebhookServer {
func NewWebhookServer(addr string, accessToken string, logger *zap.Logger) *WebhookServer {
eventChan := make(chan []byte, 100)
config := net.SimpleWebhookConfig{
Addr: addr,
AccessToken: accessToken,
EventChannel: eventChan,
}
return &WebhookServer{
eventChan: make(chan []byte, 100),
logger: logger.Named("webhook-server"),
addr: addr,
server: net.NewSimpleWebhookServer(config, logger),
logger: logger.Named("webhook-server"),
addr: addr,
}
}
// Start 启动服务器
func (s *WebhookServer) Start() error {
s.server = &fasthttp.Server{
Handler: s.handleRequest,
MaxConnsPerIP: 1000,
MaxRequestsPerConn: 1000,
}
s.logger.Info("Starting webhook server", zap.String("addr", s.addr))
go func() {
if err := s.server.ListenAndServe(s.addr); err != nil {
s.logger.Error("Webhook server error", zap.Error(err))
}
}()
if err := s.server.Start(); err != nil {
return fmt.Errorf("failed to start webhook server: %w", err)
}
return nil
}
// handleRequest 处理请求
func (s *WebhookServer) handleRequest(ctx *fasthttp.RequestCtx) {
// 只接受 POST 请求
if !ctx.IsPost() {
s.logger.Warn("Received non-POST request",
zap.String("method", string(ctx.Method())))
ctx.Error("Method Not Allowed", fasthttp.StatusMethodNotAllowed)
return
}
// 检查 Content-Type
contentType := string(ctx.Request.Header.ContentType())
if contentType != "application/json" {
s.logger.Warn("Invalid content type",
zap.String("content_type", contentType))
ctx.Error("Unsupported Media Type", fasthttp.StatusUnsupportedMediaType)
return
}
// 获取请求体
body := ctx.PostBody()
if len(body) == 0 {
s.logger.Warn("Empty request body")
ctx.Error("Bad Request", fasthttp.StatusBadRequest)
return
}
// 验证 JSON 格式
var event Event
if err := sonic.Unmarshal(body, &event); err != nil {
s.logger.Error("Failed to parse event", zap.Error(err))
ctx.Error("Bad Request", fasthttp.StatusBadRequest)
return
}
s.logger.Debug("Received webhook event",
zap.String("event_type", event.EventType),
zap.Int64("self_id", event.SelfID))
// 发送到事件通道
select {
case s.eventChan <- body:
default:
s.logger.Warn("Event channel full, dropping event")
}
// 返回成功响应
ctx.SetContentType("application/json")
ctx.SetStatusCode(fasthttp.StatusOK)
ctx.SetBodyString(`{"status":"ok"}`)
}
// Events 获取事件通道
func (s *WebhookServer) Events() <-chan []byte {
return s.eventChan
return s.server.Events()
}
// Stop 停止服务器
func (s *WebhookServer) Stop() error {
if s.server != nil {
s.logger.Info("Stopping webhook server")
if err := s.server.Shutdown(); err != nil {
return fmt.Errorf("failed to shutdown webhook server: %w", err)
}
s.logger.Info("Stopping webhook server")
if err := s.server.Stop(); err != nil {
return fmt.Errorf("failed to stop webhook server: %w", err)
}
close(s.eventChan)
return nil
}

View File

@@ -7,7 +7,7 @@ import (
"sync"
"go.uber.org/zap"
"gorm.io/driver/sqlite"
"github.com/glebarez/sqlite"
"gorm.io/gorm"
)

View File

@@ -61,13 +61,11 @@ func RegisterLifecycleHooks(
logger.Error("Failed to stop bots", zap.Error(err))
}
// 停止分发器
// 停止分发器(先停止分发器,让它有机会处理完当前事件)
dispatcher.Stop()
// 停止事件总线
eventBus.Stop()
logger.Info("CellBot application stopped successfully")
return nil
},
})

View File

@@ -136,14 +136,19 @@ func handleMCSCommand(ctx context.Context, event protocol.Event, botManager *pro
Logger: logger,
}
// 使用独立的 context 进行截图,避免受 dispatcher context 影响
// 如果 dispatcher context 被取消,截图操作仍能完成
screenshotCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
// 使用独立的 context 进行截图,完全避免受外部 context 影响
// 使用更长的超时时间,避免频繁失败
screenshotCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second)
defer cancel()
// 渲染并截图
chain, err := utils.ScreenshotHTMLToMessageChain(screenshotCtx, htmlTemplate, opts)
if err != nil {
// context.Canceled 是应用关闭时的正常行为,不记录为错误
if err.Error() == "screenshot operation was canceled" {
logger.Warn("Screenshot canceled due to application shutdown")
return nil
}
logger.Error("Failed to render status image", zap.Error(err))
return err
}

View File

@@ -25,6 +25,22 @@ type HTTPClient struct {
retryCount int
}
// GenericRequestConfig 通用请求配置
type GenericRequestConfig struct {
URL string
Method string // GET, POST, etc.
Headers map[string]string
Body []byte
AccessToken string
}
// GenericResponse 通用响应
type GenericResponse struct {
StatusCode int
Headers map[string]string
Body []byte
}
// HTTPClientConfig HTTP客户端配置
type HTTPClientConfig struct {
BotID string
@@ -117,6 +133,103 @@ func (hc *HTTPClient) SendAction(ctx context.Context, action protocol.Action) (m
return nil, fmt.Errorf("action failed after %d retries: %w", hc.retryCount, lastErr)
}
// DoGenericRequest 执行通用HTTP请求
// 用于支持适配器的特定API调用需求
func (hc *HTTPClient) DoGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) {
// 如果未提供 Method默认为 POST
if config.Method == "" {
config.Method = "POST"
}
// 重试机制
var lastErr error
for i := 0; i <= hc.retryCount; i++ {
if i > 0 {
hc.logger.Info("Retrying generic request",
zap.String("url", config.URL),
zap.Int("attempt", i),
zap.Int("max", hc.retryCount))
time.Sleep(time.Duration(i) * time.Second)
}
resp, err := hc.doGenericRequest(ctx, config)
if err != nil {
lastErr = err
continue
}
return resp, nil
}
return nil, fmt.Errorf("generic request failed after %d retries: %w", hc.retryCount, lastErr)
}
// doGenericRequest 执行单次通用请求
func (hc *HTTPClient) doGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) {
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)
// 设置请求URL和方法
req.SetRequestURI(config.URL)
req.Header.SetMethod(config.Method)
// 设置默认 Content-Type
if config.Method == "POST" || config.Method == "PUT" || config.Method == "PATCH" {
if req.Header.ContentType() == nil || len(req.Header.ContentType()) == 0 {
req.Header.SetContentType("application/json")
}
}
// 设置请求体
if config.Body != nil {
req.SetBody(config.Body)
}
// 设置自定义 Headers
for key, value := range config.Headers {
req.Header.Set(key, value)
}
// 设置 Authorization 头
if config.AccessToken != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", config.AccessToken))
}
hc.logger.Debug("Sending generic HTTP request",
zap.String("method", config.Method),
zap.String("url", config.URL))
// 发送请求
err := hc.client.DoTimeout(req, resp, hc.timeout)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
// 构建响应
genericResp := &GenericResponse{
StatusCode: resp.StatusCode(),
Body: make([]byte, len(resp.Body())),
Headers: make(map[string]string),
}
// 复制响应体
copy(genericResp.Body, resp.Body())
// 复制响应头
resp.Header.VisitAll(func(key, value []byte) {
genericResp.Headers[string(key)] = string(value)
})
hc.logger.Debug("Generic HTTP request completed",
zap.String("method", config.Method),
zap.String("url", config.URL),
zap.Int("status_code", resp.StatusCode()))
return genericResp, nil
}
// PollEvents 轮询事件正向HTTP
func (hc *HTTPClient) PollEvents(ctx context.Context, interval time.Duration) error {
ticker := time.NewTicker(interval)
@@ -186,6 +299,27 @@ type HTTPWebhookServer struct {
mu sync.RWMutex
}
// SimpleWebhookConfig 简化版 Webhook 配置
type SimpleWebhookConfig struct {
Addr string
AccessToken string // 可选的访问令牌验证
EventChannel chan []byte // 用于传递原始事件数据
}
// SimpleWebhookHandler 简化版 Webhook 处理函数
type SimpleWebhookHandler func(ctx *fasthttp.RequestCtx)
// SimpleWebhookServer 简化版 Webhook 服务器
// 用于接收协议端推送的事件,提供原始字节流通道
type SimpleWebhookServer struct {
server *fasthttp.Server
logger *zap.Logger
addr string
accessToken string
eventChan chan []byte
handler SimpleWebhookHandler
}
// WebhookHandler Webhook处理器
type WebhookHandler struct {
BotID string
@@ -248,6 +382,112 @@ func (hws *HTTPWebhookServer) Stop() error {
return nil
}
// NewSimpleWebhookServer 创建简化版 Webhook 服务器
func NewSimpleWebhookServer(config SimpleWebhookConfig, logger *zap.Logger) *SimpleWebhookServer {
return &SimpleWebhookServer{
addr: config.Addr,
accessToken: config.AccessToken,
eventChan: config.EventChannel,
logger: logger.Named("simple-webhook-server"),
}
}
// Start 启动简化版 Webhook 服务器
func (s *SimpleWebhookServer) Start() error {
s.server = &fasthttp.Server{
Handler: s.handleRequest,
MaxConnsPerIP: 1000,
MaxRequestsPerConn: 1000,
}
s.logger.Info("Starting simple webhook server", zap.String("addr", s.addr))
go func() {
if err := s.server.ListenAndServe(s.addr); err != nil {
s.logger.Error("Simple webhook server error", zap.Error(err))
}
}()
return nil
}
// handleRequest 处理 Webhook 请求
func (s *SimpleWebhookServer) handleRequest(ctx *fasthttp.RequestCtx) {
// 只接受 POST 请求
if !ctx.IsPost() {
s.logger.Warn("Received non-POST request",
zap.String("method", string(ctx.Method())))
ctx.Error("Method Not Allowed", fasthttp.StatusMethodNotAllowed)
return
}
// 检查 Content-Type
contentType := string(ctx.Request.Header.ContentType())
if contentType != "application/json" {
s.logger.Warn("Invalid content type",
zap.String("content_type", contentType))
ctx.Error("Unsupported Media Type", fasthttp.StatusUnsupportedMediaType)
return
}
// 验证访问令牌(如果配置了)
if s.accessToken != "" {
authHeader := string(ctx.Request.Header.Peek("Authorization"))
if authHeader == "" {
s.logger.Warn("Missing authorization header")
ctx.Error("Unauthorized", fasthttp.StatusUnauthorized)
return
}
// 验证 Bearer token
expectedAuth := "Bearer " + s.accessToken
if authHeader != expectedAuth {
s.logger.Warn("Invalid authorization token")
ctx.Error("Unauthorized", fasthttp.StatusUnauthorized)
return
}
}
// 获取请求体
body := ctx.PostBody()
if len(body) == 0 {
s.logger.Warn("Empty request body")
ctx.Error("Bad Request", fasthttp.StatusBadRequest)
return
}
s.logger.Debug("Received webhook event",
zap.Int("body_length", len(body)))
// 发送到事件通道
select {
case s.eventChan <- body:
default:
s.logger.Warn("Event channel full, dropping event")
}
// 返回成功响应
ctx.SetContentType("application/json")
ctx.SetStatusCode(fasthttp.StatusOK)
ctx.SetBodyString(`{"status":"ok"}`)
}
// Events 获取事件通道
func (s *SimpleWebhookServer) Events() <-chan []byte {
return s.eventChan
}
// Stop 停止简化版 Webhook 服务器
func (s *SimpleWebhookServer) Stop() error {
if s.server != nil {
s.logger.Info("Stopping simple webhook server")
if err := s.server.Shutdown(); err != nil {
return fmt.Errorf("failed to shutdown: %w", err)
}
}
return nil
}
// handleWebhook 处理Webhook请求
func (hws *HTTPWebhookServer) handleWebhook(ctx *fasthttp.RequestCtx) {
path := string(ctx.Path())