Files
frontend/src/stores/messageManager.ts
lan be84c01abd Migrate frontend realtime messaging to SSE.
Switch service integrations and screen/store consumers from websocket events to SSE, and ignore generated dist-web artifacts.

Made-with: Cursor
2026-03-10 12:58:23 +08:00

2187 lines
70 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* MessageManager - 消息管理核心模块
*
* 单一数据源原则:所有消息相关状态统一在此管理
* 解决核心问题:
* 1. 竞态条件:确保消息立即处理并同步到所有订阅者
* 2. 状态同步:已读状态在所有组件间保持一致
*
* 架构特点:
* - 管理conversations和messages状态
* - 统一处理WebSocket消息
* - 统一处理本地数据库读写
* - 提供订阅机制供React组件使用
*/
import { ConversationResponse, MessageResponse, MessageSegment, UserDTO } from '../types/dto';
import { messageService } from '../services/messageService';
import {
sseService,
WSChatMessage,
WSGroupChatMessage,
WSReadMessage,
WSGroupReadMessage,
WSRecallMessage,
WSGroupRecallMessage,
WSGroupTypingMessage,
WSGroupNoticeMessage,
GroupNoticeType,
} from '../services/sseService';
import {
saveMessage,
saveMessagesBatch,
getMessagesByConversation,
getMaxSeq,
getMinSeq,
getMessagesBeforeSeq,
markConversationAsRead,
updateConversationCacheUnreadCount,
CachedMessage,
getUserCache,
saveUserCache,
updateMessageStatus,
deleteConversation as deleteConversationFromDb,
} from '../services/database';
import { api } from '../services/api';
import { useAuthStore } from './authStore';
// ==================== 类型定义 ====================
export type MessageEventType =
| 'conversations_updated'
| 'messages_updated'
| 'unread_count_updated'
| 'connection_changed'
| 'message_sent'
| 'message_read'
| 'message_received'
| 'message_recalled'
| 'typing_status'
| 'group_notice'
| 'error';
export interface MessageEvent {
type: MessageEventType;
payload: any;
timestamp: number;
}
export type MessageSubscriber = (event: MessageEvent) => void;
// ==================== 内部状态接口 ====================
interface MessageManagerState {
// 会话相关
conversations: Map<string, ConversationResponse>;
conversationList: ConversationResponse[];
// 消息相关 - 按会话ID存储
messagesMap: Map<string, MessageResponse[]>;
// 未读数
totalUnreadCount: number;
systemUnreadCount: number;
// 连接状态
isWebSocketConnected: boolean;
// 当前活动会话ID用户正在查看的会话
currentConversationId: string | null;
// 加载状态
isLoadingConversations: boolean;
loadingMessagesSet: Set<string>; // 正在加载消息的会话ID集合
// 初始化状态
isInitialized: boolean;
// 订阅者
subscribers: Set<MessageSubscriber>;
// 输入状态 - 按群组ID存储正在输入的用户ID列表
typingUsersMap: Map<string, string[]>;
// 当前用户的禁言状态 - 按群组ID存储
mutedStatusMap: Map<string, boolean>;
}
// ==================== 已读状态保护机制常量 ====================
/**
* 已读状态保护延迟时间(毫秒)
* API 完成后,继续保护一段时间,防止 fetchConversations 的旧数据覆盖
*/
const READ_STATE_PROTECTION_DELAY = 5000; // 5秒
/**
* 已读状态记录接口
*/
interface ReadStateRecord {
/** 标记已读的时间戳 */
timestamp: number;
/** 状态版本号,用于防止旧数据覆盖新数据 */
version: number;
/** 已上报的已读序号(用于去重) */
lastReadSeq: number;
/** 清除保护的定时器ID */
clearTimer?: NodeJS.Timeout;
}
// ==================== MessageManager 类 ====================
class MessageManager {
private state: MessageManagerState;
private wsUnsubscribe: (() => void) | null = null;
private currentUserId: string | null = null;
private authUnsubscribe: (() => void) | null = null;
private lastReconnectSyncAt: number = 0;
private initializePromise: Promise<void> | null = null;
private activatingConversationTasks: Map<string, Promise<void>> = new Map();
/**
* 正在进行中的已读 API 请求集合
* 用于防止 fetchConversations 的服务器数据覆盖乐观已读状态
* key: conversationId, value: 已读状态记录(包含时间戳和版本号)
*/
private pendingReadMap: Map<string, ReadStateRecord> = new Map();
/**
* 全局已读状态版本号,每次标记已读时递增
* 用于判断服务器数据是否比本地状态更旧
*/
private readStateVersion: number = 0;
constructor() {
this.state = {
conversations: new Map(),
conversationList: [],
messagesMap: new Map(),
totalUnreadCount: 0,
systemUnreadCount: 0,
isWebSocketConnected: false,
currentConversationId: null,
isLoadingConversations: false,
loadingMessagesSet: new Set(),
isInitialized: false,
subscribers: new Set(),
typingUsersMap: new Map(),
mutedStatusMap: new Map(),
};
// 监听认证状态变化
this.initAuthListener();
}
// ==================== 私有工具方法 ====================
private initAuthListener() {
// 获取当前用户ID
const authState = useAuthStore.getState();
this.currentUserId = authState.currentUser?.id || null;
// 如果已登录,自动初始化
if (this.currentUserId && !this.state.isInitialized) {
this.initialize();
}
// 监听登录态变化,修复冷启动时 currentUser 延迟就绪导致未初始化的问题
if (!this.authUnsubscribe) {
this.authUnsubscribe = useAuthStore.subscribe((state) => {
const nextUserId = state.currentUser?.id || null;
const prevUserId = this.currentUserId;
this.currentUserId = nextUserId;
if (nextUserId && !this.state.isInitialized) {
this.initialize();
}
// 冷启动时若用户信息后到,自动重激活当前会话,保证首进 Chat 不丢首次拉取
if (nextUserId && this.state.currentConversationId) {
this.activateConversation(this.state.currentConversationId, { forceSync: true }).catch(error => {
console.error('[MessageManager] 登录态就绪后重激活会话失败:', error);
});
}
// 退出登录时清理内存状态,避免脏数据残留
if (!nextUserId && prevUserId && this.state.isInitialized) {
this.destroy();
}
});
}
}
private getCurrentUserId(): string | null {
if (!this.currentUserId) {
this.currentUserId = useAuthStore.getState().currentUser?.id || null;
}
return this.currentUserId;
}
private normalizeConversationId(conversationId: string | number | null | undefined): string {
return conversationId == null ? '' : String(conversationId);
}
private notifySubscribers(event: MessageEvent) {
this.state.subscribers.forEach(subscriber => {
try {
subscriber(event);
} catch (error) {
console.error('[MessageManager] 订阅者执行失败:', error);
}
});
}
private updateConversationList() {
// 会话排序:置顶优先,再按最后消息时间排序
const list = Array.from(this.state.conversations.values()).sort((a, b) => {
const aPinned = a.is_pinned ? 1 : 0;
const bPinned = b.is_pinned ? 1 : 0;
if (aPinned !== bPinned) {
return bPinned - aPinned;
}
const aTime = new Date(a.last_message_at || a.updated_at || 0).getTime();
const bTime = new Date(b.last_message_at || b.updated_at || 0).getTime();
return bTime - aTime;
});
this.state.conversationList = list;
}
// ==================== 初始化与销毁 ====================
async initialize(): Promise<void> {
if (this.state.isInitialized) {
return;
}
if (this.initializePromise) {
await this.initializePromise;
return;
}
this.initializePromise = (async () => {
try {
// 1. 获取当前用户ID
this.currentUserId = this.getCurrentUserId();
if (!this.currentUserId) {
console.warn('[MessageManager] 用户未登录,延迟初始化');
return;
}
// 2. 初始化WebSocket监听
this.initWebSocketListeners();
// 3. 加载会话列表
await this.fetchConversations();
// 4. 加载未读数
await this.fetchUnreadCount();
this.state.isInitialized = true;
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
// 若初始化完成时已有活动会话(用户已在 Chat 页面),立即补齐该会话同步
if (this.state.currentConversationId) {
await this.fetchMessages(this.state.currentConversationId);
}
} catch (error) {
console.error('[MessageManager] 初始化失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'initialize' },
timestamp: Date.now(),
});
}
})();
try {
await this.initializePromise;
} finally {
this.initializePromise = null;
}
}
destroy(): void {
if (this.wsUnsubscribe) {
this.wsUnsubscribe();
this.wsUnsubscribe = null;
}
this.state.subscribers.clear();
this.state.isInitialized = false;
this.initializePromise = null;
this.activatingConversationTasks.clear();
}
// ==================== WebSocket 处理 ====================
private initWebSocketListeners(): void {
if (this.wsUnsubscribe) {
this.wsUnsubscribe();
}
// 监听私聊消息
sseService.on('chat', (message: WSChatMessage) => {
this.handleNewMessage(message);
});
// 监听群聊消息
sseService.on('group_message', (message: WSGroupChatMessage) => {
this.handleNewMessage(message);
});
// 监听私聊已读回执
sseService.on('read', (message: WSReadMessage) => {
this.handleReadReceipt(message);
});
// 监听群聊已读回执
sseService.on('group_read', (message: WSGroupReadMessage) => {
this.handleGroupReadReceipt(message);
});
// 监听私聊消息撤回
sseService.on('recall', (message: WSRecallMessage) => {
this.handleRecallMessage(message);
});
// 监听群聊消息撤回
sseService.on('group_recall', (message: WSGroupRecallMessage) => {
this.handleGroupRecallMessage(message);
});
// 监听群聊输入状态
sseService.on('group_typing', (message: WSGroupTypingMessage) => {
this.handleGroupTyping(message);
});
// 监听群通知
sseService.on('group_notice', (message: WSGroupNoticeMessage) => {
this.handleGroupNotice(message);
});
// 监听连接状态
sseService.onConnect(() => {
this.state.isWebSocketConnected = true;
this.notifySubscribers({
type: 'connection_changed',
payload: { connected: true },
timestamp: Date.now(),
});
// 冷启动/重连兜底:连接恢复后同步会话和当前会话消息,避免首屏空数据
const now = Date.now();
if (now - this.lastReconnectSyncAt > 1500) {
this.lastReconnectSyncAt = now;
this.fetchConversations(true).catch(error => {
console.error('[MessageManager] 连接后同步会话失败:', error);
});
if (this.state.currentConversationId) {
this.fetchMessages(this.state.currentConversationId).catch(error => {
console.error('[MessageManager] 连接后同步当前会话消息失败:', error);
});
}
}
});
sseService.onDisconnect(() => {
this.state.isWebSocketConnected = false;
this.notifySubscribers({
type: 'connection_changed',
payload: { connected: false },
timestamp: Date.now(),
});
});
}
// ==================== 消息处理核心方法 ====================
// 已处理消息ID集合用于去重防止ACK消息和正常消息重复处理
private processedMessageIds: Set<string> = new Set();
private processedMessageIdsExpiry: Map<string, number> = new Map();
/**
* 清理过期的消息ID防止内存泄漏
*/
private cleanupProcessedMessageIds(): void {
const now = Date.now();
const expiryTime = 5 * 60 * 1000; // 5分钟过期
for (const [id, timestamp] of this.processedMessageIdsExpiry.entries()) {
if (now - timestamp > expiryTime) {
this.processedMessageIds.delete(id);
this.processedMessageIdsExpiry.delete(id);
}
}
}
/**
* 检查消息是否已处理
*/
private isMessageProcessed(messageId: string): boolean {
return this.processedMessageIds.has(messageId);
}
/**
* 标记消息已处理
*/
private markMessageAsProcessed(messageId: string): void {
this.processedMessageIds.add(messageId);
this.processedMessageIdsExpiry.set(messageId, Date.now());
// 定期清理
if (this.processedMessageIds.size > 1000) {
this.cleanupProcessedMessageIds();
}
}
// 正在获取用户信息的请求映射(用于去重)
private pendingUserRequests: Map<string, Promise<UserDTO | null>> = new Map();
/**
* 根据群通知类型构建系统提示文案
*/
private buildGroupNoticeText(noticeType: GroupNoticeType, data: any): string {
const username = data?.username || '用户';
switch (noticeType) {
case 'member_join':
return `"${username}" 加入了群聊`;
case 'member_leave':
return `"${username}" 退出了群聊`;
case 'member_removed':
return `"${username}" 被移出群聊`;
case 'muted':
return `"${username}" 已被管理员禁言`;
case 'unmuted':
return `"${username}" 已被管理员解除禁言`;
default:
return '';
}
}
/**
* 根据群组ID查找会话ID
*/
private findConversationIdByGroupId(groupId: string): string | null {
for (const conv of this.state.conversationList) {
if (String(conv.group?.id || '') === groupId) {
return conv.id;
}
}
return null;
}
/**
* 批量异步填充消息的 sender 信息,填充完成后更新内存并通知订阅者
*/
private enrichMessagesWithSenderInfo(conversationId: string, messages: MessageResponse[]): void {
const currentUserId = this.getCurrentUserId();
// 收集所有需要查询的唯一 sender_id排除当前用户
const senderIds = [...new Set(
messages
.filter(m => m.sender_id && m.sender_id !== currentUserId && m.sender_id !== '10000' && !m.sender)
.map(m => m.sender_id)
)];
if (senderIds.length === 0) return;
// 并发获取所有 sender 信息
Promise.all(senderIds.map(id => this.getSenderInfo(id).then(user => ({ id, user }))))
.then(results => {
const senderMap = new Map<string, UserDTO>();
results.forEach(({ id, user }) => {
if (user) senderMap.set(id, user);
});
if (senderMap.size === 0) return;
const current = this.state.messagesMap.get(conversationId);
if (!current) return;
let changed = false;
const updated = current.map(m => {
if (!m.sender && m.sender_id && senderMap.has(m.sender_id)) {
changed = true;
return { ...m, sender: senderMap.get(m.sender_id) };
}
return m;
});
if (!changed) return;
this.state.messagesMap.set(conversationId, updated);
this.notifySubscribers({
type: 'messages_updated',
payload: { conversationId, messages: [...updated] },
timestamp: Date.now(),
});
})
.catch(error => {
console.error('[MessageManager] 批量获取 sender 信息失败:', error);
});
}
/**
* 获取用户信息(带缓存和去重)
*/
private async getSenderInfo(userId: string): Promise<UserDTO | null> {
// 1. 先检查本地缓存
const cachedUser = await getUserCache(userId);
if (cachedUser) {
return cachedUser;
}
// 2. 检查是否已有正在进行的请求
const pendingRequest = this.pendingUserRequests.get(userId);
if (pendingRequest) {
return pendingRequest;
}
// 3. 发起新请求
const request = this.fetchUserInfo(userId);
this.pendingUserRequests.set(userId, request);
try {
const user = await request;
return user;
} finally {
// 请求完成后清理
this.pendingUserRequests.delete(userId);
}
}
/**
* 从服务器获取用户信息
*/
private async fetchUserInfo(userId: string): Promise<UserDTO | null> {
try {
const response = await api.get<UserDTO>(`/users/${userId}`);
if (response.code === 0 && response.data) {
// 缓存到本地数据库
await saveUserCache(response.data);
return response.data;
}
return null;
} catch (error) {
console.error(`[MessageManager] 获取用户信息失败: ${userId}`, error);
return null;
}
}
/**
* 处理新消息WebSocket推送
* 这是核心方法,必须立即处理并同步到所有订阅者
*/
private async handleNewMessage(message: (WSChatMessage | WSGroupChatMessage) & { _isAck?: boolean }): Promise<void> {
const { conversation_id, id, sender_id, seq, segments, created_at, _isAck } = message;
const normalizedConversationId = this.normalizeConversationId(conversation_id);
const currentUserId = this.getCurrentUserId();
// 【调试日志】追踪消息来源和重复问题
// 0. 如果是ACK消息直接跳过不增加未读数ACK是发送确认不是新消息
if (_isAck) {
// 但仍然需要更新消息列表因为ACK包含完整消息内容
// 继续处理但不增加未读数
}
// 1. 消息去重检查 - 防止ACK消息和正常消息重复处理
if (this.isMessageProcessed(id)) {
return;
}
this.markMessageAsProcessed(id);
// 2. 对于群聊消息,获取发送者信息(如果不是当前用户发送的)
if (message.type === 'group_message' && sender_id && sender_id !== currentUserId && sender_id !== '10000') {
// 异步获取发送者信息,不阻塞消息显示
this.getSenderInfo(sender_id).then(user => {
if (user) {
// 更新消息对象中的发送者信息
const messages = this.state.messagesMap.get(normalizedConversationId);
if (messages) {
// 创建新的数组和对象确保React能检测到变化
const updatedMessages = messages.map(m =>
m.id === id ? { ...m, sender: user } : m
);
this.state.messagesMap.set(normalizedConversationId, updatedMessages);
// 通知订阅者消息已更新
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId: normalizedConversationId,
messages: [...updatedMessages], // 创建新数组确保React检测到变化
},
timestamp: Date.now(),
});
}
} else {
}
}).catch(error => {
console.error('[MessageManager][ERROR] 获取发送者信息失败:', { userId: sender_id, error });
});
}
// 3. 构造消息对象
const newMessage: MessageResponse = {
id,
conversation_id: normalizedConversationId,
sender_id,
seq,
segments: segments || [],
created_at,
status: 'normal',
};
// 2. 立即更新内存中的消息列表关键确保ChatScreen能立即看到
const existingMessages = this.state.messagesMap.get(normalizedConversationId) || [];
const messageExists = existingMessages.some(m => m.id === id);
if (!messageExists) {
const updatedMessages = [...existingMessages, newMessage].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(normalizedConversationId, updatedMessages);
// 3. 立即通知订阅者(关键:解决竞态条件)
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId: normalizedConversationId,
messages: updatedMessages,
newMessage,
},
timestamp: Date.now(),
});
}
// 4. 更新会话信息
this.updateConversationWithNewMessage(normalizedConversationId, newMessage, created_at);
// 5. 更新未读数(如果不是当前用户发送且不是当前活动会话)
const isCurrentUserMessage = sender_id === currentUserId;
const isActiveConversation = normalizedConversationId === this.state.currentConversationId;
// 【调试日志】追踪未读数增加逻辑
// 修复:确保当前用户发送的消息不会增加未读数
// 同时确保currentUserId有效避免undefined === undefined的情况
// ACK消息发送确认也不应该增加未读数
const shouldIncrementUnread = !isCurrentUserMessage && !isActiveConversation && !!currentUserId && !_isAck;
if (shouldIncrementUnread) {
this.incrementUnreadCount(normalizedConversationId);
} else {
}
// 6. 如果是当前活动会话,自动标记已读
if (isActiveConversation && !isCurrentUserMessage) {
this.markAsRead(normalizedConversationId, seq);
}
// 7. 异步保存到本地数据库不阻塞UI更新
const textContent = segments?.filter((s: any) => s.type === 'text').map((s: any) => s.data?.text || '').join('') || '';
saveMessage({
id,
conversationId: normalizedConversationId,
senderId: sender_id || '',
content: textContent,
type: segments?.find((s: any) => s.type === 'image') ? 'image' : 'text',
isRead: isActiveConversation, // 如果是当前活动会话,标记为已读
createdAt: new Date(created_at).toISOString(),
seq,
status: 'normal',
segments,
}).catch(error => {
console.error('[MessageManager] 保存消息到本地失败:', error);
});
// 8. 通知收到新消息
this.notifySubscribers({
type: 'message_received',
payload: {
conversationId: conversation_id,
message: newMessage,
isCurrentUser: isCurrentUserMessage,
},
timestamp: Date.now(),
});
}
/**
* 更新会话的最后消息信息
*/
private updateConversationWithNewMessage(
conversationId: string,
message: MessageResponse,
createdAt: string
): void {
const conversation = this.state.conversations.get(conversationId);
if (conversation) {
// 更新现有会话
const updatedConv: ConversationResponse = {
...conversation,
last_seq: message.seq,
last_message: message,
last_message_at: createdAt,
updated_at: createdAt,
};
this.state.conversations.set(conversationId, updatedConv);
} else {
// 新会话,需要获取详情
// 异步获取会话详情
this.fetchConversationDetail(conversationId);
}
// 更新会话列表排序
this.updateConversationList();
// 通知会话列表更新
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
/**
* 处理私聊已读回执
*/
private handleReadReceipt(message: WSReadMessage): void {
// 可以在这里处理对方已读的状态更新
}
/**
* 处理群聊已读回执
*/
private handleGroupReadReceipt(message: WSGroupReadMessage): void {
}
/**
* 将指定消息标记为已撤回(保留占位,不删除)
*/
private markMessageAsRecalled(conversationId: string, messageId: string): void {
const normalizedConversationId = this.normalizeConversationId(conversationId);
const messages = this.state.messagesMap.get(normalizedConversationId);
if (!messages) {
return;
}
let changed = false;
const updatedMessages: MessageResponse[] = messages.map((m): MessageResponse => {
if (String(m.id) !== String(messageId) || m.status === 'recalled') {
return m;
}
changed = true;
return {
...m,
status: 'recalled' as MessageResponse['status'],
segments: [],
};
});
if (!changed) {
return;
}
this.state.messagesMap.set(normalizedConversationId, updatedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: { conversationId: normalizedConversationId, messages: updatedMessages },
timestamp: Date.now(),
});
}
/**
* 如果撤回的是会话最后一条消息,同步会话列表中的 last_message 状态
*/
private syncConversationLastMessageOnRecall(conversationId: string, messageId: string): void {
const normalizedConversationId = this.normalizeConversationId(conversationId);
const conversation = this.state.conversations.get(normalizedConversationId);
if (!conversation?.last_message) {
return;
}
if (String(conversation.last_message.id) !== String(messageId)) {
return;
}
if (conversation.last_message.status === 'recalled') {
return;
}
const updatedConversation: ConversationResponse = {
...conversation,
last_message: {
...conversation.last_message,
status: 'recalled',
},
};
this.state.conversations.set(normalizedConversationId, updatedConversation);
this.updateConversationList();
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
/**
* 处理私聊消息撤回
*/
private handleRecallMessage(message: WSRecallMessage): void {
const { conversation_id, message_id } = message;
const normalizedConversationId = this.normalizeConversationId(conversation_id);
this.markMessageAsRecalled(normalizedConversationId, message_id);
this.syncConversationLastMessageOnRecall(normalizedConversationId, message_id);
// 通知订阅者消息被撤回
this.notifySubscribers({
type: 'message_recalled',
payload: { conversationId: normalizedConversationId, messageId: message_id },
timestamp: Date.now(),
});
// 同步本地数据库状态,避免冷启动后撤回占位丢失
updateMessageStatus(message_id, 'recalled', true).catch(error => {
console.error('[MessageManager] 更新本地消息撤回状态失败:', error);
});
}
/**
* 处理群聊消息撤回
*/
private handleGroupRecallMessage(message: WSGroupRecallMessage): void {
const { conversation_id, message_id } = message;
const normalizedConversationId = this.normalizeConversationId(conversation_id);
this.markMessageAsRecalled(normalizedConversationId, message_id);
this.syncConversationLastMessageOnRecall(normalizedConversationId, message_id);
// 通知订阅者消息被撤回
this.notifySubscribers({
type: 'message_recalled',
payload: { conversationId: normalizedConversationId, messageId: message_id, isGroup: true },
timestamp: Date.now(),
});
// 同步本地数据库状态,避免冷启动后撤回占位丢失
updateMessageStatus(message_id, 'recalled', true).catch(error => {
console.error('[MessageManager] 更新本地消息撤回状态失败:', error);
});
}
/**
* 处理群聊输入状态
*/
private handleGroupTyping(message: WSGroupTypingMessage): void {
const { group_id, user_id, is_typing } = message;
const groupIdStr = String(group_id);
const currentTypingUsers = this.state.typingUsersMap.get(groupIdStr) || [];
let updatedTypingUsers: string[];
if (is_typing) {
// 添加用户到输入列表
if (!currentTypingUsers.includes(user_id)) {
updatedTypingUsers = [...currentTypingUsers, user_id];
} else {
updatedTypingUsers = currentTypingUsers;
}
} else {
// 从输入列表移除用户
updatedTypingUsers = currentTypingUsers.filter(id => id !== user_id);
}
// 只有变化时才更新
if (updatedTypingUsers.length !== currentTypingUsers.length) {
this.state.typingUsersMap.set(groupIdStr, updatedTypingUsers);
this.notifySubscribers({
type: 'typing_status',
payload: { groupId: groupIdStr, typingUsers: updatedTypingUsers },
timestamp: Date.now(),
});
}
}
/**
* 处理群通知
*/
private handleGroupNotice(message: WSGroupNoticeMessage): void {
const { notice_type, group_id, data, timestamp, message_id, seq } = message;
const groupIdStr = String(group_id);
// 处理禁言/解除禁言通知 - 更新本地状态
if (notice_type === 'muted' || notice_type === 'unmuted') {
const mutedUserId = data?.user_id;
const currentUserId = this.getCurrentUserId();
// 如果是当前用户被禁言/解除禁言,更新状态
if (mutedUserId && mutedUserId === currentUserId) {
const isMuted = notice_type === 'muted';
this.state.mutedStatusMap.set(groupIdStr, isMuted);
}
}
// 对于携带 message_id/seq 的群通知,补一条系统消息到当前会话消息流
// 这样 UI 会按系统通知样式渲染,而不是当作普通用户消息。
if (message_id && typeof seq === 'number' && seq > 0) {
const conversationId = this.findConversationIdByGroupId(groupIdStr);
if (conversationId) {
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const exists = existingMessages.some(m => String(m.id) === String(message_id));
if (!exists) {
const noticeText = this.buildGroupNoticeText(notice_type, data);
const systemNoticeMessage: MessageResponse = {
id: String(message_id),
conversation_id: conversationId,
sender_id: '10000',
seq,
segments: noticeText
? [{ type: 'text', data: { text: noticeText } as any }]
: [],
status: 'normal',
category: 'notification',
created_at: new Date(timestamp || Date.now()).toISOString(),
};
const updatedMessages = [...existingMessages, systemNoticeMessage].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, updatedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: updatedMessages,
newMessage: systemNoticeMessage,
source: 'group_notice',
},
timestamp: Date.now(),
});
}
}
}
// 通知订阅者群通知
this.notifySubscribers({
type: 'group_notice',
payload: {
groupId: groupIdStr,
noticeType: notice_type,
data,
timestamp,
messageId: message_id,
seq,
},
timestamp: Date.now(),
});
}
// ==================== 数据获取方法 ====================
/**
* 获取会话列表
*/
async fetchConversations(forceRefresh = false): Promise<void> {
if (this.state.isLoadingConversations && !forceRefresh) {
return;
}
this.state.isLoadingConversations = true;
try {
const response = await messageService.getConversations(1, 20, forceRefresh);
const conversations = response.list || [];
// 更新会话Map精确保护 pendingReadMap 中的会话不被服务器旧数据覆盖
// 场景markAsRead API 已发出但尚未完成,服务器返回的 unread_count 仍为旧值
this.state.conversations.clear();
conversations.forEach(conv => {
const readRecord = this.pendingReadMap.get(conv.id);
if (readRecord) {
// 此会话有进行中的已读请求或保护期内,使用智能合并逻辑
// 如果服务器返回的 unread_count > 0但本地有更晚的已读记录则保留本地状态
const shouldPreserveLocalRead = conv.unread_count > 0;
if (shouldPreserveLocalRead) {
this.state.conversations.set(conv.id, { ...conv, unread_count: 0 });
} else {
this.state.conversations.set(conv.id, conv);
}
} else {
this.state.conversations.set(conv.id, conv);
}
});
// 更新排序后的列表
this.updateConversationList();
// 计算总未读数(基于保护后的会话数据)
const totalUnread = Array.from(this.state.conversations.values())
.reduce((sum, conv) => sum + (conv.unread_count || 0), 0);
this.state.totalUnreadCount = totalUnread;
// 修复本地缓存refreshConversationsFromServer 已将服务器旧数据写入 SQLite
// 对于 pendingReadMap 中的会话,需要重新把 unread_count=0 写回本地缓存,
// 避免下次冷启动时读到旧的未读数
if (this.pendingReadMap.size > 0) {
for (const convId of this.pendingReadMap.keys()) {
updateConversationCacheUnreadCount(convId, 0).catch(error => {
console.error('[MessageManager] 修复本地缓存未读数失败:', convId, error);
});
}
}
// 通知更新
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
} catch (error) {
console.error('[MessageManager] 获取会话列表失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'fetchConversations' },
timestamp: Date.now(),
});
} finally {
this.state.isLoadingConversations = false;
}
}
/**
* 获取单个会话详情
*/
async fetchConversationDetail(conversationId: string): Promise<ConversationResponse | null> {
try {
const detail = await messageService.getConversationById(conversationId);
if (detail) {
// 若此会话有进行中的已读请求或处于保护期内,保留 unread_count=0
const readRecord = this.pendingReadMap.get(conversationId);
const isPending = !!readRecord;
// 使用智能合并:如果服务器返回 unread_count > 0 但本地有更晚的已读记录,保留本地状态
const shouldPreserveLocalRead = isPending && (detail.unread_count || 0) > 0;
const safeDetail = shouldPreserveLocalRead
? { ...detail as ConversationResponse, unread_count: 0 }
: detail as ConversationResponse;
this.state.conversations.set(conversationId, safeDetail);
this.updateConversationList();
// getConversationById 内部会调用 saveConversationCache 写入服务器旧数据,
// 若有 pending 已读请求或处于保护期内,需要修复本地缓存
if (isPending) {
updateConversationCacheUnreadCount(conversationId, 0).catch(error => {
console.error('[MessageManager] 修复本地缓存未读数失败:', conversationId, error);
});
}
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
return detail as ConversationResponse;
} catch (error) {
console.error('[MessageManager] 获取会话详情失败:', error);
return null;
}
}
/**
* 获取会话消息(增量同步)
* 关键方法:确保消息立即显示,解决竞态条件
*/
async fetchMessages(conversationId: string, afterSeq?: number): Promise<void> {
// 防止重复加载
if (this.state.loadingMessagesSet.has(conversationId)) {
return;
}
this.state.loadingMessagesSet.add(conversationId);
try {
const mergeMessages = (base: MessageResponse[], incoming: MessageResponse[]): MessageResponse[] => {
if (incoming.length === 0) return base;
const merged = new Map<string, MessageResponse>();
[...base, ...incoming].forEach(msg => {
merged.set(String(msg.id), msg);
});
return Array.from(merged.values()).sort((a, b) => a.seq - b.seq);
};
// 1. 先从本地数据库加载(确保立即有数据展示)
if (!afterSeq) {
let localMessages: any[] = [];
let localMaxSeq = 0;
try {
localMessages = await getMessagesByConversation(conversationId, 20);
localMaxSeq = await getMaxSeq(conversationId);
} catch (error) {
// 架构原则:本地存储异常不阻断在线同步
console.warn('[MessageManager] 读取本地消息失败,回退到服务端同步:', error);
}
if (localMessages.length > 0) {
// 转换格式
const formattedMessages: MessageResponse[] = localMessages.map(m => ({
id: m.id,
conversation_id: m.conversationId,
sender_id: m.senderId,
seq: m.seq,
segments: m.segments || [],
status: m.status as any,
created_at: m.createdAt,
}));
// 立即更新内存和通知(关键:解决竞态条件)
this.state.messagesMap.set(conversationId, formattedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: formattedMessages,
source: 'local',
},
timestamp: Date.now(),
});
// 异步填充 sender 信息
this.enrichMessagesWithSenderInfo(conversationId, formattedMessages);
} else {
// 冷启动兜底:先下发空列表事件,避免 ChatScreen 长时间停留在 loading
this.state.messagesMap.set(conversationId, []);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: [],
source: 'local_empty',
},
timestamp: Date.now(),
});
// 再直接向服务端拉一页最新消息,不依赖 detail.last_seq避免缓存 last_seq 过旧导致首屏空白
try {
const latestResponse = await messageService.getMessages(conversationId, undefined, undefined, 20);
const latestMessages = latestResponse?.messages || [];
if (latestMessages.length > 0) {
const sortedLatestMessages = [...latestMessages].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, sortedLatestMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: sortedLatestMessages,
newMessages: sortedLatestMessages,
source: 'server_bootstrap',
},
timestamp: Date.now(),
});
this.enrichMessagesWithSenderInfo(conversationId, sortedLatestMessages);
// 持久化是副作用,不阻断 UI
saveMessagesBatch(latestMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
}))).catch(error => {
console.error('[MessageManager] 保存最新消息到本地失败:', error);
});
}
} catch (error) {
console.error('[MessageManager] 冷启动拉取最新消息失败:', error);
}
}
// 2. 服务端快照 + 增量同步(架构修复:不再依赖 detail.last_seq 单点判断)
try {
// 2.1 总是先拉一页最新快照,确保 Chat 首次激活有权威来源
const snapshotResp = await messageService.getMessages(conversationId, undefined, undefined, 50);
const snapshotMessages = snapshotResp?.messages || [];
if (snapshotMessages.length > 0) {
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedSnapshot = mergeMessages(existingMessages, snapshotMessages);
this.state.messagesMap.set(conversationId, mergedSnapshot);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: mergedSnapshot,
newMessages: snapshotMessages,
source: 'server_snapshot',
},
timestamp: Date.now(),
});
this.enrichMessagesWithSenderInfo(conversationId, mergedSnapshot);
saveMessagesBatch(snapshotMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
}))).catch(error => {
console.error('[MessageManager] 保存快照消息到本地失败:', error);
});
}
// 2.2 再基于 localMaxSeq 做增量补齐(防止快照窗口不足导致漏更老的新消息)
const snapshotMaxSeq = snapshotMessages.reduce((max, m) => Math.max(max, m.seq || 0), 0);
if (snapshotMaxSeq > localMaxSeq) {
const incrementalResp = await messageService.getMessages(conversationId, localMaxSeq);
const newMessages = incrementalResp?.messages || [];
if (newMessages.length > 0) {
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedMessages = mergeMessages(existingMessages, newMessages);
this.state.messagesMap.set(conversationId, mergedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: mergedMessages,
newMessages,
source: 'server_incremental',
},
timestamp: Date.now(),
});
this.enrichMessagesWithSenderInfo(conversationId, mergedMessages);
saveMessagesBatch(newMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
}))).catch(error => {
console.error('[MessageManager] 保存增量消息到本地失败:', error);
});
}
}
} catch (error) {
console.error('[MessageManager] 快照/增量同步失败:', error);
}
} else {
// 指定了afterSeq直接请求服务端
const response = await messageService.getMessages(conversationId, afterSeq);
if (response?.messages && response.messages.length > 0) {
const newMessages = response.messages;
// 合并消息
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedMessages = mergeMessages(existingMessages, newMessages);
this.state.messagesMap.set(conversationId, mergedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: mergedMessages,
newMessages,
source: 'server',
},
timestamp: Date.now(),
});
// 异步填充 sender 信息
this.enrichMessagesWithSenderInfo(conversationId, mergedMessages);
// 持久化是副作用,不阻断 UI
saveMessagesBatch(newMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
}))).catch(error => {
console.error('[MessageManager] 保存 afterSeq 消息到本地失败:', error);
});
}
}
} catch (error) {
console.error('[MessageManager] 获取消息失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'fetchMessages', conversationId },
timestamp: Date.now(),
});
} finally {
this.state.loadingMessagesSet.delete(conversationId);
}
}
/**
* 加载更多历史消息
*/
async loadMoreMessages(conversationId: string, beforeSeq: number, limit = 20): Promise<MessageResponse[]> {
try {
// 先从本地获取
const localMessages = await getMessagesBeforeSeq(conversationId, beforeSeq, limit);
if (localMessages.length >= limit) {
// 本地有足够数据
const formattedMessages: MessageResponse[] = localMessages.map(m => ({
id: m.id,
conversation_id: m.conversationId,
sender_id: m.senderId,
seq: m.seq,
segments: m.segments || [],
status: m.status as any,
created_at: m.createdAt,
}));
// 合并到现有消息
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedMessages = [...formattedMessages, ...existingMessages].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, mergedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: { conversationId, messages: mergedMessages, source: 'local_history' },
timestamp: Date.now(),
});
// 异步填充 sender 信息
this.enrichMessagesWithSenderInfo(conversationId, mergedMessages);
return formattedMessages;
}
// 本地数据不足,从服务端获取
const response = await messageService.getMessages(conversationId, undefined, beforeSeq, limit);
if (response?.messages && response.messages.length > 0) {
const serverMessages = response.messages;
// 保存到本地
await saveMessagesBatch(serverMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
})));
// 合并消息
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedMessages = [...serverMessages, ...existingMessages].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, mergedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: { conversationId, messages: mergedMessages, source: 'server_history' },
timestamp: Date.now(),
});
// 异步填充 sender 信息
this.enrichMessagesWithSenderInfo(conversationId, mergedMessages);
return serverMessages;
}
return [];
} catch (error) {
console.error('[MessageManager] 加载更多消息失败:', error);
return [];
}
}
/**
* 获取未读数
*/
async fetchUnreadCount(): Promise<void> {
try {
const [unreadData, systemUnreadData] = await Promise.all([
messageService.getUnreadCount(),
messageService.getSystemUnreadCount(),
]);
this.state.totalUnreadCount = unreadData?.total_unread_count ?? 0;
this.state.systemUnreadCount = systemUnreadData?.unread_count ?? 0;
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
} catch (error) {
console.error('[MessageManager] 获取未读数失败:', error);
}
}
// ==================== 数据操作方法 ====================
/**
* 发送消息
*/
async sendMessage(
conversationId: string,
segments: MessageSegment[],
options?: { replyToId?: string }
): Promise<MessageResponse | null> {
try {
// 调用API发送
const response = await messageService.sendMessage(conversationId, {
segments,
reply_to_id: options?.replyToId,
});
if (response) {
// 添加到本地消息列表
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const newMessage: MessageResponse = {
id: response.id,
conversation_id: conversationId,
sender_id: this.getCurrentUserId() || '',
seq: response.seq,
segments,
created_at: new Date().toISOString(),
status: 'normal',
};
const updatedMessages = [...existingMessages, newMessage].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, updatedMessages);
// 关键:立即广播消息列表更新,确保 ChatScreen 立刻显示新消息
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: updatedMessages,
newMessage,
source: 'send',
},
timestamp: Date.now(),
});
// 更新会话
this.updateConversationWithNewMessage(conversationId, newMessage, newMessage.created_at);
// 通知
this.notifySubscribers({
type: 'message_sent',
payload: { conversationId, message: newMessage },
timestamp: Date.now(),
});
// 保存到本地数据库
const textContent = segments?.filter((s: any) => s.type === 'text').map((s: any) => s.data?.text || '').join('') || '';
saveMessage({
id: response.id,
conversationId,
senderId: this.getCurrentUserId() || '',
content: textContent,
type: segments?.find((s: any) => s.type === 'image') ? 'image' : 'text',
isRead: true,
createdAt: newMessage.created_at,
seq: response.seq,
status: 'normal',
segments,
}).catch(console.error);
return newMessage;
}
return null;
} catch (error) {
console.error('[MessageManager] 发送消息失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'sendMessage' },
timestamp: Date.now(),
});
return null;
}
}
/**
* 标记会话已读
* 关键方法:确保已读状态在所有组件间同步
*
* 修复:使用 pendingReadMap 和 readStateVersion 防止已读状态被旧数据覆盖
* 1. 记录已读时间戳和版本号
* 2. API 完成后延迟清除保护5秒
* 3. fetchConversations 使用版本号判断数据新旧
*/
async markAsRead(conversationId: string, seq: number): Promise<void> {
const conversation = this.state.conversations.get(conversationId);
if (!conversation) {
console.warn('[MessageManager] 会话不存在:', conversationId);
return;
}
const prevUnreadCount = conversation.unread_count || 0;
const existingRecord = this.pendingReadMap.get(conversationId);
// 使用 seq 去重:同一会话若已上报到更大/相同 seq则跳过重复上报
if (existingRecord && seq <= existingRecord.lastReadSeq) {
return;
}
// 清除可能存在的旧定时器
if (existingRecord?.clearTimer) {
clearTimeout(existingRecord.clearTimer);
}
// 递增全局版本号
this.readStateVersion++;
const currentVersion = this.readStateVersion;
// 1. 标记此会话有进行中的已读请求(防止 fetchConversations 覆盖乐观状态)
// 记录时间戳和版本号,用于后续智能合并
this.pendingReadMap.set(conversationId, {
timestamp: Date.now(),
version: currentVersion,
lastReadSeq: seq,
});
// 2. 乐观更新本地状态立即反映到UI
const updatedConv: ConversationResponse = {
...conversation,
unread_count: 0,
};
this.state.conversations.set(conversationId, updatedConv);
this.state.totalUnreadCount = Math.max(0, this.state.totalUnreadCount - prevUnreadCount);
this.updateConversationList();
// 3. 更新本地数据库
markConversationAsRead(conversationId).catch(console.error);
// 4. 立即通知所有订阅者
this.notifySubscribers({
type: 'message_read',
payload: {
conversationId,
unreadCount: 0,
totalUnreadCount: this.state.totalUnreadCount,
},
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
// 5. 调用 API完成后设置延迟清除保护
try {
await messageService.markAsRead(conversationId, seq);
} catch (error) {
console.error('[MessageManager] 标记已读API失败:', error);
// 失败时回滚状态
this.state.conversations.set(conversationId, conversation);
this.state.totalUnreadCount += prevUnreadCount;
this.updateConversationList();
this.notifySubscribers({
type: 'message_read',
payload: {
conversationId,
unreadCount: prevUnreadCount,
totalUnreadCount: this.state.totalUnreadCount,
},
timestamp: Date.now(),
});
// API 失败时立即清除保护,允许后续 fetch 恢复状态
this.pendingReadMap.delete(conversationId);
return;
}
// API 成功后,设置延迟清除保护
// 这确保在 API 完成后的一段时间内fetchConversations 不会用服务器旧数据覆盖已读状态
const clearTimer = setTimeout(() => {
const record = this.pendingReadMap.get(conversationId);
// 只有版本号匹配时才清除(防止清除新的已读请求的保护)
if (record && record.version === currentVersion) {
this.pendingReadMap.delete(conversationId);
}
}, READ_STATE_PROTECTION_DELAY);
// 更新记录保存定时器ID
this.pendingReadMap.set(conversationId, {
timestamp: Date.now(),
version: currentVersion,
lastReadSeq: seq,
clearTimer,
});
}
/**
* 标记所有消息已读
*/
async markAllAsRead(): Promise<void> {
// 乐观更新
const prevConversations = new Map(this.state.conversations);
const prevTotalUnread = this.state.totalUnreadCount;
this.state.conversations.forEach(conv => {
conv.unread_count = 0;
});
this.state.totalUnreadCount = 0;
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: 0,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
try {
// 标记系统消息已读
await messageService.markAllSystemMessagesRead();
// 标记所有会话已读
const promises = Array.from(prevConversations.values())
.filter(conv => (conv.unread_count || 0) > 0)
.map(conv => messageService.markAsRead(conv.id, conv.last_seq));
await Promise.all(promises);
} catch (error) {
console.error('[MessageManager] 标记所有已读失败:', error);
// 回滚
this.state.conversations = prevConversations;
this.state.totalUnreadCount = prevTotalUnread;
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: prevTotalUnread,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
}
}
/**
* 创建私聊会话
*/
async createConversation(userId: string): Promise<ConversationResponse | null> {
try {
const conversation = await messageService.createConversation(userId);
// 添加到会话列表
this.state.conversations.set(conversation.id, conversation);
this.updateConversationList();
// 通知更新
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
return conversation;
} catch (error) {
console.error('[MessageManager] 创建会话失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'createConversation' },
timestamp: Date.now(),
});
return null;
}
}
/**
* 本地更新会话信息
*/
updateConversation(conversationId: string, updates: Partial<ConversationResponse>): void {
const conversation = this.state.conversations.get(conversationId);
if (!conversation) {
console.warn('[MessageManager] 更新会话失败,会话不存在:', conversationId);
return;
}
const updatedConv: ConversationResponse = {
...conversation,
...updates,
};
this.state.conversations.set(conversationId, updatedConv);
this.updateConversationList();
// 通知更新
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
/**
* 仅自己删除会话后的本地状态回收
*/
removeConversation(conversationId: string): void {
const normalizedConversationId = this.normalizeConversationId(conversationId);
const target = this.state.conversations.get(normalizedConversationId);
const removedUnread = target?.unread_count || 0;
this.state.conversations.delete(normalizedConversationId);
this.state.messagesMap.delete(normalizedConversationId);
this.state.totalUnreadCount = Math.max(0, this.state.totalUnreadCount - removedUnread);
if (this.state.currentConversationId === normalizedConversationId) {
this.state.currentConversationId = null;
}
this.updateConversationList();
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
deleteConversationFromDb(normalizedConversationId).catch(error => {
console.error('[MessageManager] 删除本地会话失败:', error);
});
}
/**
* 设置系统消息未读数
*/
setSystemUnreadCount(count: number): void {
this.state.systemUnreadCount = count;
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
}
/**
* 增加系统消息未读数
*/
incrementSystemUnreadCount(): void {
this.state.systemUnreadCount += 1;
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
}
/**
* 减少系统消息未读数
*/
decrementSystemUnreadCount(count = 1): void {
this.state.systemUnreadCount = Math.max(0, this.state.systemUnreadCount - count);
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
}
/**
* 更新单个会话未读数
*/
updateUnreadCount(conversationId: string, count: number): void {
const conversation = this.state.conversations.get(conversationId);
if (!conversation) {
console.warn('[MessageManager] 更新未读数失败,会话不存在:', conversationId);
return;
}
const prevUnreadCount = conversation.unread_count || 0;
const diff = count - prevUnreadCount;
// 更新会话未读数
const updatedConv: ConversationResponse = {
...conversation,
unread_count: count,
};
this.state.conversations.set(conversationId, updatedConv);
// 更新总未读数
this.state.totalUnreadCount = Math.max(0, this.state.totalUnreadCount + diff);
this.updateConversationList();
// 通知更新
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
/**
* 清空会话列表
*/
clearConversations(): void {
this.state.conversations.clear();
this.state.conversationList = [];
this.state.totalUnreadCount = 0;
this.state.systemUnreadCount = 0;
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: [] },
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: 0,
systemUnreadCount: 0,
},
timestamp: Date.now(),
});
}
/**
* 使缓存失效(与旧版兼容的接口)
* 实际 MessageManager 使用内存状态,此方法用于兼容性
*/
invalidateCache(type: 'all' | 'list' | 'unread' = 'all'): void {
// MessageManager 不使用外部缓存,内存状态始终是最新的
// 此方法仅用于 API 兼容性
}
/**
* 增加未读数
*/
private incrementUnreadCount(conversationId: string): void {
const conversation = this.state.conversations.get(conversationId);
if (conversation) {
const prevUnreadCount = conversation.unread_count || 0;
const newUnreadCount = prevUnreadCount + 1;
// 创建新对象而不是直接修改,确保状态一致性
const updatedConv: ConversationResponse = {
...conversation,
unread_count: newUnreadCount,
};
this.state.conversations.set(conversationId, updatedConv);
this.state.totalUnreadCount += 1;
// 更新会话列表排序因为unread_count变化了
this.updateConversationList();
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
}
// ==================== 状态查询方法 ====================
/**
* 获取会话列表
*/
getConversations(): ConversationResponse[] {
return this.state.conversationList;
}
/**
* 获取指定会话
*/
getConversation(conversationId: string): ConversationResponse | null {
const normalizedConversationId = this.normalizeConversationId(conversationId);
return this.state.conversations.get(normalizedConversationId) || null;
}
/**
* 获取指定会话的消息
*/
getMessages(conversationId: string): MessageResponse[] {
const normalizedConversationId = this.normalizeConversationId(conversationId);
return this.state.messagesMap.get(normalizedConversationId) || [];
}
/**
* 获取总未读数
*/
getUnreadCount(): { total: number; system: number } {
return {
total: this.state.totalUnreadCount,
system: this.state.systemUnreadCount,
};
}
/**
* 获取WebSocket连接状态
*/
isConnected(): boolean {
return this.state.isWebSocketConnected;
}
/**
* 获取加载状态
*/
isLoading(): boolean {
return this.state.isLoadingConversations;
}
/**
* 获取会话消息加载状态
*/
isLoadingMessages(conversationId: string): boolean {
const normalizedConversationId = this.normalizeConversationId(conversationId);
return this.state.loadingMessagesSet.has(normalizedConversationId);
}
// ==================== 活动会话管理 ====================
/**
* 设置当前活动会话(进入聊天页面时调用)
*/
setActiveConversation(conversationId: string | null): void {
const normalizedConversationId = conversationId ? this.normalizeConversationId(conversationId) : null;
this.state.currentConversationId = normalizedConversationId;
}
/**
* 激活会话(架构入口)
* 单一职责:设置活动会话 + 保证初始化 + 串行同步该会话消息
*/
async activateConversation(conversationId: string, options?: { forceSync?: boolean }): Promise<void> {
const normalizedConversationId = this.normalizeConversationId(conversationId);
if (!normalizedConversationId) return;
await this.initialize();
this.setActiveConversation(normalizedConversationId);
// 先下发当前内存快照,确保 UI 有确定性起点
const snapshot = this.getMessages(normalizedConversationId);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId: normalizedConversationId,
messages: [...snapshot],
source: 'activation_snapshot',
},
timestamp: Date.now(),
});
const existingTask = this.activatingConversationTasks.get(normalizedConversationId);
if (existingTask && !options?.forceSync) {
await existingTask;
return;
}
const task = (async () => {
await this.fetchMessages(normalizedConversationId);
})();
this.activatingConversationTasks.set(normalizedConversationId, task);
try {
await task;
} finally {
if (this.activatingConversationTasks.get(normalizedConversationId) === task) {
this.activatingConversationTasks.delete(normalizedConversationId);
}
}
}
/**
* 获取当前活动会话ID
*/
getActiveConversation(): string | null {
return this.state.currentConversationId;
}
// ==================== 订阅机制 ====================
/**
* 订阅消息事件
* 返回取消订阅函数
*/
subscribe(subscriber: MessageSubscriber): () => void {
this.state.subscribers.add(subscriber);
// 立即发送当前状态给新订阅者
subscriber({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
subscriber({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
subscriber({
type: 'connection_changed',
payload: { connected: this.state.isWebSocketConnected },
timestamp: Date.now(),
});
// 【修复】如果当前有活动会话,立即发送该会话的消息给新订阅者
// 这样新订阅者可以立即获取当前消息状态,而不需要等待下一次更新
if (this.state.currentConversationId) {
const currentMessages = this.state.messagesMap.get(this.state.currentConversationId);
if (currentMessages && currentMessages.length > 0) {
subscriber({
type: 'messages_updated',
payload: {
conversationId: this.state.currentConversationId,
messages: [...currentMessages],
source: 'initial_sync',
},
timestamp: Date.now(),
});
}
}
// 返回取消订阅函数
return () => {
this.state.subscribers.delete(subscriber);
};
}
/**
* 订阅特定会话的消息更新
*/
subscribeToMessages(conversationId: string, callback: (messages: MessageResponse[]) => void): () => void {
const subscriber: MessageSubscriber = (event) => {
if (event.type === 'messages_updated' && event.payload.conversationId === conversationId) {
callback(event.payload.messages);
}
};
return this.subscribe(subscriber);
}
// ==================== 群聊相关状态查询 ====================
/**
* 获取群组的输入状态
*/
getTypingUsers(groupId: string): string[] {
return this.state.typingUsersMap.get(groupId) || [];
}
/**
* 获取群组的禁言状态
*/
isMuted(groupId: string): boolean {
return this.state.mutedStatusMap.get(groupId) || false;
}
/**
* 设置群组的禁言状态(本地)
*/
setMutedStatus(groupId: string, isMuted: boolean): void {
this.state.mutedStatusMap.set(groupId, isMuted);
}
}
// ==================== 单例导出 ====================
export const messageManager = new MessageManager();
// 默认导出
export default messageManager;