// Package crontask/cron_manager.go 主管理器文件 package crontask import ( "context" "fmt" "log" "github.com/jmoiron/sqlx" "github.com/robfig/cron/v3" ) type JobHandler func() error type CronManager struct { cron *cron.Cron db *sqlx.DB handlers map[string]JobHandler entries map[int]cron.EntryID cancel context.CancelFunc ctx context.Context } func NewCronManager(db *sqlx.DB) *CronManager { ctx, cancel := context.WithCancel(context.Background()) cm := &CronManager{ cron: cron.New(), db: db, handlers: make(map[string]JobHandler), entries: make(map[int]cron.EntryID), cancel: cancel, ctx: ctx, } // 注册默认任务处理器 cm.registerDefaultHandlers() // 加载数据库中的定时任务 if err := cm.LoadJobs(); err != nil { log.Printf("Failed to load cron jobs: %v", err) } return cm } // Start 开始所有定时任务 OK func (cm *CronManager) Start() { cm.cron.Start() log.Println("定时任务已启动") } // Stop 停止所有定时任务 OK func (cm *CronManager) Stop() error { log.Println("调度器正在停止...") if cm.cron != nil && cm.cancel != nil { // 取消所有任务的上下文 cm.cancel() // 停止调度器 cm.cron.Stop() // 移除所有已注册的任务条目 for _, entryID := range cm.entries { cm.cron.Remove(entryID) } log.Printf("已移除 %d 个任务条目", len(cm.entries)) // 清空条目映射 cm.entries = make(map[int]cron.EntryID) // 重新创建上下文,为下次启动做准备 ctx, cancel := context.WithCancel(context.Background()) cm.ctx = ctx cm.cancel = cancel } log.Println("调度器停止完成") return nil } // RemoveJob 移除指定任务 OK func (cm *CronManager) RemoveJob(jobID int) error { if entryID, exists := cm.entries[jobID]; exists { cm.cron.Remove(entryID) delete(cm.entries, jobID) log.Printf("已移除任务 ID: %d", jobID) } return nil } // AddJobByID 添加指定任务 OK func (cm *CronManager) AddJobByID(jobID int) error { var job CronJob err := cm.db.Get(&job, "SELECT id, name, schedule, handler, enabled FROM admin_cron_jobs WHERE id = ? AND enabled = 1 AND isdel = 0", jobID) if err != nil { return fmt.Errorf("job with ID %d not found", jobID) } // 检查 handler 是否存在 handler, exists := cm.handlers[job.Handler] if !exists { return fmt.Errorf("handler %s not found for job %d", job.Handler, jobID) } // 如果该任务已存在,先移除旧的任务 if entryID, exists := cm.entries[jobID]; exists { cm.cron.Remove(entryID) delete(cm.entries, jobID) log.Printf("移除已存在的任务: %s (ID: %d)", job.Name, jobID) } // 添加新任务 entryID, err := cm.cron.AddFunc(job.Schedule, func() { cm.executeJob(job, handler) }) if err != nil { return err } cm.entries[jobID] = entryID log.Printf("Added job %s with ID %d", job.Name, jobID) return nil } // Restart 重启定时任务管理器(移除当前所有任务并停止后再次启动) OK func (cm *CronManager) Restart() error { log.Println("调度器正在重启...") // 停止当前所有任务 if err := cm.Stop(); err != nil { return err } // 清空处理器和条目映射 cm.handlers = make(map[string]JobHandler) cm.entries = make(map[int]cron.EntryID) // 重新初始化 cron 实例 cm.cron = cron.New() // 重新注册默认处理器 cm.registerDefaultHandlers() // 重新加载数据库中的任务 if err := cm.LoadJobs(); err != nil { return err } // 启动任务(如果未暂停) cm.Start() log.Println("调度器重启完成") return nil } // RefreshJobs 刷新任务列表 OK func (cm *CronManager) RefreshJobs() error { // 停止当前所有任务 if err := cm.Stop(); err != nil { return err } cm.handlers = make(map[string]JobHandler) cm.entries = make(map[int]cron.EntryID) cm.cron = cron.New() cm.registerDefaultHandlers() err := cm.LoadJobs() if err != nil { return err } cm.Start() return nil }