Files
backend/internal/pkg/sse/hub.go
lan 86ef150fec Replace websocket flow with SSE support in backend.
Update handlers, services, router, and data conversion logic to support server-sent events and related message pipeline changes.

Made-with: Cursor
2026-03-10 12:58:23 +08:00

153 lines
2.9 KiB
Go

package sse
import (
"encoding/json"
"strconv"
"sync"
"sync/atomic"
"time"
)
const (
defaultUserBufferSize = 128
maxReplayEvents = 200
)
type Event struct {
ID uint64 `json:"event_id"`
Event string `json:"event"`
TS int64 `json:"ts"`
Payload interface{} `json:"payload"`
}
type subscriber struct {
id uint64
ch chan Event
quit chan struct{}
}
type Hub struct {
seq uint64
mu sync.RWMutex
subscribers map[string]map[uint64]*subscriber
history map[string][]Event
}
func NewHub() *Hub {
return &Hub{
subscribers: make(map[string]map[uint64]*subscriber),
history: make(map[string][]Event),
}
}
func (h *Hub) NextID() uint64 {
return atomic.AddUint64(&h.seq, 1)
}
func ParseEventID(raw string) uint64 {
if raw == "" {
return 0
}
id, err := strconv.ParseUint(raw, 10, 64)
if err != nil {
return 0
}
return id
}
func (h *Hub) Subscribe(userID string, afterID uint64) (chan Event, func(), []Event) {
subID := h.NextID()
sub := &subscriber{
id: subID,
ch: make(chan Event, defaultUserBufferSize),
quit: make(chan struct{}),
}
h.mu.Lock()
if _, ok := h.subscribers[userID]; !ok {
h.subscribers[userID] = make(map[uint64]*subscriber)
}
h.subscribers[userID][subID] = sub
replay := make([]Event, 0)
for _, e := range h.history[userID] {
if e.ID > afterID {
replay = append(replay, e)
}
}
h.mu.Unlock()
cancel := func() {
h.mu.Lock()
defer h.mu.Unlock()
if userSubs, ok := h.subscribers[userID]; ok {
if s, exists := userSubs[subID]; exists {
close(s.quit)
delete(userSubs, subID)
close(s.ch)
}
if len(userSubs) == 0 {
delete(h.subscribers, userID)
}
}
}
return sub.ch, cancel, replay
}
func (h *Hub) HasSubscribers(userID string) bool {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.subscribers[userID]) > 0
}
func (h *Hub) PublishToUser(userID string, eventName string, payload interface{}) Event {
ev := Event{
ID: h.NextID(),
Event: eventName,
TS: time.Now().UnixMilli(),
Payload: payload,
}
h.publish(userID, ev)
return ev
}
func (h *Hub) PublishToUsers(userIDs []string, eventName string, payload interface{}) {
for _, uid := range userIDs {
h.PublishToUser(uid, eventName, payload)
}
}
func (h *Hub) publish(userID string, ev Event) {
h.mu.Lock()
history := append(h.history[userID], ev)
if len(history) > maxReplayEvents {
history = history[len(history)-maxReplayEvents:]
}
h.history[userID] = history
targets := make([]*subscriber, 0, len(h.subscribers[userID]))
for _, s := range h.subscribers[userID] {
targets = append(targets, s)
}
h.mu.Unlock()
for _, s := range targets {
select {
case <-s.quit:
case s.ch <- ev:
default:
// 慢消费者丢弃单条消息,客户端可通过 Last-Event-ID + HTTP 同步补偿
}
}
}
func EncodeData(ev Event) (string, error) {
body, err := json.Marshal(ev.Payload)
if err != nil {
return "", err
}
return string(body), nil
}