Files

187 lines
4.1 KiB
Go
Raw Permalink Normal View History

package onebot11
import (
"context"
"fmt"
"sync"
"time"
"github.com/bytedance/sonic"
"github.com/google/uuid"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
// HTTPClient OneBot11 HTTP客户端
type HTTPClient struct {
baseURL string
accessToken string
httpClient *fasthttp.Client
logger *zap.Logger
timeout time.Duration
}
// NewHTTPClient 创建HTTP客户端
func NewHTTPClient(baseURL, accessToken string, timeout time.Duration, logger *zap.Logger) *HTTPClient {
if timeout == 0 {
timeout = 30 * time.Second
}
return &HTTPClient{
baseURL: baseURL,
accessToken: accessToken,
httpClient: &fasthttp.Client{
ReadTimeout: timeout,
WriteTimeout: timeout,
MaxConnsPerHost: 100,
},
logger: logger.Named("http-client"),
timeout: timeout,
}
}
// Call 调用API
func (c *HTTPClient) Call(ctx context.Context, action string, params map[string]interface{}) (*OB11Response, error) {
// 构建请求数据
reqData := map[string]interface{}{
"action": action,
"params": params,
}
data, err := sonic.Marshal(reqData)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
// 构建URL
url := fmt.Sprintf("%s/%s", c.baseURL, action)
c.logger.Debug("Calling HTTP API",
zap.String("action", action),
zap.String("url", url))
req := fasthttp.AcquireRequest()
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseRequest(req)
defer fasthttp.ReleaseResponse(resp)
// 设置请求
req.SetRequestURI(url)
req.Header.SetMethod("POST")
req.Header.SetContentType("application/json")
req.SetBody(data)
// 设置访问令牌
if c.accessToken != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.accessToken))
}
// 发送请求
if err := c.httpClient.DoTimeout(req, resp, c.timeout); err != nil {
return nil, fmt.Errorf("request failed: %w", err)
}
// 检查HTTP状态码
statusCode := resp.StatusCode()
if statusCode != 200 {
return nil, fmt.Errorf("unexpected status code: %d", statusCode)
}
// 解析响应
var ob11Resp OB11Response
if err := sonic.Unmarshal(resp.Body(), &ob11Resp); err != nil {
return nil, fmt.Errorf("failed to parse response: %w", err)
}
// 检查业务状态
if ob11Resp.Status != "ok" && ob11Resp.Status != "async" {
return &ob11Resp, fmt.Errorf("API error (retcode=%d)", ob11Resp.RetCode)
}
c.logger.Debug("HTTP API call succeeded",
zap.String("action", action),
zap.String("status", ob11Resp.Status))
return &ob11Resp, nil
}
// Close 关闭客户端
func (c *HTTPClient) Close() error {
// fasthttp.Client 不需要显式关闭
return nil
}
// WSResponseWaiter WebSocket响应等待器
type WSResponseWaiter struct {
pending map[string]chan *OB11Response
mu sync.RWMutex
logger *zap.Logger
timeout time.Duration
}
// NewWSResponseWaiter 创建WebSocket响应等待器
func NewWSResponseWaiter(timeout time.Duration, logger *zap.Logger) *WSResponseWaiter {
if timeout == 0 {
timeout = 30 * time.Second
}
return &WSResponseWaiter{
pending: make(map[string]chan *OB11Response),
logger: logger.Named("ws-waiter"),
timeout: timeout,
}
}
// Wait 等待响应
func (w *WSResponseWaiter) Wait(echo string) (*OB11Response, error) {
w.mu.Lock()
ch := make(chan *OB11Response, 1)
w.pending[echo] = ch
w.mu.Unlock()
defer func() {
w.mu.Lock()
delete(w.pending, echo)
close(ch)
w.mu.Unlock()
}()
select {
case resp := <-ch:
return resp, nil
case <-time.After(w.timeout):
return nil, fmt.Errorf("timeout waiting for response (echo=%s)", echo)
}
}
// Notify 通知响应到达
func (w *WSResponseWaiter) Notify(resp *OB11Response) {
if resp.Echo == "" {
return
}
w.mu.RLock()
ch, ok := w.pending[resp.Echo]
w.mu.RUnlock()
if !ok {
w.logger.Warn("Received response for unknown echo",
zap.String("echo", resp.Echo))
return
}
select {
case ch <- resp:
w.logger.Debug("Notified response",
zap.String("echo", resp.Echo))
default:
w.logger.Warn("Failed to notify response: channel full",
zap.String("echo", resp.Echo))
}
}
// GenerateEcho 生成唯一的echo标识
func GenerateEcho() string {
return uuid.New().String()
}