- Added new dependencies for SQLite support and improved HTTP client functionality in go.mod and go.sum. - Refactored webhook server implementation to utilize a simplified version, enhancing code maintainability. - Updated API client to leverage a generic request method, streamlining API interactions. - Modified configuration to include access token for webhook server, improving security. - Enhanced event handling and request processing in the API client for better performance.
554 lines
14 KiB
Go
554 lines
14 KiB
Go
package net
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
|
||
"cellbot/internal/engine"
|
||
"cellbot/internal/protocol"
|
||
|
||
"github.com/bytedance/sonic"
|
||
"github.com/valyala/fasthttp"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
// HTTPClient HTTP客户端(用于正向HTTP连接)
|
||
type HTTPClient struct {
|
||
client *fasthttp.Client
|
||
logger *zap.Logger
|
||
eventBus *engine.EventBus
|
||
botID string
|
||
baseURL string
|
||
timeout time.Duration
|
||
retryCount int
|
||
}
|
||
|
||
// GenericRequestConfig 通用请求配置
|
||
type GenericRequestConfig struct {
|
||
URL string
|
||
Method string // GET, POST, etc.
|
||
Headers map[string]string
|
||
Body []byte
|
||
AccessToken string
|
||
}
|
||
|
||
// GenericResponse 通用响应
|
||
type GenericResponse struct {
|
||
StatusCode int
|
||
Headers map[string]string
|
||
Body []byte
|
||
}
|
||
|
||
// HTTPClientConfig HTTP客户端配置
|
||
type HTTPClientConfig struct {
|
||
BotID string
|
||
BaseURL string
|
||
Timeout time.Duration
|
||
RetryCount int
|
||
}
|
||
|
||
// NewHTTPClient 创建HTTP客户端
|
||
func NewHTTPClient(config HTTPClientConfig, logger *zap.Logger, eventBus *engine.EventBus) *HTTPClient {
|
||
if config.Timeout == 0 {
|
||
config.Timeout = 30 * time.Second
|
||
}
|
||
if config.RetryCount == 0 {
|
||
config.RetryCount = 3
|
||
}
|
||
|
||
return &HTTPClient{
|
||
client: &fasthttp.Client{
|
||
ReadTimeout: config.Timeout,
|
||
WriteTimeout: config.Timeout,
|
||
MaxConnsPerHost: 100,
|
||
},
|
||
logger: logger.Named("http-client"),
|
||
eventBus: eventBus,
|
||
botID: config.BotID,
|
||
baseURL: config.BaseURL,
|
||
timeout: config.Timeout,
|
||
retryCount: config.RetryCount,
|
||
}
|
||
}
|
||
|
||
// SendAction 发送动作请求(正向HTTP)
|
||
func (hc *HTTPClient) SendAction(ctx context.Context, action protocol.Action) (map[string]interface{}, error) {
|
||
// 序列化动作为JSON
|
||
data, err := sonic.Marshal(action)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to marshal action: %w", err)
|
||
}
|
||
|
||
req := fasthttp.AcquireRequest()
|
||
resp := fasthttp.AcquireResponse()
|
||
defer fasthttp.ReleaseRequest(req)
|
||
defer fasthttp.ReleaseResponse(resp)
|
||
|
||
url := hc.baseURL + "/action"
|
||
req.SetRequestURI(url)
|
||
req.Header.SetMethod("POST")
|
||
req.Header.SetContentType("application/json")
|
||
req.SetBody(data)
|
||
|
||
hc.logger.Debug("Sending action",
|
||
zap.String("url", url),
|
||
zap.String("action", string(action.GetType())))
|
||
|
||
// 重试机制
|
||
var lastErr error
|
||
for i := 0; i <= hc.retryCount; i++ {
|
||
if i > 0 {
|
||
hc.logger.Info("Retrying action request",
|
||
zap.Int("attempt", i),
|
||
zap.Int("max", hc.retryCount))
|
||
time.Sleep(time.Duration(i) * time.Second)
|
||
}
|
||
|
||
err := hc.client.DoTimeout(req, resp, hc.timeout)
|
||
if err != nil {
|
||
lastErr = fmt.Errorf("request failed: %w", err)
|
||
continue
|
||
}
|
||
|
||
if resp.StatusCode() != fasthttp.StatusOK {
|
||
lastErr = fmt.Errorf("unexpected status code: %d", resp.StatusCode())
|
||
continue
|
||
}
|
||
|
||
// 解析响应
|
||
var result map[string]interface{}
|
||
if err := sonic.Unmarshal(resp.Body(), &result); err != nil {
|
||
lastErr = fmt.Errorf("failed to parse response: %w", err)
|
||
continue
|
||
}
|
||
|
||
hc.logger.Info("Action sent successfully",
|
||
zap.String("action", string(action.GetType())))
|
||
|
||
return result, nil
|
||
}
|
||
|
||
return nil, fmt.Errorf("action failed after %d retries: %w", hc.retryCount, lastErr)
|
||
}
|
||
|
||
// DoGenericRequest 执行通用HTTP请求
|
||
// 用于支持适配器的特定API调用需求
|
||
func (hc *HTTPClient) DoGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) {
|
||
// 如果未提供 Method,默认为 POST
|
||
if config.Method == "" {
|
||
config.Method = "POST"
|
||
}
|
||
|
||
// 重试机制
|
||
var lastErr error
|
||
for i := 0; i <= hc.retryCount; i++ {
|
||
if i > 0 {
|
||
hc.logger.Info("Retrying generic request",
|
||
zap.String("url", config.URL),
|
||
zap.Int("attempt", i),
|
||
zap.Int("max", hc.retryCount))
|
||
time.Sleep(time.Duration(i) * time.Second)
|
||
}
|
||
|
||
resp, err := hc.doGenericRequest(ctx, config)
|
||
if err != nil {
|
||
lastErr = err
|
||
continue
|
||
}
|
||
|
||
return resp, nil
|
||
}
|
||
|
||
return nil, fmt.Errorf("generic request failed after %d retries: %w", hc.retryCount, lastErr)
|
||
}
|
||
|
||
// doGenericRequest 执行单次通用请求
|
||
func (hc *HTTPClient) doGenericRequest(ctx context.Context, config GenericRequestConfig) (*GenericResponse, error) {
|
||
req := fasthttp.AcquireRequest()
|
||
resp := fasthttp.AcquireResponse()
|
||
defer fasthttp.ReleaseRequest(req)
|
||
defer fasthttp.ReleaseResponse(resp)
|
||
|
||
// 设置请求URL和方法
|
||
req.SetRequestURI(config.URL)
|
||
req.Header.SetMethod(config.Method)
|
||
|
||
// 设置默认 Content-Type
|
||
if config.Method == "POST" || config.Method == "PUT" || config.Method == "PATCH" {
|
||
if req.Header.ContentType() == nil || len(req.Header.ContentType()) == 0 {
|
||
req.Header.SetContentType("application/json")
|
||
}
|
||
}
|
||
|
||
// 设置请求体
|
||
if config.Body != nil {
|
||
req.SetBody(config.Body)
|
||
}
|
||
|
||
// 设置自定义 Headers
|
||
for key, value := range config.Headers {
|
||
req.Header.Set(key, value)
|
||
}
|
||
|
||
// 设置 Authorization 头
|
||
if config.AccessToken != "" {
|
||
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", config.AccessToken))
|
||
}
|
||
|
||
hc.logger.Debug("Sending generic HTTP request",
|
||
zap.String("method", config.Method),
|
||
zap.String("url", config.URL))
|
||
|
||
// 发送请求
|
||
err := hc.client.DoTimeout(req, resp, hc.timeout)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("request failed: %w", err)
|
||
}
|
||
|
||
// 构建响应
|
||
genericResp := &GenericResponse{
|
||
StatusCode: resp.StatusCode(),
|
||
Body: make([]byte, len(resp.Body())),
|
||
Headers: make(map[string]string),
|
||
}
|
||
|
||
// 复制响应体
|
||
copy(genericResp.Body, resp.Body())
|
||
|
||
// 复制响应头
|
||
resp.Header.VisitAll(func(key, value []byte) {
|
||
genericResp.Headers[string(key)] = string(value)
|
||
})
|
||
|
||
hc.logger.Debug("Generic HTTP request completed",
|
||
zap.String("method", config.Method),
|
||
zap.String("url", config.URL),
|
||
zap.Int("status_code", resp.StatusCode()))
|
||
|
||
return genericResp, nil
|
||
}
|
||
|
||
// PollEvents 轮询事件(正向HTTP)
|
||
func (hc *HTTPClient) PollEvents(ctx context.Context, interval time.Duration) error {
|
||
ticker := time.NewTicker(interval)
|
||
defer ticker.Stop()
|
||
|
||
hc.logger.Info("Starting event polling",
|
||
zap.Duration("interval", interval))
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
if err := hc.fetchEvents(ctx); err != nil {
|
||
hc.logger.Error("Failed to fetch events", zap.Error(err))
|
||
}
|
||
|
||
case <-ctx.Done():
|
||
hc.logger.Info("Event polling stopped")
|
||
return ctx.Err()
|
||
}
|
||
}
|
||
}
|
||
|
||
// fetchEvents 获取事件
|
||
func (hc *HTTPClient) fetchEvents(ctx context.Context) error {
|
||
req := fasthttp.AcquireRequest()
|
||
resp := fasthttp.AcquireResponse()
|
||
defer fasthttp.ReleaseRequest(req)
|
||
defer fasthttp.ReleaseResponse(resp)
|
||
|
||
url := hc.baseURL + "/events"
|
||
req.SetRequestURI(url)
|
||
req.Header.SetMethod("GET")
|
||
|
||
err := hc.client.DoTimeout(req, resp, hc.timeout)
|
||
if err != nil {
|
||
return fmt.Errorf("request failed: %w", err)
|
||
}
|
||
|
||
if resp.StatusCode() != fasthttp.StatusOK {
|
||
return fmt.Errorf("unexpected status code: %d", resp.StatusCode())
|
||
}
|
||
|
||
// 解析事件列表
|
||
var events []protocol.BaseEvent
|
||
if err := sonic.Unmarshal(resp.Body(), &events); err != nil {
|
||
return fmt.Errorf("failed to parse events: %w", err)
|
||
}
|
||
|
||
// 发布事件到事件总线
|
||
for i := range events {
|
||
hc.logger.Debug("Event received",
|
||
zap.String("type", string(events[i].Type)),
|
||
zap.String("detail_type", events[i].DetailType))
|
||
|
||
hc.eventBus.Publish(&events[i])
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// HTTPWebhookServer HTTP Webhook服务器(用于反向HTTP连接)
|
||
type HTTPWebhookServer struct {
|
||
server *fasthttp.Server
|
||
logger *zap.Logger
|
||
eventBus *engine.EventBus
|
||
handlers map[string]*WebhookHandler
|
||
mu sync.RWMutex
|
||
}
|
||
|
||
// SimpleWebhookConfig 简化版 Webhook 配置
|
||
type SimpleWebhookConfig struct {
|
||
Addr string
|
||
AccessToken string // 可选的访问令牌验证
|
||
EventChannel chan []byte // 用于传递原始事件数据
|
||
}
|
||
|
||
// SimpleWebhookHandler 简化版 Webhook 处理函数
|
||
type SimpleWebhookHandler func(ctx *fasthttp.RequestCtx)
|
||
|
||
// SimpleWebhookServer 简化版 Webhook 服务器
|
||
// 用于接收协议端推送的事件,提供原始字节流通道
|
||
type SimpleWebhookServer struct {
|
||
server *fasthttp.Server
|
||
logger *zap.Logger
|
||
addr string
|
||
accessToken string
|
||
eventChan chan []byte
|
||
handler SimpleWebhookHandler
|
||
}
|
||
|
||
// WebhookHandler Webhook处理器
|
||
type WebhookHandler struct {
|
||
BotID string
|
||
Secret string
|
||
Validator func([]byte, string) bool
|
||
}
|
||
|
||
// NewHTTPWebhookServer 创建HTTP Webhook服务器
|
||
func NewHTTPWebhookServer(logger *zap.Logger, eventBus *engine.EventBus) *HTTPWebhookServer {
|
||
return &HTTPWebhookServer{
|
||
logger: logger.Named("webhook-server"),
|
||
eventBus: eventBus,
|
||
handlers: make(map[string]*WebhookHandler),
|
||
}
|
||
}
|
||
|
||
// RegisterWebhook 注册Webhook处理器
|
||
func (hws *HTTPWebhookServer) RegisterWebhook(path string, handler *WebhookHandler) {
|
||
hws.mu.Lock()
|
||
defer hws.mu.Unlock()
|
||
|
||
hws.handlers[path] = handler
|
||
hws.logger.Info("Webhook registered",
|
||
zap.String("path", path),
|
||
zap.String("bot_id", handler.BotID))
|
||
}
|
||
|
||
// UnregisterWebhook 注销Webhook处理器
|
||
func (hws *HTTPWebhookServer) UnregisterWebhook(path string) {
|
||
hws.mu.Lock()
|
||
defer hws.mu.Unlock()
|
||
|
||
delete(hws.handlers, path)
|
||
hws.logger.Info("Webhook unregistered", zap.String("path", path))
|
||
}
|
||
|
||
// Start 启动Webhook服务器
|
||
func (hws *HTTPWebhookServer) Start(addr string) error {
|
||
hws.server = &fasthttp.Server{
|
||
Handler: hws.handleWebhook,
|
||
}
|
||
|
||
hws.logger.Info("Starting webhook server", zap.String("address", addr))
|
||
|
||
go func() {
|
||
if err := hws.server.ListenAndServe(addr); err != nil {
|
||
hws.logger.Error("Webhook server error", zap.Error(err))
|
||
}
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
// Stop 停止Webhook服务器
|
||
func (hws *HTTPWebhookServer) Stop() error {
|
||
if hws.server != nil {
|
||
hws.logger.Info("Stopping webhook server")
|
||
return hws.server.Shutdown()
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// NewSimpleWebhookServer 创建简化版 Webhook 服务器
|
||
func NewSimpleWebhookServer(config SimpleWebhookConfig, logger *zap.Logger) *SimpleWebhookServer {
|
||
return &SimpleWebhookServer{
|
||
addr: config.Addr,
|
||
accessToken: config.AccessToken,
|
||
eventChan: config.EventChannel,
|
||
logger: logger.Named("simple-webhook-server"),
|
||
}
|
||
}
|
||
|
||
// Start 启动简化版 Webhook 服务器
|
||
func (s *SimpleWebhookServer) Start() error {
|
||
s.server = &fasthttp.Server{
|
||
Handler: s.handleRequest,
|
||
MaxConnsPerIP: 1000,
|
||
MaxRequestsPerConn: 1000,
|
||
}
|
||
|
||
s.logger.Info("Starting simple webhook server", zap.String("addr", s.addr))
|
||
|
||
go func() {
|
||
if err := s.server.ListenAndServe(s.addr); err != nil {
|
||
s.logger.Error("Simple webhook server error", zap.Error(err))
|
||
}
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
|
||
// handleRequest 处理 Webhook 请求
|
||
func (s *SimpleWebhookServer) handleRequest(ctx *fasthttp.RequestCtx) {
|
||
// 只接受 POST 请求
|
||
if !ctx.IsPost() {
|
||
s.logger.Warn("Received non-POST request",
|
||
zap.String("method", string(ctx.Method())))
|
||
ctx.Error("Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
|
||
// 检查 Content-Type
|
||
contentType := string(ctx.Request.Header.ContentType())
|
||
if contentType != "application/json" {
|
||
s.logger.Warn("Invalid content type",
|
||
zap.String("content_type", contentType))
|
||
ctx.Error("Unsupported Media Type", fasthttp.StatusUnsupportedMediaType)
|
||
return
|
||
}
|
||
|
||
// 验证访问令牌(如果配置了)
|
||
if s.accessToken != "" {
|
||
authHeader := string(ctx.Request.Header.Peek("Authorization"))
|
||
if authHeader == "" {
|
||
s.logger.Warn("Missing authorization header")
|
||
ctx.Error("Unauthorized", fasthttp.StatusUnauthorized)
|
||
return
|
||
}
|
||
|
||
// 验证 Bearer token
|
||
expectedAuth := "Bearer " + s.accessToken
|
||
if authHeader != expectedAuth {
|
||
s.logger.Warn("Invalid authorization token")
|
||
ctx.Error("Unauthorized", fasthttp.StatusUnauthorized)
|
||
return
|
||
}
|
||
}
|
||
|
||
// 获取请求体
|
||
body := ctx.PostBody()
|
||
if len(body) == 0 {
|
||
s.logger.Warn("Empty request body")
|
||
ctx.Error("Bad Request", fasthttp.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
s.logger.Debug("Received webhook event",
|
||
zap.Int("body_length", len(body)))
|
||
|
||
// 发送到事件通道
|
||
select {
|
||
case s.eventChan <- body:
|
||
default:
|
||
s.logger.Warn("Event channel full, dropping event")
|
||
}
|
||
|
||
// 返回成功响应
|
||
ctx.SetContentType("application/json")
|
||
ctx.SetStatusCode(fasthttp.StatusOK)
|
||
ctx.SetBodyString(`{"status":"ok"}`)
|
||
}
|
||
|
||
// Events 获取事件通道
|
||
func (s *SimpleWebhookServer) Events() <-chan []byte {
|
||
return s.eventChan
|
||
}
|
||
|
||
// Stop 停止简化版 Webhook 服务器
|
||
func (s *SimpleWebhookServer) Stop() error {
|
||
if s.server != nil {
|
||
s.logger.Info("Stopping simple webhook server")
|
||
if err := s.server.Shutdown(); err != nil {
|
||
return fmt.Errorf("failed to shutdown: %w", err)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// handleWebhook 处理Webhook请求
|
||
func (hws *HTTPWebhookServer) handleWebhook(ctx *fasthttp.RequestCtx) {
|
||
path := string(ctx.Path())
|
||
|
||
hws.mu.RLock()
|
||
handler, exists := hws.handlers[path]
|
||
hws.mu.RUnlock()
|
||
|
||
if !exists {
|
||
ctx.Error("Webhook not found", fasthttp.StatusNotFound)
|
||
return
|
||
}
|
||
|
||
// 验证签名(如果配置了)
|
||
if handler.Secret != "" && handler.Validator != nil {
|
||
signature := string(ctx.Request.Header.Peek("X-Signature"))
|
||
if !handler.Validator(ctx.PostBody(), signature) {
|
||
hws.logger.Warn("Invalid webhook signature",
|
||
zap.String("path", path),
|
||
zap.String("bot_id", handler.BotID))
|
||
ctx.Error("Invalid signature", fasthttp.StatusUnauthorized)
|
||
return
|
||
}
|
||
}
|
||
|
||
// 解析事件
|
||
var event protocol.BaseEvent
|
||
if err := sonic.Unmarshal(ctx.PostBody(), &event); err != nil {
|
||
hws.logger.Error("Failed to parse webhook event",
|
||
zap.Error(err),
|
||
zap.String("path", path))
|
||
ctx.Error("Invalid event format", fasthttp.StatusBadRequest)
|
||
return
|
||
}
|
||
|
||
// 设置BotID
|
||
if event.SelfID == "" {
|
||
event.SelfID = handler.BotID
|
||
}
|
||
|
||
// 设置时间戳
|
||
if event.Timestamp == 0 {
|
||
event.Timestamp = time.Now().Unix()
|
||
}
|
||
|
||
// 确保Data字段不为nil
|
||
if event.Data == nil {
|
||
event.Data = make(map[string]interface{})
|
||
}
|
||
|
||
hws.logger.Info("Webhook event received",
|
||
zap.String("path", path),
|
||
zap.String("bot_id", handler.BotID),
|
||
zap.String("type", string(event.Type)),
|
||
zap.String("detail_type", event.DetailType))
|
||
|
||
// 发布到事件总线
|
||
hws.eventBus.Publish(&event)
|
||
|
||
// 返回成功响应
|
||
ctx.SetContentType("application/json")
|
||
ctx.SetBodyString(`{"success":true}`)
|
||
}
|