Files
backend/internal/service/push_service.go

576 lines
17 KiB
Go
Raw Permalink Normal View History

package service
import (
"context"
"errors"
"fmt"
"time"
"carrot_bbs/internal/dto"
"carrot_bbs/internal/model"
"carrot_bbs/internal/pkg/websocket"
"carrot_bbs/internal/repository"
)
// 推送相关常量
const (
// DefaultPushTimeout 默认推送超时时间
DefaultPushTimeout = 30 * time.Second
// MaxRetryCount 最大重试次数
MaxRetryCount = 3
// DefaultExpiredTime 默认消息过期时间24小时
DefaultExpiredTime = 24 * time.Hour
// PushQueueSize 推送队列大小
PushQueueSize = 1000
)
// PushPriority 推送优先级
type PushPriority int
const (
PriorityLow PushPriority = 1 // 低优先级(营销消息等)
PriorityNormal PushPriority = 5 // 普通优先级(系统通知)
PriorityHigh PushPriority = 8 // 高优先级(聊天消息)
PriorityCritical PushPriority = 10 // 最高优先级(重要系统通知)
)
// PushService 推送服务接口
type PushService interface {
// 推送核心方法
PushMessage(ctx context.Context, userID string, message *model.Message) error
PushToUser(ctx context.Context, userID string, message *model.Message, priority int) error
// 系统消息推送
PushSystemMessage(ctx context.Context, userID string, msgType, title, content string, data map[string]interface{}) error
PushNotification(ctx context.Context, userID string, notification *websocket.NotificationMessage) error
PushAnnouncement(ctx context.Context, announcement *websocket.AnnouncementMessage) error
// 系统通知推送(新接口,使用独立的 SystemNotification 模型)
PushSystemNotification(ctx context.Context, userID string, notification *model.SystemNotification) error
// 设备管理
RegisterDevice(ctx context.Context, userID string, deviceID string, deviceType model.DeviceType, pushToken string) error
UnregisterDevice(ctx context.Context, deviceID string) error
UpdateDeviceToken(ctx context.Context, deviceID string, newPushToken string) error
// 推送记录管理
CreatePushRecord(ctx context.Context, userID string, messageID string, channel model.PushChannel) (*model.PushRecord, error)
GetPendingPushes(ctx context.Context, userID string) ([]*model.PushRecord, error)
// 后台任务
StartPushWorker(ctx context.Context)
StopPushWorker()
}
// pushServiceImpl 推送服务实现
type pushServiceImpl struct {
pushRepo *repository.PushRecordRepository
deviceRepo *repository.DeviceTokenRepository
messageRepo *repository.MessageRepository
wsManager *websocket.WebSocketManager
// 推送队列
pushQueue chan *pushTask
stopChan chan struct{}
}
// pushTask 推送任务
type pushTask struct {
userID string
message *model.Message
priority int
}
// NewPushService 创建推送服务
func NewPushService(
pushRepo *repository.PushRecordRepository,
deviceRepo *repository.DeviceTokenRepository,
messageRepo *repository.MessageRepository,
wsManager *websocket.WebSocketManager,
) PushService {
return &pushServiceImpl{
pushRepo: pushRepo,
deviceRepo: deviceRepo,
messageRepo: messageRepo,
wsManager: wsManager,
pushQueue: make(chan *pushTask, PushQueueSize),
stopChan: make(chan struct{}),
}
}
// PushMessage 推送消息给用户
func (s *pushServiceImpl) PushMessage(ctx context.Context, userID string, message *model.Message) error {
return s.PushToUser(ctx, userID, message, int(PriorityNormal))
}
// PushToUser 带优先级的推送
func (s *pushServiceImpl) PushToUser(ctx context.Context, userID string, message *model.Message, priority int) error {
// 首先尝试WebSocket推送实时推送
if s.pushViaWebSocket(ctx, userID, message) {
// WebSocket推送成功记录推送状态
record, err := s.CreatePushRecord(ctx, userID, message.ID, model.PushChannelWebSocket)
if err != nil {
return fmt.Errorf("failed to create push record: %w", err)
}
record.MarkPushed()
if err := s.pushRepo.Update(record); err != nil {
return fmt.Errorf("failed to update push record: %w", err)
}
return nil
}
// WebSocket推送失败加入推送队列等待移动端推送
select {
case s.pushQueue <- &pushTask{
userID: userID,
message: message,
priority: priority,
}:
return nil
default:
// 队列已满,直接创建待推送记录
_, err := s.CreatePushRecord(ctx, userID, message.ID, model.PushChannelFCM)
if err != nil {
return fmt.Errorf("failed to create pending push record: %w", err)
}
return errors.New("push queue is full, message queued for later delivery")
}
}
// pushViaWebSocket 通过WebSocket推送消息
// 返回true表示推送成功false表示用户不在线
func (s *pushServiceImpl) pushViaWebSocket(ctx context.Context, userID string, message *model.Message) bool {
if s.wsManager == nil {
return false
}
if !s.wsManager.IsUserOnline(userID) {
return false
}
// 判断是否为系统消息/通知消息
if message.IsSystemMessage() || message.Category == model.CategoryNotification {
// 使用 NotificationMessage 格式推送系统通知
// 从 segments 中提取文本内容
content := dto.ExtractTextContentFromModel(message.Segments)
notification := &websocket.NotificationMessage{
ID: fmt.Sprintf("%s", message.ID),
Type: string(message.SystemType),
Content: content,
Extra: make(map[string]interface{}),
CreatedAt: message.CreatedAt.UnixMilli(),
}
// 填充额外数据
if message.ExtraData != nil {
notification.Extra["actor_id"] = message.ExtraData.ActorID
notification.Extra["actor_name"] = message.ExtraData.ActorName
notification.Extra["avatar_url"] = message.ExtraData.AvatarURL
notification.Extra["target_id"] = message.ExtraData.TargetID
notification.Extra["target_type"] = message.ExtraData.TargetType
notification.Extra["action_url"] = message.ExtraData.ActionURL
notification.Extra["action_time"] = message.ExtraData.ActionTime
// 设置触发用户信息
if message.ExtraData.ActorID > 0 {
notification.TriggerUser = &websocket.NotificationUser{
ID: fmt.Sprintf("%d", message.ExtraData.ActorID),
Username: message.ExtraData.ActorName,
Avatar: message.ExtraData.AvatarURL,
}
}
}
wsMsg := websocket.CreateWSMessage(websocket.MessageTypeNotification, notification)
s.wsManager.SendToUser(userID, wsMsg)
return true
}
// 构建普通聊天消息的 WebSocket 消息 - 使用新的 WSEventResponse 格式
// 获取会话类型 (private/group)
detailType := "private"
if message.ConversationID != "" {
// 从会话中获取类型,需要查询数据库或从消息中判断
// 这里暂时默认为 privategroup 类型需要额外逻辑
}
// 直接使用 message.Segments
segments := message.Segments
event := &dto.WSEventResponse{
ID: fmt.Sprintf("%s", message.ID),
Time: message.CreatedAt.UnixMilli(),
Type: "message",
DetailType: detailType,
Seq: fmt.Sprintf("%d", message.Seq),
Segments: segments,
SenderID: message.SenderID,
}
wsMsg := websocket.CreateWSMessage(websocket.MessageTypeMessage, event)
s.wsManager.SendToUser(userID, wsMsg)
return true
}
// pushViaFCM 通过FCM推送预留接口
func (s *pushServiceImpl) pushViaFCM(ctx context.Context, deviceToken *model.DeviceToken, message *model.Message) error {
// TODO: 实现FCM推送
// 1. 构建FCM消息
// 2. 调用Firebase Admin SDK发送消息
// 3. 处理发送结果
return errors.New("FCM push not implemented")
}
// pushViaAPNs 通过APNs推送预留接口
func (s *pushServiceImpl) pushViaAPNs(ctx context.Context, deviceToken *model.DeviceToken, message *model.Message) error {
// TODO: 实现APNs推送
// 1. 构建APNs消息
// 2. 调用APNs SDK发送消息
// 3. 处理发送结果
return errors.New("APNs push not implemented")
}
// RegisterDevice 注册设备
func (s *pushServiceImpl) RegisterDevice(ctx context.Context, userID string, deviceID string, deviceType model.DeviceType, pushToken string) error {
deviceToken := &model.DeviceToken{
UserID: userID,
DeviceID: deviceID,
DeviceType: deviceType,
PushToken: pushToken,
IsActive: true,
}
deviceToken.UpdateLastUsed()
return s.deviceRepo.Upsert(deviceToken)
}
// UnregisterDevice 注销设备
func (s *pushServiceImpl) UnregisterDevice(ctx context.Context, deviceID string) error {
return s.deviceRepo.Deactivate(deviceID)
}
// UpdateDeviceToken 更新设备Token
func (s *pushServiceImpl) UpdateDeviceToken(ctx context.Context, deviceID string, newPushToken string) error {
deviceToken, err := s.deviceRepo.GetByDeviceID(deviceID)
if err != nil {
return fmt.Errorf("device not found: %w", err)
}
deviceToken.PushToken = newPushToken
deviceToken.Activate()
return s.deviceRepo.Update(deviceToken)
}
// CreatePushRecord 创建推送记录
func (s *pushServiceImpl) CreatePushRecord(ctx context.Context, userID string, messageID string, channel model.PushChannel) (*model.PushRecord, error) {
expiredAt := time.Now().Add(DefaultExpiredTime)
record := &model.PushRecord{
UserID: userID,
MessageID: messageID,
PushChannel: channel,
PushStatus: model.PushStatusPending,
MaxRetry: MaxRetryCount,
ExpiredAt: &expiredAt,
}
if err := s.pushRepo.Create(record); err != nil {
return nil, fmt.Errorf("failed to create push record: %w", err)
}
return record, nil
}
// GetPendingPushes 获取待推送记录
func (s *pushServiceImpl) GetPendingPushes(ctx context.Context, userID string) ([]*model.PushRecord, error) {
return s.pushRepo.GetByUserID(userID, 100, 0)
}
// StartPushWorker 启动推送工作协程
func (s *pushServiceImpl) StartPushWorker(ctx context.Context) {
go s.processPushQueue()
go s.retryFailedPushes()
}
// StopPushWorker 停止推送工作协程
func (s *pushServiceImpl) StopPushWorker() {
close(s.stopChan)
}
// processPushQueue 处理推送队列
func (s *pushServiceImpl) processPushQueue() {
for {
select {
case <-s.stopChan:
return
case task := <-s.pushQueue:
s.processPushTask(task)
}
}
}
// processPushTask 处理单个推送任务
func (s *pushServiceImpl) processPushTask(task *pushTask) {
ctx, cancel := context.WithTimeout(context.Background(), DefaultPushTimeout)
defer cancel()
// 获取用户活跃设备
devices, err := s.deviceRepo.GetActiveByUserID(task.userID)
if err != nil || len(devices) == 0 {
// 没有可用设备,创建待推送记录
s.CreatePushRecord(ctx, task.userID, task.message.ID, model.PushChannelFCM)
return
}
// 对每个设备创建推送记录并尝试推送
for _, device := range devices {
record, err := s.CreatePushRecord(ctx, task.userID, task.message.ID, s.getChannelForDevice(device))
if err != nil {
continue
}
var pushErr error
switch {
case device.IsIOS():
pushErr = s.pushViaAPNs(ctx, device, task.message)
case device.IsAndroid():
pushErr = s.pushViaFCM(ctx, device, task.message)
default:
// Web设备只支持WebSocket
continue
}
if pushErr != nil {
record.MarkFailed(pushErr.Error())
} else {
record.MarkPushed()
}
s.pushRepo.Update(record)
}
}
// getChannelForDevice 根据设备类型获取推送通道
func (s *pushServiceImpl) getChannelForDevice(device *model.DeviceToken) model.PushChannel {
switch device.DeviceType {
case model.DeviceTypeIOS:
return model.PushChannelAPNs
case model.DeviceTypeAndroid:
return model.PushChannelFCM
default:
return model.PushChannelWebSocket
}
}
// retryFailedPushes 重试失败的推送
func (s *pushServiceImpl) retryFailedPushes() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-s.stopChan:
return
case <-ticker.C:
s.doRetry()
}
}
}
// doRetry 执行重试
func (s *pushServiceImpl) doRetry() {
ctx := context.Background()
// 获取失败待重试的推送
records, err := s.pushRepo.GetFailedPushesForRetry(100)
if err != nil {
return
}
for _, record := range records {
// 检查是否过期
if record.IsExpired() {
record.MarkExpired()
s.pushRepo.Update(record)
continue
}
// 获取消息
message, err := s.messageRepo.GetMessageByID(record.MessageID)
if err != nil {
record.MarkFailed("message not found")
s.pushRepo.Update(record)
continue
}
// 尝试WebSocket推送
if s.pushViaWebSocket(ctx, record.UserID, message) {
record.MarkDelivered()
s.pushRepo.Update(record)
continue
}
// 获取设备并尝试移动端推送
if record.DeviceToken != "" {
device, err := s.deviceRepo.GetByPushToken(record.DeviceToken)
if err != nil {
record.MarkFailed("device not found")
s.pushRepo.Update(record)
continue
}
var pushErr error
switch {
case device.IsIOS():
pushErr = s.pushViaAPNs(ctx, device, message)
case device.IsAndroid():
pushErr = s.pushViaFCM(ctx, device, message)
}
if pushErr != nil {
record.MarkFailed(pushErr.Error())
} else {
record.MarkPushed()
}
s.pushRepo.Update(record)
}
}
}
// PushSystemMessage 推送系统消息
func (s *pushServiceImpl) PushSystemMessage(ctx context.Context, userID string, msgType, title, content string, data map[string]interface{}) error {
// 首先尝试WebSocket推送
if s.pushSystemViaWebSocket(ctx, userID, msgType, title, content, data) {
return nil
}
// 用户不在线,创建待推送记录(移动端上线后可通过其他方式获取)
// 系统消息通常不需要离线推送,客户端上线后会主动拉取
return errors.New("user is offline, system message will be available on next sync")
}
// pushSystemViaWebSocket 通过WebSocket推送系统消息
func (s *pushServiceImpl) pushSystemViaWebSocket(ctx context.Context, userID string, msgType, title, content string, data map[string]interface{}) bool {
if s.wsManager == nil {
return false
}
if !s.wsManager.IsUserOnline(userID) {
return false
}
sysMsg := &websocket.SystemMessage{
Type: msgType,
Title: title,
Content: content,
Data: data,
CreatedAt: time.Now().UnixMilli(),
}
wsMsg := websocket.CreateWSMessage(websocket.MessageTypeSystem, sysMsg)
s.wsManager.SendToUser(userID, wsMsg)
return true
}
// PushNotification 推送通知消息
func (s *pushServiceImpl) PushNotification(ctx context.Context, userID string, notification *websocket.NotificationMessage) error {
// 首先尝试WebSocket推送
if s.pushNotificationViaWebSocket(ctx, userID, notification) {
return nil
}
// 用户不在线,创建待推送记录
// 通知消息可以等用户上线后拉取
return errors.New("user is offline, notification will be available on next sync")
}
// pushNotificationViaWebSocket 通过WebSocket推送通知消息
func (s *pushServiceImpl) pushNotificationViaWebSocket(ctx context.Context, userID string, notification *websocket.NotificationMessage) bool {
if s.wsManager == nil {
return false
}
if !s.wsManager.IsUserOnline(userID) {
return false
}
if notification.CreatedAt == 0 {
notification.CreatedAt = time.Now().UnixMilli()
}
wsMsg := websocket.CreateWSMessage(websocket.MessageTypeNotification, notification)
s.wsManager.SendToUser(userID, wsMsg)
return true
}
// PushAnnouncement 广播公告消息
func (s *pushServiceImpl) PushAnnouncement(ctx context.Context, announcement *websocket.AnnouncementMessage) error {
if s.wsManager == nil {
return errors.New("websocket manager not available")
}
if announcement.CreatedAt == 0 {
announcement.CreatedAt = time.Now().UnixMilli()
}
wsMsg := websocket.CreateWSMessage(websocket.MessageTypeAnnouncement, announcement)
s.wsManager.Broadcast(wsMsg)
return nil
}
// PushSystemNotification 推送系统通知(使用独立的 SystemNotification 模型)
func (s *pushServiceImpl) PushSystemNotification(ctx context.Context, userID string, notification *model.SystemNotification) error {
// 首先尝试WebSocket推送
if s.pushSystemNotificationViaWebSocket(ctx, userID, notification) {
return nil
}
// 用户不在线,系统通知已存储在数据库中,用户上线后会主动拉取
return nil
}
// pushSystemNotificationViaWebSocket 通过WebSocket推送系统通知
func (s *pushServiceImpl) pushSystemNotificationViaWebSocket(ctx context.Context, userID string, notification *model.SystemNotification) bool {
if s.wsManager == nil {
return false
}
if !s.wsManager.IsUserOnline(userID) {
return false
}
// 构建 WebSocket 通知消息
wsNotification := &websocket.NotificationMessage{
ID: fmt.Sprintf("%d", notification.ID),
Type: string(notification.Type),
Title: notification.Title,
Content: notification.Content,
Extra: make(map[string]interface{}),
CreatedAt: notification.CreatedAt.UnixMilli(),
}
// 填充额外数据
if notification.ExtraData != nil {
wsNotification.Extra["actor_id_str"] = notification.ExtraData.ActorIDStr
wsNotification.Extra["actor_name"] = notification.ExtraData.ActorName
wsNotification.Extra["avatar_url"] = notification.ExtraData.AvatarURL
wsNotification.Extra["target_id"] = notification.ExtraData.TargetID
wsNotification.Extra["target_type"] = notification.ExtraData.TargetType
wsNotification.Extra["action_url"] = notification.ExtraData.ActionURL
wsNotification.Extra["action_time"] = notification.ExtraData.ActionTime
// 设置触发用户信息
if notification.ExtraData.ActorIDStr != "" {
wsNotification.TriggerUser = &websocket.NotificationUser{
ID: notification.ExtraData.ActorIDStr,
Username: notification.ExtraData.ActorName,
Avatar: notification.ExtraData.AvatarURL,
}
}
}
wsMsg := websocket.CreateWSMessage(websocket.MessageTypeNotification, wsNotification)
s.wsManager.SendToUser(userID, wsMsg)
return true
}