谈谈定时任务的原理和应用

定时任务的原理

  • 本质是开一个线程,无限循环,检查本地的当前时间,是否符合执行的条件

  • Redis: 比如redis-5.0.8,起了一个线程不停的循环检查,比对当前时间(gettimeofday)判断是否需要执行

  • Golang: 起一个goroutine无限循环,从最小堆中取符合时间的任务

  • Java: 单线程, 无限循环,最小堆: java.util.TimerThread#mainLoop

定时任务类型

  • 定时任务一般分为 本地定时任务 和 分布式定时任务

  • 比如定时加载数据到本地缓存的时候,这时一般就使用本地定时任务

  • 而有些任务定时只需要执行一次, 现在很多服务都不是单点了,这就需要分布式定时任务来进行调度了

分布式定时任务使用示例

Saturn

  • 如果是Java服务,可以很方便的接入Saturn,可以直接通知到服务
  • 但如果是其他语言,则是通过执行shell命令间接实现的

使用域信号通知通知服务

  • 这里Go应用为例子,通过域信号结合shell命令,从而方便使用saturn定时任务

  • client端:提供shell命令,供saturn定时调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const SockPath = "/tmp/notify-os.sock"

func notifyRun(task *notifyTask) {

httpc := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", SockPath)
},
},
}
response, err := httpc.Get("http://unix/" + task.name + "?" + task.args)
if err != nil {
panic(err)
}
var buf = make([]byte, 1024)
response.Body.Read(buf)
logger.Infof("finish notify task: %s, args:%s, resp: %s", task.name, task.args, string(buf))
}

  • server端:提供uds服务,介绍请求触发执行相应的任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

type ser struct{}

func (s ser) ServeHTTP(rw http.ResponseWriter, r *http.Request) {

defer func() {
if err := recover(); err != nil {
logger.Errorf("notify server panic, r: %s, err:%s", r.RequestURI, err)
}
}()

name := r.URL.Path

if strings.HasPrefix(name, "/") {
name = name[1:]
}

if job, ok := jobs[name]; ok {
args := map[string]string{}
for k, v := range r.URL.Query() {
args[k] = v[0]
}
if job.Handler(args) {
rw.Write([]byte("ok"))
logger.Infof("job run success, name:%s, args: %s", name, args)
} else {
rw.Write([]byte("fail"))
logger.Errorf("job run fail, name:%s, args: %s", name, args)
}
return
}

rw.Write([]byte("not exist"))
logger.Warnf("job not exist, name:%s", name)

}
func NotifyServe() {
sockPath := utils.BizConfig("notify_uds_file", "")

if sockPath == "" {
panic("sockPath is nil")
}

logger.Info("Unix NotifyServe ...")
os.Remove(sockPath)
server := http.Server{
Handler: ser{},
}
unixListener, err := net.Listen("unix", sockPath)
if err != nil {
panic(err)
}
server.Serve(unixListener)
}

  • windows 平台没有uds,可以使用普通的HTTP接口代替