From f3a72264afb9a06242a885fee7e717a42f6b14c2 Mon Sep 17 00:00:00 2001 From: xiaolan Date: Mon, 5 Jan 2026 18:42:45 +0800 Subject: [PATCH] chore: update dependencies and refactor webhook handling - Added new dependencies for SQLite support and improved HTTP client functionality in go.mod and go.sum. - Refactored webhook server implementation to utilize a simplified version, enhancing code maintainability. - Updated API client to leverage a generic request method, streamlining API interactions. - Modified configuration to include access token for webhook server, improving security. - Enhanced event handling and request processing in the API client for better performance. --- configs/config.toml | 2 +- go.mod | 21 +- go.sum | 32 ++- internal/adapter/milky/adapter.go | 2 +- internal/adapter/milky/api_client.go | 106 +++------- internal/adapter/milky/webhook_server.go | 107 +++------- internal/database/database.go | 2 +- internal/di/lifecycle.go | 4 +- internal/plugins/mcstatus/mcstatus.go | 11 +- pkg/net/httpclient.go | 240 +++++++++++++++++++++++ 10 files changed, 346 insertions(+), 181 deletions(-) diff --git a/configs/config.toml b/configs/config.toml index 6785054..2aae18c 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -50,7 +50,7 @@ enabled = true connection_type = "ws" self_id = "123456789" nickname = "TestBot" -ws_url = "ws://127.0.0.1:3001" +ws_url = "ws://192.168.10.148:3001" access_token = "hDeu66@_DDhgMf<9" timeout = 30 heartbeat = 30 diff --git a/go.mod b/go.mod index 7ea20bc..b0440ff 100644 --- a/go.mod +++ b/go.mod @@ -9,29 +9,38 @@ require ( github.com/bytedance/sonic v1.14.2 github.com/fasthttp/websocket v1.5.12 github.com/fsnotify/fsnotify v1.9.0 + github.com/glebarez/sqlite v1.11.0 github.com/valyala/fasthttp v1.58.0 go.uber.org/fx v1.20.0 go.uber.org/zap v1.26.0 golang.org/x/time v0.14.0 ) -require github.com/robfig/cron/v3 v3.0.1 +require ( + github.com/Tnze/go-mc v1.20.2 + github.com/chromedp/chromedp v0.14.2 + github.com/robfig/cron/v3 v3.0.1 + gorm.io/gorm v1.31.1 +) require ( - github.com/Tnze/go-mc v1.20.2 // indirect github.com/chromedp/cdproto v0.0.0-20250724212937-08a3db8b4327 // indirect - github.com/chromedp/chromedp v0.14.2 // indirect github.com/chromedp/sysutil v1.1.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/glebarez/go-sqlite v1.21.2 // indirect github.com/go-json-experiment/json v0.0.0-20250725192818-e39067aee2d2 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect github.com/gobwas/ws v1.4.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/mattn/go-isatty v0.0.17 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect golang.org/x/text v0.21.0 // indirect - gorm.io/driver/sqlite v1.6.0 // indirect - gorm.io/gorm v1.31.1 // indirect + modernc.org/libc v1.22.5 // indirect + modernc.org/mathutil v1.5.0 // indirect + modernc.org/memory v1.5.0 // indirect + modernc.org/sqlite v1.23.1 // indirect ) require ( diff --git a/go.sum b/go.sum index 0b0defb..17a8c8c 100644 --- a/go.sum +++ b/go.sum @@ -23,10 +23,16 @@ github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gE github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/fasthttp/websocket v1.5.12 h1:e4RGPpWW2HTbL3zV0Y/t7g0ub294LkiuXXUuTOUInlE= github.com/fasthttp/websocket v1.5.12/go.mod h1:I+liyL7/4moHojiOgUOIKEWm9EIxHqxZChS+aMFltyg= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/glebarez/go-sqlite v1.21.2 h1:3a6LFC4sKahUunAmynQKLZceZCOzUthkRkEAl9gAXWo= +github.com/glebarez/go-sqlite v1.21.2/go.mod h1:sfxdZyhQjTM2Wry3gVYWaW072Ri1WMdWJi0k6+3382k= +github.com/glebarez/sqlite v1.11.0 h1:wSG0irqzP6VurnMEpFGer5Li19RpIRi2qvQz++w0GMw= +github.com/glebarez/sqlite v1.11.0/go.mod h1:h8/o8j5wiAsqSPoWELDUdJXhjAhsVliSn7bWZjOhrgQ= github.com/go-json-experiment/json v0.0.0-20250725192818-e39067aee2d2 h1:iizUGZ9pEquQS5jTGkh4AqeeHCMbfbjeb0zMt0aEFzs= github.com/go-json-experiment/json v0.0.0-20250725192818-e39067aee2d2/go.mod h1:TiCD2a1pcmjd7YnhGH0f/zKNcCD06B029pHhzV23c2M= github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= @@ -35,6 +41,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.4.0 h1:CTaoG1tojrh4ucGPcoJFiAQUAsEWekEWvLy7GsVNqGs= github.com/gobwas/ws v1.4.0/go.mod h1:G3gNqMNtPppf5XUz7O4shetPpcZ1VJ7zt18dlUeakrc= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= @@ -45,10 +53,17 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= -github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= -github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80 h1:6Yzfa6GP0rIo/kULo2bwGEkFvCePZ3qHDDTC3/J9Swo= +github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs= +github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde h1:x0TT0RDC7UhAVbbWWBzr41ElhJx5tXPWkIHA2HWPRuw= +github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/savsgio/gotils v0.0.0-20240704082632-aef3928b8a38 h1:D0vL7YNisV2yqE55+q0lFuGse6U8lxlg7fYTctlT5Gc= @@ -86,9 +101,8 @@ golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VA golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= @@ -99,7 +113,13 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ= -gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8= gorm.io/gorm v1.31.1 h1:7CA8FTFz/gRfgqgpeKIBcervUn3xSyPUmr6B2WXJ7kg= gorm.io/gorm v1.31.1/go.mod h1:XyQVbO2k6YkOis7C2437jSit3SsDK72s7n7rsSHd+Gs= +modernc.org/libc v1.22.5 h1:91BNch/e5B0uPbJFgqbxXuOnxBQjlS//icfQEGmvyjE= +modernc.org/libc v1.22.5/go.mod h1:jj+Z7dTNX8fBScMVNRAYZ/jF91K8fdT2hYMThc3YjBY= +modernc.org/mathutil v1.5.0 h1:rV0Ko/6SfM+8G+yKiyI830l3Wuz1zRutdslNoQ0kfiQ= +modernc.org/mathutil v1.5.0/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/memory v1.5.0 h1:N+/8c5rE6EqugZwHii4IFsaJ7MUhoWX07J5tC/iI5Ds= +modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU= +modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM= +modernc.org/sqlite v1.23.1/go.mod h1:OrDj17Mggn6MhE+iPbBNf7RGKODDE9NFT0f3EwDzJqk= diff --git a/internal/adapter/milky/adapter.go b/internal/adapter/milky/adapter.go index a927c32..7923291 100644 --- a/internal/adapter/milky/adapter.go +++ b/internal/adapter/milky/adapter.go @@ -185,7 +185,7 @@ func (a *Adapter) startWebhook() error { return fmt.Errorf("webhook_listen_addr is required for webhook mode") } - a.webhookServer = NewWebhookServer(a.config.WebhookListenAddr, a.logger) + a.webhookServer = NewWebhookServer(a.config.WebhookListenAddr, a.config.AccessToken, a.logger) // 启动服务器 if err := a.webhookServer.Start(); err != nil { diff --git a/internal/adapter/milky/api_client.go b/internal/adapter/milky/api_client.go index 97f3347..51faf0a 100644 --- a/internal/adapter/milky/api_client.go +++ b/internal/adapter/milky/api_client.go @@ -5,20 +5,19 @@ import ( "fmt" "time" + "cellbot/pkg/net" + "github.com/bytedance/sonic" - "github.com/valyala/fasthttp" "go.uber.org/zap" ) // APIClient Milky API 客户端 -// 用于调用协议端的 API (POST /api/:api) +// 使用 pkg/net.HTTPClient 提供的通用 HTTP 请求功能 type APIClient struct { + httpClient *net.HTTPClient baseURL string accessToken string - httpClient *fasthttp.Client logger *zap.Logger - timeout time.Duration - retryCount int } // NewAPIClient 创建 API 客户端 @@ -30,17 +29,17 @@ func NewAPIClient(baseURL, accessToken string, timeout time.Duration, retryCount retryCount = 3 } + config := net.HTTPClientConfig{ + BaseURL: baseURL, + Timeout: timeout, + RetryCount: retryCount, + } + return &APIClient{ + httpClient: net.NewHTTPClient(config, logger.Named("api-client"), nil), baseURL: baseURL, accessToken: accessToken, - httpClient: &fasthttp.Client{ - ReadTimeout: timeout, - WriteTimeout: timeout, - MaxConnsPerHost: 100, - }, - logger: logger.Named("api-client"), - timeout: timeout, - retryCount: retryCount, + logger: logger.Named("api-client"), } } @@ -69,63 +68,26 @@ func (c *APIClient) Call(ctx context.Context, endpoint string, input interface{} zap.String("endpoint", endpoint), zap.String("url", url)) - // 重试机制 - var lastErr error - for i := 0; i <= c.retryCount; i++ { - if i > 0 { - c.logger.Info("Retrying API call", - zap.String("endpoint", endpoint), - zap.Int("attempt", i), - zap.Int("max", c.retryCount)) - - // 指数退避 - backoff := time.Duration(i) * time.Second - select { - case <-time.After(backoff): - case <-ctx.Done(): - return nil, ctx.Err() - } - } - - resp, err := c.doRequest(ctx, url, inputData) - if err != nil { - lastErr = err - continue - } - - return resp, nil - } - - return nil, fmt.Errorf("API call failed after %d retries: %w", c.retryCount, lastErr) + return c.doRequest(ctx, url, inputData) } // doRequest 执行单次请求 func (c *APIClient) doRequest(ctx context.Context, url string, inputData []byte) (*APIResponse, error) { - req := fasthttp.AcquireRequest() - resp := fasthttp.AcquireResponse() - defer fasthttp.ReleaseRequest(req) - defer fasthttp.ReleaseResponse(resp) - - // 设置请求 - req.SetRequestURI(url) - req.Header.SetMethod("POST") - req.Header.SetContentType("application/json") - req.SetBody(inputData) - - // 设置 Authorization 头 - if c.accessToken != "" { - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.accessToken)) + // 使用 pkg/net.HTTPClient 的通用请求方法 + config := net.GenericRequestConfig{ + URL: url, + Method: "POST", + Body: inputData, + AccessToken: c.accessToken, } - // 发送请求 - err := c.httpClient.DoTimeout(req, resp, c.timeout) + resp, err := c.httpClient.DoGenericRequest(ctx, config) if err != nil { return nil, fmt.Errorf("request failed: %w", err) } // 检查 HTTP 状态码 - statusCode := resp.StatusCode() - switch statusCode { + switch resp.StatusCode { case 401: return nil, fmt.Errorf("unauthorized: access token invalid or missing") case 404: @@ -135,12 +97,12 @@ func (c *APIClient) doRequest(ctx context.Context, url string, inputData []byte) case 200: // 继续处理 default: - return nil, fmt.Errorf("unexpected status code: %d", statusCode) + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } // 解析响应 var apiResp APIResponse - if err := sonic.Unmarshal(resp.Body(), &apiResp); err != nil { + if err := sonic.Unmarshal(resp.Body, &apiResp); err != nil { return nil, fmt.Errorf("failed to parse response: %w", err) } @@ -162,28 +124,14 @@ func (c *APIClient) doRequest(ctx context.Context, url string, inputData []byte) } // CallWithoutRetry 调用 API(不重试) +// 注意:pkg/net.HTTPClient 的通用请求已经内置了重试机制 +// 这个方法保留是为了向后兼容,但仍然会使用底层的重试机制 func (c *APIClient) CallWithoutRetry(ctx context.Context, endpoint string, input interface{}) (*APIResponse, error) { - // 序列化输入参数 - var inputData []byte - var err error - - if input == nil { - inputData = []byte("{}") - } else { - inputData, err = sonic.Marshal(input) - if err != nil { - return nil, fmt.Errorf("failed to marshal input: %w", err) - } - } - - // 构建 URL - url := fmt.Sprintf("%s/api/%s", c.baseURL, endpoint) - - return c.doRequest(ctx, url, inputData) + return c.Call(ctx, endpoint, input) } // Close 关闭客户端 func (c *APIClient) Close() error { - // fasthttp.Client 不需要显式关闭 + // pkg/net.HTTPClient 的 fasthttp.Client 不需要显式关闭 return nil } diff --git a/internal/adapter/milky/webhook_server.go b/internal/adapter/milky/webhook_server.go index 5202ad8..69bd21e 100644 --- a/internal/adapter/milky/webhook_server.go +++ b/internal/adapter/milky/webhook_server.go @@ -1,115 +1,60 @@ package milky import ( + "cellbot/pkg/net" "fmt" - "github.com/bytedance/sonic" - "github.com/valyala/fasthttp" "go.uber.org/zap" ) // WebhookServer Webhook 服务器 -// 用于接收协议端 POST 推送的事件 +// 使用 pkg/net.SimpleWebhookServer 实现,避免重复的 fasthttp 代码 type WebhookServer struct { - server *fasthttp.Server - eventChan chan []byte - logger *zap.Logger - addr string + server *net.SimpleWebhookServer + logger *zap.Logger + addr string } // NewWebhookServer 创建 Webhook 服务器 -func NewWebhookServer(addr string, logger *zap.Logger) *WebhookServer { +func NewWebhookServer(addr string, accessToken string, logger *zap.Logger) *WebhookServer { + eventChan := make(chan []byte, 100) + + config := net.SimpleWebhookConfig{ + Addr: addr, + AccessToken: accessToken, + EventChannel: eventChan, + } + return &WebhookServer{ - eventChan: make(chan []byte, 100), - logger: logger.Named("webhook-server"), - addr: addr, + server: net.NewSimpleWebhookServer(config, logger), + logger: logger.Named("webhook-server"), + addr: addr, } } // Start 启动服务器 func (s *WebhookServer) Start() error { - s.server = &fasthttp.Server{ - Handler: s.handleRequest, - MaxConnsPerIP: 1000, - MaxRequestsPerConn: 1000, - } - s.logger.Info("Starting webhook server", zap.String("addr", s.addr)) - go func() { - if err := s.server.ListenAndServe(s.addr); err != nil { - s.logger.Error("Webhook server error", zap.Error(err)) - } - }() + if err := s.server.Start(); err != nil { + return fmt.Errorf("failed to start webhook server: %w", err) + } return nil } -// handleRequest 处理请求 -func (s *WebhookServer) handleRequest(ctx *fasthttp.RequestCtx) { - // 只接受 POST 请求 - if !ctx.IsPost() { - s.logger.Warn("Received non-POST request", - zap.String("method", string(ctx.Method()))) - ctx.Error("Method Not Allowed", fasthttp.StatusMethodNotAllowed) - return - } - - // 检查 Content-Type - contentType := string(ctx.Request.Header.ContentType()) - if contentType != "application/json" { - s.logger.Warn("Invalid content type", - zap.String("content_type", contentType)) - ctx.Error("Unsupported Media Type", fasthttp.StatusUnsupportedMediaType) - return - } - - // 获取请求体 - body := ctx.PostBody() - if len(body) == 0 { - s.logger.Warn("Empty request body") - ctx.Error("Bad Request", fasthttp.StatusBadRequest) - return - } - - // 验证 JSON 格式 - var event Event - if err := sonic.Unmarshal(body, &event); err != nil { - s.logger.Error("Failed to parse event", zap.Error(err)) - ctx.Error("Bad Request", fasthttp.StatusBadRequest) - return - } - - s.logger.Debug("Received webhook event", - zap.String("event_type", event.EventType), - zap.Int64("self_id", event.SelfID)) - - // 发送到事件通道 - select { - case s.eventChan <- body: - default: - s.logger.Warn("Event channel full, dropping event") - } - - // 返回成功响应 - ctx.SetContentType("application/json") - ctx.SetStatusCode(fasthttp.StatusOK) - ctx.SetBodyString(`{"status":"ok"}`) -} - // Events 获取事件通道 func (s *WebhookServer) Events() <-chan []byte { - return s.eventChan + return s.server.Events() } // Stop 停止服务器 func (s *WebhookServer) Stop() error { - if s.server != nil { - s.logger.Info("Stopping webhook server") - if err := s.server.Shutdown(); err != nil { - return fmt.Errorf("failed to shutdown webhook server: %w", err) - } + s.logger.Info("Stopping webhook server") + + if err := s.server.Stop(); err != nil { + return fmt.Errorf("failed to stop webhook server: %w", err) } - close(s.eventChan) + return nil } diff --git a/internal/database/database.go b/internal/database/database.go index affab80..80a8516 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -7,7 +7,7 @@ import ( "sync" "go.uber.org/zap" - "gorm.io/driver/sqlite" + "github.com/glebarez/sqlite" "gorm.io/gorm" ) diff --git a/internal/di/lifecycle.go b/internal/di/lifecycle.go index 81a12b4..c60a82d 100644 --- a/internal/di/lifecycle.go +++ b/internal/di/lifecycle.go @@ -61,13 +61,11 @@ func RegisterLifecycleHooks( logger.Error("Failed to stop bots", zap.Error(err)) } - // 停止分发器 + // 停止分发器(先停止分发器,让它有机会处理完当前事件) dispatcher.Stop() // 停止事件总线 eventBus.Stop() - - logger.Info("CellBot application stopped successfully") return nil }, }) diff --git a/internal/plugins/mcstatus/mcstatus.go b/internal/plugins/mcstatus/mcstatus.go index a29dd64..e0261e6 100644 --- a/internal/plugins/mcstatus/mcstatus.go +++ b/internal/plugins/mcstatus/mcstatus.go @@ -136,14 +136,19 @@ func handleMCSCommand(ctx context.Context, event protocol.Event, botManager *pro Logger: logger, } - // 使用独立的 context 进行截图,避免受 dispatcher context 影响 - // 如果 dispatcher context 被取消,截图操作仍能完成 - screenshotCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + // 使用独立的 context 进行截图,完全避免受外部 context 影响 + // 使用更长的超时时间,避免频繁失败 + screenshotCtx, cancel := context.WithTimeout(context.Background(), 90*time.Second) defer cancel() // 渲染并截图 chain, err := utils.ScreenshotHTMLToMessageChain(screenshotCtx, htmlTemplate, opts) if err != nil { + // context.Canceled 是应用关闭时的正常行为,不记录为错误 + if err.Error() == "screenshot operation was canceled" { + logger.Warn("Screenshot canceled due to application shutdown") + return nil + } logger.Error("Failed to render status image", zap.Error(err)) return err } diff --git a/pkg/net/httpclient.go b/pkg/net/httpclient.go index 591e28d..933c226 100644 --- a/pkg/net/httpclient.go +++ b/pkg/net/httpclient.go @@ -25,6 +25,22 @@ type HTTPClient struct { retryCount int } +// GenericRequestConfig 通用请求配置 +type GenericRequestConfig struct { + URL string + Method string // GET, POST, etc. + Headers map[string]string + Body []byte + AccessToken string +} + +// GenericResponse 通用响应 +type GenericResponse struct { + StatusCode int + Headers map[string]string + Body []byte +} + // HTTPClientConfig HTTP客户端配置 type HTTPClientConfig struct { BotID string @@ -117,6 +133,103 @@ func (hc *HTTPClient) SendAction(ctx context.Context, action protocol.Action) (m return nil, fmt.Errorf("action failed after %d retries: %w", hc.retryCount, lastErr) } +// DoGenericRequest 执行通用HTTP请求 +// 用于支持适配器的特定API调用需求 +func (hc *HTTPClient) DoGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) { + // 如果未提供 Method,默认为 POST + if config.Method == "" { + config.Method = "POST" + } + + // 重试机制 + var lastErr error + for i := 0; i <= hc.retryCount; i++ { + if i > 0 { + hc.logger.Info("Retrying generic request", + zap.String("url", config.URL), + zap.Int("attempt", i), + zap.Int("max", hc.retryCount)) + time.Sleep(time.Duration(i) * time.Second) + } + + resp, err := hc.doGenericRequest(ctx, config) + if err != nil { + lastErr = err + continue + } + + return resp, nil + } + + return nil, fmt.Errorf("generic request failed after %d retries: %w", hc.retryCount, lastErr) +} + +// doGenericRequest 执行单次通用请求 +func (hc *HTTPClient) doGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) { + req := fasthttp.AcquireRequest() + resp := fasthttp.AcquireResponse() + defer fasthttp.ReleaseRequest(req) + defer fasthttp.ReleaseResponse(resp) + + // 设置请求URL和方法 + req.SetRequestURI(config.URL) + req.Header.SetMethod(config.Method) + + // 设置默认 Content-Type + if config.Method == "POST" || config.Method == "PUT" || config.Method == "PATCH" { + if req.Header.ContentType() == nil || len(req.Header.ContentType()) == 0 { + req.Header.SetContentType("application/json") + } + } + + // 设置请求体 + if config.Body != nil { + req.SetBody(config.Body) + } + + // 设置自定义 Headers + for key, value := range config.Headers { + req.Header.Set(key, value) + } + + // 设置 Authorization 头 + if config.AccessToken != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", config.AccessToken)) + } + + hc.logger.Debug("Sending generic HTTP request", + zap.String("method", config.Method), + zap.String("url", config.URL)) + + // 发送请求 + err := hc.client.DoTimeout(req, resp, hc.timeout) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + + // 构建响应 + genericResp := &GenericResponse{ + StatusCode: resp.StatusCode(), + Body: make([]byte, len(resp.Body())), + Headers: make(map[string]string), + } + + // 复制响应体 + copy(genericResp.Body, resp.Body()) + + // 复制响应头 + resp.Header.VisitAll(func(key, value []byte) { + genericResp.Headers[string(key)] = string(value) + }) + + hc.logger.Debug("Generic HTTP request completed", + zap.String("method", config.Method), + zap.String("url", config.URL), + zap.Int("status_code", resp.StatusCode())) + + return genericResp, nil +} + // PollEvents 轮询事件(正向HTTP) func (hc *HTTPClient) PollEvents(ctx context.Context, interval time.Duration) error { ticker := time.NewTicker(interval) @@ -186,6 +299,27 @@ type HTTPWebhookServer struct { mu sync.RWMutex } +// SimpleWebhookConfig 简化版 Webhook 配置 +type SimpleWebhookConfig struct { + Addr string + AccessToken string // 可选的访问令牌验证 + EventChannel chan []byte // 用于传递原始事件数据 +} + +// SimpleWebhookHandler 简化版 Webhook 处理函数 +type SimpleWebhookHandler func(ctx *fasthttp.RequestCtx) + +// SimpleWebhookServer 简化版 Webhook 服务器 +// 用于接收协议端推送的事件,提供原始字节流通道 +type SimpleWebhookServer struct { + server *fasthttp.Server + logger *zap.Logger + addr string + accessToken string + eventChan chan []byte + handler SimpleWebhookHandler +} + // WebhookHandler Webhook处理器 type WebhookHandler struct { BotID string @@ -248,6 +382,112 @@ func (hws *HTTPWebhookServer) Stop() error { return nil } +// NewSimpleWebhookServer 创建简化版 Webhook 服务器 +func NewSimpleWebhookServer(config SimpleWebhookConfig, logger *zap.Logger) *SimpleWebhookServer { + return &SimpleWebhookServer{ + addr: config.Addr, + accessToken: config.AccessToken, + eventChan: config.EventChannel, + logger: logger.Named("simple-webhook-server"), + } +} + +// Start 启动简化版 Webhook 服务器 +func (s *SimpleWebhookServer) Start() error { + s.server = &fasthttp.Server{ + Handler: s.handleRequest, + MaxConnsPerIP: 1000, + MaxRequestsPerConn: 1000, + } + + s.logger.Info("Starting simple webhook server", zap.String("addr", s.addr)) + + go func() { + if err := s.server.ListenAndServe(s.addr); err != nil { + s.logger.Error("Simple webhook server error", zap.Error(err)) + } + }() + + return nil +} + +// handleRequest 处理 Webhook 请求 +func (s *SimpleWebhookServer) handleRequest(ctx *fasthttp.RequestCtx) { + // 只接受 POST 请求 + if !ctx.IsPost() { + s.logger.Warn("Received non-POST request", + zap.String("method", string(ctx.Method()))) + ctx.Error("Method Not Allowed", fasthttp.StatusMethodNotAllowed) + return + } + + // 检查 Content-Type + contentType := string(ctx.Request.Header.ContentType()) + if contentType != "application/json" { + s.logger.Warn("Invalid content type", + zap.String("content_type", contentType)) + ctx.Error("Unsupported Media Type", fasthttp.StatusUnsupportedMediaType) + return + } + + // 验证访问令牌(如果配置了) + if s.accessToken != "" { + authHeader := string(ctx.Request.Header.Peek("Authorization")) + if authHeader == "" { + s.logger.Warn("Missing authorization header") + ctx.Error("Unauthorized", fasthttp.StatusUnauthorized) + return + } + + // 验证 Bearer token + expectedAuth := "Bearer " + s.accessToken + if authHeader != expectedAuth { + s.logger.Warn("Invalid authorization token") + ctx.Error("Unauthorized", fasthttp.StatusUnauthorized) + return + } + } + + // 获取请求体 + body := ctx.PostBody() + if len(body) == 0 { + s.logger.Warn("Empty request body") + ctx.Error("Bad Request", fasthttp.StatusBadRequest) + return + } + + s.logger.Debug("Received webhook event", + zap.Int("body_length", len(body))) + + // 发送到事件通道 + select { + case s.eventChan <- body: + default: + s.logger.Warn("Event channel full, dropping event") + } + + // 返回成功响应 + ctx.SetContentType("application/json") + ctx.SetStatusCode(fasthttp.StatusOK) + ctx.SetBodyString(`{"status":"ok"}`) +} + +// Events 获取事件通道 +func (s *SimpleWebhookServer) Events() <-chan []byte { + return s.eventChan +} + +// Stop 停止简化版 Webhook 服务器 +func (s *SimpleWebhookServer) Stop() error { + if s.server != nil { + s.logger.Info("Stopping simple webhook server") + if err := s.server.Shutdown(); err != nil { + return fmt.Errorf("failed to shutdown: %w", err) + } + } + return nil +} + // handleWebhook 处理Webhook请求 func (hws *HTTPWebhookServer) handleWebhook(ctx *fasthttp.RequestCtx) { path := string(ctx.Path())