为开源项目 go-gin-api 增加后台任务模块
任务管理界面 (WEB)
支持在 WEB 界面
中对任务进行管理,例如:新增任务、编辑任务、启用/禁用任务、手动执行任务 等。
任务的属性包括:
任务名称
执行方式
SHELL
HTTP
表达式(*/5 * * * *)
命令
超时时间(秒)
重试次数
重试间隔(秒)
执行结束是否通知
不通知
失败通知
结束通知
结果关键字匹配通知
状态
备注
当执行方式为 HTTP
时,支持选择请求方式 GET
或 POST
;
当设置执行结束通知时,支持选择通知方式 邮件 或 Webhook
;
当设置邮件通知时,支持输入邮箱地址多个用,分割;
当设置结果关键字匹配通知时,支持输入关键字多个用,分割;
任务增加完成后,会把任务数据持久化到 MySQL
中。
任务调度器
参考了两个开源组件:
最终选择使用 jakecoffman/cron ,后者是在前者的基础上做了一定的补充,例如 AddFunc()
增加了 name
参数,同时还增加了 RemoveJob(name string)
支持删除特定的任务。
// AddFunc adds a func to the Cron to be run on the given schedule. func (c *Cron) AddFunc(spec string, cmd func(), name string) { c.AddJob(spec, FuncJob(cmd), name) } ... // RemoveJob removes a Job from the Cron based on name. func (c *Cron) RemoveJob(name string) { if !c.running { i := c.entries.pos(name) if i == -1 { return } c.entries = c.entries[:i+copy(c.entries[i:], c.entries[i+1:])] return } c.remove <- name }
对其简单封装下就可以使用了,下面是封装的方法,方法的具体实现与使用从 go-gin-api 中获取。
type Server interface { i() // Start 启动 cron 服务 Start() // Stop 停止 cron 服务 Stop() // AddTask 增加定时任务 AddTask(task *cron_task_repo.CronTask) // RemoveTask 删除定时任务 RemoveTask(taskId int) // AddJob 增加定时任务执行的工作内容 AddJob(task *cron_task_repo.CronTask) cron.FuncJob }
当调用 Start()
启动服务时,会把 MySQL
中的任务列表加载到调度器中。
通过以上方法,当从 WEB 界面
操作 新增、编辑、启用/禁用、手动执行任务时,可以动态的对调度器中的任务进行管理。
任务执行器
任务执行器指的是任务真实执行所在的机器。
我的思路是使用 Kafka
的发布与订阅功能,当调度器发现需要执行的任务时,将任务信息写到 Kafka
的 Topic
中,任务执行器订阅相关的 Topic
获取任务信息然后执行任务。
如果任务的执行方式为 HTTP
,那么任务执行器可以为一组集群,专门处理调用 HTTP
任务,这里可以为一个消费组(Consumer Group
),也可适具体场景而定。
如果任务的执行方式为 SHELL
,那么任务执行器必须在脚本所在的宿主机上,这里可以为一个具体任务的消费者。
如果任务量过多,可以考虑根据业务场景多设置几个 Topic
。
在项目中为了便于演示,不写入到 Kafka
中,仅记录了日志。
func (s *server) AddJob(task *cron_task_repo.CronTask) cron.FuncJob { return func() { s.taskCount.Add() defer s.taskCount.Done() msg := fmt.Sprintf("开始执行任务:(%d)%s [%s]", task.Id, task.Name, task.Spec) s.logger.Info(msg) } }
日志目录:/logs/go-gin-api-cron.log
。
小结
本文纯属抛砖引玉,有问题,欢迎批评指正。
http://www.jcdi.cn/