Files
cellbot/internal/engine/eventbus.go

183 lines
4.1 KiB
Go
Raw Normal View History

package engine
import (
"context"
"sync"
"cellbot/internal/protocol"
"go.uber.org/zap"
)
// Subscription 订阅信息
type Subscription struct {
ID string
Chan chan protocol.Event
Filter func(protocol.Event) bool
}
// EventBus 事件总线
// 基于channel的高性能发布订阅实现
type EventBus struct {
subscriptions map[string][]*Subscription
mu sync.RWMutex
logger *zap.Logger
eventChan chan protocol.Event
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// NewEventBus 创建事件总线
func NewEventBus(logger *zap.Logger, bufferSize int) *EventBus {
ctx, cancel := context.WithCancel(context.Background())
return &EventBus{
subscriptions: make(map[string][]*Subscription),
logger: logger.Named("eventbus"),
eventChan: make(chan protocol.Event, bufferSize),
ctx: ctx,
cancel: cancel,
}
}
// Start 启动事件总线
func (eb *EventBus) Start() {
eb.wg.Add(1)
go eb.dispatch()
eb.logger.Info("Event bus started")
}
// Stop 停止事件总线
func (eb *EventBus) Stop() {
eb.cancel()
eb.wg.Wait()
close(eb.eventChan)
eb.logger.Info("Event bus stopped")
}
// Publish 发布事件
func (eb *EventBus) Publish(event protocol.Event) {
select {
case eb.eventChan <- event:
case <-eb.ctx.Done():
eb.logger.Warn("Event bus is shutting down, event dropped",
zap.String("type", string(event.GetType())))
}
}
// Subscribe 订阅事件
func (eb *EventBus) Subscribe(eventType protocol.EventType, filter func(protocol.Event) bool) chan protocol.Event {
eb.mu.Lock()
defer eb.mu.Unlock()
sub := &Subscription{
ID: generateSubscriptionID(),
Chan: make(chan protocol.Event, 100),
Filter: filter,
}
key := string(eventType)
eb.subscriptions[key] = append(eb.subscriptions[key], sub)
eb.logger.Debug("New subscription added",
zap.String("event_type", key),
zap.String("sub_id", sub.ID))
return sub.Chan
}
// Unsubscribe 取消订阅
func (eb *EventBus) Unsubscribe(eventType protocol.EventType, ch chan protocol.Event) {
eb.mu.Lock()
defer eb.mu.Unlock()
key := string(eventType)
subs := eb.subscriptions[key]
for i, sub := range subs {
if sub.Chan == ch {
close(sub.Chan)
eb.subscriptions[key] = append(subs[:i], subs[i+1:]...)
eb.logger.Debug("Subscription removed",
zap.String("event_type", key),
zap.String("sub_id", sub.ID))
return
}
}
}
// dispatch 分发事件到订阅者
func (eb *EventBus) dispatch() {
defer eb.wg.Done()
for {
select {
case event, ok := <-eb.eventChan:
if !ok {
return
}
eb.dispatchEvent(event)
case <-eb.ctx.Done():
return
}
}
}
// dispatchEvent 分发单个事件
func (eb *EventBus) dispatchEvent(event protocol.Event) {
eb.mu.RLock()
key := string(event.GetType())
subs := eb.subscriptions[key]
// 复制订阅者列表避免锁竞争
subsCopy := make([]*Subscription, len(subs))
copy(subsCopy, subs)
eb.mu.RUnlock()
for _, sub := range subsCopy {
if sub.Filter == nil || sub.Filter(event) {
select {
case sub.Chan <- event:
default:
// 订阅者channel已满,丢弃事件
eb.logger.Warn("Subscription channel full, event dropped",
zap.String("sub_id", sub.ID),
zap.String("event_type", key))
}
}
}
}
// GetSubscriptionCount 获取订阅者数量
func (eb *EventBus) GetSubscriptionCount(eventType protocol.EventType) int {
eb.mu.RLock()
defer eb.mu.RUnlock()
return len(eb.subscriptions[string(eventType)])
}
// Clear 清空所有订阅
func (eb *EventBus) Clear() {
eb.mu.Lock()
defer eb.mu.Unlock()
for eventType, subs := range eb.subscriptions {
for _, sub := range subs {
close(sub.Chan)
}
delete(eb.subscriptions, eventType)
}
eb.logger.Info("All subscriptions cleared")
}
// generateSubscriptionID 生成订阅ID
func generateSubscriptionID() string {
return "sub-" + randomString(8)
}
// randomString 生成随机字符串
func randomString(length int) string {
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
b := make([]byte, length)
for i := range b {
b[i] = charset[i%len(charset)]
}
return string(b)
}