123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- package mhayaTimeWheel
- import (
- "container/list"
- "sync"
- "sync/atomic"
- )
- type bucket struct {
- // 64-bit atomic operations require 64-bit alignment, but 32-bit
- // compilers do not ensure it. So we must keep the 64-bit field
- // as the first field of the struct.
- //
- // For more explanations, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG
- // and https://go101.org/article/memory-layout.html.
- expiration int64
- mu sync.Mutex
- timers *list.List
- }
- func newBucket() *bucket {
- return &bucket{
- timers: list.New(),
- expiration: -1,
- }
- }
- func (b *bucket) Expiration() int64 {
- return atomic.LoadInt64(&b.expiration)
- }
- func (b *bucket) SetExpiration(expiration int64) bool {
- return atomic.SwapInt64(&b.expiration, expiration) != expiration
- }
- func (b *bucket) Add(t *Timer) {
- b.mu.Lock()
- e := b.timers.PushBack(t)
- t.setBucket(b)
- t.element = e
- b.mu.Unlock()
- }
- func (b *bucket) remove(t *Timer) bool {
- if t.getBucket() != b {
- // If remove is called from t.Stop, and this happens just after the timing wheel's goroutine has:
- // 1. removed t from b (through b.Flush -> b.remove)
- // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
- // then t.getBucket will return nil for case 1, or ab (non-nil) for case 2.
- // In either case, the returned value does not equal to b.
- return false
- }
- b.timers.Remove(t.element)
- t.setBucket(nil)
- t.element = nil
- return true
- }
- func (b *bucket) Remove(t *Timer) bool {
- b.mu.Lock()
- defer b.mu.Unlock()
- return b.remove(t)
- }
- func (b *bucket) Flush(reinsert func(*Timer)) {
- b.mu.Lock()
- e := b.timers.Front()
- for e != nil {
- next := e.Next()
- t := e.Value.(*Timer)
- b.remove(t)
- reinsert(t)
- e = next
- }
- b.mu.Unlock()
- b.SetExpiration(-1)
- }
|