Go语言中时间轮的实现

2021-02-23 网络
浏览
[科技新闻]Go语言中时间轮的实现

原题目:Go语言中时间轮的实现

作者 | luozhiyun 责编 | 张文

泉源 | https://www.luozhiyun.com/archives/444

最近在工作中有一个需求,简朴来说就是在短时间内会建立上百万个准时义务,建立的时刻会将对应的金额相加,防止超售,需要过半个小时再去核对数据,若是数据对不上就需要将加上的金额再减回去。

这个需求若是用 Go 内置的 Timer 来做的话性能对照低下,由于 Timer 是使用最小堆来实现的,建立和删除的时间复杂度都为 O(log n)。若是使用时间轮的话则是 O(1)性能会好许多。

对于时间轮来说,我以前写过一篇 java 版的时间轮算法剖析:https://www.luozhiyun.com/archives/59,这次来看看 Go 语言的时间轮实现,顺便人人有兴趣的也可以对比一下两者的区别,以及我写文章的水平和一年多前有没有提升,哈哈哈。

时间轮的运用其实是异常的普遍的,在 Netty、Akka、Quartz、ZooKeeper、Kafka 等组件中都存在时间轮的踪影。下面用 Go 实现的时间轮是以 Kafka 的代码为原型来实现的。

完整代码:https://github.com/devYun/timingwheel。

先容

简朴时间轮

在时间轮中存储义务的是一个环形行列,底层接纳数组实现,数组中的每个元素可以存放一个准时义务列表。准时义务列表是一个环形的双向链表,链表中的每一项示意的都是准时义务项,其中封装了真正的准时义务。

时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是牢固的,可用 wheelSize 来示意,那么整个时间轮的总体时间跨度(interval)可以通过公式 tickMs×wheelSize 盘算得出。

时间轮另有一个 表盘指针(currentTime),用来示意时间轮当前所处的时间,currentTime 是 tickMs 的整数倍。currentTime 指向的地方是示意到期的时间格,示意需要处置的时间格所对应的链表中的所有义务。

如下图是一个 tickMs 为 1s,wheelSize 即是 10 的时间轮,每一格内里放的是一个准时义务链表,链表内里存有真正的义务项:

taskList

初始情况下表盘指针 currentTime 指向时间格0,若时间轮的 tickMs 为 1ms 且 wheelSize 即是10,那么 interval 则即是 10s。如下图此时有一个准时为2s的义务插进来会存放到时间格为 2 的义务链表中,用红色符号。随着时间的不停推移,指针 currentTime 不停向前推进,若是过了 2s,那么 currentTime 会指向时间格 2 的位置,会将此时间格的义务链表获取出来处置。

timewheel

若是当前的指针 currentTime 指向的是 2,此时若是插入一个 9s 的义务进来,那么新来的义务会服用原来的时间格链表,会存放到时间格 1 中

timewheelAdd9S

这里所讲的时间轮都是简朴时间轮,只有一层,总体时间局限在 currentTime 和 currentTime interval 之间。若是现在有一个 15s 的准时义务是需要重新开启一个时间轮,设置一个时间跨度至少为 15s 的时间轮才够用。然则这样扩充是没有底线的,若是需要一个 1 万秒的时间轮,那么就需要一个这么大的数组去存放,不仅占用很大的内存空间,而且也会由于需要遍历这么大的数组从而拉低效率。

因此引入了层级时间轮的观点。

层级时间轮

如图是一个两层的时间轮,第二层时间轮也是由 10 个时间格组成,每个时间格的跨度是 10s。第二层的时间轮的 tickMs 为第一层时间轮的 interval,即 10s。每一层时间轮的 wheelSize 是牢固的,都是 10,那么第二层的时间轮的总体时间跨度 interval 为 100s。

图中展示了每个时间格对应的过时时间局限, 我们可以清晰地看到, 第二层时间轮的第 0 个时间格的过时时间局限是 [0,9]。也就是说, 第二层时间轮的一个时间格就可以示意第一层时间轮的所有(10 个)时间格;

若是向该时间轮中添加一个 15s 的义务,那么当第一层时间轮容纳不下时,进入第二层时间轮,并插入到过时时间为[10,19]的时间格中。

timewheellevel2

随着时间的流逝,当原本 15s 的义务还剩下 5s 的时刻,这里就有一个时间轮降级的操作,此时第一层时间轮的总体时间跨度已足够,此义务被添加到第一层时间轮到期时间为 5 的时间格中,之后再履历 5s 后,此义务真正到期,最终执行响应的到期操作。

代码实现

由于我们这个 Go 语言版本的时间轮代码是模仿 Kafka 写的,以是在详细实现时间轮 TimingWheel 时另有一些小细节:

  • 时间轮的时间格中每个链表会有一个 root 节点用于简化边界条件。它是一个附加的链表节点,该节点作为第一个节点,它的值域中并不存储任何东西,只是为了操作的利便而引入的;

  • 除了第一层时间轮,其余高层时间轮的起始时间(startMs)都设置为建立此层时间轮时前面第一轮的 currentTime。每一层的 currentTime 都必须是 tickMs 的整数倍,若是不满足则会将 currentTime 修剪为 tickMs 的整数倍。修剪方式为:currentTime = startMs - (startMs % tickMs);

  • Kafka 中的准时器只需持有 TimingWheel 的第一层时间轮的引用,并不会直接持有其他高层的时间轮,但每一层时间轮都市有一个引用(overflowWheel)指向更高一层的应用;

  • Kafka 中的准时器使用了 DelayQueue 来协助推进时间轮。在操作中会将每个使用到的时间格中每个链表都加入 DelayQueue,DelayQueue 会凭据时间轮对应的过时时间 expiration 来排序,最短 expiration 的义务会被排在 DelayQueue 的队头,通过单独线程来获取 DelayQueue 中到期的义务;

结构体

type TimingWheel struct {// 时间跨度,单元是毫秒tick int64 // in milliseconds// 时间轮个数wheelSize int64// 总跨度interval int64 // in milliseconds// 当前指针指向时间currentTime int64 // in milliseconds// 时间格列表buckets []*bucket// 延迟行列queue *delayqueue.DelayQueue // 上级的时间轮引用overflowWheel unsafe.Pointer // type: *TimingWheel

exitC chan struct{}waitGroup waitGroupWrapper}

tick、wheelSize、interval、currentTime 都对照好明白,buckets 字段代表的是时间格列表,queue 是一个延迟行列,所有的义务都是通过延迟行列来举行触发,overflowWheel 是上层时间轮的引用。

type bucket struct {// 义务的过时时间expiration int64

mu sync.Mutex// 相同过时时间的义务行列timers *list.List}

bucket 内里实际上封装的是时间格内里的义务行列,内里放入的是相同过时时间的义务,到期后会将行列 timers 拿出来举行处置。这里有个有意思的地方是由于会有多个线程并发的接见 bucket,以是需要用到原子类来获取 int64 位的值,为了保证 32 位系统上面读取 64 位数据的一致性,需要举行 64 位对齐。详细的可以看这篇:https://www.luozhiyun.com/archives/429,讲的是对内存对齐的思索。

type Timer struct {// 到期时间expiration int64 // in milliseconds// 要被执行的详细义务task func// Timer所在bucket的指针b unsafe.Pointer // type: *bucket// bucket列表中对应的元素element *list.Element}

Timer 是时间轮的最小执行单元,是准时义务的封装,到期后会挪用 task 来执行义务。

,科技前沿,

Group 37

初始化时间轮

例如现在初始化一个tick是1s,wheelSize是10的时间轮:

funcmain{ tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start } funcNewTimingWheel(tick time.Duration, wheelSize int64) * TimingWheel{ // 将传入的tick转化成毫秒 tickMs := int64(tick / time.Millisecond) // 若是小于零,那么panic if tickMs <= 0 { panic(errors.New("tick must be greater than or equal to 1ms")) } // 设置最先时间 startMs := timeToMs(time.Now.UTC) // 初始化TimingWheel return newTimingWheel( tickMs, wheelSize, startMs, delayqueue.New(int(wheelSize)), )}func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel { // 初始化buckets的巨细 buckets := make([]*bucket, wheelSize) for i := range buckets { buckets[i] = newBucket } // 实例化TimingWheel return &TimingWheel{ tick: tickMs, wheelSize: wheelSize, // currentTime必须是tickMs的倍数,以是这里使用truncate举行修剪 currentTime: truncate(startMs, tickMs), interval: tickMs * wheelSize, buckets: buckets, queue: queue, exitC: make(chan struct{}), }}

初始化十分简朴,人人可以看看上面的代码注释即可。

启动时间轮

下面我们看看 start 方式:

func(tw *TimingWheel)Start{ // Poll会执行一个无限循环,将到期的元素放入到queue的C管道中 tw.waitGroup.Wrap(func { tw.queue.Poll(tw.exitC, func int64 { return timeToMs(time.Now.UTC) }) }) // 开启无限循环获取queue中C的数据 tw.waitGroup.Wrap(func { for { select { // 从行列内里出来的数据都是到期的bucket case elem := <-tw.queue.C: b := elem.(*bucket) // 时间轮会将当前时间 currentTime 往前移动到 bucket的到期时间 tw.advanceClock(b.Expiration) // 取出bucket行列的数据,并挪用addOrRun方式执行 b.Flush(tw.addOrRun) case <-tw.exitC: return } } })}

这里使用了 util 封装的一个 Wrap 方式,这个方式会起一个 goroutines 异步执行传入的函数,详细的可以到我上面给出的链接去看源码。

Start 方式会启动两个 goroutines。第一个goroutines用来挪用延迟行列的 queue 的 Poll 方式,这个方式会一直循环获取行列内里的数据,然后将到期的数据放入到 queue 的 C 管道中;第二个 goroutines 会无限循环获取 queue 中 C 的数据,若是 C 中有数据示意已经到期,那么会先挪用 advanceClock 方式将当前时间 currentTime 往前移动到 bucket的到期时间,然后再挪用 Flush 方式取出 bucket 中的行列,并挪用 addOrRun 方式执行。

func (tw *TimingWheel) advanceClock(expiration int64) {currentTime := atomic.LoadInt64(&tw.currentTime)// 过时时间大于即是(当前时间 tick)if expiration >= currentTime tw.tick {// 将currentTime设置为expiration,从而推进currentTimecurrentTime = truncate(expiration, tw.tick)atomic.StoreInt64(&tw.currentTime, currentTime)

// Try to advance the clock of the overflow wheel if present// 若是有上层时间轮,那么递归挪用上层时间轮的引用overflowWheel := atomic.LoadPointer(&tw.overflowWheel)if overflowWheel != nil {(*TimingWheel)(overflowWheel).advanceClock(currentTime)}}}

advanceClock 方式会凭据到期时间来重新设置 currentTime,从而推进时间轮前进。

func (b *bucket) Flush(reinsert func(*Timer)) {var ts []*Timer

b.mu.Lock// 循环获取bucket行列节点for e := b.timers.Front; e != nil; {next := e.Next

t := e.Value.(*Timer)// 将头节点移除bucket行列b.remove(t)ts = append(ts, t)

e = next}b.mu.Unlock

b.SetExpiration(-1) // TODO: Improve the coordination with b.Add

for _, t := range ts {reinsert(t)}}

Flush 方式会凭据 bucket 内里 timers 列表举行遍历插入到 ts 数组中,然后挪用 reinsert 方式,这里是挪用的 addOrRun 方式。

func (tw *TimingWheel) addOrRun(t *Timer) {// 若是已经过时,那么直接执行if !tw.add(t) { // 异步执行准时义务go t.task}}

addOrRun 会挪用 add 方式检查传入的准时义务 Timer 是否已经到期,若是到期那么异步挪用 task 方式直接执行。add 方式我们下面会接着剖析。

整个 start 执行流程如图:

timewheel_start

  1. start 方式会启动一个 goroutines 挪用 poll 来处置 DelayQueue 中到期的数据,并将数据放入到管道 C 中;

  2. start 方式启动第二个 goroutines 方式会循环获取 DelayQueue 中管道C的数据,管道 C 中实际上存放的是一个 bucket,然后遍历bucket的timers列表,若是义务已经到期,那么异步执行,没有到期则重新放入到 DelayQueue 中。

add task

func main {tw := timingwheel.NewTimingWheel(time.Second, 10)tw.Start // 添加义务tw.AfterFunc(time.Second*15, func {fmt.Println("The timer fires")exitC <- time.Now.UTC})}

我们通过 AfterFunc 方式添加一个 15s 的准时义务,若是到期了,那么执行传入的函数。

func (tw *TimingWheel) AfterFunc(d time.Duration, f func) *Timer {t := &Timer{expiration: timeToMs(time.Now.UTC.Add(d)),task: f,}tw.addOrRun(t)return t}

AfterFunc 方式会凭据传入的义务到期时间,以及到期需要执行的函数封装成 Timer,挪用 addOrRun 方式。addOrRun 方式我们上面已经看过了,会凭据到期时间来决议是否需要执行准时义务。

下面我们来看一下 add 方式:

func (tw *TimingWheel) add(t *Timer) bool {currentTime := atomic.LoadInt64(&tw.currentTime)// 已经过时if t.expiration < currentTime tw.tick {// Already expiredreturn false// 到期时间在第一层环内} else if t.expiration < currentTime tw.interval {// Put it into its own bucket// 获取时间轮的位置virtualID := t.expiration / tw.tickb := tw.buckets[virtualID%tw.wheelSize]// 将义务放入到bucket行列中b.Add(t) // 若是是相同的时间,那么返回false,防止被多次插入到行列中if b.SetExpiration(virtualID * tw.tick) { // 将该bucket加入到延迟行列中tw.queue.Offer(b, b.Expiration)}

return true} else {// Out of the interval. Put it into the overflow wheel// 若是放入的到期时间跨越第一层时间轮,那么放到上一层中去overflowWheel := atomic.LoadPointer(&tw.overflowWheel)if overflowWheel == nil {atomic.CompareAndSwapPointer(&tw.overflowWheel,nil,// 需要注重的是,这里tick变成了intervalunsafe.Pointer(newTimingWheel(tw.interval,tw.wheelSize,currentTime,tw.queue,)),)overflowWheel = atomic.LoadPointer(&tw.overflowWheel)}// 往上递归return (*TimingWheel)(overflowWheel).add(t)}}

add 方式凭据到期时间来分成了三部门,第一部门是小于当前时间 tick,示意已经到期,那么返回 false 执行义务即可;

第二部门的判断会凭据 expiration 是否小于时间轮的跨度,若是小于的话示意该准时义务可以放入到当前时间轮中,通过取模找到 buckets 对应的时间格并放入到 bucket 行列中,SetExpiration 方式会凭据传入的参数来判断是否已经执行过延迟行列的 Offer 方式,防止重复插入;

第三部门示意该准时义务的时间跨度跨越了当前时间轮,需要升级到上一层的时间轮中。需要注重的是,上一层的时间轮的 tick 是当前时间轮的 interval,延迟行列照样同一个,然后设置为指针 overflowWheel,并挪用 add 方式往上层递归。

末端

到这里时间轮已经讲完了,不外另有需要注重的地方,我们在用上面的时间轮实现中,使用了 DelayQueue 加环形行列的方式实现了时间轮。对准时义务项的插入和删除操作而言,TimingWheel 时间复杂度为 O(1),在 DelayQueue 中的行列使用的是优先行列,时间复杂度是 O(log n),然则由于 buckets 列表实际上是异常小的,以是并不会影响性能。

Reference

  • https://github.com/RussellLuo/timingwheel

  • https://zhuanlan.zhihu.com/p/121483218

股价狂涨 500 亿,小米手机营业与造车能否兼得?

RISC-V 没你想象的那么好

30 周岁的 Python,“虐”我 20 年

刚刚,毅力号火星车着陆的第一视角视频公布 华裔工程师:梦寐以求

23日凌晨3时许,NASA公布了毅力号火星车着陆的第一视角视频,记录了进入、下降和着陆的过程。 华裔工程师陈艾伦(Allen “Ai” Chen,音译),是毅力号下降和着陆阶段负责人,他在NA…