Files
cellbot/internal/engine/scheduler.go
lafay 64cd81b7f1 feat: enhance event handling and add scheduling capabilities
- Introduced a new scheduler to manage timed tasks within the event dispatcher.
- Updated the dispatcher to support the new scheduler, allowing for improved event processing.
- Enhanced action serialization in the OneBot11 adapter to convert message chains to the appropriate format.
- Added new dependencies for cron scheduling and other indirect packages in go.mod and go.sum.
- Improved logging for event publishing and handler matching, providing better insights during execution.
- Refactored plugin loading to include scheduled job management.
2026-01-05 04:33:30 +08:00

632 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package engine
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
)
// Job 定时任务接口
type Job interface {
// ID 返回任务唯一标识
ID() string
// Start 启动任务
Start(ctx context.Context) error
// Stop 停止任务
Stop() error
// IsRunning 检查任务是否正在运行
IsRunning() bool
// NextRun 返回下次执行时间
NextRun() time.Time
}
// JobFunc 任务执行函数类型
type JobFunc func(ctx context.Context) error
// Scheduler 定时任务调度器
type Scheduler struct {
jobs map[string]Job
mu sync.RWMutex
logger *zap.Logger
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
running int32
}
// NewScheduler 创建新的调度器
func NewScheduler(logger *zap.Logger) *Scheduler {
ctx, cancel := context.WithCancel(context.Background())
return &Scheduler{
jobs: make(map[string]Job),
logger: logger.Named("scheduler"),
ctx: ctx,
cancel: cancel,
}
}
// Start 启动调度器
func (s *Scheduler) Start() error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return fmt.Errorf("scheduler is already running")
}
s.mu.RLock()
defer s.mu.RUnlock()
// 启动所有任务
for id, job := range s.jobs {
if err := job.Start(s.ctx); err != nil {
s.logger.Error("Failed to start job",
zap.String("job_id", id),
zap.Error(err))
continue
}
s.logger.Info("Job started", zap.String("job_id", id))
}
s.logger.Info("Scheduler started", zap.Int("job_count", len(s.jobs)))
return nil
}
// Stop 停止调度器
func (s *Scheduler) Stop() error {
if !atomic.CompareAndSwapInt32(&s.running, 1, 0) {
return fmt.Errorf("scheduler is not running")
}
s.cancel()
s.mu.RLock()
defer s.mu.RUnlock()
// 停止所有任务
for id, job := range s.jobs {
if err := job.Stop(); err != nil {
s.logger.Error("Failed to stop job",
zap.String("job_id", id),
zap.Error(err))
continue
}
s.logger.Info("Job stopped", zap.String("job_id", id))
}
s.wg.Wait()
s.logger.Info("Scheduler stopped")
return nil
}
// AddJob 添加任务
func (s *Scheduler) AddJob(job Job) error {
s.mu.Lock()
defer s.mu.Unlock()
id := job.ID()
if _, exists := s.jobs[id]; exists {
return fmt.Errorf("job with id %s already exists", id)
}
s.jobs[id] = job
// 如果调度器正在运行,立即启动任务
if atomic.LoadInt32(&s.running) == 1 {
if err := job.Start(s.ctx); err != nil {
delete(s.jobs, id)
return fmt.Errorf("failed to start job: %w", err)
}
s.logger.Info("Job added and started", zap.String("job_id", id))
} else {
s.logger.Info("Job added", zap.String("job_id", id))
}
return nil
}
// RemoveJob 移除任务
func (s *Scheduler) RemoveJob(id string) error {
s.mu.Lock()
defer s.mu.Unlock()
job, exists := s.jobs[id]
if !exists {
return fmt.Errorf("job with id %s not found", id)
}
if err := job.Stop(); err != nil {
s.logger.Error("Failed to stop job during removal",
zap.String("job_id", id),
zap.Error(err))
}
delete(s.jobs, id)
s.logger.Info("Job removed", zap.String("job_id", id))
return nil
}
// GetJob 获取任务
func (s *Scheduler) GetJob(id string) (Job, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
job, exists := s.jobs[id]
return job, exists
}
// GetAllJobs 获取所有任务
func (s *Scheduler) GetAllJobs() map[string]Job {
s.mu.RLock()
defer s.mu.RUnlock()
result := make(map[string]Job, len(s.jobs))
for id, job := range s.jobs {
result[id] = job
}
return result
}
// IsRunning 检查调度器是否正在运行
func (s *Scheduler) IsRunning() bool {
return atomic.LoadInt32(&s.running) == 1
}
// ============================================================================
// Job 实现
// ============================================================================
// CronJob 基于 Cron 表达式的任务
type CronJob struct {
id string
spec string
handler JobFunc
cron *cron.Cron
logger *zap.Logger
running int32
nextRun time.Time
mu sync.RWMutex
}
// NewCronJob 创建 Cron 任务
func NewCronJob(id, spec string, handler JobFunc, logger *zap.Logger) (*CronJob, error) {
parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
c := cron.New(cron.WithParser(parser), cron.WithChain(cron.Recover(cron.DefaultLogger)))
job := &CronJob{
id: id,
spec: spec,
handler: handler,
cron: c,
logger: logger.Named("cron-job").With(zap.String("job_id", id)),
}
// 添加任务到 cron
_, err := c.AddFunc(spec, func() {
ctx := context.Background()
if err := handler(ctx); err != nil {
job.logger.Error("Cron job execution failed", zap.Error(err))
}
// 更新下次执行时间
entries := c.Entries()
job.mu.Lock()
if len(entries) > 0 {
// 找到最近的执行时间
next := entries[0].Next
for _, entry := range entries {
if entry.Next.Before(next) {
next = entry.Next
}
}
job.nextRun = next
}
job.mu.Unlock()
})
if err != nil {
return nil, fmt.Errorf("invalid cron spec: %w", err)
}
// 计算初始下次执行时间(需要先启动 cron 才能计算)
// 这里先设置为零值,在 Start 时再计算
job.mu.Lock()
job.nextRun = time.Time{}
job.mu.Unlock()
return job, nil
}
func (j *CronJob) ID() string {
return j.id
}
func (j *CronJob) Start(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&j.running, 0, 1) {
return fmt.Errorf("job is already running")
}
j.cron.Start()
j.logger.Info("Cron job started", zap.String("spec", j.spec))
// 更新下次执行时间
entries := j.cron.Entries()
if len(entries) > 0 {
j.mu.Lock()
// 找到最近的执行时间
next := entries[0].Next
for _, entry := range entries {
if entry.Next.Before(next) {
next = entry.Next
}
}
j.nextRun = next
j.mu.Unlock()
}
return nil
}
func (j *CronJob) Stop() error {
if !atomic.CompareAndSwapInt32(&j.running, 1, 0) {
return fmt.Errorf("job is not running")
}
ctx := j.cron.Stop()
<-ctx.Done()
j.logger.Info("Cron job stopped")
return nil
}
func (j *CronJob) IsRunning() bool {
return atomic.LoadInt32(&j.running) == 1
}
func (j *CronJob) NextRun() time.Time {
j.mu.RLock()
defer j.mu.RUnlock()
return j.nextRun
}
// IntervalJob 固定间隔的任务
type IntervalJob struct {
id string
interval time.Duration
handler JobFunc
logger *zap.Logger
running int32
nextRun time.Time
mu sync.RWMutex
ticker *time.Ticker
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewIntervalJob 创建固定间隔任务
func NewIntervalJob(id string, interval time.Duration, handler JobFunc, logger *zap.Logger) *IntervalJob {
return &IntervalJob{
id: id,
interval: interval,
handler: handler,
logger: logger.Named("interval-job").With(zap.String("job_id", id)),
}
}
func (j *IntervalJob) ID() string {
return j.id
}
func (j *IntervalJob) Start(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&j.running, 0, 1) {
return fmt.Errorf("job is already running")
}
j.ctx, j.cancel = context.WithCancel(ctx)
j.ticker = time.NewTicker(j.interval)
j.mu.Lock()
j.nextRun = time.Now().Add(j.interval)
j.mu.Unlock()
j.wg.Add(1)
go j.run()
j.logger.Info("Interval job started", zap.Duration("interval", j.interval))
return nil
}
func (j *IntervalJob) run() {
defer j.wg.Done()
// 立即执行一次(可选,根据需求调整)
// if err := j.handler(j.ctx); err != nil {
// j.logger.Error("Interval job execution failed", zap.Error(err))
// }
for {
select {
case <-j.ticker.C:
j.mu.Lock()
j.nextRun = time.Now().Add(j.interval)
j.mu.Unlock()
if err := j.handler(j.ctx); err != nil {
j.logger.Error("Interval job execution failed", zap.Error(err))
}
case <-j.ctx.Done():
return
}
}
}
func (j *IntervalJob) Stop() error {
if !atomic.CompareAndSwapInt32(&j.running, 1, 0) {
return fmt.Errorf("job is not running")
}
if j.cancel != nil {
j.cancel()
}
if j.ticker != nil {
j.ticker.Stop()
}
j.wg.Wait()
j.logger.Info("Interval job stopped")
return nil
}
func (j *IntervalJob) IsRunning() bool {
return atomic.LoadInt32(&j.running) == 1
}
func (j *IntervalJob) NextRun() time.Time {
j.mu.RLock()
defer j.mu.RUnlock()
return j.nextRun
}
// OnceJob 单次延迟执行的任务
type OnceJob struct {
id string
delay time.Duration
handler JobFunc
logger *zap.Logger
running int32
nextRun time.Time
mu sync.RWMutex
timer *time.Timer
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// NewOnceJob 创建单次延迟执行任务
func NewOnceJob(id string, delay time.Duration, handler JobFunc, logger *zap.Logger) *OnceJob {
return &OnceJob{
id: id,
delay: delay,
handler: handler,
logger: logger.Named("once-job").With(zap.String("job_id", id)),
}
}
func (j *OnceJob) ID() string {
return j.id
}
func (j *OnceJob) Start(ctx context.Context) error {
if !atomic.CompareAndSwapInt32(&j.running, 0, 1) {
return fmt.Errorf("job is already running")
}
j.ctx, j.cancel = context.WithCancel(ctx)
j.timer = time.NewTimer(j.delay)
j.mu.Lock()
j.nextRun = time.Now().Add(j.delay)
j.mu.Unlock()
j.wg.Add(1)
go j.run()
j.logger.Info("Once job started", zap.Duration("delay", j.delay))
return nil
}
func (j *OnceJob) run() {
defer j.wg.Done()
select {
case <-j.timer.C:
if err := j.handler(j.ctx); err != nil {
j.logger.Error("Once job execution failed", zap.Error(err))
}
atomic.StoreInt32(&j.running, 0)
case <-j.ctx.Done():
if !j.timer.Stop() {
<-j.timer.C
}
return
}
}
func (j *OnceJob) Stop() error {
if !atomic.CompareAndSwapInt32(&j.running, 1, 0) {
return fmt.Errorf("job is not running")
}
if j.cancel != nil {
j.cancel()
}
if j.timer != nil {
if !j.timer.Stop() {
<-j.timer.C
}
}
j.wg.Wait()
j.logger.Info("Once job stopped")
return nil
}
func (j *OnceJob) IsRunning() bool {
return atomic.LoadInt32(&j.running) == 1
}
func (j *OnceJob) NextRun() time.Time {
j.mu.RLock()
defer j.mu.RUnlock()
return j.nextRun
}
// ============================================================================
// 全局调度器 API链式风格延迟注册
// ============================================================================
var (
globalJobRegistry = make([]JobBuilder, 0)
globalJobMu sync.RWMutex
jobCounter int64
)
// JobBuilder 任务构建器接口(延迟注册)
type JobBuilder interface {
// Build 构建任务实例(由依赖注入系统调用)
Build(logger *zap.Logger) (Job, error)
}
// generateJobID 生成任务 ID
func generateJobID(prefix string) string {
counter := atomic.AddInt64(&jobCounter, 1)
return fmt.Sprintf("%s_%d", prefix, counter)
}
// CronJobBuilder Cron 任务构建器
type CronJobBuilder struct {
id string
spec string
handler JobFunc
}
// Cron 创建 Cron 任务构建器(在 init 函数中调用)
func Cron(spec string) *CronJobBuilder {
return &CronJobBuilder{
id: generateJobID("cron"),
spec: spec,
}
}
// Handle 设置处理函数并注册到全局注册表(延迟注册)
func (b *CronJobBuilder) Handle(handler JobFunc) {
b.handler = handler
if b.handler == nil {
panic("scheduler: handler cannot be nil")
}
globalJobMu.Lock()
defer globalJobMu.Unlock()
globalJobRegistry = append(globalJobRegistry, b)
}
// Build 构建 Cron 任务
func (b *CronJobBuilder) Build(logger *zap.Logger) (Job, error) {
return NewCronJob(b.id, b.spec, b.handler, logger)
}
// IntervalJobBuilder 固定间隔任务构建器
type IntervalJobBuilder struct {
id string
interval time.Duration
handler JobFunc
}
// Interval 创建固定间隔任务构建器(在 init 函数中调用)
func Interval(interval time.Duration) *IntervalJobBuilder {
return &IntervalJobBuilder{
id: generateJobID("interval"),
interval: interval,
}
}
// Handle 设置处理函数并注册到全局注册表(延迟注册)
func (b *IntervalJobBuilder) Handle(handler JobFunc) {
b.handler = handler
if b.handler == nil {
panic("scheduler: handler cannot be nil")
}
globalJobMu.Lock()
defer globalJobMu.Unlock()
globalJobRegistry = append(globalJobRegistry, b)
}
// Build 构建固定间隔任务
func (b *IntervalJobBuilder) Build(logger *zap.Logger) (Job, error) {
return NewIntervalJob(b.id, b.interval, b.handler, logger), nil
}
// OnceJobBuilder 单次延迟任务构建器
type OnceJobBuilder struct {
id string
delay time.Duration
handler JobFunc
}
// Once 创建单次延迟任务构建器(在 init 函数中调用)
func Once(delay time.Duration) *OnceJobBuilder {
return &OnceJobBuilder{
id: generateJobID("once"),
delay: delay,
}
}
// Handle 设置处理函数并注册到全局注册表(延迟注册)
func (b *OnceJobBuilder) Handle(handler JobFunc) {
b.handler = handler
if b.handler == nil {
panic("scheduler: handler cannot be nil")
}
globalJobMu.Lock()
defer globalJobMu.Unlock()
globalJobRegistry = append(globalJobRegistry, b)
}
// Build 构建单次延迟任务
func (b *OnceJobBuilder) Build(logger *zap.Logger) (Job, error) {
return NewOnceJob(b.id, b.delay, b.handler, logger), nil
}
// LoadAllJobs 加载所有已注册的任务(由依赖注入系统调用)
func LoadAllJobs(scheduler *Scheduler, logger *zap.Logger) error {
globalJobMu.RLock()
defer globalJobMu.RUnlock()
for i, builder := range globalJobRegistry {
job, err := builder.Build(logger)
if err != nil {
logger.Error("Failed to build job",
zap.Int("index", i),
zap.Error(err))
continue
}
if err := scheduler.AddJob(job); err != nil {
logger.Error("Failed to add job to scheduler",
zap.String("job_id", job.ID()),
zap.Error(err))
continue
}
logger.Debug("Job loaded",
zap.String("job_id", job.ID()))
}
logger.Info("All scheduled jobs loaded",
zap.Int("job_count", len(globalJobRegistry)))
return nil
}