package redis import ( "context" "errors" "fmt" "sync" "time" "carrotskin/pkg/config" "github.com/redis/go-redis/v9" "go.uber.org/zap" ) // Client Redis客户端包装(包含连接池统计和健康检查) type Client struct { *redis.Client // 嵌入原始Redis客户端 logger *zap.Logger // 日志记录器 stats *RedisStats // 连接池统计信息 healthCheckDone chan struct{} // 健康检查完成信号 closeCh chan struct{} // 关闭信号通道 wg sync.WaitGroup // 等待组 } // RedisStats Redis连接池统计信息 type RedisStats struct { PoolSize int // 连接池大小 IdleConns int // 空闲连接数 ActiveConns int // 活跃连接数 StaleConns int // 过期连接数 TotalConns int // 总连接数 LastPingTime time.Time // 上次探活时间 LastPingSuccess bool // 上次探活是否成功 mu sync.RWMutex // 保护统计信息 } // New 创建Redis客户端(带健康检查和优化配置) func New(cfg config.RedisConfig, logger *zap.Logger) (*Client, error) { // 设置默认值 poolSize := cfg.PoolSize if poolSize <= 0 { poolSize = 16 // 优化:提高默认连接池大小 } minIdleConns := cfg.MinIdleConns if minIdleConns <= 0 { minIdleConns = 8 // 优化:提高最小空闲连接数 } maxRetries := cfg.MaxRetries if maxRetries <= 0 { maxRetries = 3 } dialTimeout := cfg.DialTimeout if dialTimeout <= 0 { dialTimeout = 5 * time.Second } readTimeout := cfg.ReadTimeout if readTimeout <= 0 { readTimeout = 3 * time.Second } writeTimeout := cfg.WriteTimeout if writeTimeout <= 0 { writeTimeout = 3 * time.Second } poolTimeout := cfg.PoolTimeout if poolTimeout <= 0 { poolTimeout = 4 * time.Second } connMaxIdleTime := cfg.ConnMaxIdleTime if connMaxIdleTime <= 0 { connMaxIdleTime = 10 * time.Minute // 优化:减少空闲连接超时 } connMaxLifetime := cfg.ConnMaxLifetime if connMaxLifetime <= 0 { connMaxLifetime = 30 * time.Minute // 新增:连接最大生命周期 } // 创建Redis客户端(带优化配置) rdb := redis.NewClient(&redis.Options{ Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), Password: cfg.Password, DB: cfg.Database, PoolSize: poolSize, MinIdleConns: minIdleConns, MaxRetries: maxRetries, DialTimeout: dialTimeout, ReadTimeout: readTimeout, WriteTimeout: writeTimeout, PoolTimeout: poolTimeout, ConnMaxIdleTime: connMaxIdleTime, ConnMaxLifetime: connMaxLifetime, }) // 测试连接(带重试机制) if err := pingWithRetry(rdb, 3, 2*time.Second); err != nil { return nil, fmt.Errorf("Redis连接失败: %w", err) } // 创建客户端包装 client := &Client{ Client: rdb, logger: logger, stats: &RedisStats{}, healthCheckDone: make(chan struct{}), closeCh: make(chan struct{}), } // 初始化统计信息 client.updateStats() // 启动定期健康检查 healthCheckInterval := cfg.HealthCheckInterval if healthCheckInterval <= 0 { healthCheckInterval = 30 * time.Second } client.startHealthCheck(healthCheckInterval) logger.Info("Redis连接成功", zap.String("host", cfg.Host), zap.Int("port", cfg.Port), zap.Int("database", cfg.Database), zap.Int("pool_size", poolSize), zap.Int("min_idle_conns", minIdleConns), ) return client, nil } // pingWithRetry 带重试的Ping操作 func pingWithRetry(rdb *redis.Client, maxRetries int, retryInterval time.Duration) error { var err error for i := 0; i < maxRetries; i++ { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) err = rdb.Ping(ctx).Err() cancel() if err == nil { return nil } if i < maxRetries-1 { time.Sleep(retryInterval) } } return err } // startHealthCheck 启动定期健康检查 func (c *Client) startHealthCheck(interval time.Duration) { c.wg.Add(1) go func() { defer c.wg.Done() ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: c.doHealthCheck() case <-c.closeCh: return } } }() } // doHealthCheck 执行健康检查 func (c *Client) doHealthCheck() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 更新统计信息 c.updateStats() // 执行Ping检查 err := c.Client.Ping(ctx).Err() c.stats.mu.Lock() c.stats.LastPingTime = time.Now() c.stats.LastPingSuccess = err == nil c.stats.mu.Unlock() if err != nil { c.logger.Warn("Redis健康检查失败", zap.Error(err)) } else { c.logger.Debug("Redis健康检查成功") } } // updateStats 更新连接池统计信息 func (c *Client) updateStats() { // 获取底层连接池统计信息 stats := c.Client.PoolStats() c.stats.mu.Lock() c.stats.PoolSize = c.Client.Options().PoolSize c.stats.IdleConns = int(stats.IdleConns) c.stats.ActiveConns = int(stats.TotalConns) - int(stats.IdleConns) c.stats.TotalConns = int(stats.TotalConns) c.stats.StaleConns = int(stats.StaleConns) c.stats.mu.Unlock() } // GetStats 获取连接池统计信息 func (c *Client) GetStats() RedisStats { c.stats.mu.RLock() defer c.stats.mu.RUnlock() return RedisStats{ PoolSize: c.stats.PoolSize, IdleConns: c.stats.IdleConns, ActiveConns: c.stats.ActiveConns, StaleConns: c.stats.StaleConns, TotalConns: c.stats.TotalConns, LastPingTime: c.stats.LastPingTime, LastPingSuccess: c.stats.LastPingSuccess, } } // LogStats 记录连接池状态日志 func (c *Client) LogStats() { stats := c.GetStats() c.logger.Info("Redis连接池状态", zap.Int("pool_size", stats.PoolSize), zap.Int("idle_conns", stats.IdleConns), zap.Int("active_conns", stats.ActiveConns), zap.Int("total_conns", stats.TotalConns), zap.Int("stale_conns", stats.StaleConns), zap.Bool("last_ping_success", stats.LastPingSuccess), ) } // Ping 验证Redis连接(带超时控制) func (c *Client) Ping(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() return c.Client.Ping(ctx).Err() } // Close 关闭Redis连接 func (c *Client) Close() error { // 停止健康检查 close(c.closeCh) c.wg.Wait() c.logger.Info("正在关闭Redis连接") c.LogStats() // 关闭前记录最终状态 return c.Client.Close() } // ===== 以下是封装的便捷方法,用于返回 (value, error) 格式 ===== // Set 设置键值对(带过期时间)- 封装版本 func (c *Client) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error { return c.Client.Set(ctx, key, value, expiration).Err() } // Get 获取键值 - 封装版本 func (c *Client) Get(ctx context.Context, key string) (string, error) { return c.Client.Get(ctx, key).Result() } // Del 删除键 - 封装版本 func (c *Client) Del(ctx context.Context, keys ...string) error { return c.Client.Del(ctx, keys...).Err() } // Exists 检查键是否存在 - 封装版本 func (c *Client) Exists(ctx context.Context, keys ...string) (int64, error) { return c.Client.Exists(ctx, keys...).Result() } // Expire 设置键的过期时间 - 封装版本 func (c *Client) Expire(ctx context.Context, key string, expiration time.Duration) error { return c.Client.Expire(ctx, key, expiration).Err() } // TTL 获取键的剩余过期时间 - 封装版本 func (c *Client) TTL(ctx context.Context, key string) (time.Duration, error) { return c.Client.TTL(ctx, key).Result() } // Incr 自增 - 封装版本 func (c *Client) Incr(ctx context.Context, key string) (int64, error) { return c.Client.Incr(ctx, key).Result() } // Decr 自减 - 封装版本 func (c *Client) Decr(ctx context.Context, key string) (int64, error) { return c.Client.Decr(ctx, key).Result() } // HSet 设置哈希字段 - 封装版本 func (c *Client) HSet(ctx context.Context, key string, values ...interface{}) error { return c.Client.HSet(ctx, key, values...).Err() } // HGet 获取哈希字段 - 封装版本 func (c *Client) HGet(ctx context.Context, key, field string) (string, error) { return c.Client.HGet(ctx, key, field).Result() } // HGetAll 获取所有哈希字段 - 封装版本 func (c *Client) HGetAll(ctx context.Context, key string) (map[string]string, error) { return c.Client.HGetAll(ctx, key).Result() } // HDel 删除哈希字段 - 封装版本 func (c *Client) HDel(ctx context.Context, key string, fields ...string) error { return c.Client.HDel(ctx, key, fields...).Err() } // SAdd 添加集合成员 - 封装版本 func (c *Client) SAdd(ctx context.Context, key string, members ...interface{}) error { return c.Client.SAdd(ctx, key, members...).Err() } // SMembers 获取集合所有成员 - 封装版本 func (c *Client) SMembers(ctx context.Context, key string) ([]string, error) { return c.Client.SMembers(ctx, key).Result() } // SRem 删除集合成员 - 封装版本 func (c *Client) SRem(ctx context.Context, key string, members ...interface{}) error { return c.Client.SRem(ctx, key, members...).Err() } // SIsMember 检查是否是集合成员 - 封装版本 func (c *Client) SIsMember(ctx context.Context, key string, member interface{}) (bool, error) { return c.Client.SIsMember(ctx, key, member).Result() } // ZAdd 添加有序集合成员 - 封装版本 func (c *Client) ZAdd(ctx context.Context, key string, members ...redis.Z) error { return c.Client.ZAdd(ctx, key, members...).Err() } // ZRange 获取有序集合范围内的成员 - 封装版本 func (c *Client) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error) { return c.Client.ZRange(ctx, key, start, stop).Result() } // ZRem 删除有序集合成员 - 封装版本 func (c *Client) ZRem(ctx context.Context, key string, members ...interface{}) error { return c.Client.ZRem(ctx, key, members...).Err() } // Pipeline 创建管道 func (c *Client) Pipeline() redis.Pipeliner { return c.Client.Pipeline() } // TxPipeline 创建事务管道 func (c *Client) TxPipeline() redis.Pipeliner { return c.Client.TxPipeline() } // Nil 检查错误是否为Nil(key不存在) func (c *Client) Nil(err error) bool { return errors.Is(err, redis.Nil) } // GetBytes 从Redis读取key对应的字节数据,统一处理错误 func (c *Client) GetBytes(ctx context.Context, key string) ([]byte, error) { val, err := c.Client.Get(ctx, key).Bytes() if err != nil { if errors.Is(err, redis.Nil) { // 处理key不存在的情况(返回nil,无错误) return nil, nil } return nil, err // 其他错误(如连接失败) } return val, nil }