refactor: Remove Token management and integrate Redis for authentication
- Deleted the Token model and its repository, transitioning to a Redis-based token management system. - Updated the service layer to utilize Redis for token storage, enhancing performance and scalability. - Refactored the container to remove TokenRepository and integrate the new token service. - Cleaned up the Dockerfile and other files by removing unnecessary whitespace and comments. - Enhanced error handling and logging for Redis initialization and usage.
This commit is contained in:
168
internal/task/runner.go
Normal file
168
internal/task/runner.go
Normal file
@@ -0,0 +1,168 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Task 定义可调度任务
|
||||
type Task interface {
|
||||
Name() string
|
||||
Interval() time.Duration
|
||||
Run(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Runner 简单的周期任务调度器
|
||||
type Runner struct {
|
||||
tasks []Task
|
||||
logger *zap.Logger
|
||||
wg sync.WaitGroup
|
||||
startImmediately bool
|
||||
jitterPercent float64
|
||||
}
|
||||
|
||||
// NewRunner 创建任务调度器
|
||||
func NewRunner(logger *zap.Logger, tasks ...Task) *Runner {
|
||||
return NewRunnerWithOptions(logger, tasks)
|
||||
}
|
||||
|
||||
// RunnerOption 运行器配置项
|
||||
type RunnerOption func(r *Runner)
|
||||
|
||||
// WithStartImmediately 是否启动后立即执行一次(默认 true)
|
||||
func WithStartImmediately(start bool) RunnerOption {
|
||||
return func(r *Runner) {
|
||||
r.startImmediately = start
|
||||
}
|
||||
}
|
||||
|
||||
// WithJitter 为执行间隔增加 0~percent 之间的随机抖动(percent=0 关闭,默认0)
|
||||
// 可降低多个任务同时触发的概率
|
||||
func WithJitter(percent float64) RunnerOption {
|
||||
return func(r *Runner) {
|
||||
if percent < 0 {
|
||||
percent = 0
|
||||
}
|
||||
r.jitterPercent = percent
|
||||
}
|
||||
}
|
||||
|
||||
// NewRunnerWithOptions 支持可选配置的创建函数
|
||||
func NewRunnerWithOptions(logger *zap.Logger, tasks []Task, opts ...RunnerOption) *Runner {
|
||||
r := &Runner{
|
||||
tasks: tasks,
|
||||
logger: logger,
|
||||
startImmediately: true,
|
||||
jitterPercent: 0,
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(r)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// Start 启动所有任务(异步)
|
||||
func (r *Runner) Start(ctx context.Context) {
|
||||
for _, t := range r.tasks {
|
||||
task := t
|
||||
r.wg.Add(1)
|
||||
go func() {
|
||||
defer r.wg.Done()
|
||||
defer r.recoverPanic(task)
|
||||
|
||||
interval := r.normalizeInterval(task.Interval())
|
||||
|
||||
// 可选:立即执行一次
|
||||
if r.startImmediately {
|
||||
r.runOnce(ctx, task)
|
||||
}
|
||||
|
||||
// 周期执行
|
||||
for {
|
||||
wait := r.applyJitter(interval)
|
||||
if !r.wait(ctx, wait) {
|
||||
return
|
||||
}
|
||||
|
||||
// 每轮读取最新的 interval,允许任务动态调整间隔
|
||||
interval = r.normalizeInterval(task.Interval())
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
r.runOnce(ctx, task)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Wait 等待所有任务退出
|
||||
func (r *Runner) Wait() {
|
||||
r.wg.Wait()
|
||||
}
|
||||
|
||||
func (r *Runner) runOnce(ctx context.Context, task Task) {
|
||||
if err := task.Run(ctx); err != nil && r.logger != nil {
|
||||
r.logger.Warn("任务执行失败", zap.String("task", task.Name()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// normalizeInterval 确保间隔为正值
|
||||
func (r *Runner) normalizeInterval(d time.Duration) time.Duration {
|
||||
if d <= 0 {
|
||||
return time.Minute
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// applyJitter 在基础间隔上添加最多 jitterPercent 的随机抖动
|
||||
func (r *Runner) applyJitter(base time.Duration) time.Duration {
|
||||
if r.jitterPercent <= 0 {
|
||||
return base
|
||||
}
|
||||
maxJitter := time.Duration(float64(base) * r.jitterPercent)
|
||||
if maxJitter <= 0 {
|
||||
return base
|
||||
}
|
||||
return base + time.Duration(rand.Int63n(int64(maxJitter)))
|
||||
}
|
||||
|
||||
// wait 封装带 context 的 sleep
|
||||
func (r *Runner) wait(ctx context.Context, d time.Duration) bool {
|
||||
if d <= 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
timer := time.NewTimer(d)
|
||||
defer timer.Stop()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
case <-timer.C:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// recoverPanic 防止任务 panic 导致 goroutine 退出
|
||||
func (r *Runner) recoverPanic(task Task) {
|
||||
if rec := recover(); rec != nil && r.logger != nil {
|
||||
r.logger.Error("任务发生panic",
|
||||
zap.String("task", task.Name()),
|
||||
zap.Any("panic", rec),
|
||||
zap.ByteString("stack", debug.Stack()),
|
||||
)
|
||||
}
|
||||
}
|
||||
65
internal/task/runner_test.go
Normal file
65
internal/task/runner_test.go
Normal file
@@ -0,0 +1,65 @@
|
||||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type mockTask struct {
|
||||
name string
|
||||
interval time.Duration
|
||||
err error
|
||||
runCount *atomic.Int32
|
||||
}
|
||||
|
||||
func (m *mockTask) Name() string { return m.name }
|
||||
func (m *mockTask) Interval() time.Duration { return m.interval }
|
||||
func (m *mockTask) Run(ctx context.Context) error {
|
||||
if m.runCount != nil {
|
||||
m.runCount.Add(1)
|
||||
}
|
||||
return m.err
|
||||
}
|
||||
|
||||
func TestRunner_StartAndWait(t *testing.T) {
|
||||
runCount := &atomic.Int32{}
|
||||
task := &mockTask{name: "ok", interval: 20 * time.Millisecond, runCount: runCount}
|
||||
runner := NewRunner(zap.NewNop(), task)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
runner.Start(ctx)
|
||||
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
cancel()
|
||||
runner.Wait()
|
||||
|
||||
if runCount.Load() == 0 {
|
||||
t.Fatalf("expected task to run at least once")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunner_RunErrorLogged(t *testing.T) {
|
||||
runCount := &atomic.Int32{}
|
||||
task := &mockTask{name: "err", interval: 10 * time.Millisecond, err: errors.New("boom"), runCount: runCount}
|
||||
runner := NewRunner(zap.NewNop(), task)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
runner.Start(ctx)
|
||||
time.Sleep(25 * time.Millisecond)
|
||||
cancel()
|
||||
runner.Wait()
|
||||
|
||||
if runCount.Load() == 0 {
|
||||
t.Fatalf("expected task to be attempted")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user