time_wheel.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. // Package mhayaTimeWheel file from https://github.com/RussellLuo/timingwheel
  2. package mhayaTimeWheel
  3. import (
  4. "sync/atomic"
  5. "time"
  6. "unsafe"
  7. cutils "github.com/mhaya/extend/utils"
  8. clog "github.com/mhaya/logger"
  9. )
  10. // TimeWheel is an implementation of Hierarchical Timing Wheels.
  11. type TimeWheel struct {
  12. tick int64 // in milliseconds
  13. wheelSize int64 // wheel size
  14. interval int64 // in milliseconds
  15. currentTime int64 // in milliseconds
  16. buckets []*bucket // bucket list
  17. queue *DelayQueue // delay queue
  18. overflowWheel unsafe.Pointer // type: *TimeWheel The higher-level overflow wheel.
  19. exitC chan struct{} // exit chan
  20. waitGroup waitGroupWrapper // wait group
  21. }
  22. // NewTimeWheel creates an instance of TimeWheel with the given tick and wheelSize.
  23. func NewTimeWheel(tick time.Duration, wheelSize int64) *TimeWheel {
  24. tickMs := int64(tick / time.Millisecond)
  25. if tickMs <= 0 {
  26. clog.Error("tick must be greater than or equal to 1ms")
  27. return nil
  28. }
  29. startMs := TimeToMS(time.Now().UTC())
  30. return newTimingWheel(
  31. tickMs,
  32. wheelSize,
  33. startMs,
  34. NewDelayQueue(int(wheelSize)),
  35. )
  36. }
  37. // newTimingWheel is an internal helper function that really creates an instance of TimeWheel.
  38. func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *DelayQueue) *TimeWheel {
  39. buckets := make([]*bucket, wheelSize)
  40. for i := range buckets {
  41. buckets[i] = newBucket()
  42. }
  43. return &TimeWheel{
  44. tick: tickMs,
  45. wheelSize: wheelSize,
  46. currentTime: truncate(startMs, tickMs),
  47. interval: tickMs * wheelSize,
  48. buckets: buckets,
  49. queue: queue,
  50. exitC: make(chan struct{}),
  51. }
  52. }
  53. // add inserts the timer t into the current timing wheel.
  54. func (tw *TimeWheel) add(t *Timer) bool {
  55. currentTime := atomic.LoadInt64(&tw.currentTime)
  56. if t.expiration < currentTime+tw.tick {
  57. // Already expired
  58. return false
  59. }
  60. if t.expiration < currentTime+tw.interval {
  61. // Put it into its own bucket
  62. virtualID := t.expiration / tw.tick
  63. b := tw.buckets[virtualID%tw.wheelSize]
  64. b.Add(t)
  65. // Set the bucket expiration time
  66. if b.SetExpiration(virtualID * tw.tick) {
  67. // The bucket needs to be enqueued since it was an expired bucket.
  68. // We only need to enqueue the bucket when its expiration time has changed,
  69. // i.e. the wheel has advanced and this bucket get reused with a new expiration.
  70. // Any further calls to set the expiration within the same wheel cycle will
  71. // pass in the same value and hence return false, thus the bucket with the
  72. // same expiration will not be enqueued multiple times.
  73. tw.queue.Offer(b, b.Expiration())
  74. }
  75. return true
  76. } else {
  77. // Out of the interval. Put it into the overflow wheel
  78. overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
  79. if overflowWheel == nil {
  80. atomic.CompareAndSwapPointer(
  81. &tw.overflowWheel,
  82. nil,
  83. unsafe.Pointer(newTimingWheel(
  84. tw.interval,
  85. tw.wheelSize,
  86. currentTime,
  87. tw.queue,
  88. )),
  89. )
  90. overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
  91. }
  92. return (*TimeWheel)(overflowWheel).add(t)
  93. }
  94. }
  95. // addOrRun inserts the timer t into the current timing wheel, or run the
  96. // timer's task if it has already expired.
  97. func (tw *TimeWheel) addOrRun(t *Timer) {
  98. if !tw.add(t) {
  99. // Already expired
  100. // Like the standard time.AfterFunc (https://golang.org/pkg/time/#AfterFunc),
  101. // always execute the timer's task in its own goroutine.
  102. if t.isAsync {
  103. go t.task()
  104. } else {
  105. t.task()
  106. }
  107. }
  108. }
  109. func (tw *TimeWheel) advanceClock(expiration int64) {
  110. currentTime := atomic.LoadInt64(&tw.currentTime)
  111. if expiration >= currentTime+tw.tick {
  112. currentTime = truncate(expiration, tw.tick)
  113. atomic.StoreInt64(&tw.currentTime, currentTime)
  114. // Try to advance the clock of the overflow wheel if present
  115. overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
  116. if overflowWheel != nil {
  117. (*TimeWheel)(overflowWheel).advanceClock(currentTime)
  118. }
  119. }
  120. }
  121. // Start starts the current timing wheel.
  122. func (tw *TimeWheel) Start() {
  123. tw.waitGroup.Wrap(func() {
  124. tw.queue.Poll(tw.exitC, func() int64 {
  125. return TimeToMS(time.Now().UTC())
  126. })
  127. })
  128. tw.waitGroup.Wrap(func() {
  129. for {
  130. select {
  131. case elem := <-tw.queue.C:
  132. b := elem.(*bucket)
  133. tw.advanceClock(b.Expiration())
  134. b.Flush(tw.addOrRun)
  135. case <-tw.exitC:
  136. return
  137. }
  138. }
  139. })
  140. }
  141. // Stop stops the current timing wheel.
  142. //
  143. // If there is any timer's task being running in its own goroutine, Stop does
  144. // not wait for the task to complete before returning. If the caller needs to
  145. // know whether the task is completed, it must coordinate with the task explicitly.
  146. func (tw *TimeWheel) Stop() {
  147. close(tw.exitC)
  148. tw.waitGroup.Wait()
  149. }
  150. // AfterFunc waits for the duration to elapse and then calls f in its own goroutine.
  151. // It returns a Timer that can be used to cancel the call using its Stop method.
  152. func (tw *TimeWheel) AfterFunc(id uint64, d time.Duration, f func(), async ...bool) *Timer {
  153. t := &Timer{
  154. id: id,
  155. expiration: TimeToMS(time.Now().UTC().Add(d)),
  156. task: f,
  157. isAsync: getAsyncValue(async...),
  158. }
  159. tw.addOrRun(t)
  160. return t
  161. }
  162. func (tw *TimeWheel) AddEveryFunc(id uint64, d time.Duration, f func(), async ...bool) *Timer {
  163. return tw.ScheduleFunc(id, &EverySchedule{Interval: d}, f, async...)
  164. }
  165. func (tw *TimeWheel) BuildAfterFunc(d time.Duration, f func()) *Timer {
  166. id := NextId()
  167. return tw.AfterFunc(id, d, f)
  168. }
  169. func (tw *TimeWheel) BuildEveryFunc(d time.Duration, f func(), async ...bool) *Timer {
  170. id := NextId()
  171. return tw.AddEveryFunc(id, d, f, async...)
  172. }
  173. // ScheduleFunc calls f (in its own goroutine) according to the execution
  174. // plan scheduled by s. It returns a Timer that can be used to cancel the
  175. // call using its Stop method.
  176. //
  177. // If the caller want to terminate the execution plan halfway, it must
  178. // stop the timer and ensure that the timer is stopped actually, since in
  179. // the current implementation, there is a gap between the expiring and the
  180. // restarting of the timer. The wait time for ensuring is short since the
  181. // gap is very small.
  182. //
  183. // Internally, ScheduleFunc will ask the first execution time (by calling
  184. // s.Next()) initially, and create a timer if the execution time is non-zero.
  185. // Afterwards, it will ask the next execution time each time f is about to
  186. // be executed, and f will be called at the next execution time if the time
  187. // is non-zero.
  188. func (tw *TimeWheel) ScheduleFunc(id uint64, s Scheduler, f func(), async ...bool) *Timer {
  189. expiration := s.Next(time.Now())
  190. if expiration.IsZero() {
  191. // No time is scheduled, return nil.
  192. return nil
  193. }
  194. t := &Timer{
  195. id: id,
  196. expiration: TimeToMS(expiration),
  197. isAsync: getAsyncValue(async...),
  198. }
  199. t.task = func() {
  200. // Schedule the task to execute at the next time if possible.
  201. nextExpiration := s.Next(MSToTime(t.expiration))
  202. if !expiration.IsZero() {
  203. t.expiration = TimeToMS(nextExpiration)
  204. tw.addOrRun(t)
  205. }
  206. // Actually execute the task.
  207. cutils.Try(f, func(errString string) {
  208. clog.Warn(errString)
  209. })
  210. }
  211. tw.addOrRun(t)
  212. return t
  213. }
  214. func (tw *TimeWheel) NextId() uint64 {
  215. return NextId()
  216. }
  217. func getAsyncValue(asyncTask ...bool) bool {
  218. if len(asyncTask) > 0 {
  219. return asyncTask[0]
  220. }
  221. return false
  222. }