Files
cellbot/pkg/net/httpclient.go
xiaolan f3a72264af chore: update dependencies and refactor webhook handling
- Added new dependencies for SQLite support and improved HTTP client functionality in go.mod and go.sum.
- Refactored webhook server implementation to utilize a simplified version, enhancing code maintainability.
- Updated API client to leverage a generic request method, streamlining API interactions.
- Modified configuration to include access token for webhook server, improving security.
- Enhanced event handling and request processing in the API client for better performance.
2026-01-05 18:42:45 +08:00

554 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package net
import (
"context"
"fmt"
"sync"
"time"
"cellbot/internal/engine"
"cellbot/internal/protocol"
"github.com/bytedance/sonic"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
// HTTPClient HTTP客户端用于正向HTTP连接
type HTTPClient struct {
client *fasthttp.Client
logger *zap.Logger
eventBus *engine.EventBus
botID string
baseURL string
timeout time.Duration
retryCount int
}
// GenericRequestConfig 通用请求配置
type GenericRequestConfig struct {
URL string
Method string // GET, POST, etc.
Headers map[string]string
Body []byte
AccessToken string
}
// GenericResponse 通用响应
type GenericResponse struct {
StatusCode int
Headers map[string]string
Body []byte
}
// HTTPClientConfig HTTP客户端配置
type HTTPClientConfig struct {
BotID string
BaseURL string
Timeout time.Duration
RetryCount int
}
// NewHTTPClient 创建HTTP客户端
func NewHTTPClient(config HTTPClientConfig, logger *zap.Logger, eventBus *engine.EventBus) *HTTPClient {
if config.Timeout == 0 {
config.Timeout = 30 * time.Second
}
if config.RetryCount == 0 {
config.RetryCount = 3
}
return &HTTPClient{
client: &fasthttp.Client{
ReadTimeout: config.Timeout,
WriteTimeout: config.Timeout,
MaxConnsPerHost: 100,
},
logger: logger.Named("http-client"),
eventBus: eventBus,
botID: config.BotID,
baseURL: config.BaseURL,
timeout: config.Timeout,
retryCount: config.RetryCount,
}
}
// SendAction 发送动作请求正向HTTP
func (hc *HTTPClient) SendAction(ctx context.Context, action protocol.Action) (map[string]interface{}, error) {
// 序列化动作为JSON
data, err := sonic.Marshal(action)
if err != nil {
return nil, fmt.Errorf("failed to marshal action: %w", err)
}
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)
url := hc.baseURL + "/action"
req.SetRequestURI(url)
req.Header.SetMethod("POST")
req.Header.SetContentType("application/json")
req.SetBody(data)
hc.logger.Debug("Sending action",
zap.String("url", url),
zap.String("action", string(action.GetType())))
// 重试机制
var lastErr error
for i := 0; i <= hc.retryCount; i++ {
if i > 0 {
hc.logger.Info("Retrying action request",
zap.Int("attempt", i),
zap.Int("max", hc.retryCount))
time.Sleep(time.Duration(i) * time.Second)
}
err := hc.client.DoTimeout(req, resp, hc.timeout)
if err != nil {
lastErr = fmt.Errorf("request failed: %w", err)
continue
}
if resp.StatusCode() != fasthttp.StatusOK {
lastErr = fmt.Errorf("unexpected status code: %d", resp.StatusCode())
continue
}
// 解析响应
var result map[string]interface{}
if err := sonic.Unmarshal(resp.Body(), &result); err != nil {
lastErr = fmt.Errorf("failed to parse response: %w", err)
continue
}
hc.logger.Info("Action sent successfully",
zap.String("action", string(action.GetType())))
return result, nil
}
return nil, fmt.Errorf("action failed after %d retries: %w", hc.retryCount, lastErr)
}
// DoGenericRequest 执行通用HTTP请求
// 用于支持适配器的特定API调用需求
func (hc *HTTPClient) DoGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) {
// 如果未提供 Method默认为 POST
if config.Method == "" {
config.Method = "POST"
}
// 重试机制
var lastErr error
for i := 0; i <= hc.retryCount; i++ {
if i > 0 {
hc.logger.Info("Retrying generic request",
zap.String("url", config.URL),
zap.Int("attempt", i),
zap.Int("max", hc.retryCount))
time.Sleep(time.Duration(i) * time.Second)
}
resp, err := hc.doGenericRequest(ctx, config)
if err != nil {
lastErr = err
continue
}
return resp, nil
}
return nil, fmt.Errorf("generic request failed after %d retries: %w", hc.retryCount, lastErr)
}
// doGenericRequest 执行单次通用请求
func (hc *HTTPClient) doGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) {
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)
// 设置请求URL和方法
req.SetRequestURI(config.URL)
req.Header.SetMethod(config.Method)
// 设置默认 Content-Type
if config.Method == "POST" || config.Method == "PUT" || config.Method == "PATCH" {
if req.Header.ContentType() == nil || len(req.Header.ContentType()) == 0 {
req.Header.SetContentType("application/json")
}
}
// 设置请求体
if config.Body != nil {
req.SetBody(config.Body)
}
// 设置自定义 Headers
for key, value := range config.Headers {
req.Header.Set(key, value)
}
// 设置 Authorization 头
if config.AccessToken != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", config.AccessToken))
}
hc.logger.Debug("Sending generic HTTP request",
zap.String("method", config.Method),
zap.String("url", config.URL))
// 发送请求
err := hc.client.DoTimeout(req, resp, hc.timeout)
if err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
// 构建响应
genericResp := &GenericResponse{
StatusCode: resp.StatusCode(),
Body: make([]byte, len(resp.Body())),
Headers: make(map[string]string),
}
// 复制响应体
copy(genericResp.Body, resp.Body())
// 复制响应头
resp.Header.VisitAll(func(key, value []byte) {
genericResp.Headers[string(key)] = string(value)
})
hc.logger.Debug("Generic HTTP request completed",
zap.String("method", config.Method),
zap.String("url", config.URL),
zap.Int("status_code", resp.StatusCode()))
return genericResp, nil
}
// PollEvents 轮询事件正向HTTP
func (hc *HTTPClient) PollEvents(ctx context.Context, interval time.Duration) error {
ticker := time.NewTicker(interval)
defer ticker.Stop()
hc.logger.Info("Starting event polling",
zap.Duration("interval", interval))
for {
select {
case <-ticker.C:
if err := hc.fetchEvents(ctx); err != nil {
hc.logger.Error("Failed to fetch events", zap.Error(err))
}
case <-ctx.Done():
hc.logger.Info("Event polling stopped")
return ctx.Err()
}
}
}
// fetchEvents 获取事件
func (hc *HTTPClient) fetchEvents(ctx context.Context) error {
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)
url := hc.baseURL + "/events"
req.SetRequestURI(url)
req.Header.SetMethod("GET")
err := hc.client.DoTimeout(req, resp, hc.timeout)
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
if resp.StatusCode() != fasthttp.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode())
}
// 解析事件列表
var events []protocol.BaseEvent
if err := sonic.Unmarshal(resp.Body(), &events); err != nil {
return fmt.Errorf("failed to parse events: %w", err)
}
// 发布事件到事件总线
for i := range events {
hc.logger.Debug("Event received",
zap.String("type", string(events[i].Type)),
zap.String("detail_type", events[i].DetailType))
hc.eventBus.Publish(&events[i])
}
return nil
}
// HTTPWebhookServer HTTP Webhook服务器用于反向HTTP连接
type HTTPWebhookServer struct {
server *fasthttp.Server
logger *zap.Logger
eventBus *engine.EventBus
handlers map[string]*WebhookHandler
mu sync.RWMutex
}
// SimpleWebhookConfig 简化版 Webhook 配置
type SimpleWebhookConfig struct {
Addr string
AccessToken string // 可选的访问令牌验证
EventChannel chan []byte // 用于传递原始事件数据
}
// SimpleWebhookHandler 简化版 Webhook 处理函数
type SimpleWebhookHandler func(ctx *fasthttp.RequestCtx)
// SimpleWebhookServer 简化版 Webhook 服务器
// 用于接收协议端推送的事件,提供原始字节流通道
type SimpleWebhookServer struct {
server *fasthttp.Server
logger *zap.Logger
addr string
accessToken string
eventChan chan []byte
handler SimpleWebhookHandler
}
// WebhookHandler Webhook处理器
type WebhookHandler struct {
BotID string
Secret string
Validator func([]byte, string) bool
}
// NewHTTPWebhookServer 创建HTTP Webhook服务器
func NewHTTPWebhookServer(logger *zap.Logger, eventBus *engine.EventBus) *HTTPWebhookServer {
return &HTTPWebhookServer{
logger: logger.Named("webhook-server"),
eventBus: eventBus,
handlers: make(map[string]*WebhookHandler),
}
}
// RegisterWebhook 注册Webhook处理器
func (hws *HTTPWebhookServer) RegisterWebhook(path string, handler *WebhookHandler) {
hws.mu.Lock()
defer hws.mu.Unlock()
hws.handlers[path] = handler
hws.logger.Info("Webhook registered",
zap.String("path", path),
zap.String("bot_id", handler.BotID))
}
// UnregisterWebhook 注销Webhook处理器
func (hws *HTTPWebhookServer) UnregisterWebhook(path string) {
hws.mu.Lock()
defer hws.mu.Unlock()
delete(hws.handlers, path)
hws.logger.Info("Webhook unregistered", zap.String("path", path))
}
// Start 启动Webhook服务器
func (hws *HTTPWebhookServer) Start(addr string) error {
hws.server = &fasthttp.Server{
Handler: hws.handleWebhook,
}
hws.logger.Info("Starting webhook server", zap.String("address", addr))
go func() {
if err := hws.server.ListenAndServe(addr); err != nil {
hws.logger.Error("Webhook server error", zap.Error(err))
}
}()
return nil
}
// Stop 停止Webhook服务器
func (hws *HTTPWebhookServer) Stop() error {
if hws.server != nil {
hws.logger.Info("Stopping webhook server")
return hws.server.Shutdown()
}
return nil
}
// NewSimpleWebhookServer 创建简化版 Webhook 服务器
func NewSimpleWebhookServer(config SimpleWebhookConfig, logger *zap.Logger) *SimpleWebhookServer {
return &SimpleWebhookServer{
addr: config.Addr,
accessToken: config.AccessToken,
eventChan: config.EventChannel,
logger: logger.Named("simple-webhook-server"),
}
}
// Start 启动简化版 Webhook 服务器
func (s *SimpleWebhookServer) Start() error {
s.server = &fasthttp.Server{
Handler: s.handleRequest,
MaxConnsPerIP: 1000,
MaxRequestsPerConn: 1000,
}
s.logger.Info("Starting simple webhook server", zap.String("addr", s.addr))
go func() {
if err := s.server.ListenAndServe(s.addr); err != nil {
s.logger.Error("Simple webhook server error", zap.Error(err))
}
}()
return nil
}
// handleRequest 处理 Webhook 请求
func (s *SimpleWebhookServer) handleRequest(ctx *fasthttp.RequestCtx) {
// 只接受 POST 请求
if !ctx.IsPost() {
s.logger.Warn("Received non-POST request",
zap.String("method", string(ctx.Method())))
ctx.Error("Method Not Allowed", fasthttp.StatusMethodNotAllowed)
return
}
// 检查 Content-Type
contentType := string(ctx.Request.Header.ContentType())
if contentType != "application/json" {
s.logger.Warn("Invalid content type",
zap.String("content_type", contentType))
ctx.Error("Unsupported Media Type", fasthttp.StatusUnsupportedMediaType)
return
}
// 验证访问令牌(如果配置了)
if s.accessToken != "" {
authHeader := string(ctx.Request.Header.Peek("Authorization"))
if authHeader == "" {
s.logger.Warn("Missing authorization header")
ctx.Error("Unauthorized", fasthttp.StatusUnauthorized)
return
}
// 验证 Bearer token
expectedAuth := "Bearer " + s.accessToken
if authHeader != expectedAuth {
s.logger.Warn("Invalid authorization token")
ctx.Error("Unauthorized", fasthttp.StatusUnauthorized)
return
}
}
// 获取请求体
body := ctx.PostBody()
if len(body) == 0 {
s.logger.Warn("Empty request body")
ctx.Error("Bad Request", fasthttp.StatusBadRequest)
return
}
s.logger.Debug("Received webhook event",
zap.Int("body_length", len(body)))
// 发送到事件通道
select {
case s.eventChan <- body:
default:
s.logger.Warn("Event channel full, dropping event")
}
// 返回成功响应
ctx.SetContentType("application/json")
ctx.SetStatusCode(fasthttp.StatusOK)
ctx.SetBodyString(`{"status":"ok"}`)
}
// Events 获取事件通道
func (s *SimpleWebhookServer) Events() <-chan []byte {
return s.eventChan
}
// Stop 停止简化版 Webhook 服务器
func (s *SimpleWebhookServer) Stop() error {
if s.server != nil {
s.logger.Info("Stopping simple webhook server")
if err := s.server.Shutdown(); err != nil {
return fmt.Errorf("failed to shutdown: %w", err)
}
}
return nil
}
// handleWebhook 处理Webhook请求
func (hws *HTTPWebhookServer) handleWebhook(ctx *fasthttp.RequestCtx) {
path := string(ctx.Path())
hws.mu.RLock()
handler, exists := hws.handlers[path]
hws.mu.RUnlock()
if !exists {
ctx.Error("Webhook not found", fasthttp.StatusNotFound)
return
}
// 验证签名(如果配置了)
if handler.Secret != "" && handler.Validator != nil {
signature := string(ctx.Request.Header.Peek("X-Signature"))
if !handler.Validator(ctx.PostBody(), signature) {
hws.logger.Warn("Invalid webhook signature",
zap.String("path", path),
zap.String("bot_id", handler.BotID))
ctx.Error("Invalid signature", fasthttp.StatusUnauthorized)
return
}
}
// 解析事件
var event protocol.BaseEvent
if err := sonic.Unmarshal(ctx.PostBody(), &event); err != nil {
hws.logger.Error("Failed to parse webhook event",
zap.Error(err),
zap.String("path", path))
ctx.Error("Invalid event format", fasthttp.StatusBadRequest)
return
}
// 设置BotID
if event.SelfID == "" {
event.SelfID = handler.BotID
}
// 设置时间戳
if event.Timestamp == 0 {
event.Timestamp = time.Now().Unix()
}
// 确保Data字段不为nil
if event.Data == nil {
event.Data = make(map[string]interface{})
}
hws.logger.Info("Webhook event received",
zap.String("path", path),
zap.String("bot_id", handler.BotID),
zap.String("type", string(event.Type)),
zap.String("detail_type", event.DetailType))
// 发布到事件总线
hws.eventBus.Publish(&event)
// 返回成功响应
ctx.SetContentType("application/json")
ctx.SetBodyString(`{"success":true}`)
}