package net import ( "context" "fmt" "sync" "time" "cellbot/internal/engine" "cellbot/internal/protocol" "github.com/bytedance/sonic" "github.com/valyala/fasthttp" "go.uber.org/zap" ) // HTTPClient HTTP客户端(用于正向HTTP连接) type HTTPClient struct { client *fasthttp.Client logger *zap.Logger eventBus *engine.EventBus botID string baseURL string timeout time.Duration retryCount int } // GenericRequestConfig 通用请求配置 type GenericRequestConfig struct { URL string Method string // GET, POST, etc. Headers map[string]string Body []byte AccessToken string } // GenericResponse 通用响应 type GenericResponse struct { StatusCode int Headers map[string]string Body []byte } // HTTPClientConfig HTTP客户端配置 type HTTPClientConfig struct { BotID string BaseURL string Timeout time.Duration RetryCount int } // NewHTTPClient 创建HTTP客户端 func NewHTTPClient(config HTTPClientConfig, logger *zap.Logger, eventBus *engine.EventBus) *HTTPClient { if config.Timeout == 0 { config.Timeout = 30 * time.Second } if config.RetryCount == 0 { config.RetryCount = 3 } return &HTTPClient{ client: &fasthttp.Client{ ReadTimeout: config.Timeout, WriteTimeout: config.Timeout, MaxConnsPerHost: 100, }, logger: logger.Named("http-client"), eventBus: eventBus, botID: config.BotID, baseURL: config.BaseURL, timeout: config.Timeout, retryCount: config.RetryCount, } } // SendAction 发送动作请求(正向HTTP) func (hc *HTTPClient) SendAction(ctx context.Context, action protocol.Action) (map[string]interface{}, error) { // 序列化动作为JSON data, err := sonic.Marshal(action) if err != nil { return nil, fmt.Errorf("failed to marshal action: %w", err) } req := fasthttp.AcquireRequest() resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseRequest(req) defer fasthttp.ReleaseResponse(resp) url := hc.baseURL + "/action" req.SetRequestURI(url) req.Header.SetMethod("POST") req.Header.SetContentType("application/json") req.SetBody(data) hc.logger.Debug("Sending action", zap.String("url", url), zap.String("action", string(action.GetType()))) // 重试机制 var lastErr error for i := 0; i <= hc.retryCount; i++ { if i > 0 { hc.logger.Info("Retrying action request", zap.Int("attempt", i), zap.Int("max", hc.retryCount)) time.Sleep(time.Duration(i) * time.Second) } err := hc.client.DoTimeout(req, resp, hc.timeout) if err != nil { lastErr = fmt.Errorf("request failed: %w", err) continue } if resp.StatusCode() != fasthttp.StatusOK { lastErr = fmt.Errorf("unexpected status code: %d", resp.StatusCode()) continue } // 解析响应 var result map[string]interface{} if err := sonic.Unmarshal(resp.Body(), &result); err != nil { lastErr = fmt.Errorf("failed to parse response: %w", err) continue } hc.logger.Info("Action sent successfully", zap.String("action", string(action.GetType()))) return result, nil } return nil, fmt.Errorf("action failed after %d retries: %w", hc.retryCount, lastErr) } // DoGenericRequest 执行通用HTTP请求 // 用于支持适配器的特定API调用需求 func (hc *HTTPClient) DoGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) { // 如果未提供 Method,默认为 POST if config.Method == "" { config.Method = "POST" } // 重试机制 var lastErr error for i := 0; i <= hc.retryCount; i++ { if i > 0 { hc.logger.Info("Retrying generic request", zap.String("url", config.URL), zap.Int("attempt", i), zap.Int("max", hc.retryCount)) time.Sleep(time.Duration(i) * time.Second) } resp, err := hc.doGenericRequest(ctx, config) if err != nil { lastErr = err continue } return resp, nil } return nil, fmt.Errorf("generic request failed after %d retries: %w", hc.retryCount, lastErr) } // doGenericRequest 执行单次通用请求 func (hc *HTTPClient) doGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) { req := fasthttp.AcquireRequest() resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseRequest(req) defer fasthttp.ReleaseResponse(resp) // 设置请求URL和方法 req.SetRequestURI(config.URL) req.Header.SetMethod(config.Method) // 设置默认 Content-Type if config.Method == "POST" || config.Method == "PUT" || config.Method == "PATCH" { if req.Header.ContentType() == nil || len(req.Header.ContentType()) == 0 { req.Header.SetContentType("application/json") } } // 设置请求体 if config.Body != nil { req.SetBody(config.Body) } // 设置自定义 Headers for key, value := range config.Headers { req.Header.Set(key, value) } // 设置 Authorization 头 if config.AccessToken != "" { req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", config.AccessToken)) } hc.logger.Debug("Sending generic HTTP request", zap.String("method", config.Method), zap.String("url", config.URL)) // 发送请求 err := hc.client.DoTimeout(req, resp, hc.timeout) if err != nil { return nil, fmt.Errorf("request failed: %w", err) } // 构建响应 genericResp := &GenericResponse{ StatusCode: resp.StatusCode(), Body: make([]byte, len(resp.Body())), Headers: make(map[string]string), } // 复制响应体 copy(genericResp.Body, resp.Body()) // 复制响应头 resp.Header.VisitAll(func(key, value []byte) { genericResp.Headers[string(key)] = string(value) }) hc.logger.Debug("Generic HTTP request completed", zap.String("method", config.Method), zap.String("url", config.URL), zap.Int("status_code", resp.StatusCode())) return genericResp, nil } // PollEvents 轮询事件(正向HTTP) func (hc *HTTPClient) PollEvents(ctx context.Context, interval time.Duration) error { ticker := time.NewTicker(interval) defer ticker.Stop() hc.logger.Info("Starting event polling", zap.Duration("interval", interval)) for { select { case <-ticker.C: if err := hc.fetchEvents(ctx); err != nil { hc.logger.Error("Failed to fetch events", zap.Error(err)) } case <-ctx.Done(): hc.logger.Info("Event polling stopped") return ctx.Err() } } } // fetchEvents 获取事件 func (hc *HTTPClient) fetchEvents(ctx context.Context) error { req := fasthttp.AcquireRequest() resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseRequest(req) defer fasthttp.ReleaseResponse(resp) url := hc.baseURL + "/events" req.SetRequestURI(url) req.Header.SetMethod("GET") err := hc.client.DoTimeout(req, resp, hc.timeout) if err != nil { return fmt.Errorf("request failed: %w", err) } if resp.StatusCode() != fasthttp.StatusOK { return fmt.Errorf("unexpected status code: %d", resp.StatusCode()) } // 解析事件列表 var events []protocol.BaseEvent if err := sonic.Unmarshal(resp.Body(), &events); err != nil { return fmt.Errorf("failed to parse events: %w", err) } // 发布事件到事件总线 for i := range events { hc.logger.Debug("Event received", zap.String("type", string(events[i].Type)), zap.String("detail_type", events[i].DetailType)) hc.eventBus.Publish(&events[i]) } return nil } // HTTPWebhookServer HTTP Webhook服务器(用于反向HTTP连接) type HTTPWebhookServer struct { server *fasthttp.Server logger *zap.Logger eventBus *engine.EventBus handlers map[string]*WebhookHandler mu sync.RWMutex } // SimpleWebhookConfig 简化版 Webhook 配置 type SimpleWebhookConfig struct { Addr string AccessToken string // 可选的访问令牌验证 EventChannel chan []byte // 用于传递原始事件数据 } // SimpleWebhookHandler 简化版 Webhook 处理函数 type SimpleWebhookHandler func(ctx *fasthttp.RequestCtx) // SimpleWebhookServer 简化版 Webhook 服务器 // 用于接收协议端推送的事件,提供原始字节流通道 type SimpleWebhookServer struct { server *fasthttp.Server logger *zap.Logger addr string accessToken string eventChan chan []byte handler SimpleWebhookHandler } // WebhookHandler Webhook处理器 type WebhookHandler struct { BotID string Secret string Validator func([]byte, string) bool } // NewHTTPWebhookServer 创建HTTP Webhook服务器 func NewHTTPWebhookServer(logger *zap.Logger, eventBus *engine.EventBus) *HTTPWebhookServer { return &HTTPWebhookServer{ logger: logger.Named("webhook-server"), eventBus: eventBus, handlers: make(map[string]*WebhookHandler), } } // RegisterWebhook 注册Webhook处理器 func (hws *HTTPWebhookServer) RegisterWebhook(path string, handler *WebhookHandler) { hws.mu.Lock() defer hws.mu.Unlock() hws.handlers[path] = handler hws.logger.Info("Webhook registered", zap.String("path", path), zap.String("bot_id", handler.BotID)) } // UnregisterWebhook 注销Webhook处理器 func (hws *HTTPWebhookServer) UnregisterWebhook(path string) { hws.mu.Lock() defer hws.mu.Unlock() delete(hws.handlers, path) hws.logger.Info("Webhook unregistered", zap.String("path", path)) } // Start 启动Webhook服务器 func (hws *HTTPWebhookServer) Start(addr string) error { hws.server = &fasthttp.Server{ Handler: hws.handleWebhook, } hws.logger.Info("Starting webhook server", zap.String("address", addr)) go func() { if err := hws.server.ListenAndServe(addr); err != nil { hws.logger.Error("Webhook server error", zap.Error(err)) } }() return nil } // Stop 停止Webhook服务器 func (hws *HTTPWebhookServer) Stop() error { if hws.server != nil { hws.logger.Info("Stopping webhook server") return hws.server.Shutdown() } return nil } // NewSimpleWebhookServer 创建简化版 Webhook 服务器 func NewSimpleWebhookServer(config SimpleWebhookConfig, logger *zap.Logger) *SimpleWebhookServer { return &SimpleWebhookServer{ addr: config.Addr, accessToken: config.AccessToken, eventChan: config.EventChannel, logger: logger.Named("simple-webhook-server"), } } // Start 启动简化版 Webhook 服务器 func (s *SimpleWebhookServer) Start() error { s.server = &fasthttp.Server{ Handler: s.handleRequest, MaxConnsPerIP: 1000, MaxRequestsPerConn: 1000, } s.logger.Info("Starting simple webhook server", zap.String("addr", s.addr)) go func() { if err := s.server.ListenAndServe(s.addr); err != nil { s.logger.Error("Simple webhook server error", zap.Error(err)) } }() return nil } // handleRequest 处理 Webhook 请求 func (s *SimpleWebhookServer) handleRequest(ctx *fasthttp.RequestCtx) { // 只接受 POST 请求 if !ctx.IsPost() { s.logger.Warn("Received non-POST request", zap.String("method", string(ctx.Method()))) ctx.Error("Method Not Allowed", fasthttp.StatusMethodNotAllowed) return } // 检查 Content-Type contentType := string(ctx.Request.Header.ContentType()) if contentType != "application/json" { s.logger.Warn("Invalid content type", zap.String("content_type", contentType)) ctx.Error("Unsupported Media Type", fasthttp.StatusUnsupportedMediaType) return } // 验证访问令牌(如果配置了) if s.accessToken != "" { authHeader := string(ctx.Request.Header.Peek("Authorization")) if authHeader == "" { s.logger.Warn("Missing authorization header") ctx.Error("Unauthorized", fasthttp.StatusUnauthorized) return } // 验证 Bearer token expectedAuth := "Bearer " + s.accessToken if authHeader != expectedAuth { s.logger.Warn("Invalid authorization token") ctx.Error("Unauthorized", fasthttp.StatusUnauthorized) return } } // 获取请求体 body := ctx.PostBody() if len(body) == 0 { s.logger.Warn("Empty request body") ctx.Error("Bad Request", fasthttp.StatusBadRequest) return } s.logger.Debug("Received webhook event", zap.Int("body_length", len(body))) // 发送到事件通道 select { case s.eventChan <- body: default: s.logger.Warn("Event channel full, dropping event") } // 返回成功响应 ctx.SetContentType("application/json") ctx.SetStatusCode(fasthttp.StatusOK) ctx.SetBodyString(`{"status":"ok"}`) } // Events 获取事件通道 func (s *SimpleWebhookServer) Events() <-chan []byte { return s.eventChan } // Stop 停止简化版 Webhook 服务器 func (s *SimpleWebhookServer) Stop() error { if s.server != nil { s.logger.Info("Stopping simple webhook server") if err := s.server.Shutdown(); err != nil { return fmt.Errorf("failed to shutdown: %w", err) } } return nil } // handleWebhook 处理Webhook请求 func (hws *HTTPWebhookServer) handleWebhook(ctx *fasthttp.RequestCtx) { path := string(ctx.Path()) hws.mu.RLock() handler, exists := hws.handlers[path] hws.mu.RUnlock() if !exists { ctx.Error("Webhook not found", fasthttp.StatusNotFound) return } // 验证签名(如果配置了) if handler.Secret != "" && handler.Validator != nil { signature := string(ctx.Request.Header.Peek("X-Signature")) if !handler.Validator(ctx.PostBody(), signature) { hws.logger.Warn("Invalid webhook signature", zap.String("path", path), zap.String("bot_id", handler.BotID)) ctx.Error("Invalid signature", fasthttp.StatusUnauthorized) return } } // 解析事件 var event protocol.BaseEvent if err := sonic.Unmarshal(ctx.PostBody(), &event); err != nil { hws.logger.Error("Failed to parse webhook event", zap.Error(err), zap.String("path", path)) ctx.Error("Invalid event format", fasthttp.StatusBadRequest) return } // 设置BotID if event.SelfID == "" { event.SelfID = handler.BotID } // 设置时间戳 if event.Timestamp == 0 { event.Timestamp = time.Now().Unix() } // 确保Data字段不为nil if event.Data == nil { event.Data = make(map[string]interface{}) } hws.logger.Info("Webhook event received", zap.String("path", path), zap.String("bot_id", handler.BotID), zap.String("type", string(event.Type)), zap.String("detail_type", event.DetailType)) // 发布到事件总线 hws.eventBus.Publish(&event) // 返回成功响应 ctx.SetContentType("application/json") ctx.SetBodyString(`{"success":true}`) }