Files
Quincy_admin/crontask/cron_manager.go
2026-03-26 22:13:03 +08:00

172 lines
3.9 KiB
Go

// Package crontask/cron_manager.go 主管理器文件
package crontask
import (
"context"
"fmt"
"log"
"github.com/jmoiron/sqlx"
"github.com/robfig/cron/v3"
)
type JobHandler func() error
type CronManager struct {
cron *cron.Cron
db *sqlx.DB
handlers map[string]JobHandler
entries map[int]cron.EntryID
cancel context.CancelFunc
ctx context.Context
}
func NewCronManager(db *sqlx.DB) *CronManager {
ctx, cancel := context.WithCancel(context.Background())
cm := &CronManager{
cron: cron.New(),
db: db,
handlers: make(map[string]JobHandler),
entries: make(map[int]cron.EntryID),
cancel: cancel,
ctx: ctx,
}
// 注册默认任务处理器
cm.registerDefaultHandlers()
// 加载数据库中的定时任务
if err := cm.LoadJobs(); err != nil {
log.Printf("Failed to load cron jobs: %v", err)
}
return cm
}
// Start 开始所有定时任务 OK
func (cm *CronManager) Start() {
cm.cron.Start()
log.Println("定时任务已启动")
}
// Stop 停止所有定时任务 OK
func (cm *CronManager) Stop() error {
log.Println("调度器正在停止...")
if cm.cron != nil && cm.cancel != nil {
// 取消所有任务的上下文
cm.cancel()
// 停止调度器
cm.cron.Stop()
// 移除所有已注册的任务条目
for _, entryID := range cm.entries {
cm.cron.Remove(entryID)
}
log.Printf("已移除 %d 个任务条目", len(cm.entries))
// 清空条目映射
cm.entries = make(map[int]cron.EntryID)
// 重新创建上下文,为下次启动做准备
ctx, cancel := context.WithCancel(context.Background())
cm.ctx = ctx
cm.cancel = cancel
}
log.Println("调度器停止完成")
return nil
}
// RemoveJob 移除指定任务 OK
func (cm *CronManager) RemoveJob(jobID int) error {
if entryID, exists := cm.entries[jobID]; exists {
cm.cron.Remove(entryID)
delete(cm.entries, jobID)
log.Printf("已移除任务 ID: %d", jobID)
}
return nil
}
// AddJobByID 添加指定任务 OK
func (cm *CronManager) AddJobByID(jobID int) error {
var job CronJob
err := cm.db.Get(&job, "SELECT id, name, schedule, handler, enabled FROM admin_cron_jobs WHERE id = ? AND enabled = 1 AND isdel = 0", jobID)
if err != nil {
return fmt.Errorf("job with ID %d not found", jobID)
}
// 检查 handler 是否存在
handler, exists := cm.handlers[job.Handler]
if !exists {
return fmt.Errorf("handler %s not found for job %d", job.Handler, jobID)
}
// 如果该任务已存在,先移除旧的任务
if entryID, exists := cm.entries[jobID]; exists {
cm.cron.Remove(entryID)
delete(cm.entries, jobID)
log.Printf("移除已存在的任务: %s (ID: %d)", job.Name, jobID)
}
// 添加新任务
entryID, err := cm.cron.AddFunc(job.Schedule, func() {
cm.executeJob(job, handler)
})
if err != nil {
return err
}
cm.entries[jobID] = entryID
log.Printf("Added job %s with ID %d", job.Name, jobID)
return nil
}
// Restart 重启定时任务管理器(移除当前所有任务并停止后再次启动) OK
func (cm *CronManager) Restart() error {
log.Println("调度器正在重启...")
// 停止当前所有任务
if err := cm.Stop(); err != nil {
return err
}
// 清空处理器和条目映射
cm.handlers = make(map[string]JobHandler)
cm.entries = make(map[int]cron.EntryID)
// 重新初始化 cron 实例
cm.cron = cron.New()
// 重新注册默认处理器
cm.registerDefaultHandlers()
// 重新加载数据库中的任务
if err := cm.LoadJobs(); err != nil {
return err
}
// 启动任务(如果未暂停)
cm.Start()
log.Println("调度器重启完成")
return nil
}
// RefreshJobs 刷新任务列表 OK
func (cm *CronManager) RefreshJobs() error {
// 停止当前所有任务
if err := cm.Stop(); err != nil {
return err
}
cm.handlers = make(map[string]JobHandler)
cm.entries = make(map[int]cron.EntryID)
cm.cron = cron.New()
cm.registerDefaultHandlers()
err := cm.LoadJobs()
if err != nil {
return err
}
cm.Start()
return nil
}