Files
cellbot/internal/engine/eventbus.go
lafay fb5fae1524 chore: update project structure and enhance plugin functionality
- Added new entries to .gitignore for database files.
- Updated go.mod and go.sum to include new indirect dependencies for database and ORM support.
- Refactored event handling to improve message reply functionality in the protocol.
- Enhanced the dispatcher to allow for better event processing and logging.
- Removed outdated plugin documentation and unnecessary files to streamline the codebase.
- Improved welcome message formatting and screenshot options for better user experience.
2026-01-05 05:14:31 +08:00

373 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"context"
"crypto/rand"
"encoding/hex"
"sync"
"sync/atomic"
"time"
"cellbot/internal/protocol"
"go.uber.org/zap"
)
// Subscription 订阅信息
type Subscription struct {
ID string
Chan chan protocol.Event
Filter func(protocol.Event) bool
CreatedAt time.Time
EventCount int64 // 接收的事件数量
}
// EventBusMetrics 事件总线指标
type EventBusMetrics struct {
PublishedTotal int64 // 发布的事件总数
DispatchedTotal int64 // 分发的事件总数
DroppedTotal int64 // 丢弃的事件总数
SubscriberTotal int64 // 订阅者总数
LastEventTime int64 // 最后一次事件时间Unix时间戳
}
// EventBus 事件总线
// 基于channel的高性能发布订阅实现
type EventBus struct {
subscriptions map[string][]*Subscription
mu sync.RWMutex
logger *zap.Logger
eventChan chan protocol.Event
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
metrics EventBusMetrics
bufferSize int
}
// NewEventBus 创建事件总线
func NewEventBus(logger *zap.Logger, bufferSize int) *EventBus {
if bufferSize <= 0 {
bufferSize = 1000
}
ctx, cancel := context.WithCancel(context.Background())
return &EventBus{
subscriptions: make(map[string][]*Subscription),
logger: logger.Named("eventbus"),
eventChan: make(chan protocol.Event, bufferSize),
ctx: ctx,
cancel: cancel,
bufferSize: bufferSize,
}
}
// Start 启动事件总线
func (eb *EventBus) Start() {
eb.wg.Add(1)
go eb.dispatch()
eb.logger.Info("Event bus started")
}
// Stop 停止事件总线
func (eb *EventBus) Stop() {
eb.cancel()
eb.wg.Wait()
close(eb.eventChan)
eb.logger.Info("Event bus stopped")
}
// Publish 发布事件
func (eb *EventBus) Publish(event protocol.Event) {
eb.logger.Debug("Publishing event to channel",
zap.String("event_type", string(event.GetType())),
zap.String("detail_type", event.GetDetailType()),
zap.String("self_id", event.GetSelfID()),
zap.Int("channel_len", len(eb.eventChan)),
zap.Int("channel_cap", cap(eb.eventChan)))
select {
case eb.eventChan <- event:
atomic.AddInt64(&eb.metrics.PublishedTotal, 1)
atomic.StoreInt64(&eb.metrics.LastEventTime, time.Now().Unix())
eb.logger.Info("Event published successfully",
zap.String("event_type", string(event.GetType())),
zap.String("detail_type", event.GetDetailType()),
zap.String("self_id", event.GetSelfID()))
case <-eb.ctx.Done():
atomic.AddInt64(&eb.metrics.DroppedTotal, 1)
eb.logger.Warn("Event bus is shutting down, event dropped",
zap.String("type", string(event.GetType())))
default:
// 如果channel满了也丢弃事件
atomic.AddInt64(&eb.metrics.DroppedTotal, 1)
eb.logger.Warn("Event channel full, event dropped",
zap.String("type", string(event.GetType())),
zap.Int("buffer_size", eb.bufferSize),
zap.Int("channel_len", len(eb.eventChan)))
}
}
// PublishBatch 批量发布事件
func (eb *EventBus) PublishBatch(events []protocol.Event) {
for _, event := range events {
eb.Publish(event)
}
}
// PublishAsync 异步发布事件(不阻塞)
func (eb *EventBus) PublishAsync(event protocol.Event) {
go eb.Publish(event)
}
// TryPublish 尝试发布事件(非阻塞)
func (eb *EventBus) TryPublish(event protocol.Event) bool {
select {
case eb.eventChan <- event:
atomic.AddInt64(&eb.metrics.PublishedTotal, 1)
atomic.StoreInt64(&eb.metrics.LastEventTime, time.Now().Unix())
return true
case <-eb.ctx.Done():
atomic.AddInt64(&eb.metrics.DroppedTotal, 1)
return false
default:
atomic.AddInt64(&eb.metrics.DroppedTotal, 1)
return false
}
}
// Subscribe 订阅事件
func (eb *EventBus) Subscribe(eventType protocol.EventType, filter func(protocol.Event) bool) chan protocol.Event {
return eb.SubscribeWithBuffer(eventType, filter, 100)
}
// SubscribeWithBuffer 订阅事件(指定缓冲区大小)
func (eb *EventBus) SubscribeWithBuffer(eventType protocol.EventType, filter func(protocol.Event) bool, bufferSize int) chan protocol.Event {
eb.mu.Lock()
defer eb.mu.Unlock()
if bufferSize <= 0 {
bufferSize = 100
}
sub := &Subscription{
ID: generateSubscriptionID(),
Chan: make(chan protocol.Event, bufferSize),
Filter: filter,
CreatedAt: time.Now(),
}
key := string(eventType)
eb.subscriptions[key] = append(eb.subscriptions[key], sub)
atomic.AddInt64(&eb.metrics.SubscriberTotal, 1)
eb.logger.Debug("New subscription added",
zap.String("event_type", key),
zap.String("sub_id", sub.ID),
zap.Int("buffer_size", bufferSize))
return sub.Chan
}
// Unsubscribe 取消订阅
func (eb *EventBus) Unsubscribe(eventType protocol.EventType, ch chan protocol.Event) {
eb.mu.Lock()
defer eb.mu.Unlock()
key := string(eventType)
subs := eb.subscriptions[key]
for i, sub := range subs {
if sub.Chan == ch {
close(sub.Chan)
eb.subscriptions[key] = append(subs[:i], subs[i+1:]...)
atomic.AddInt64(&eb.metrics.SubscriberTotal, -1)
eb.logger.Debug("Subscription removed",
zap.String("event_type", key),
zap.String("sub_id", sub.ID),
zap.Int64("event_count", sub.EventCount),
zap.Duration("lifetime", time.Since(sub.CreatedAt)))
return
}
}
}
// dispatch 分发事件到订阅者
func (eb *EventBus) dispatch() {
defer eb.wg.Done()
eb.logger.Info("Event bus dispatch loop started")
for {
select {
case event, ok := <-eb.eventChan:
if !ok {
eb.logger.Info("Event channel closed, stopping dispatch")
return
}
eb.logger.Debug("Received event from channel",
zap.String("event_type", string(event.GetType())))
eb.dispatchEvent(event)
case <-eb.ctx.Done():
eb.logger.Info("Context cancelled, stopping dispatch")
return
}
}
}
// dispatchEvent 分发单个事件
func (eb *EventBus) dispatchEvent(event protocol.Event) {
eb.mu.RLock()
key := string(event.GetType())
subs := eb.subscriptions[key]
// 复制订阅者列表避免锁竞争
subsCopy := make([]*Subscription, len(subs))
copy(subsCopy, subs)
eb.mu.RUnlock()
eb.logger.Info("Dispatching event",
zap.String("event_type", key),
zap.String("detail_type", event.GetDetailType()),
zap.Int("subscriber_count", len(subsCopy)))
dispatched := 0
for _, sub := range subsCopy {
if sub.Filter == nil || sub.Filter(event) {
select {
case sub.Chan <- event:
atomic.AddInt64(&sub.EventCount, 1)
dispatched++
eb.logger.Debug("Event dispatched to subscriber",
zap.String("sub_id", sub.ID))
default:
// 订阅者channel已满,丢弃事件
atomic.AddInt64(&eb.metrics.DroppedTotal, 1)
eb.logger.Warn("Subscription channel full, event dropped",
zap.String("sub_id", sub.ID),
zap.String("event_type", key),
zap.String("detail_type", event.GetDetailType()),
zap.String("raw_message", func() string {
if data := event.GetData(); data != nil {
if msg, ok := data["raw_message"].(string); ok {
return msg
}
}
return ""
}()))
}
}
}
if dispatched > 0 {
atomic.AddInt64(&eb.metrics.DispatchedTotal, int64(dispatched))
eb.logger.Info("Event dispatched successfully",
zap.String("event_type", key),
zap.Int("dispatched_count", dispatched))
} else {
eb.logger.Warn("No subscribers for event",
zap.String("event_type", key),
zap.String("detail_type", event.GetDetailType()))
}
}
// GetSubscriptionCount 获取订阅者数量
func (eb *EventBus) GetSubscriptionCount(eventType protocol.EventType) int {
eb.mu.RLock()
defer eb.mu.RUnlock()
return len(eb.subscriptions[string(eventType)])
}
// Clear 清空所有订阅
func (eb *EventBus) Clear() {
eb.mu.Lock()
defer eb.mu.Unlock()
for eventType, subs := range eb.subscriptions {
for _, sub := range subs {
close(sub.Chan)
}
delete(eb.subscriptions, eventType)
}
eb.logger.Info("All subscriptions cleared")
}
// GetMetrics 获取事件总线指标
func (eb *EventBus) GetMetrics() EventBusMetrics {
return EventBusMetrics{
PublishedTotal: atomic.LoadInt64(&eb.metrics.PublishedTotal),
DispatchedTotal: atomic.LoadInt64(&eb.metrics.DispatchedTotal),
DroppedTotal: atomic.LoadInt64(&eb.metrics.DroppedTotal),
SubscriberTotal: atomic.LoadInt64(&eb.metrics.SubscriberTotal),
LastEventTime: atomic.LoadInt64(&eb.metrics.LastEventTime),
}
}
// GetAllSubscriptions 获取所有订阅信息
func (eb *EventBus) GetAllSubscriptions() map[string]int {
eb.mu.RLock()
defer eb.mu.RUnlock()
result := make(map[string]int)
for eventType, subs := range eb.subscriptions {
result[eventType] = len(subs)
}
return result
}
// GetBufferUsage 获取缓冲区使用情况
func (eb *EventBus) GetBufferUsage() float64 {
return float64(len(eb.eventChan)) / float64(eb.bufferSize)
}
// IsHealthy 检查事件总线健康状态
func (eb *EventBus) IsHealthy() bool {
// 检查缓冲区使用率是否过高
if eb.GetBufferUsage() > 0.9 {
return false
}
// 检查是否有过多的丢弃事件
metrics := eb.GetMetrics()
if metrics.PublishedTotal > 0 {
dropRate := float64(metrics.DroppedTotal) / float64(metrics.PublishedTotal)
if dropRate > 0.1 { // 丢弃率超过10%
return false
}
}
return true
}
// LogMetrics 记录指标日志
func (eb *EventBus) LogMetrics() {
metrics := eb.GetMetrics()
subs := eb.GetAllSubscriptions()
eb.logger.Info("EventBus metrics",
zap.Int64("published_total", metrics.PublishedTotal),
zap.Int64("dispatched_total", metrics.DispatchedTotal),
zap.Int64("dropped_total", metrics.DroppedTotal),
zap.Int64("subscriber_total", metrics.SubscriberTotal),
zap.Float64("buffer_usage", eb.GetBufferUsage()),
zap.Any("subscriptions", subs))
}
// generateSubscriptionID 生成订阅ID
func generateSubscriptionID() string {
return "sub-" + randomString(8)
}
// randomString 生成随机字符串
func randomString(length int) string {
b := make([]byte, length/2)
if _, err := rand.Read(b); err != nil {
// 降级到简单实现
const charset = "abcdefghijklmnopqrstuvwxyz0123456789"
result := make([]byte, length)
for i := range result {
result[i] = charset[i%len(charset)]
}
return string(result)
}
return hex.EncodeToString(b)
}