package engine import ( "context" "crypto/rand" "encoding/hex" "sync" "sync/atomic" "time" "cellbot/internal/protocol" "go.uber.org/zap" ) // Subscription 订阅信息 type Subscription struct { ID string Chan chan protocol.Event Filter func(protocol.Event) bool CreatedAt time.Time EventCount int64 // 接收的事件数量 } // EventBusMetrics 事件总线指标 type EventBusMetrics struct { PublishedTotal int64 // 发布的事件总数 DispatchedTotal int64 // 分发的事件总数 DroppedTotal int64 // 丢弃的事件总数 SubscriberTotal int64 // 订阅者总数 LastEventTime int64 // 最后一次事件时间(Unix时间戳) } // EventBus 事件总线 // 基于channel的高性能发布订阅实现 type EventBus struct { subscriptions map[string][]*Subscription mu sync.RWMutex logger *zap.Logger eventChan chan protocol.Event wg sync.WaitGroup ctx context.Context cancel context.CancelFunc metrics EventBusMetrics bufferSize int } // NewEventBus 创建事件总线 func NewEventBus(logger *zap.Logger, bufferSize int) *EventBus { if bufferSize <= 0 { bufferSize = 1000 } ctx, cancel := context.WithCancel(context.Background()) return &EventBus{ subscriptions: make(map[string][]*Subscription), logger: logger.Named("eventbus"), eventChan: make(chan protocol.Event, bufferSize), ctx: ctx, cancel: cancel, bufferSize: bufferSize, } } // Start 启动事件总线 func (eb *EventBus) Start() { eb.wg.Add(1) go eb.dispatch() eb.logger.Info("Event bus started") } // Stop 停止事件总线 func (eb *EventBus) Stop() { eb.cancel() eb.wg.Wait() close(eb.eventChan) eb.logger.Info("Event bus stopped") } // Publish 发布事件 func (eb *EventBus) Publish(event protocol.Event) { eb.logger.Info("Publishing event to channel", zap.String("event_type", string(event.GetType())), zap.String("detail_type", event.GetDetailType()), zap.Int("channel_len", len(eb.eventChan)), zap.Int("channel_cap", cap(eb.eventChan))) select { case eb.eventChan <- event: atomic.AddInt64(&eb.metrics.PublishedTotal, 1) atomic.StoreInt64(&eb.metrics.LastEventTime, time.Now().Unix()) eb.logger.Info("Event successfully queued", zap.String("event_type", string(event.GetType()))) case <-eb.ctx.Done(): atomic.AddInt64(&eb.metrics.DroppedTotal, 1) eb.logger.Warn("Event bus is shutting down, event dropped", zap.String("type", string(event.GetType()))) default: // 如果channel满了,也丢弃事件 atomic.AddInt64(&eb.metrics.DroppedTotal, 1) eb.logger.Warn("Event channel full, event dropped", zap.String("type", string(event.GetType())), zap.Int("buffer_size", eb.bufferSize), zap.Int("channel_len", len(eb.eventChan))) } } // PublishBatch 批量发布事件 func (eb *EventBus) PublishBatch(events []protocol.Event) { for _, event := range events { eb.Publish(event) } } // PublishAsync 异步发布事件(不阻塞) func (eb *EventBus) PublishAsync(event protocol.Event) { go eb.Publish(event) } // TryPublish 尝试发布事件(非阻塞) func (eb *EventBus) TryPublish(event protocol.Event) bool { select { case eb.eventChan <- event: atomic.AddInt64(&eb.metrics.PublishedTotal, 1) atomic.StoreInt64(&eb.metrics.LastEventTime, time.Now().Unix()) return true case <-eb.ctx.Done(): atomic.AddInt64(&eb.metrics.DroppedTotal, 1) return false default: atomic.AddInt64(&eb.metrics.DroppedTotal, 1) return false } } // Subscribe 订阅事件 func (eb *EventBus) Subscribe(eventType protocol.EventType, filter func(protocol.Event) bool) chan protocol.Event { return eb.SubscribeWithBuffer(eventType, filter, 100) } // SubscribeWithBuffer 订阅事件(指定缓冲区大小) func (eb *EventBus) SubscribeWithBuffer(eventType protocol.EventType, filter func(protocol.Event) bool, bufferSize int) chan protocol.Event { eb.mu.Lock() defer eb.mu.Unlock() if bufferSize <= 0 { bufferSize = 100 } sub := &Subscription{ ID: generateSubscriptionID(), Chan: make(chan protocol.Event, bufferSize), Filter: filter, CreatedAt: time.Now(), } key := string(eventType) eb.subscriptions[key] = append(eb.subscriptions[key], sub) atomic.AddInt64(&eb.metrics.SubscriberTotal, 1) eb.logger.Debug("New subscription added", zap.String("event_type", key), zap.String("sub_id", sub.ID), zap.Int("buffer_size", bufferSize)) return sub.Chan } // Unsubscribe 取消订阅 func (eb *EventBus) Unsubscribe(eventType protocol.EventType, ch chan protocol.Event) { eb.mu.Lock() defer eb.mu.Unlock() key := string(eventType) subs := eb.subscriptions[key] for i, sub := range subs { if sub.Chan == ch { close(sub.Chan) eb.subscriptions[key] = append(subs[:i], subs[i+1:]...) atomic.AddInt64(&eb.metrics.SubscriberTotal, -1) eb.logger.Debug("Subscription removed", zap.String("event_type", key), zap.String("sub_id", sub.ID), zap.Int64("event_count", sub.EventCount), zap.Duration("lifetime", time.Since(sub.CreatedAt))) return } } } // dispatch 分发事件到订阅者 func (eb *EventBus) dispatch() { defer eb.wg.Done() eb.logger.Info("Event bus dispatch loop started") for { select { case event, ok := <-eb.eventChan: if !ok { eb.logger.Info("Event channel closed, stopping dispatch") return } eb.logger.Debug("Received event from channel", zap.String("event_type", string(event.GetType()))) eb.dispatchEvent(event) case <-eb.ctx.Done(): eb.logger.Info("Context cancelled, stopping dispatch") return } } } // dispatchEvent 分发单个事件 func (eb *EventBus) dispatchEvent(event protocol.Event) { eb.mu.RLock() key := string(event.GetType()) subs := eb.subscriptions[key] // 复制订阅者列表避免锁竞争 subsCopy := make([]*Subscription, len(subs)) copy(subsCopy, subs) eb.mu.RUnlock() eb.logger.Info("Dispatching event", zap.String("event_type", key), zap.String("detail_type", event.GetDetailType()), zap.Int("subscriber_count", len(subsCopy))) dispatched := 0 for _, sub := range subsCopy { if sub.Filter == nil || sub.Filter(event) { select { case sub.Chan <- event: atomic.AddInt64(&sub.EventCount, 1) dispatched++ eb.logger.Debug("Event dispatched to subscriber", zap.String("sub_id", sub.ID)) default: // 订阅者channel已满,丢弃事件 atomic.AddInt64(&eb.metrics.DroppedTotal, 1) eb.logger.Warn("Subscription channel full, event dropped", zap.String("sub_id", sub.ID), zap.String("event_type", key)) } } } if dispatched > 0 { atomic.AddInt64(&eb.metrics.DispatchedTotal, int64(dispatched)) eb.logger.Info("Event dispatched successfully", zap.String("event_type", key), zap.Int("dispatched_count", dispatched)) } else { eb.logger.Warn("No subscribers for event", zap.String("event_type", key), zap.String("detail_type", event.GetDetailType())) } } // GetSubscriptionCount 获取订阅者数量 func (eb *EventBus) GetSubscriptionCount(eventType protocol.EventType) int { eb.mu.RLock() defer eb.mu.RUnlock() return len(eb.subscriptions[string(eventType)]) } // Clear 清空所有订阅 func (eb *EventBus) Clear() { eb.mu.Lock() defer eb.mu.Unlock() for eventType, subs := range eb.subscriptions { for _, sub := range subs { close(sub.Chan) } delete(eb.subscriptions, eventType) } eb.logger.Info("All subscriptions cleared") } // GetMetrics 获取事件总线指标 func (eb *EventBus) GetMetrics() EventBusMetrics { return EventBusMetrics{ PublishedTotal: atomic.LoadInt64(&eb.metrics.PublishedTotal), DispatchedTotal: atomic.LoadInt64(&eb.metrics.DispatchedTotal), DroppedTotal: atomic.LoadInt64(&eb.metrics.DroppedTotal), SubscriberTotal: atomic.LoadInt64(&eb.metrics.SubscriberTotal), LastEventTime: atomic.LoadInt64(&eb.metrics.LastEventTime), } } // GetAllSubscriptions 获取所有订阅信息 func (eb *EventBus) GetAllSubscriptions() map[string]int { eb.mu.RLock() defer eb.mu.RUnlock() result := make(map[string]int) for eventType, subs := range eb.subscriptions { result[eventType] = len(subs) } return result } // GetBufferUsage 获取缓冲区使用情况 func (eb *EventBus) GetBufferUsage() float64 { return float64(len(eb.eventChan)) / float64(eb.bufferSize) } // IsHealthy 检查事件总线健康状态 func (eb *EventBus) IsHealthy() bool { // 检查缓冲区使用率是否过高 if eb.GetBufferUsage() > 0.9 { return false } // 检查是否有过多的丢弃事件 metrics := eb.GetMetrics() if metrics.PublishedTotal > 0 { dropRate := float64(metrics.DroppedTotal) / float64(metrics.PublishedTotal) if dropRate > 0.1 { // 丢弃率超过10% return false } } return true } // LogMetrics 记录指标日志 func (eb *EventBus) LogMetrics() { metrics := eb.GetMetrics() subs := eb.GetAllSubscriptions() eb.logger.Info("EventBus metrics", zap.Int64("published_total", metrics.PublishedTotal), zap.Int64("dispatched_total", metrics.DispatchedTotal), zap.Int64("dropped_total", metrics.DroppedTotal), zap.Int64("subscriber_total", metrics.SubscriberTotal), zap.Float64("buffer_usage", eb.GetBufferUsage()), zap.Any("subscriptions", subs)) } // generateSubscriptionID 生成订阅ID func generateSubscriptionID() string { return "sub-" + randomString(8) } // randomString 生成随机字符串 func randomString(length int) string { b := make([]byte, length/2) if _, err := rand.Read(b); err != nil { // 降级到简单实现 const charset = "abcdefghijklmnopqrstuvwxyz0123456789" result := make([]byte, length) for i := range result { result[i] = charset[i%len(charset)] } return string(result) } return hex.EncodeToString(b) }