Files
frontend/src/stores/messageManager.ts

2255 lines
74 KiB
TypeScript
Raw Normal View History

/**
* MessageManager -
*
*
*
* 1.
* 2.
*
*
* - conversations和messages状态
* - WebSocket消息
* -
* - React组件使用
*/
import { ConversationResponse, MessageResponse, MessageSegment, UserDTO } from '../types/dto';
import { messageService } from '../services/messageService';
import {
websocketService,
WSChatMessage,
WSGroupChatMessage,
WSReadMessage,
WSGroupReadMessage,
WSRecallMessage,
WSGroupRecallMessage,
WSGroupTypingMessage,
WSGroupNoticeMessage,
GroupNoticeType,
} from '../services/websocketService';
import {
saveMessage,
saveMessagesBatch,
getMessagesByConversation,
getMaxSeq,
getMinSeq,
getMessagesBeforeSeq,
markConversationAsRead,
updateConversationCacheUnreadCount,
CachedMessage,
getUserCache,
saveUserCache,
deleteMessage as deleteMessageFromDb,
deleteConversation as deleteConversationFromDb,
} from '../services/database';
import { api } from '../services/api';
import { useAuthStore } from './authStore';
// ==================== 类型定义 ====================
export type MessageEventType =
| 'conversations_updated'
| 'messages_updated'
| 'unread_count_updated'
| 'connection_changed'
| 'message_sent'
| 'message_read'
| 'message_received'
| 'message_recalled'
| 'typing_status'
| 'group_notice'
| 'error';
export interface MessageEvent {
type: MessageEventType;
payload: any;
timestamp: number;
}
export type MessageSubscriber = (event: MessageEvent) => void;
// ==================== 内部状态接口 ====================
interface MessageManagerState {
// 会话相关
conversations: Map<string, ConversationResponse>;
conversationList: ConversationResponse[];
// 消息相关 - 按会话ID存储
messagesMap: Map<string, MessageResponse[]>;
// 未读数
totalUnreadCount: number;
systemUnreadCount: number;
// 连接状态
isWebSocketConnected: boolean;
// 当前活动会话ID用户正在查看的会话
currentConversationId: string | null;
// 加载状态
isLoadingConversations: boolean;
loadingMessagesSet: Set<string>; // 正在加载消息的会话ID集合
// 初始化状态
isInitialized: boolean;
// 订阅者
subscribers: Set<MessageSubscriber>;
// 输入状态 - 按群组ID存储正在输入的用户ID列表
typingUsersMap: Map<string, string[]>;
// 当前用户的禁言状态 - 按群组ID存储
mutedStatusMap: Map<string, boolean>;
}
// ==================== 已读状态保护机制常量 ====================
/**
*
* API fetchConversations
*/
const READ_STATE_PROTECTION_DELAY = 5000; // 5秒
/**
*
*/
interface ReadStateRecord {
/** 标记已读的时间戳 */
timestamp: number;
/** 状态版本号,用于防止旧数据覆盖新数据 */
version: number;
/** 已上报的已读序号(用于去重) */
lastReadSeq: number;
/** 清除保护的定时器ID */
clearTimer?: NodeJS.Timeout;
}
// ==================== MessageManager 类 ====================
class MessageManager {
private state: MessageManagerState;
private wsUnsubscribe: (() => void) | null = null;
private currentUserId: string | null = null;
private authUnsubscribe: (() => void) | null = null;
private lastReconnectSyncAt: number = 0;
private initializePromise: Promise<void> | null = null;
private activatingConversationTasks: Map<string, Promise<void>> = new Map();
/**
* API
* fetchConversations
* key: conversationId, value: 已读状态记录
*/
private pendingReadMap: Map<string, ReadStateRecord> = new Map();
/**
*
*
*/
private readStateVersion: number = 0;
constructor() {
this.state = {
conversations: new Map(),
conversationList: [],
messagesMap: new Map(),
totalUnreadCount: 0,
systemUnreadCount: 0,
isWebSocketConnected: false,
currentConversationId: null,
isLoadingConversations: false,
loadingMessagesSet: new Set(),
isInitialized: false,
subscribers: new Set(),
typingUsersMap: new Map(),
mutedStatusMap: new Map(),
};
// 监听认证状态变化
this.initAuthListener();
}
// ==================== 私有工具方法 ====================
private initAuthListener() {
// 获取当前用户ID
const authState = useAuthStore.getState();
this.currentUserId = authState.currentUser?.id || null;
// 如果已登录,自动初始化
if (this.currentUserId && !this.state.isInitialized) {
this.initialize();
}
// 监听登录态变化,修复冷启动时 currentUser 延迟就绪导致未初始化的问题
if (!this.authUnsubscribe) {
this.authUnsubscribe = useAuthStore.subscribe((state) => {
const nextUserId = state.currentUser?.id || null;
const prevUserId = this.currentUserId;
this.currentUserId = nextUserId;
if (nextUserId && !this.state.isInitialized) {
this.initialize();
}
// 冷启动时若用户信息后到,自动重激活当前会话,保证首进 Chat 不丢首次拉取
if (nextUserId && this.state.currentConversationId) {
this.activateConversation(this.state.currentConversationId, { forceSync: true }).catch(error => {
console.error('[MessageManager] 登录态就绪后重激活会话失败:', error);
});
}
// 退出登录时清理内存状态,避免脏数据残留
if (!nextUserId && prevUserId && this.state.isInitialized) {
this.destroy();
}
});
}
}
private getCurrentUserId(): string | null {
if (!this.currentUserId) {
this.currentUserId = useAuthStore.getState().currentUser?.id || null;
}
return this.currentUserId;
}
private normalizeConversationId(conversationId: string | number | null | undefined): string {
return conversationId == null ? '' : String(conversationId);
}
private notifySubscribers(event: MessageEvent) {
this.state.subscribers.forEach(subscriber => {
try {
subscriber(event);
} catch (error) {
console.error('[MessageManager] 订阅者执行失败:', error);
}
});
}
private updateConversationList() {
// 会话排序:置顶优先,再按最后消息时间排序
const list = Array.from(this.state.conversations.values()).sort((a, b) => {
const aPinned = a.is_pinned ? 1 : 0;
const bPinned = b.is_pinned ? 1 : 0;
if (aPinned !== bPinned) {
return bPinned - aPinned;
}
const aTime = new Date(a.last_message_at || a.updated_at || 0).getTime();
const bTime = new Date(b.last_message_at || b.updated_at || 0).getTime();
return bTime - aTime;
});
this.state.conversationList = list;
}
// ==================== 初始化与销毁 ====================
async initialize(): Promise<void> {
if (this.state.isInitialized) {
console.log('[MessageManager] 已经初始化,跳过');
return;
}
if (this.initializePromise) {
await this.initializePromise;
return;
}
console.log('[MessageManager] 开始初始化');
this.initializePromise = (async () => {
try {
// 1. 获取当前用户ID
this.currentUserId = this.getCurrentUserId();
if (!this.currentUserId) {
console.warn('[MessageManager] 用户未登录,延迟初始化');
return;
}
// 2. 初始化WebSocket监听
this.initWebSocketListeners();
// 3. 加载会话列表
await this.fetchConversations();
// 4. 加载未读数
await this.fetchUnreadCount();
this.state.isInitialized = true;
console.log('[MessageManager] 初始化完成');
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
// 若初始化完成时已有活动会话(用户已在 Chat 页面),立即补齐该会话同步
if (this.state.currentConversationId) {
await this.fetchMessages(this.state.currentConversationId);
}
} catch (error) {
console.error('[MessageManager] 初始化失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'initialize' },
timestamp: Date.now(),
});
}
})();
try {
await this.initializePromise;
} finally {
this.initializePromise = null;
}
}
destroy(): void {
console.log('[MessageManager] 销毁资源');
if (this.wsUnsubscribe) {
this.wsUnsubscribe();
this.wsUnsubscribe = null;
}
this.state.subscribers.clear();
this.state.isInitialized = false;
this.initializePromise = null;
this.activatingConversationTasks.clear();
}
// ==================== WebSocket 处理 ====================
private initWebSocketListeners(): void {
if (this.wsUnsubscribe) {
this.wsUnsubscribe();
}
console.log('[MessageManager] 初始化WebSocket监听');
// 监听私聊消息
websocketService.on('chat', (message: WSChatMessage) => {
console.log('[MessageManager] 收到私聊消息:', message.id);
this.handleNewMessage(message);
});
// 监听群聊消息
websocketService.on('group_message', (message: WSGroupChatMessage) => {
console.log('[MessageManager] 收到群聊消息:', message.id);
this.handleNewMessage(message);
});
// 监听私聊已读回执
websocketService.on('read', (message: WSReadMessage) => {
console.log('[MessageManager] 收到已读回执:', message);
this.handleReadReceipt(message);
});
// 监听群聊已读回执
websocketService.on('group_read', (message: WSGroupReadMessage) => {
console.log('[MessageManager] 收到群聊已读回执:', message);
this.handleGroupReadReceipt(message);
});
// 监听私聊消息撤回
websocketService.on('recall', (message: WSRecallMessage) => {
console.log('[MessageManager] 收到私聊撤回:', message);
this.handleRecallMessage(message);
});
// 监听群聊消息撤回
websocketService.on('group_recall', (message: WSGroupRecallMessage) => {
console.log('[MessageManager] 收到群聊撤回:', message);
this.handleGroupRecallMessage(message);
});
// 监听群聊输入状态
websocketService.on('group_typing', (message: WSGroupTypingMessage) => {
this.handleGroupTyping(message);
});
// 监听群通知
websocketService.on('group_notice', (message: WSGroupNoticeMessage) => {
console.log('[MessageManager] 收到群通知:', message);
this.handleGroupNotice(message);
});
// 监听连接状态
websocketService.onConnect(() => {
console.log('[MessageManager] WebSocket已连接');
this.state.isWebSocketConnected = true;
this.notifySubscribers({
type: 'connection_changed',
payload: { connected: true },
timestamp: Date.now(),
});
// 冷启动/重连兜底:连接恢复后同步会话和当前会话消息,避免首屏空数据
const now = Date.now();
if (now - this.lastReconnectSyncAt > 1500) {
this.lastReconnectSyncAt = now;
this.fetchConversations(true).catch(error => {
console.error('[MessageManager] 连接后同步会话失败:', error);
});
if (this.state.currentConversationId) {
this.fetchMessages(this.state.currentConversationId).catch(error => {
console.error('[MessageManager] 连接后同步当前会话消息失败:', error);
});
}
}
});
websocketService.onDisconnect(() => {
console.log('[MessageManager] WebSocket已断开');
this.state.isWebSocketConnected = false;
this.notifySubscribers({
type: 'connection_changed',
payload: { connected: false },
timestamp: Date.now(),
});
});
}
// ==================== 消息处理核心方法 ====================
// 已处理消息ID集合用于去重防止ACK消息和正常消息重复处理
private processedMessageIds: Set<string> = new Set();
private processedMessageIdsExpiry: Map<string, number> = new Map();
/**
* ID
*/
private cleanupProcessedMessageIds(): void {
const now = Date.now();
const expiryTime = 5 * 60 * 1000; // 5分钟过期
for (const [id, timestamp] of this.processedMessageIdsExpiry.entries()) {
if (now - timestamp > expiryTime) {
this.processedMessageIds.delete(id);
this.processedMessageIdsExpiry.delete(id);
}
}
}
/**
*
*/
private isMessageProcessed(messageId: string): boolean {
return this.processedMessageIds.has(messageId);
}
/**
*
*/
private markMessageAsProcessed(messageId: string): void {
this.processedMessageIds.add(messageId);
this.processedMessageIdsExpiry.set(messageId, Date.now());
// 定期清理
if (this.processedMessageIds.size > 1000) {
this.cleanupProcessedMessageIds();
}
}
// 正在获取用户信息的请求映射(用于去重)
private pendingUserRequests: Map<string, Promise<UserDTO | null>> = new Map();
/**
*
*/
private buildGroupNoticeText(noticeType: GroupNoticeType, data: any): string {
const username = data?.username || '用户';
switch (noticeType) {
case 'member_join':
return `"${username}" 加入了群聊`;
case 'member_leave':
return `"${username}" 退出了群聊`;
case 'member_removed':
return `"${username}" 被移出群聊`;
case 'muted':
return `"${username}" 已被管理员禁言`;
case 'unmuted':
return `"${username}" 已被管理员解除禁言`;
default:
return '';
}
}
/**
* ID查找会话ID
*/
private findConversationIdByGroupId(groupId: string): string | null {
for (const conv of this.state.conversationList) {
if (String(conv.group?.id || '') === groupId) {
return conv.id;
}
}
return null;
}
/**
* sender
*/
private enrichMessagesWithSenderInfo(conversationId: string, messages: MessageResponse[]): void {
const currentUserId = this.getCurrentUserId();
// 收集所有需要查询的唯一 sender_id排除当前用户
const senderIds = [...new Set(
messages
.filter(m => m.sender_id && m.sender_id !== currentUserId && m.sender_id !== '10000' && !m.sender)
.map(m => m.sender_id)
)];
if (senderIds.length === 0) return;
// 并发获取所有 sender 信息
Promise.all(senderIds.map(id => this.getSenderInfo(id).then(user => ({ id, user }))))
.then(results => {
const senderMap = new Map<string, UserDTO>();
results.forEach(({ id, user }) => {
if (user) senderMap.set(id, user);
});
if (senderMap.size === 0) return;
const current = this.state.messagesMap.get(conversationId);
if (!current) return;
let changed = false;
const updated = current.map(m => {
if (!m.sender && m.sender_id && senderMap.has(m.sender_id)) {
changed = true;
return { ...m, sender: senderMap.get(m.sender_id) };
}
return m;
});
if (!changed) return;
this.state.messagesMap.set(conversationId, updated);
this.notifySubscribers({
type: 'messages_updated',
payload: { conversationId, messages: [...updated] },
timestamp: Date.now(),
});
})
.catch(error => {
console.error('[MessageManager] 批量获取 sender 信息失败:', error);
});
}
/**
*
*/
private async getSenderInfo(userId: string): Promise<UserDTO | null> {
// 1. 先检查本地缓存
const cachedUser = await getUserCache(userId);
if (cachedUser) {
return cachedUser;
}
// 2. 检查是否已有正在进行的请求
const pendingRequest = this.pendingUserRequests.get(userId);
if (pendingRequest) {
return pendingRequest;
}
// 3. 发起新请求
const request = this.fetchUserInfo(userId);
this.pendingUserRequests.set(userId, request);
try {
const user = await request;
return user;
} finally {
// 请求完成后清理
this.pendingUserRequests.delete(userId);
}
}
/**
*
*/
private async fetchUserInfo(userId: string): Promise<UserDTO | null> {
try {
const response = await api.get<UserDTO>(`/users/${userId}`);
if (response.code === 0 && response.data) {
// 缓存到本地数据库
await saveUserCache(response.data);
return response.data;
}
return null;
} catch (error) {
console.error(`[MessageManager] 获取用户信息失败: ${userId}`, error);
return null;
}
}
/**
* WebSocket推送
*
*/
private async handleNewMessage(message: (WSChatMessage | WSGroupChatMessage) & { _isAck?: boolean }): Promise<void> {
const { conversation_id, id, sender_id, seq, segments, created_at, _isAck } = message;
const normalizedConversationId = this.normalizeConversationId(conversation_id);
const currentUserId = this.getCurrentUserId();
// 【调试日志】追踪消息来源和重复问题
console.log('[MessageManager][DEBUG] 处理新消息:', {
messageId: id,
conversationId: normalizedConversationId,
senderId: sender_id,
seq,
currentUserId,
messageType: message.type,
isAck: _isAck,
currentConversationId: this.state.currentConversationId,
isActiveConversation: normalizedConversationId === this.state.currentConversationId,
timestamp: Date.now(),
});
// 0. 如果是ACK消息直接跳过不增加未读数ACK是发送确认不是新消息
if (_isAck) {
console.log('[MessageManager][DEBUG] ACK消息跳过未读数处理:', id);
// 但仍然需要更新消息列表因为ACK包含完整消息内容
// 继续处理但不增加未读数
}
// 1. 消息去重检查 - 防止ACK消息和正常消息重复处理
if (this.isMessageProcessed(id)) {
console.log('[MessageManager][DEBUG] 消息已处理,跳过:', id);
return;
}
this.markMessageAsProcessed(id);
// 2. 对于群聊消息,获取发送者信息(如果不是当前用户发送的)
if (message.type === 'group_message' && sender_id && sender_id !== currentUserId && sender_id !== '10000') {
// 异步获取发送者信息,不阻塞消息显示
this.getSenderInfo(sender_id).then(user => {
if (user) {
console.log('[MessageManager][DEBUG] 获取到发送者信息:', { userId: sender_id, nickname: user.nickname });
// 更新消息对象中的发送者信息
const messages = this.state.messagesMap.get(normalizedConversationId);
if (messages) {
// 创建新的数组和对象确保React能检测到变化
const updatedMessages = messages.map(m =>
m.id === id ? { ...m, sender: user } : m
);
this.state.messagesMap.set(normalizedConversationId, updatedMessages);
console.log('[MessageManager][DEBUG] 更新消息发送者信息:', { messageId: id, sender: user.nickname });
// 通知订阅者消息已更新
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId: normalizedConversationId,
messages: [...updatedMessages], // 创建新数组确保React检测到变化
},
timestamp: Date.now(),
});
}
} else {
console.log('[MessageManager][DEBUG] 未能获取到发送者信息:', { userId: sender_id });
}
}).catch(error => {
console.error('[MessageManager][DEBUG] 获取发送者信息失败:', { userId: sender_id, error });
});
}
// 3. 构造消息对象
const newMessage: MessageResponse = {
id,
conversation_id: normalizedConversationId,
sender_id,
seq,
segments: segments || [],
created_at,
status: 'normal',
};
// 2. 立即更新内存中的消息列表关键确保ChatScreen能立即看到
const existingMessages = this.state.messagesMap.get(normalizedConversationId) || [];
const messageExists = existingMessages.some(m => m.id === id);
if (!messageExists) {
const updatedMessages = [...existingMessages, newMessage].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(normalizedConversationId, updatedMessages);
// 3. 立即通知订阅者(关键:解决竞态条件)
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId: normalizedConversationId,
messages: updatedMessages,
newMessage,
},
timestamp: Date.now(),
});
}
// 4. 更新会话信息
this.updateConversationWithNewMessage(normalizedConversationId, newMessage, created_at);
// 5. 更新未读数(如果不是当前用户发送且不是当前活动会话)
const isCurrentUserMessage = sender_id === currentUserId;
const isActiveConversation = normalizedConversationId === this.state.currentConversationId;
// 【调试日志】追踪未读数增加逻辑
console.log('[MessageManager][DEBUG] 未读数判断:', {
messageId: id,
sender_id,
currentUserId,
isCurrentUserMessage,
isActiveConversation,
currentConversationId: this.state.currentConversationId,
shouldIncrement: !isCurrentUserMessage && !isActiveConversation,
});
// 修复:确保当前用户发送的消息不会增加未读数
// 同时确保currentUserId有效避免undefined === undefined的情况
// ACK消息发送确认也不应该增加未读数
const shouldIncrementUnread = !isCurrentUserMessage && !isActiveConversation && !!currentUserId && !_isAck;
if (shouldIncrementUnread) {
console.log('[MessageManager][DEBUG] 增加未读数:', { conversationId: conversation_id, messageId: id });
this.incrementUnreadCount(normalizedConversationId);
} else {
console.log('[MessageManager][DEBUG] 跳过未读数增加:', {
isCurrentUserMessage,
isActiveConversation,
hasCurrentUserId: !!currentUserId,
isAck: _isAck,
});
}
// 6. 如果是当前活动会话,自动标记已读
if (isActiveConversation && !isCurrentUserMessage) {
this.markAsRead(normalizedConversationId, seq);
}
// 7. 异步保存到本地数据库不阻塞UI更新
const textContent = segments?.filter((s: any) => s.type === 'text').map((s: any) => s.data?.text || '').join('') || '';
saveMessage({
id,
conversationId: normalizedConversationId,
senderId: sender_id || '',
content: textContent,
type: segments?.find((s: any) => s.type === 'image') ? 'image' : 'text',
isRead: isActiveConversation, // 如果是当前活动会话,标记为已读
createdAt: new Date(created_at).toISOString(),
seq,
status: 'normal',
segments,
}).catch(error => {
console.error('[MessageManager] 保存消息到本地失败:', error);
});
// 8. 通知收到新消息
this.notifySubscribers({
type: 'message_received',
payload: {
conversationId: conversation_id,
message: newMessage,
isCurrentUser: isCurrentUserMessage,
},
timestamp: Date.now(),
});
}
/**
*
*/
private updateConversationWithNewMessage(
conversationId: string,
message: MessageResponse,
createdAt: string
): void {
const conversation = this.state.conversations.get(conversationId);
if (conversation) {
console.log('[MessageManager][DEBUG] updateConversationWithNewMessage - 更新前:', {
conversationId,
prevUnreadCount: conversation.unread_count,
conversationRef: conversation,
});
// 更新现有会话
const updatedConv: ConversationResponse = {
...conversation,
last_seq: message.seq,
last_message: message,
last_message_at: createdAt,
updated_at: createdAt,
};
this.state.conversations.set(conversationId, updatedConv);
console.log('[MessageManager][DEBUG] updateConversationWithNewMessage - 更新后:', {
conversationId,
newUnreadCount: updatedConv.unread_count,
updatedConvRef: updatedConv,
conversationInMap: this.state.conversations.get(conversationId),
isSameRef: updatedConv === this.state.conversations.get(conversationId),
});
} else {
// 新会话,需要获取详情
// 异步获取会话详情
this.fetchConversationDetail(conversationId);
}
// 更新会话列表排序
this.updateConversationList();
// 通知会话列表更新
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
/**
*
*/
private handleReadReceipt(message: WSReadMessage): void {
// 可以在这里处理对方已读的状态更新
console.log('[MessageManager] 处理已读回执:', message);
}
/**
*
*/
private handleGroupReadReceipt(message: WSGroupReadMessage): void {
console.log('[MessageManager] 处理群聊已读回执:', message);
}
/**
*
*/
private handleRecallMessage(message: WSRecallMessage): void {
const { conversation_id, message_id } = message;
// 从消息列表中移除被撤回的消息
const messages = this.state.messagesMap.get(conversation_id);
if (messages) {
const updatedMessages = messages.filter(m => m.id !== message_id);
if (updatedMessages.length !== messages.length) {
this.state.messagesMap.set(conversation_id, updatedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: { conversationId: conversation_id, messages: updatedMessages },
timestamp: Date.now(),
});
}
}
// 通知订阅者消息被撤回
this.notifySubscribers({
type: 'message_recalled',
payload: { conversationId: conversation_id, messageId: message_id },
timestamp: Date.now(),
});
// 从本地数据库删除
deleteMessageFromDb(message_id).catch(error => {
console.error('[MessageManager] 删除本地消息失败:', error);
});
}
/**
*
*/
private handleGroupRecallMessage(message: WSGroupRecallMessage): void {
const { conversation_id, message_id } = message;
// 从消息列表中移除被撤回的消息
const messages = this.state.messagesMap.get(conversation_id);
if (messages) {
const updatedMessages = messages.filter(m => m.id !== message_id);
if (updatedMessages.length !== messages.length) {
this.state.messagesMap.set(conversation_id, updatedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: { conversationId: conversation_id, messages: updatedMessages },
timestamp: Date.now(),
});
}
}
// 通知订阅者消息被撤回
this.notifySubscribers({
type: 'message_recalled',
payload: { conversationId: conversation_id, messageId: message_id, isGroup: true },
timestamp: Date.now(),
});
// 从本地数据库删除
deleteMessageFromDb(message_id).catch(error => {
console.error('[MessageManager] 删除本地消息失败:', error);
});
}
/**
*
*/
private handleGroupTyping(message: WSGroupTypingMessage): void {
const { group_id, user_id, is_typing } = message;
const groupIdStr = String(group_id);
const currentTypingUsers = this.state.typingUsersMap.get(groupIdStr) || [];
let updatedTypingUsers: string[];
if (is_typing) {
// 添加用户到输入列表
if (!currentTypingUsers.includes(user_id)) {
updatedTypingUsers = [...currentTypingUsers, user_id];
} else {
updatedTypingUsers = currentTypingUsers;
}
} else {
// 从输入列表移除用户
updatedTypingUsers = currentTypingUsers.filter(id => id !== user_id);
}
// 只有变化时才更新
if (updatedTypingUsers.length !== currentTypingUsers.length) {
this.state.typingUsersMap.set(groupIdStr, updatedTypingUsers);
this.notifySubscribers({
type: 'typing_status',
payload: { groupId: groupIdStr, typingUsers: updatedTypingUsers },
timestamp: Date.now(),
});
}
}
/**
*
*/
private handleGroupNotice(message: WSGroupNoticeMessage): void {
const { notice_type, group_id, data, timestamp, message_id, seq } = message;
const groupIdStr = String(group_id);
// 处理禁言/解除禁言通知 - 更新本地状态
if (notice_type === 'muted' || notice_type === 'unmuted') {
const mutedUserId = data?.user_id;
const currentUserId = this.getCurrentUserId();
// 如果是当前用户被禁言/解除禁言,更新状态
if (mutedUserId && mutedUserId === currentUserId) {
const isMuted = notice_type === 'muted';
this.state.mutedStatusMap.set(groupIdStr, isMuted);
}
}
// 对于携带 message_id/seq 的群通知,补一条系统消息到当前会话消息流
// 这样 UI 会按系统通知样式渲染,而不是当作普通用户消息。
if (message_id && typeof seq === 'number' && seq > 0) {
const conversationId = this.findConversationIdByGroupId(groupIdStr);
if (conversationId) {
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const exists = existingMessages.some(m => String(m.id) === String(message_id));
if (!exists) {
const noticeText = this.buildGroupNoticeText(notice_type, data);
const systemNoticeMessage: MessageResponse = {
id: String(message_id),
conversation_id: conversationId,
sender_id: '10000',
seq,
segments: noticeText
? [{ type: 'text', data: { text: noticeText } as any }]
: [],
status: 'normal',
category: 'notification',
created_at: new Date(timestamp || Date.now()).toISOString(),
};
const updatedMessages = [...existingMessages, systemNoticeMessage].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, updatedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: updatedMessages,
newMessage: systemNoticeMessage,
source: 'group_notice',
},
timestamp: Date.now(),
});
}
}
}
// 通知订阅者群通知
this.notifySubscribers({
type: 'group_notice',
payload: {
groupId: groupIdStr,
noticeType: notice_type,
data,
timestamp,
messageId: message_id,
seq,
},
timestamp: Date.now(),
});
}
// ==================== 数据获取方法 ====================
/**
*
*/
async fetchConversations(forceRefresh = false): Promise<void> {
if (this.state.isLoadingConversations && !forceRefresh) {
console.log('[MessageManager] 会话列表正在加载中,跳过');
return;
}
this.state.isLoadingConversations = true;
console.log('[MessageManager] 获取会话列表');
try {
const response = await messageService.getConversations(1, 20, forceRefresh);
const conversations = response.list || [];
// 更新会话Map精确保护 pendingReadMap 中的会话不被服务器旧数据覆盖
// 场景markAsRead API 已发出但尚未完成,服务器返回的 unread_count 仍为旧值
this.state.conversations.clear();
conversations.forEach(conv => {
const readRecord = this.pendingReadMap.get(conv.id);
if (readRecord) {
// 此会话有进行中的已读请求或保护期内,使用智能合并逻辑
// 如果服务器返回的 unread_count > 0但本地有更晚的已读记录则保留本地状态
const shouldPreserveLocalRead = conv.unread_count > 0;
if (shouldPreserveLocalRead) {
console.log('[MessageManager] fetchConversations 保护已读状态:', {
conversationId: conv.id,
serverUnread: conv.unread_count,
readTimestamp: readRecord.timestamp,
version: readRecord.version,
});
this.state.conversations.set(conv.id, { ...conv, unread_count: 0 });
} else {
this.state.conversations.set(conv.id, conv);
}
} else {
this.state.conversations.set(conv.id, conv);
}
});
// 更新排序后的列表
this.updateConversationList();
// 计算总未读数(基于保护后的会话数据)
const totalUnread = Array.from(this.state.conversations.values())
.reduce((sum, conv) => sum + (conv.unread_count || 0), 0);
this.state.totalUnreadCount = totalUnread;
// 修复本地缓存refreshConversationsFromServer 已将服务器旧数据写入 SQLite
// 对于 pendingReadMap 中的会话,需要重新把 unread_count=0 写回本地缓存,
// 避免下次冷启动时读到旧的未读数
if (this.pendingReadMap.size > 0) {
for (const convId of this.pendingReadMap.keys()) {
updateConversationCacheUnreadCount(convId, 0).catch(error => {
console.error('[MessageManager] 修复本地缓存未读数失败:', convId, error);
});
}
}
console.log('[MessageManager] 会话列表获取完成:', {
count: conversations.length,
totalUnread,
});
// 通知更新
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
} catch (error) {
console.error('[MessageManager] 获取会话列表失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'fetchConversations' },
timestamp: Date.now(),
});
} finally {
this.state.isLoadingConversations = false;
}
}
/**
*
*/
async fetchConversationDetail(conversationId: string): Promise<ConversationResponse | null> {
try {
const detail = await messageService.getConversationById(conversationId);
if (detail) {
// 若此会话有进行中的已读请求或处于保护期内,保留 unread_count=0
const readRecord = this.pendingReadMap.get(conversationId);
const isPending = !!readRecord;
// 使用智能合并:如果服务器返回 unread_count > 0 但本地有更晚的已读记录,保留本地状态
const shouldPreserveLocalRead = isPending && (detail.unread_count || 0) > 0;
const safeDetail = shouldPreserveLocalRead
? { ...detail as ConversationResponse, unread_count: 0 }
: detail as ConversationResponse;
this.state.conversations.set(conversationId, safeDetail);
this.updateConversationList();
// getConversationById 内部会调用 saveConversationCache 写入服务器旧数据,
// 若有 pending 已读请求或处于保护期内,需要修复本地缓存
if (isPending) {
updateConversationCacheUnreadCount(conversationId, 0).catch(error => {
console.error('[MessageManager] 修复本地缓存未读数失败:', conversationId, error);
});
}
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
return detail as ConversationResponse;
} catch (error) {
console.error('[MessageManager] 获取会话详情失败:', error);
return null;
}
}
/**
*
*
*/
async fetchMessages(conversationId: string, afterSeq?: number): Promise<void> {
// 防止重复加载
if (this.state.loadingMessagesSet.has(conversationId)) {
console.log('[MessageManager] 消息正在加载中,跳过:', conversationId);
return;
}
this.state.loadingMessagesSet.add(conversationId);
console.log('[MessageManager][DEBUG] 获取消息开始:', { conversationId, afterSeq, currentConversationId: this.state.currentConversationId, timestamp: Date.now() });
try {
const mergeMessages = (base: MessageResponse[], incoming: MessageResponse[]): MessageResponse[] => {
if (incoming.length === 0) return base;
const merged = new Map<string, MessageResponse>();
[...base, ...incoming].forEach(msg => {
merged.set(String(msg.id), msg);
});
return Array.from(merged.values()).sort((a, b) => a.seq - b.seq);
};
// 1. 先从本地数据库加载(确保立即有数据展示)
if (!afterSeq) {
let localMessages: any[] = [];
let localMaxSeq = 0;
try {
localMessages = await getMessagesByConversation(conversationId, 20);
localMaxSeq = await getMaxSeq(conversationId);
} catch (error) {
// 架构原则:本地存储异常不阻断在线同步
console.warn('[MessageManager] 读取本地消息失败,回退到服务端同步:', error);
}
console.log('[MessageManager] 本地消息:', { count: localMessages.length, maxSeq: localMaxSeq });
if (localMessages.length > 0) {
// 转换格式
const formattedMessages: MessageResponse[] = localMessages.map(m => ({
id: m.id,
conversation_id: m.conversationId,
sender_id: m.senderId,
seq: m.seq,
segments: m.segments || [],
status: m.status as any,
created_at: m.createdAt,
}));
// 立即更新内存和通知(关键:解决竞态条件)
this.state.messagesMap.set(conversationId, formattedMessages);
console.log('[MessageManager][DEBUG] 本地消息已加载,通知订阅者:', { conversationId, messageCount: formattedMessages.length, timestamp: Date.now() });
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: formattedMessages,
source: 'local',
},
timestamp: Date.now(),
});
// 异步填充 sender 信息
this.enrichMessagesWithSenderInfo(conversationId, formattedMessages);
} else {
// 冷启动兜底:先下发空列表事件,避免 ChatScreen 长时间停留在 loading
this.state.messagesMap.set(conversationId, []);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: [],
source: 'local_empty',
},
timestamp: Date.now(),
});
// 再直接向服务端拉一页最新消息,不依赖 detail.last_seq避免缓存 last_seq 过旧导致首屏空白
try {
const latestResponse = await messageService.getMessages(conversationId, undefined, undefined, 20);
const latestMessages = latestResponse?.messages || [];
if (latestMessages.length > 0) {
const sortedLatestMessages = [...latestMessages].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, sortedLatestMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: sortedLatestMessages,
newMessages: sortedLatestMessages,
source: 'server_bootstrap',
},
timestamp: Date.now(),
});
this.enrichMessagesWithSenderInfo(conversationId, sortedLatestMessages);
// 持久化是副作用,不阻断 UI
saveMessagesBatch(latestMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
}))).catch(error => {
console.error('[MessageManager] 保存最新消息到本地失败:', error);
});
}
} catch (error) {
console.error('[MessageManager] 冷启动拉取最新消息失败:', error);
}
}
// 2. 服务端快照 + 增量同步(架构修复:不再依赖 detail.last_seq 单点判断)
try {
// 2.1 总是先拉一页最新快照,确保 Chat 首次激活有权威来源
const snapshotResp = await messageService.getMessages(conversationId, undefined, undefined, 50);
const snapshotMessages = snapshotResp?.messages || [];
if (snapshotMessages.length > 0) {
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedSnapshot = mergeMessages(existingMessages, snapshotMessages);
this.state.messagesMap.set(conversationId, mergedSnapshot);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: mergedSnapshot,
newMessages: snapshotMessages,
source: 'server_snapshot',
},
timestamp: Date.now(),
});
this.enrichMessagesWithSenderInfo(conversationId, mergedSnapshot);
saveMessagesBatch(snapshotMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
}))).catch(error => {
console.error('[MessageManager] 保存快照消息到本地失败:', error);
});
}
// 2.2 再基于 localMaxSeq 做增量补齐(防止快照窗口不足导致漏更老的新消息)
const snapshotMaxSeq = snapshotMessages.reduce((max, m) => Math.max(max, m.seq || 0), 0);
if (snapshotMaxSeq > localMaxSeq) {
console.log('[MessageManager] 增量补齐:', { localMaxSeq, snapshotMaxSeq });
const incrementalResp = await messageService.getMessages(conversationId, localMaxSeq);
const newMessages = incrementalResp?.messages || [];
if (newMessages.length > 0) {
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedMessages = mergeMessages(existingMessages, newMessages);
this.state.messagesMap.set(conversationId, mergedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: mergedMessages,
newMessages,
source: 'server_incremental',
},
timestamp: Date.now(),
});
this.enrichMessagesWithSenderInfo(conversationId, mergedMessages);
saveMessagesBatch(newMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
}))).catch(error => {
console.error('[MessageManager] 保存增量消息到本地失败:', error);
});
}
}
} catch (error) {
console.error('[MessageManager] 快照/增量同步失败:', error);
}
} else {
// 指定了afterSeq直接请求服务端
const response = await messageService.getMessages(conversationId, afterSeq);
if (response?.messages && response.messages.length > 0) {
const newMessages = response.messages;
// 合并消息
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedMessages = mergeMessages(existingMessages, newMessages);
this.state.messagesMap.set(conversationId, mergedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: mergedMessages,
newMessages,
source: 'server',
},
timestamp: Date.now(),
});
// 异步填充 sender 信息
this.enrichMessagesWithSenderInfo(conversationId, mergedMessages);
// 持久化是副作用,不阻断 UI
saveMessagesBatch(newMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
}))).catch(error => {
console.error('[MessageManager] 保存 afterSeq 消息到本地失败:', error);
});
}
}
} catch (error) {
console.error('[MessageManager] 获取消息失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'fetchMessages', conversationId },
timestamp: Date.now(),
});
} finally {
this.state.loadingMessagesSet.delete(conversationId);
}
}
/**
*
*/
async loadMoreMessages(conversationId: string, beforeSeq: number, limit = 20): Promise<MessageResponse[]> {
console.log('[MessageManager] 加载更多历史消息:', { conversationId, beforeSeq, limit });
try {
// 先从本地获取
const localMessages = await getMessagesBeforeSeq(conversationId, beforeSeq, limit);
if (localMessages.length >= limit) {
// 本地有足够数据
const formattedMessages: MessageResponse[] = localMessages.map(m => ({
id: m.id,
conversation_id: m.conversationId,
sender_id: m.senderId,
seq: m.seq,
segments: m.segments || [],
status: m.status as any,
created_at: m.createdAt,
}));
// 合并到现有消息
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedMessages = [...formattedMessages, ...existingMessages].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, mergedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: { conversationId, messages: mergedMessages, source: 'local_history' },
timestamp: Date.now(),
});
// 异步填充 sender 信息
this.enrichMessagesWithSenderInfo(conversationId, mergedMessages);
return formattedMessages;
}
// 本地数据不足,从服务端获取
const response = await messageService.getMessages(conversationId, undefined, beforeSeq, limit);
if (response?.messages && response.messages.length > 0) {
const serverMessages = response.messages;
// 保存到本地
await saveMessagesBatch(serverMessages.map((m: any) => ({
id: m.id,
conversationId: m.conversation_id || conversationId,
senderId: m.sender_id,
content: m.content,
type: m.type || 'text',
isRead: m.is_read || false,
createdAt: m.created_at,
seq: m.seq,
status: m.status || 'normal',
segments: m.segments,
})));
// 合并消息
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const mergedMessages = [...serverMessages, ...existingMessages].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, mergedMessages);
this.notifySubscribers({
type: 'messages_updated',
payload: { conversationId, messages: mergedMessages, source: 'server_history' },
timestamp: Date.now(),
});
// 异步填充 sender 信息
this.enrichMessagesWithSenderInfo(conversationId, mergedMessages);
return serverMessages;
}
return [];
} catch (error) {
console.error('[MessageManager] 加载更多消息失败:', error);
return [];
}
}
/**
*
*/
async fetchUnreadCount(): Promise<void> {
try {
const [unreadData, systemUnreadData] = await Promise.all([
messageService.getUnreadCount(),
messageService.getSystemUnreadCount(),
]);
this.state.totalUnreadCount = unreadData?.total_unread_count ?? 0;
this.state.systemUnreadCount = systemUnreadData?.unread_count ?? 0;
console.log('[MessageManager] 未读数更新:', {
total: this.state.totalUnreadCount,
system: this.state.systemUnreadCount,
});
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
} catch (error) {
console.error('[MessageManager] 获取未读数失败:', error);
}
}
// ==================== 数据操作方法 ====================
/**
*
*/
async sendMessage(
conversationId: string,
segments: MessageSegment[],
options?: { replyToId?: string }
): Promise<MessageResponse | null> {
console.log('[MessageManager] 发送消息:', { conversationId, segmentCount: segments.length });
try {
// 调用API发送
const response = await messageService.sendMessage(conversationId, {
segments,
reply_to_id: options?.replyToId,
});
if (response) {
// 添加到本地消息列表
const existingMessages = this.state.messagesMap.get(conversationId) || [];
const newMessage: MessageResponse = {
id: response.id,
conversation_id: conversationId,
sender_id: this.getCurrentUserId() || '',
seq: response.seq,
segments,
created_at: new Date().toISOString(),
status: 'normal',
};
const updatedMessages = [...existingMessages, newMessage].sort((a, b) => a.seq - b.seq);
this.state.messagesMap.set(conversationId, updatedMessages);
// 关键:立即广播消息列表更新,确保 ChatScreen 立刻显示新消息
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId,
messages: updatedMessages,
newMessage,
source: 'send',
},
timestamp: Date.now(),
});
// 更新会话
this.updateConversationWithNewMessage(conversationId, newMessage, newMessage.created_at);
// 通知
this.notifySubscribers({
type: 'message_sent',
payload: { conversationId, message: newMessage },
timestamp: Date.now(),
});
// 保存到本地数据库
const textContent = segments?.filter((s: any) => s.type === 'text').map((s: any) => s.data?.text || '').join('') || '';
saveMessage({
id: response.id,
conversationId,
senderId: this.getCurrentUserId() || '',
content: textContent,
type: segments?.find((s: any) => s.type === 'image') ? 'image' : 'text',
isRead: true,
createdAt: newMessage.created_at,
seq: response.seq,
status: 'normal',
segments,
}).catch(console.error);
return newMessage;
}
return null;
} catch (error) {
console.error('[MessageManager] 发送消息失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'sendMessage' },
timestamp: Date.now(),
});
return null;
}
}
/**
*
*
*
* 使 pendingReadMap readStateVersion
* 1.
* 2. API 5
* 3. fetchConversations 使
*/
async markAsRead(conversationId: string, seq: number): Promise<void> {
console.log('[MessageManager] 标记已读:', { conversationId, seq });
const conversation = this.state.conversations.get(conversationId);
if (!conversation) {
console.warn('[MessageManager] 会话不存在:', conversationId);
return;
}
const prevUnreadCount = conversation.unread_count || 0;
const existingRecord = this.pendingReadMap.get(conversationId);
// 使用 seq 去重:同一会话若已上报到更大/相同 seq则跳过重复上报
if (existingRecord && seq <= existingRecord.lastReadSeq) {
console.log('[MessageManager] 已读seq未前进跳过重复上报:', {
conversationId,
seq,
lastReadSeq: existingRecord.lastReadSeq,
});
return;
}
// 清除可能存在的旧定时器
if (existingRecord?.clearTimer) {
clearTimeout(existingRecord.clearTimer);
}
// 递增全局版本号
this.readStateVersion++;
const currentVersion = this.readStateVersion;
// 1. 标记此会话有进行中的已读请求(防止 fetchConversations 覆盖乐观状态)
// 记录时间戳和版本号,用于后续智能合并
this.pendingReadMap.set(conversationId, {
timestamp: Date.now(),
version: currentVersion,
lastReadSeq: seq,
});
console.log('[MessageManager] 已读保护开始:', { conversationId, version: currentVersion });
// 2. 乐观更新本地状态立即反映到UI
const updatedConv: ConversationResponse = {
...conversation,
unread_count: 0,
};
this.state.conversations.set(conversationId, updatedConv);
this.state.totalUnreadCount = Math.max(0, this.state.totalUnreadCount - prevUnreadCount);
this.updateConversationList();
// 3. 更新本地数据库
markConversationAsRead(conversationId).catch(console.error);
// 4. 立即通知所有订阅者
this.notifySubscribers({
type: 'message_read',
payload: {
conversationId,
unreadCount: 0,
totalUnreadCount: this.state.totalUnreadCount,
},
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
// 5. 调用 API完成后设置延迟清除保护
try {
await messageService.markAsRead(conversationId, seq);
console.log('[MessageManager] 标记已读API调用成功:', { conversationId, version: currentVersion });
} catch (error) {
console.error('[MessageManager] 标记已读API失败:', error);
// 失败时回滚状态
this.state.conversations.set(conversationId, conversation);
this.state.totalUnreadCount += prevUnreadCount;
this.updateConversationList();
this.notifySubscribers({
type: 'message_read',
payload: {
conversationId,
unreadCount: prevUnreadCount,
totalUnreadCount: this.state.totalUnreadCount,
},
timestamp: Date.now(),
});
// API 失败时立即清除保护,允许后续 fetch 恢复状态
this.pendingReadMap.delete(conversationId);
return;
}
// API 成功后,设置延迟清除保护
// 这确保在 API 完成后的一段时间内fetchConversations 不会用服务器旧数据覆盖已读状态
const clearTimer = setTimeout(() => {
const record = this.pendingReadMap.get(conversationId);
// 只有版本号匹配时才清除(防止清除新的已读请求的保护)
if (record && record.version === currentVersion) {
this.pendingReadMap.delete(conversationId);
console.log('[MessageManager] 已读保护结束:', { conversationId, version: currentVersion });
}
}, READ_STATE_PROTECTION_DELAY);
// 更新记录保存定时器ID
this.pendingReadMap.set(conversationId, {
timestamp: Date.now(),
version: currentVersion,
lastReadSeq: seq,
clearTimer,
});
}
/**
*
*/
async markAllAsRead(): Promise<void> {
console.log('[MessageManager] 标记所有已读');
// 乐观更新
const prevConversations = new Map(this.state.conversations);
const prevTotalUnread = this.state.totalUnreadCount;
this.state.conversations.forEach(conv => {
conv.unread_count = 0;
});
this.state.totalUnreadCount = 0;
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: 0,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
try {
// 标记系统消息已读
await messageService.markAllSystemMessagesRead();
// 标记所有会话已读
const promises = Array.from(prevConversations.values())
.filter(conv => (conv.unread_count || 0) > 0)
.map(conv => messageService.markAsRead(conv.id, conv.last_seq));
await Promise.all(promises);
} catch (error) {
console.error('[MessageManager] 标记所有已读失败:', error);
// 回滚
this.state.conversations = prevConversations;
this.state.totalUnreadCount = prevTotalUnread;
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: prevTotalUnread,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
}
}
/**
*
*/
async createConversation(userId: string): Promise<ConversationResponse | null> {
console.log('[MessageManager] 创建私聊会话:', userId);
try {
const conversation = await messageService.createConversation(userId);
// 添加到会话列表
this.state.conversations.set(conversation.id, conversation);
this.updateConversationList();
// 通知更新
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
return conversation;
} catch (error) {
console.error('[MessageManager] 创建会话失败:', error);
this.notifySubscribers({
type: 'error',
payload: { error, context: 'createConversation' },
timestamp: Date.now(),
});
return null;
}
}
/**
*
*/
updateConversation(conversationId: string, updates: Partial<ConversationResponse>): void {
const conversation = this.state.conversations.get(conversationId);
if (!conversation) {
console.warn('[MessageManager] 更新会话失败,会话不存在:', conversationId);
return;
}
const updatedConv: ConversationResponse = {
...conversation,
...updates,
};
this.state.conversations.set(conversationId, updatedConv);
this.updateConversationList();
// 通知更新
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
/**
*
*/
removeConversation(conversationId: string): void {
const normalizedConversationId = this.normalizeConversationId(conversationId);
const target = this.state.conversations.get(normalizedConversationId);
const removedUnread = target?.unread_count || 0;
this.state.conversations.delete(normalizedConversationId);
this.state.messagesMap.delete(normalizedConversationId);
this.state.totalUnreadCount = Math.max(0, this.state.totalUnreadCount - removedUnread);
if (this.state.currentConversationId === normalizedConversationId) {
this.state.currentConversationId = null;
}
this.updateConversationList();
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
deleteConversationFromDb(normalizedConversationId).catch(error => {
console.error('[MessageManager] 删除本地会话失败:', error);
});
}
/**
*
*/
setSystemUnreadCount(count: number): void {
console.log('[MessageManager] 设置系统未读数:', count);
this.state.systemUnreadCount = count;
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
}
/**
*
*/
incrementSystemUnreadCount(): void {
this.state.systemUnreadCount += 1;
console.log('[MessageManager] 系统未读数增加:', this.state.systemUnreadCount);
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
}
/**
*
*/
decrementSystemUnreadCount(count = 1): void {
this.state.systemUnreadCount = Math.max(0, this.state.systemUnreadCount - count);
console.log('[MessageManager] 系统未读数减少:', this.state.systemUnreadCount);
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
}
/**
*
*/
updateUnreadCount(conversationId: string, count: number): void {
const conversation = this.state.conversations.get(conversationId);
if (!conversation) {
console.warn('[MessageManager] 更新未读数失败,会话不存在:', conversationId);
return;
}
const prevUnreadCount = conversation.unread_count || 0;
const diff = count - prevUnreadCount;
// 更新会话未读数
const updatedConv: ConversationResponse = {
...conversation,
unread_count: count,
};
this.state.conversations.set(conversationId, updatedConv);
// 更新总未读数
this.state.totalUnreadCount = Math.max(0, this.state.totalUnreadCount + diff);
this.updateConversationList();
// 通知更新
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
/**
*
*/
clearConversations(): void {
console.log('[MessageManager] 清空会话列表');
this.state.conversations.clear();
this.state.conversationList = [];
this.state.totalUnreadCount = 0;
this.state.systemUnreadCount = 0;
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: [] },
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: 0,
systemUnreadCount: 0,
},
timestamp: Date.now(),
});
}
/**
* 使
* MessageManager 使
*/
invalidateCache(type: 'all' | 'list' | 'unread' = 'all'): void {
console.log('[MessageManager] 使缓存失效:', type);
// MessageManager 不使用外部缓存,内存状态始终是最新的
// 此方法仅用于 API 兼容性
}
/**
*
*/
private incrementUnreadCount(conversationId: string): void {
const conversation = this.state.conversations.get(conversationId);
if (conversation) {
const prevUnreadCount = conversation.unread_count || 0;
const newUnreadCount = prevUnreadCount + 1;
console.log('[MessageManager][DEBUG] incrementUnreadCount:', {
conversationId,
prevUnreadCount,
newUnreadCount,
conversationRef: conversation,
conversationInMap: this.state.conversations.get(conversationId),
isSameRef: conversation === this.state.conversations.get(conversationId),
});
// 创建新对象而不是直接修改,确保状态一致性
const updatedConv: ConversationResponse = {
...conversation,
unread_count: newUnreadCount,
};
this.state.conversations.set(conversationId, updatedConv);
this.state.totalUnreadCount += 1;
// 更新会话列表排序因为unread_count变化了
this.updateConversationList();
this.notifySubscribers({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
this.notifySubscribers({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
}
}
// ==================== 状态查询方法 ====================
/**
*
*/
getConversations(): ConversationResponse[] {
return this.state.conversationList;
}
/**
*
*/
getConversation(conversationId: string): ConversationResponse | null {
const normalizedConversationId = this.normalizeConversationId(conversationId);
return this.state.conversations.get(normalizedConversationId) || null;
}
/**
*
*/
getMessages(conversationId: string): MessageResponse[] {
const normalizedConversationId = this.normalizeConversationId(conversationId);
return this.state.messagesMap.get(normalizedConversationId) || [];
}
/**
*
*/
getUnreadCount(): { total: number; system: number } {
return {
total: this.state.totalUnreadCount,
system: this.state.systemUnreadCount,
};
}
/**
* WebSocket连接状态
*/
isConnected(): boolean {
return this.state.isWebSocketConnected;
}
/**
*
*/
isLoading(): boolean {
return this.state.isLoadingConversations;
}
/**
*
*/
isLoadingMessages(conversationId: string): boolean {
const normalizedConversationId = this.normalizeConversationId(conversationId);
return this.state.loadingMessagesSet.has(normalizedConversationId);
}
// ==================== 活动会话管理 ====================
/**
*
*/
setActiveConversation(conversationId: string | null): void {
const normalizedConversationId = conversationId ? this.normalizeConversationId(conversationId) : null;
console.log('[MessageManager] 设置活动会话:', normalizedConversationId);
this.state.currentConversationId = normalizedConversationId;
}
/**
*
* + +
*/
async activateConversation(conversationId: string, options?: { forceSync?: boolean }): Promise<void> {
const normalizedConversationId = this.normalizeConversationId(conversationId);
if (!normalizedConversationId) return;
await this.initialize();
this.setActiveConversation(normalizedConversationId);
// 先下发当前内存快照,确保 UI 有确定性起点
const snapshot = this.getMessages(normalizedConversationId);
this.notifySubscribers({
type: 'messages_updated',
payload: {
conversationId: normalizedConversationId,
messages: [...snapshot],
source: 'activation_snapshot',
},
timestamp: Date.now(),
});
const existingTask = this.activatingConversationTasks.get(normalizedConversationId);
if (existingTask && !options?.forceSync) {
await existingTask;
return;
}
const task = (async () => {
await this.fetchMessages(normalizedConversationId);
})();
this.activatingConversationTasks.set(normalizedConversationId, task);
try {
await task;
} finally {
if (this.activatingConversationTasks.get(normalizedConversationId) === task) {
this.activatingConversationTasks.delete(normalizedConversationId);
}
}
}
/**
* ID
*/
getActiveConversation(): string | null {
return this.state.currentConversationId;
}
// ==================== 订阅机制 ====================
/**
*
*
*/
subscribe(subscriber: MessageSubscriber): () => void {
this.state.subscribers.add(subscriber);
console.log('[MessageManager] 新订阅者,当前数量:', this.state.subscribers.size);
// 立即发送当前状态给新订阅者
subscriber({
type: 'conversations_updated',
payload: { conversations: this.state.conversationList },
timestamp: Date.now(),
});
subscriber({
type: 'unread_count_updated',
payload: {
totalUnreadCount: this.state.totalUnreadCount,
systemUnreadCount: this.state.systemUnreadCount,
},
timestamp: Date.now(),
});
subscriber({
type: 'connection_changed',
payload: { connected: this.state.isWebSocketConnected },
timestamp: Date.now(),
});
// 【修复】如果当前有活动会话,立即发送该会话的消息给新订阅者
// 这样新订阅者可以立即获取当前消息状态,而不需要等待下一次更新
if (this.state.currentConversationId) {
const currentMessages = this.state.messagesMap.get(this.state.currentConversationId);
if (currentMessages && currentMessages.length > 0) {
console.log('[MessageManager][DEBUG] 新订阅者,发送当前活动会话消息:', {
conversationId: this.state.currentConversationId,
messageCount: currentMessages.length,
});
subscriber({
type: 'messages_updated',
payload: {
conversationId: this.state.currentConversationId,
messages: [...currentMessages],
source: 'initial_sync',
},
timestamp: Date.now(),
});
}
}
// 返回取消订阅函数
return () => {
this.state.subscribers.delete(subscriber);
console.log('[MessageManager] 订阅者移除,当前数量:', this.state.subscribers.size);
};
}
/**
*
*/
subscribeToMessages(conversationId: string, callback: (messages: MessageResponse[]) => void): () => void {
const subscriber: MessageSubscriber = (event) => {
if (event.type === 'messages_updated' && event.payload.conversationId === conversationId) {
callback(event.payload.messages);
}
};
return this.subscribe(subscriber);
}
// ==================== 群聊相关状态查询 ====================
/**
*
*/
getTypingUsers(groupId: string): string[] {
return this.state.typingUsersMap.get(groupId) || [];
}
/**
*
*/
isMuted(groupId: string): boolean {
return this.state.mutedStatusMap.get(groupId) || false;
}
/**
*
*/
setMutedStatus(groupId: string, isMuted: boolean): void {
this.state.mutedStatusMap.set(groupId, isMuted);
}
}
// ==================== 单例导出 ====================
export const messageManager = new MessageManager();
// 默认导出
export default messageManager;