Files
cellbot/internal/engine/dispatcher.go

348 lines
10 KiB
Go
Raw Permalink Normal View History

package engine
import (
"context"
"runtime/debug"
"sort"
"sync"
"sync/atomic"
"time"
"cellbot/internal/protocol"
"go.uber.org/zap"
)
// DispatcherMetrics 分发器指标
type DispatcherMetrics struct {
ProcessedTotal int64 // 处理的事件总数
SuccessTotal int64 // 成功处理的事件数
FailedTotal int64 // 失败的事件数
PanicTotal int64 // Panic次数
AvgProcessTime float64 // 平均处理时间(毫秒)
LastProcessTime int64 // 最后处理时间Unix时间戳
}
// Dispatcher 事件分发器
// 管理事件处理器并按照优先级分发事件
type Dispatcher struct {
handlers []protocol.EventHandler
middlewares []protocol.Middleware
logger *zap.Logger
eventBus *EventBus
scheduler *Scheduler
metrics DispatcherMetrics
mu sync.RWMutex
totalTime int64 // 总处理时间(纳秒)
}
// NewDispatcher 创建事件分发器
func NewDispatcher(eventBus *EventBus, logger *zap.Logger) *Dispatcher {
return &Dispatcher{
handlers: make([]protocol.EventHandler, 0),
middlewares: make([]protocol.Middleware, 0),
logger: logger.Named("dispatcher"),
eventBus: eventBus,
}
}
// NewDispatcherWithScheduler 创建带调度器的事件分发器
func NewDispatcherWithScheduler(eventBus *EventBus, logger *zap.Logger, scheduler *Scheduler) *Dispatcher {
dispatcher := NewDispatcher(eventBus, logger)
dispatcher.scheduler = scheduler
return dispatcher
}
// RegisterHandler 注册事件处理器
func (d *Dispatcher) RegisterHandler(handler protocol.EventHandler) {
d.mu.Lock()
defer d.mu.Unlock()
d.handlers = append(d.handlers, handler)
// 按优先级排序(数值越小优先级越高)
sort.Slice(d.handlers, func(i, j int) bool {
return d.handlers[i].Priority() < d.handlers[j].Priority()
})
d.logger.Debug("Handler registered",
zap.Int("priority", handler.Priority()),
zap.Int("total_handlers", len(d.handlers)))
}
// UnregisterHandler 取消注册事件处理器
func (d *Dispatcher) UnregisterHandler(handler protocol.EventHandler) {
d.mu.Lock()
defer d.mu.Unlock()
for i, h := range d.handlers {
if h == handler {
d.handlers = append(d.handlers[:i], d.handlers[i+1:]...)
break
}
}
d.logger.Debug("Handler unregistered",
zap.Int("total_handlers", len(d.handlers)))
}
// RegisterMiddleware 注册中间件
func (d *Dispatcher) RegisterMiddleware(middleware protocol.Middleware) {
d.mu.Lock()
defer d.mu.Unlock()
d.middlewares = append(d.middlewares, middleware)
d.logger.Debug("Middleware registered",
zap.Int("total_middlewares", len(d.middlewares)))
}
// Start 启动分发器
func (d *Dispatcher) Start(ctx context.Context) {
// 订阅所有类型的事件
for _, eventType := range []protocol.EventType{
protocol.EventTypeMessage,
protocol.EventTypeNotice,
protocol.EventTypeRequest,
protocol.EventTypeMeta,
} {
eventChan := d.eventBus.Subscribe(eventType, nil)
go d.eventLoop(ctx, eventChan)
}
// 启动调度器
if d.scheduler != nil {
if err := d.scheduler.Start(); err != nil {
d.logger.Error("Failed to start scheduler", zap.Error(err))
} else {
d.logger.Info("Scheduler started")
}
}
d.logger.Info("Dispatcher started")
}
// Stop 停止分发器
func (d *Dispatcher) Stop() {
// 停止调度器
if d.scheduler != nil {
if err := d.scheduler.Stop(); err != nil {
d.logger.Error("Failed to stop scheduler", zap.Error(err))
} else {
d.logger.Info("Scheduler stopped")
}
}
d.logger.Info("Dispatcher stopped")
}
// GetScheduler 获取调度器
func (d *Dispatcher) GetScheduler() *Scheduler {
return d.scheduler
}
// eventLoop 事件循环
func (d *Dispatcher) eventLoop(ctx context.Context, eventChan chan protocol.Event) {
// 使用独立的 context避免应用关闭时取消正在处理的事件
// 即使应用关闭,也要让正在处理的事件完成
shutdown := false
for {
select {
case event, ok := <-eventChan:
if !ok {
d.logger.Info("Event channel closed, stopping event loop")
return
}
d.logger.Debug("Event received in eventLoop",
zap.String("type", string(event.GetType())),
zap.String("detail_type", event.GetDetailType()),
zap.String("self_id", event.GetSelfID()),
zap.Bool("shutdown", shutdown))
// 为每个事件创建独立的 context避免应用关闭时取消正在处理的事件
// 使用独立的 context允许处理完成
handlerCtx, handlerCancel := context.WithTimeout(context.Background(), 5*time.Minute)
// 直接使用 goroutine 处理事件Go 的调度器会自动管理
go func(e protocol.Event) {
defer handlerCancel()
d.handleEvent(handlerCtx, e)
}(event)
case <-ctx.Done():
// 当应用关闭时,标记为关闭状态,但继续处理 channel 中的事件
if !shutdown {
d.logger.Info("Context cancelled, will continue processing events until channel closes")
shutdown = true
}
// 继续处理 channel 中的事件,直到 channel 关闭
// 不再检查 ctx.Done(),只等待 channel 关闭
}
}
}
// handleEvent 处理单个事件
func (d *Dispatcher) handleEvent(ctx context.Context, event protocol.Event) {
startTime := time.Now()
// 使用defer捕获panic
defer func() {
if r := recover(); r != nil {
atomic.AddInt64(&d.metrics.PanicTotal, 1)
atomic.AddInt64(&d.metrics.FailedTotal, 1)
d.logger.Error("Panic in event handler",
zap.Any("panic", r),
zap.String("stack", string(debug.Stack())),
zap.String("event_type", string(event.GetType())))
}
// 更新指标
duration := time.Since(startTime)
atomic.AddInt64(&d.metrics.ProcessedTotal, 1)
atomic.AddInt64(&d.totalTime, duration.Nanoseconds())
atomic.StoreInt64(&d.metrics.LastProcessTime, time.Now().Unix())
// 计算平均处理时间
processed := atomic.LoadInt64(&d.metrics.ProcessedTotal)
if processed > 0 {
avgNs := atomic.LoadInt64(&d.totalTime) / processed
d.metrics.AvgProcessTime = float64(avgNs) / 1e6 // 转换为毫秒
}
}()
d.logger.Info("Processing event",
zap.String("type", string(event.GetType())),
zap.String("detail_type", event.GetDetailType()),
zap.String("self_id", event.GetSelfID()))
// 通过中间件链处理事件
d.mu.RLock()
middlewares := d.middlewares
d.mu.RUnlock()
next := d.createHandlerChain(ctx, event)
// 执行中间件链
if len(middlewares) > 0 {
d.executeMiddlewares(ctx, event, middlewares, func(ctx context.Context, e protocol.Event) error {
next(ctx, e)
return nil
})
} else {
next(ctx, event)
}
atomic.AddInt64(&d.metrics.SuccessTotal, 1)
}
// createHandlerChain 创建处理器链
func (d *Dispatcher) createHandlerChain(ctx context.Context, event protocol.Event) func(context.Context, protocol.Event) {
return func(ctx context.Context, e protocol.Event) {
d.mu.RLock()
handlers := make([]protocol.EventHandler, len(d.handlers))
copy(handlers, d.handlers)
d.mu.RUnlock()
for i, handler := range handlers {
matched := handler.Match(event)
d.logger.Debug("Checking handler",
zap.Int("handler_index", i),
zap.String("handler_name", handler.Name()),
zap.Int("priority", handler.Priority()),
zap.Bool("matched", matched))
if matched {
d.logger.Info("Handler matched, calling Handle",
zap.Int("handler_index", i),
zap.String("handler_name", handler.Name()),
zap.String("handler_description", handler.Description()))
// 使用defer捕获单个handler的panic
func() {
defer func() {
if r := recover(); r != nil {
d.logger.Error("Panic in handler",
zap.Any("panic", r),
zap.String("stack", string(debug.Stack())),
zap.String("event_type", string(e.GetType())))
}
}()
if err := handler.Handle(ctx, e); err != nil {
d.logger.Error("Handler execution failed",
zap.Error(err),
zap.String("event_type", string(e.GetType())))
}
}()
}
}
}
}
// executeMiddlewares 执行中间件链
func (d *Dispatcher) executeMiddlewares(ctx context.Context, event protocol.Event, middlewares []protocol.Middleware, next func(context.Context, protocol.Event) error) {
// 从后向前构建中间件链
handler := next
for i := len(middlewares) - 1; i >= 0; i-- {
middleware := middlewares[i]
currentHandler := handler
handler = func(ctx context.Context, e protocol.Event) error {
defer func() {
if r := recover(); r != nil {
d.logger.Error("Panic in middleware",
zap.Any("panic", r),
zap.String("stack", string(debug.Stack())),
zap.String("event_type", string(e.GetType())))
}
}()
if err := middleware.Process(ctx, e, currentHandler); err != nil {
d.logger.Error("Middleware execution failed",
zap.Error(err),
zap.String("event_type", string(e.GetType())))
}
return nil
}
}
// 执行中间件链
handler(ctx, event)
}
// GetHandlerCount 获取处理器数量
func (d *Dispatcher) GetHandlerCount() int {
d.mu.RLock()
defer d.mu.RUnlock()
return len(d.handlers)
}
// GetMiddlewareCount 获取中间件数量
func (d *Dispatcher) GetMiddlewareCount() int {
d.mu.RLock()
defer d.mu.RUnlock()
return len(d.middlewares)
}
// GetMetrics 获取分发器指标
func (d *Dispatcher) GetMetrics() DispatcherMetrics {
return DispatcherMetrics{
ProcessedTotal: atomic.LoadInt64(&d.metrics.ProcessedTotal),
SuccessTotal: atomic.LoadInt64(&d.metrics.SuccessTotal),
FailedTotal: atomic.LoadInt64(&d.metrics.FailedTotal),
PanicTotal: atomic.LoadInt64(&d.metrics.PanicTotal),
AvgProcessTime: d.metrics.AvgProcessTime,
LastProcessTime: atomic.LoadInt64(&d.metrics.LastProcessTime),
}
}
// LogMetrics 记录指标日志
func (d *Dispatcher) LogMetrics() {
metrics := d.GetMetrics()
d.logger.Info("Dispatcher metrics",
zap.Int64("processed_total", metrics.ProcessedTotal),
zap.Int64("success_total", metrics.SuccessTotal),
zap.Int64("failed_total", metrics.FailedTotal),
zap.Int64("panic_total", metrics.PanicTotal),
zap.Float64("avg_process_time_ms", metrics.AvgProcessTime),
zap.Int("handler_count", d.GetHandlerCount()),
zap.Int("middleware_count", d.GetMiddlewareCount()))
}