golang cron v3 定时任务实现

golang cron v3 定时任务实现

Anderyly
2021-07-10 / 0 评论 / 12 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2024年02月27日,已超过221天没有更新,若内容或图片失效,请留言反馈。

最近需要在 golang 中使用定时任务功能,用到了一个 cron 库,当前是 v3 版本,网上挺多都是 v2 的教程,记录一下使用方法。

在旧版本的库中默认的 cron 表达式不是标准格式,第一个位是秒级的定义。
现在 v3 版本直接用标准 cron 表示式即可,主要看 godoc 文档部分

cron 表示式


推荐使用在线工具来看自己写的 cron 对不对,简单的表达式直接写一般问题不大。这里推荐 crontab.guru,可以通过可视化的方式来查看你编写的定时规则。

以下内容摘自 维基百科-Cron

文件格式說明

┌──分鐘(0 - 59)
│  ┌──小時(0 - 23)
│  │  ┌──日(1 - 31)
│  │  │  ┌─月(1 - 12)
│  │  │  │  ┌─星期(0 - 6,表示从周日到周六)
│  │  │  │  │
*  *  *  *  * 被執行的命令

注:
在某些系统里,星期日也可以为 7

不很直观的用法:如果日期和星期同时被设定,那么其中的一个条件被满足时,指令便会被执行。请参考下例。
前 5 个域称之分时日月周,可方便个人记忆。
从第六个域起,指明要执行的命令。

安装


现在都是用的 Go module 进行模块的管理,直接在 goland 中使用 alt + 回车即可同步对应的包 “github.com/robfig/cron/v3”

使用 go get 安装方式如下

go get github.com/robfig/cron/v3

创建配置
建议使用标准的 cron 表达式

// 使用默认的配置
c := cron.New()
// 可以配置如果当前任务正在进行,那么跳过
c := cron.New(cron.WithChain(cron.SkipIfStillRunning(logger)))
// 官方也提供了旧版本的秒级的定义,这个注意你需要传入的 cron 表达式不再是标准 cron 表达式
c := cron.New(cron.WithSeconds())
在上面的代码中出现了一个 logger,我使用的是 logrus,在源码中可以看到 cron 需要的 logger 的定义

// Logger is the interface used in this package for logging, so that any backend
// can be plugged in. It is a subset of the github.com/go-logr/logr interface.
type Logger interface {
    // Info logs routine messages about cron's operation.
    Info(msg string, keysAndValues ...interface{})
    // Error logs an error condition.
    Error(err error, msg string, keysAndValues ...interface{})
}
那么我们定义了一个 Clog 结构体,实现对应的接口就行了

import (
    "github.com/robfig/cron/v3"
    log "github.com/sirupsen/logrus"
)

type CLog struct {
    clog *log.Logger
}

func (l *CLog) Info(msg string, keysAndValues ...interface{}) {
    l.clog.WithFields(log.Fields{
        "data": keysAndValues,
    }).Info(msg)
}

func (l *CLog) Error(err error, msg string, keysAndValues ...interface{}) {
    l.clog.WithFields(log.Fields{
        "msg":  msg,
        "data": keysAndValues,
    }).Warn(msg)
}
添加任务
启动定时任务有两种方法,分别是传入函数和传入任务。

传入函数
我们看到文档中给出的范例,可以看到任务的添加是通过 c.AddFunc() 这个函数来进行的,直接传入一个函数即可,可以看到定义是 func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error)。

# Runs at 6am in time.Local
cron.New().AddFunc("0 6 * * ?", ...)

# Runs at 6am in America/New_York
nyc, _ := time.LoadLocation("America/New_York")
c := cron.New(cron.WithLocation(nyc))
c.AddFunc("0 6 * * ?", ...)

// AddFunc adds a func to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
    return c.AddJob(spec, FuncJob(cmd))
}

举个例子,如果你传入的任务仅仅就是一个简单函数进行执行,使用 AddFunc() 就行了,同时也可以通过闭包来引用函数外面的变量,下面是一个完整的例子。

package main

import (
    "fmt"
    "github.com/robfig/cron/v3"
    "time"
)

func TestCron() {
    c := cron.New()
    i := 1
    c.AddFunc("*/1 * * * *", func() {
        fmt.Println("每分钟执行一次", i)
        i++
    })
    c.Start()
    time.Sleep(time.Minute * 5)
}
func main() {
    TestCron()
}

/* output

每分钟执行一次 1
每分钟执行一次 2
每分钟执行一次 3
每分钟执行一次 4
每分钟执行一次 5
*/
传入任务
但是如果我们定义的任务里面还需要留存其他信息呢,可以使用 AddJob() 这个函数,追溯一下源码定义。

// AddJob adds a Job to the Cron to be run on the given schedule.
// The spec is parsed using the time zone of this Cron instance as the default.
// An opaque ID is returned that can be used to later remove it.
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
    schedule, err := c.parser.Parse(spec)
    if err != nil {
        return 0, err
    }
    return c.Schedule(schedule, cmd), nil
}

// 可以看到需要传入两个参数,`spec` 就是 cron 表达式,Job 类型我们好像还没见过,点进去看
// Job is an interface for submitted cron jobs.
type Job interface {
    Run()
}
现在知道我们的定时任务只需要实现 Run() 这个函数就行了,所以我们可以给出自己的 Job 定义

type Job struct {
    A    int      `json:"a"`
    B    int      `json:"b"`
    C    string   `json:"c"`
    Shut chan int `json:"shut"`
}

// implement Run() interface to start rsync job
func (this Job) Run() {
    this.A++
    fmt.Printf("A: %d\n", this.A)
    *this.B++
    fmt.Printf("B: %d\n", *this.B)
    *this.C += "str"
    fmt.Printf("C: %s\n", *this.C)
}

代码例子


给出一个完整代码的示例,我封装了一个 StartJob 函数,方便自己的管理,当然在 c.AddJob() 处可添加多个任务,都会 cron 的要求执行

package main

import (
    "fmt"
    "github.com/robfig/cron/v3"
    log "github.com/sirupsen/logrus"
    "time"
)

// 定时任务计划
/*
- spec,传入 cron 时间设置
- job,对应执行的任务
*/
func StartJob(spec string, job Job) {
    logger := &CLog{clog: log.New()}
    logger.clog.SetFormatter(&log.TextFormatter{
        FullTimestamp:   true,
        TimestampFormat: "2006-01-02 15:04:05",
    })
    c := cron.New(cron.WithChain(cron.SkipIfStillRunning(logger)))

    c.AddJob(spec, &job)

    // 启动执行任务
    c.Start()
    // 退出时关闭计划任务
    defer c.Stop()

    // 如果使用 select{} 那么就一直会循环
    select {
    case <-job.Shut:
        return
    }
}

func StopJob(shut chan int) {
    shut <- 0
}

type CLog struct {
    clog *log.Logger
}

func (l *CLog) Info(msg string, keysAndValues ...interface{}) {
    l.clog.WithFields(log.Fields{
        "data": keysAndValues,
    }).Info(msg)
}

func (l *CLog) Error(err error, msg string, keysAndValues ...interface{}) {
    l.clog.WithFields(log.Fields{
        "msg":  msg,
        "data": keysAndValues,
    }).Warn(msg)
}

type Job struct {
    A    int      `json:"a"`
    B    int      `json:"b"`
    C    string   `json:"c"`
    Shut chan int `json:"shut"`
}

// implement Run() interface to start job
func (j *Job) Run() {
    j.A++
    fmt.Printf("A: %d\n", j.A)
    j.B++
    fmt.Printf("B: %d\n", j.B)
    j.C += "str"
    fmt.Printf("C: %s\n", j.C)
}

func main() {
    job1 := Job{
        A:    0,
        B:    1,
        C:    "",
        Shut: make(chan int, 1),
    }
    // 每分钟执行一次
    go StartJob("*/1 * * * *", job1)
    time.Sleep(time.Minute * 3)
}
/*
output

A: 1
B: 2
C: str
A: 2
B: 3
C: strstr
A: 3
B: 4
C: strstrstr
*/

总结


这个 cron 库的 v3 版本直接使用标准 cron 表达式即可
启动 cron 任务有传入函数和传入任务两种方法,如果需要管理建议实现自己的 Job 类
参考资料
robfig/cron
godoc-cron
crontab.guru

0

评论 (0)

取消