实现一个时间轮

// 包 timewheel 提供了一个简单的时间轮调度器的实现。
package timewheel

import (
	"container/list" // 引入 list 包来使用双向链表
	"context"        // 引入 context 包来进行上下文控制
	"time"           // 引入 time 包来处理时间相关的功能
)

// Task 接口定义了需要时间轮执行的任务,它必须实现 Execute 方法。
type Task interface {
	Execute()
}

// TimeNode 结构体表示时间轮上的一个节点。
type TimeNode struct {
	task    Task      // 任务实例
	excTime time.Time // 预定的执行时间
	cycle   int       // 时间轮需要转动多少圈后执行任务
}

// TimeWheel 结构体表示时间轮。
type TimeWheel struct {
	startTime  time.Time     // 时间轮的启动时间
	interval   time.Duration // 时间轮的时间间隔
	currentPos int           // 当前的槽位置
	ticker     *time.Ticker  // 定时触发器
	slots      []*list.List  // 时间轮的槽数组,每个槽点是一个链表
	tasks      chan Task     // 任务channel,用于执行任务
	stop       chan struct{} // 停止channel,用于停止时间轮
	cancel     context.CancelFunc // 上下文取消函数
}

// NewTimeWheel 创建并初始化一个时间轮实例。
func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
	slots := make([]*list.List, slotNum)
	for i := range slots {
		slots[i] = list.New()
	}
	return &TimeWheel{
		startTime:  time.Now().UTC(),
		interval:   interval,
		currentPos: 0,
		ticker:     time.NewTicker(interval),
		slots:      slots,
		tasks:      make(chan Task, 1000),
		stop:       make(chan struct{}),
	}
}

// DefaultTimeWheel 创建一个默认配置的时间轮实例。
func DefaultTimeWheel() *TimeWheel {
	return NewTimeWheel(time.Second, 60)
}

// Start 启动时间轮,使其开始调度任务。
func (t *TimeWheel) Start() {
	ctx := context.Background()
	ctx, cancelFunc := context.WithCancel(ctx)
	t.cancel = cancelFunc
	go t.Loop(ctx)
	go func() {
		for {
			select {
			case task := <-t.tasks: // 当任务channel中有任务时,执行任务
				task.Execute()
			case <-t.stop: // 当接收到停止信号时,停止定时器并退出
				t.ticker.Stop()
				return
			}
		}

	}()
}

// Stop 停止时间轮,取消所有调度。
func (t *TimeWheel) Stop() {
	t.stop <- struct{}{} // 发送停止信号
	t.cancel()           // 调用上下文的取消函数
}

// Loop 是时间轮的核心调度循环。
func (t *TimeWheel) Loop(ctx context.Context) {
	for {
		select {
		case <-ctx.Done(): // 当上下文被取消时退出循环
			return
		case <-t.ticker.C: // 定时器触发时执行
			t.currentPos = (t.currentPos + 1) % len(t.slots) // 计算当前槽位置
			l := t.slots[t.currentPos]
			for e := l.Front(); e != nil; {
				node := e.Value.(*TimeNode)
				if node.cycle > 0 {
					node.cycle-- // 减少圈数
					e = e.Next()
					continue
				}
				t.tasks <- node.task // 将任务发送到任务channel
				next := e.Next()
				l.Remove(e) // 移除已执行的任务
				e = next
			}

		}
	}
}

// AddTask 添加任务到时间轮。
func (t *TimeWheel) AddTask(task Task, delay time.Duration) {
	pos := (t.currentPos + int(delay/t.interval)) % len(t.slots) // 计算任务应该放入的槽位置
	cycle := int(delay/t.interval) / len(t.slots) // 计算任务需要等待的圈数

	if t.slots[pos] == nil {
		t.slots[pos] = list.New()
	}
	t.slots[pos].PushBack(
		&TimeNode{
			task:    task,
			excTime: time.Now().Add(delay),
			cycle:   cycle,
		},
	)
}

说明文档

1. 概述

时间轮 (TimeWheel) 是一个用于任务调度的数据结构,它允许你以固定的时间间隔调度任务。这段Go代码提供了一个简单的时间轮实现。

2. 结构体和接口

  • Task​: 一个接口,所有希望被时间轮调度的任务都应该实现这个接口的 Execute​ 方法。
  • TimeNode​: 代表时间轮上的节点,持有一个任务和任务的执行时间以及剩余圈数。
  • TimeWheel​: 时间轮的主体,包含了时间轮的各项配置,如时间间隔、槽数组等,并且可以开始和停止任务调度。

3. 方法

  • NewTimeWheel​: 创建并返回一个新的时间轮实例。
  • DefaultTimeWheel​: 创建一个拥有默认配置(一秒钟的间隔,60个槽)的时间轮实例。
  • Start​: 启动时间轮,开始调度任务。
  • Stop​: 停止时间轮,取消所有调度任务。
  • Loop​: 作为时间轮的核心调度循环,负责移动槽位置,执行到期的任务。
  • AddTask​: 向时间轮添加任务,可以指定延迟时间。

4. 使用方式

创建时间轮实例,通过 AddTask​ 方法添加任务,然后调用 Start​ 方法开始调度。当不再需要时间轮时,可以调用 Stop​ 方法来停止所有调度。

5. 注意事项

  • 时间轮初始化时会创建一个任务通道 tasks​,其大小为1000,因此它可以在不阻塞的情况下缓存多达1000个待执行的任务。
  • 时间轮的 Loop​ 方法使用了Go的 select​ 语句来高效的处理任务的执行和时间轮的停止。
  • 在停止时间轮时,应确保所有的任务都已经完成或不需要执行,以避免资源泄露或未完成的任务。