bucket.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package mhayaTimeWheel
  2. import (
  3. "container/list"
  4. "sync"
  5. "sync/atomic"
  6. )
  7. type bucket struct {
  8. // 64-bit atomic operations require 64-bit alignment, but 32-bit
  9. // compilers do not ensure it. So we must keep the 64-bit field
  10. // as the first field of the struct.
  11. //
  12. // For more explanations, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  13. // and https://go101.org/article/memory-layout.html.
  14. expiration int64
  15. mu sync.Mutex
  16. timers *list.List
  17. }
  18. func newBucket() *bucket {
  19. return &bucket{
  20. timers: list.New(),
  21. expiration: -1,
  22. }
  23. }
  24. func (b *bucket) Expiration() int64 {
  25. return atomic.LoadInt64(&b.expiration)
  26. }
  27. func (b *bucket) SetExpiration(expiration int64) bool {
  28. return atomic.SwapInt64(&b.expiration, expiration) != expiration
  29. }
  30. func (b *bucket) Add(t *Timer) {
  31. b.mu.Lock()
  32. e := b.timers.PushBack(t)
  33. t.setBucket(b)
  34. t.element = e
  35. b.mu.Unlock()
  36. }
  37. func (b *bucket) remove(t *Timer) bool {
  38. if t.getBucket() != b {
  39. // If remove is called from t.Stop, and this happens just after the timing wheel's goroutine has:
  40. // 1. removed t from b (through b.Flush -> b.remove)
  41. // 2. moved t from b to another bucket ab (through b.Flush -> b.remove and ab.Add)
  42. // then t.getBucket will return nil for case 1, or ab (non-nil) for case 2.
  43. // In either case, the returned value does not equal to b.
  44. return false
  45. }
  46. b.timers.Remove(t.element)
  47. t.setBucket(nil)
  48. t.element = nil
  49. return true
  50. }
  51. func (b *bucket) Remove(t *Timer) bool {
  52. b.mu.Lock()
  53. defer b.mu.Unlock()
  54. return b.remove(t)
  55. }
  56. func (b *bucket) Flush(reinsert func(*Timer)) {
  57. b.mu.Lock()
  58. e := b.timers.Front()
  59. for e != nil {
  60. next := e.Next()
  61. t := e.Value.(*Timer)
  62. b.remove(t)
  63. reinsert(t)
  64. e = next
  65. }
  66. b.mu.Unlock()
  67. b.SetExpiration(-1)
  68. }