first commit
This commit is contained in:
171
crontask/cron_manager.go
Normal file
171
crontask/cron_manager.go
Normal file
@@ -0,0 +1,171 @@
|
||||
// Package crontask/cron_manager.go 主管理器文件
|
||||
package crontask
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
type JobHandler func() error
|
||||
|
||||
type CronManager struct {
|
||||
cron *cron.Cron
|
||||
db *sqlx.DB
|
||||
handlers map[string]JobHandler
|
||||
entries map[int]cron.EntryID
|
||||
cancel context.CancelFunc
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewCronManager(db *sqlx.DB) *CronManager {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cm := &CronManager{
|
||||
cron: cron.New(),
|
||||
db: db,
|
||||
handlers: make(map[string]JobHandler),
|
||||
entries: make(map[int]cron.EntryID),
|
||||
cancel: cancel,
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
// 注册默认任务处理器
|
||||
cm.registerDefaultHandlers()
|
||||
|
||||
// 加载数据库中的定时任务
|
||||
if err := cm.LoadJobs(); err != nil {
|
||||
log.Printf("Failed to load cron jobs: %v", err)
|
||||
}
|
||||
|
||||
return cm
|
||||
}
|
||||
|
||||
// Start 开始所有定时任务 OK
|
||||
func (cm *CronManager) Start() {
|
||||
cm.cron.Start()
|
||||
log.Println("定时任务已启动")
|
||||
}
|
||||
|
||||
// Stop 停止所有定时任务 OK
|
||||
func (cm *CronManager) Stop() error {
|
||||
log.Println("调度器正在停止...")
|
||||
if cm.cron != nil && cm.cancel != nil {
|
||||
// 取消所有任务的上下文
|
||||
cm.cancel()
|
||||
|
||||
// 停止调度器
|
||||
cm.cron.Stop()
|
||||
|
||||
// 移除所有已注册的任务条目
|
||||
for _, entryID := range cm.entries {
|
||||
cm.cron.Remove(entryID)
|
||||
}
|
||||
log.Printf("已移除 %d 个任务条目", len(cm.entries))
|
||||
|
||||
// 清空条目映射
|
||||
cm.entries = make(map[int]cron.EntryID)
|
||||
|
||||
// 重新创建上下文,为下次启动做准备
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cm.ctx = ctx
|
||||
cm.cancel = cancel
|
||||
}
|
||||
log.Println("调度器停止完成")
|
||||
return nil
|
||||
}
|
||||
|
||||
// RemoveJob 移除指定任务 OK
|
||||
func (cm *CronManager) RemoveJob(jobID int) error {
|
||||
if entryID, exists := cm.entries[jobID]; exists {
|
||||
cm.cron.Remove(entryID)
|
||||
delete(cm.entries, jobID)
|
||||
log.Printf("已移除任务 ID: %d", jobID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddJobByID 添加指定任务 OK
|
||||
func (cm *CronManager) AddJobByID(jobID int) error {
|
||||
var job CronJob
|
||||
err := cm.db.Get(&job, "SELECT id, name, schedule, handler, enabled FROM admin_cron_jobs WHERE id = ? AND enabled = 1 AND isdel = 0", jobID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("job with ID %d not found", jobID)
|
||||
}
|
||||
|
||||
// 检查 handler 是否存在
|
||||
handler, exists := cm.handlers[job.Handler]
|
||||
if !exists {
|
||||
return fmt.Errorf("handler %s not found for job %d", job.Handler, jobID)
|
||||
}
|
||||
|
||||
// 如果该任务已存在,先移除旧的任务
|
||||
if entryID, exists := cm.entries[jobID]; exists {
|
||||
cm.cron.Remove(entryID)
|
||||
delete(cm.entries, jobID)
|
||||
log.Printf("移除已存在的任务: %s (ID: %d)", job.Name, jobID)
|
||||
}
|
||||
|
||||
// 添加新任务
|
||||
entryID, err := cm.cron.AddFunc(job.Schedule, func() {
|
||||
cm.executeJob(job, handler)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cm.entries[jobID] = entryID
|
||||
log.Printf("Added job %s with ID %d", job.Name, jobID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Restart 重启定时任务管理器(移除当前所有任务并停止后再次启动) OK
|
||||
func (cm *CronManager) Restart() error {
|
||||
log.Println("调度器正在重启...")
|
||||
|
||||
// 停止当前所有任务
|
||||
if err := cm.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 清空处理器和条目映射
|
||||
cm.handlers = make(map[string]JobHandler)
|
||||
cm.entries = make(map[int]cron.EntryID)
|
||||
|
||||
// 重新初始化 cron 实例
|
||||
cm.cron = cron.New()
|
||||
|
||||
// 重新注册默认处理器
|
||||
cm.registerDefaultHandlers()
|
||||
|
||||
// 重新加载数据库中的任务
|
||||
if err := cm.LoadJobs(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 启动任务(如果未暂停)
|
||||
cm.Start()
|
||||
|
||||
log.Println("调度器重启完成")
|
||||
return nil
|
||||
}
|
||||
|
||||
// RefreshJobs 刷新任务列表 OK
|
||||
func (cm *CronManager) RefreshJobs() error {
|
||||
// 停止当前所有任务
|
||||
if err := cm.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
cm.handlers = make(map[string]JobHandler)
|
||||
cm.entries = make(map[int]cron.EntryID)
|
||||
cm.cron = cron.New()
|
||||
cm.registerDefaultHandlers()
|
||||
err := cm.LoadJobs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cm.Start()
|
||||
return nil
|
||||
}
|
||||
55
crontask/job_handlers.go
Normal file
55
crontask/job_handlers.go
Normal file
@@ -0,0 +1,55 @@
|
||||
// Package crontask/job_handlers 任务处理器文件
|
||||
package crontask
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os/exec"
|
||||
)
|
||||
|
||||
func (cm *CronManager) registerDefaultHandlers() {
|
||||
// mysql数据库备份任务
|
||||
cm.RegisterHandler("backup_data", func() error {
|
||||
err := ExecuteShellCommand("backup_data", "./backup_mysql.sh start")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = ExecuteShellCommand("backup_mysql", "./backup_mysql.sh clean")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
// 系统监控
|
||||
cm.RegisterHandler("system_monitor", func() error {
|
||||
log.Printf("system_monitor !!!")
|
||||
return nil
|
||||
})
|
||||
|
||||
// 执行日志轮转任务
|
||||
cm.RegisterHandler("cleanup_log", func() error {
|
||||
return ExecuteShellCommand("cleanup_log", "./gin-app.sh cleanlogs")
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
// ExecuteShellCommand 执行shell命令的公共方法
|
||||
func ExecuteShellCommand(name, command string) error {
|
||||
cmd := exec.Command("/bin/bash", "-c", command)
|
||||
output, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
log.Printf("[%s] Shell command execution failed: %v", name, err)
|
||||
log.Printf("[%s] Command output: %s", name, output)
|
||||
return err
|
||||
}
|
||||
log.Printf("[%s] Shell command executed successfully", name)
|
||||
log.Printf("[%s] Command output: %s", name, output)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RegisterHandler 注册任务处理器
|
||||
func (cm *CronManager) RegisterHandler(name string, handler JobHandler) {
|
||||
cm.handlers[name] = handler
|
||||
}
|
||||
108
crontask/job_manager.go
Normal file
108
crontask/job_manager.go
Normal file
@@ -0,0 +1,108 @@
|
||||
// Package crontask/job_manager.go 任务管理文件
|
||||
package crontask
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
)
|
||||
|
||||
// LoadJobs 加载数据库中的定时任务
|
||||
func (cm *CronManager) LoadJobs() error {
|
||||
const query = `
|
||||
SELECT id, name, schedule, handler, enabled
|
||||
FROM admin_cron_jobs
|
||||
WHERE enabled = ?
|
||||
AND isdel = 0
|
||||
`
|
||||
|
||||
var jobs []CronJob
|
||||
err := cm.db.Select(&jobs, query, 1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Printf("Loaded %d enabled cron jobs", len(jobs))
|
||||
|
||||
for _, job := range jobs {
|
||||
if err := cm.AddJob(job); err != nil {
|
||||
log.Printf("Failed to add job %s: %v", job.Name, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddJob 添加定时任务
|
||||
func (cm *CronManager) AddJob(job CronJob) error {
|
||||
// 如果该任务已存在,先移除旧的任务
|
||||
if entryID, exists := cm.entries[job.ID]; exists {
|
||||
cm.cron.Remove(entryID)
|
||||
delete(cm.entries, job.ID)
|
||||
log.Printf("移除已存在的任务: %s (ID: %d)", job.Name, job.ID)
|
||||
}
|
||||
|
||||
handler, exists := cm.handlers[job.Handler]
|
||||
if !exists {
|
||||
return fmt.Errorf("handler %s not found", job.Handler)
|
||||
}
|
||||
|
||||
entryID, err := cm.cron.AddFunc(job.Schedule, func() {
|
||||
cm.executeJob(job, handler)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cm.entries[job.ID] = entryID
|
||||
log.Printf("Added job %s with ID %d", job.Name, job.ID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// executeJob 执行定时任务
|
||||
func (cm *CronManager) executeJob(job CronJob, handler JobHandler) {
|
||||
// 检查上下文是否已取消
|
||||
select {
|
||||
case <-cm.ctx.Done():
|
||||
log.Printf("Job %s cancelled before execution", job.Name)
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
status := "success"
|
||||
var message string
|
||||
|
||||
// 执行任务(如果任务支持上下文取消,应传递 cm.ctx)
|
||||
err := handler()
|
||||
if err != nil {
|
||||
status = "failed"
|
||||
message = err.Error()
|
||||
log.Printf("Job %s execution failed: %v", job.Name, err)
|
||||
}
|
||||
|
||||
// 检查任务是否在执行过程中被取消
|
||||
select {
|
||||
case <-cm.ctx.Done():
|
||||
log.Printf("Job %s was cancelled during execution", job.Name)
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// 创建 endTime 变量以便获取其地址
|
||||
endTime := time.Now()
|
||||
|
||||
// 记录执行日志
|
||||
logEntry := CronJobLog{
|
||||
JobID: job.ID,
|
||||
Status: status,
|
||||
Message: message,
|
||||
StartTime: &startTime,
|
||||
EndTime: &endTime,
|
||||
}
|
||||
|
||||
_, err = cm.db.NamedExec(`INSERT INTO admin_cron_job_logs (job_id, status, message, start_time, end_time)
|
||||
VALUES (:job_id, :status, :message, :start_time, :end_time)`, logEntry)
|
||||
if err != nil {
|
||||
log.Printf("Failed to log job execution: %v", err)
|
||||
}
|
||||
}
|
||||
26
crontask/models.go
Normal file
26
crontask/models.go
Normal file
@@ -0,0 +1,26 @@
|
||||
// Package crontask/models.go 数据模型文件
|
||||
package crontask
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type CronJob struct {
|
||||
ID int `db:"id"`
|
||||
Name string `db:"name"`
|
||||
Schedule string `db:"schedule"`
|
||||
Handler string `db:"handler"`
|
||||
Enabled int `db:"enabled"`
|
||||
Description string `db:"description"`
|
||||
CreatedAt *time.Time `db:"create_time"`
|
||||
UpdatedAt *time.Time `db:"update_time"`
|
||||
}
|
||||
|
||||
type CronJobLog struct {
|
||||
ID int `db:"id"`
|
||||
JobID int `db:"job_id"`
|
||||
Status string `db:"status"`
|
||||
Message string `db:"message"`
|
||||
StartTime *time.Time `db:"start_time"`
|
||||
EndTime *time.Time `db:"end_time"`
|
||||
}
|
||||
Reference in New Issue
Block a user