实现一个时间轮
// 包 timewheel 提供了一个简单的时间轮调度器的实现。
package timewheel
import (
"container/list" // 引入 list 包来使用双向链表
"context" // 引入 context 包来进行上下文控制
"time" // 引入 time 包来处理时间相关的功能
)
// Task 接口定义了需要时间轮执行的任务,它必须实现 Execute 方法。
type Task interface {
Execute()
}
// TimeNode 结构体表示时间轮上的一个节点。
type TimeNode struct {
task Task // 任务实例
excTime time.Time // 预定的执行时间
cycle int // 时间轮需要转动多少圈后执行任务
}
// TimeWheel 结构体表示时间轮。
type TimeWheel struct {
startTime time.Time // 时间轮的启动时间
interval time.Duration // 时间轮的时间间隔
currentPos int // 当前的槽位置
ticker *time.Ticker // 定时触发器
slots []*list.List // 时间轮的槽数组,每个槽点是一个链表
tasks chan Task // 任务channel,用于执行任务
stop chan struct{} // 停止channel,用于停止时间轮
cancel context.CancelFunc // 上下文取消函数
}
// NewTimeWheel 创建并初始化一个时间轮实例。
func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
slots := make([]*list.List, slotNum)
for i := range slots {
slots[i] = list.New()
}
return &TimeWheel{
startTime: time.Now().UTC(),
interval: interval,
currentPos: 0,
ticker: time.NewTicker(interval),
slots: slots,
tasks: make(chan Task, 1000),
stop: make(chan struct{}),
}
}
// DefaultTimeWheel 创建一个默认配置的时间轮实例。
func DefaultTimeWheel() *TimeWheel {
return NewTimeWheel(time.Second, 60)
}
// Start 启动时间轮,使其开始调度任务。
func (t *TimeWheel) Start() {
ctx := context.Background()
ctx, cancelFunc := context.WithCancel(ctx)
t.cancel = cancelFunc
go t.Loop(ctx)
go func() {
for {
select {
case task := <-t.tasks: // 当任务channel中有任务时,执行任务
task.Execute()
case <-t.stop: // 当接收到停止信号时,停止定时器并退出
t.ticker.Stop()
return
}
}
}()
}
// Stop 停止时间轮,取消所有调度。
func (t *TimeWheel) Stop() {
t.stop <- struct{}{} // 发送停止信号
t.cancel() // 调用上下文的取消函数
}
// Loop 是时间轮的核心调度循环。
func (t *TimeWheel) Loop(ctx context.Context) {
for {
select {
case <-ctx.Done(): // 当上下文被取消时退出循环
return
case <-t.ticker.C: // 定时器触发时执行
t.currentPos = (t.currentPos + 1) % len(t.slots) // 计算当前槽位置
l := t.slots[t.currentPos]
for e := l.Front(); e != nil; {
node := e.Value.(*TimeNode)
if node.cycle > 0 {
node.cycle-- // 减少圈数
e = e.Next()
continue
}
t.tasks <- node.task // 将任务发送到任务channel
next := e.Next()
l.Remove(e) // 移除已执行的任务
e = next
}
}
}
}
// AddTask 添加任务到时间轮。
func (t *TimeWheel) AddTask(task Task, delay time.Duration) {
pos := (t.currentPos + int(delay/t.interval)) % len(t.slots) // 计算任务应该放入的槽位置
cycle := int(delay/t.interval) / len(t.slots) // 计算任务需要等待的圈数
if t.slots[pos] == nil {
t.slots[pos] = list.New()
}
t.slots[pos].PushBack(
&TimeNode{
task: task,
excTime: time.Now().Add(delay),
cycle: cycle,
},
)
}
说明文档
1. 概述
时间轮 (TimeWheel) 是一个用于任务调度的数据结构,它允许你以固定的时间间隔调度任务。这段Go代码提供了一个简单的时间轮实现。
2. 结构体和接口
-
Task
: 一个接口,所有希望被时间轮调度的任务都应该实现这个接口的Execute
方法。 -
TimeNode
: 代表时间轮上的节点,持有一个任务和任务的执行时间以及剩余圈数。 -
TimeWheel
: 时间轮的主体,包含了时间轮的各项配置,如时间间隔、槽数组等,并且可以开始和停止任务调度。
3. 方法
-
NewTimeWheel
: 创建并返回一个新的时间轮实例。 -
DefaultTimeWheel
: 创建一个拥有默认配置(一秒钟的间隔,60个槽)的时间轮实例。 -
Start
: 启动时间轮,开始调度任务。 -
Stop
: 停止时间轮,取消所有调度任务。 -
Loop
: 作为时间轮的核心调度循环,负责移动槽位置,执行到期的任务。 -
AddTask
: 向时间轮添加任务,可以指定延迟时间。
4. 使用方式
创建时间轮实例,通过 AddTask
方法添加任务,然后调用 Start
方法开始调度。当不再需要时间轮时,可以调用 Stop
方法来停止所有调度。
5. 注意事项
- 时间轮初始化时会创建一个任务通道
tasks
,其大小为1000,因此它可以在不阻塞的情况下缓存多达1000个待执行的任务。 - 时间轮的
Loop
方法使用了Go的select
语句来高效的处理任务的执行和时间轮的停止。 - 在停止时间轮时,应确保所有的任务都已经完成或不需要执行,以避免资源泄露或未完成的任务。