delay_queue.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package mhayaTimeWheel
  2. import (
  3. "container/heap"
  4. "sync"
  5. "sync/atomic"
  6. "time"
  7. )
  8. // The start of PriorityQueue implementation.
  9. // Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go
  10. type item struct {
  11. Value interface{}
  12. Priority int64
  13. Index int
  14. }
  15. // this is a priority queue as implemented by a min heap
  16. // ie. the 0th element is the *lowest* value
  17. type priorityQueue []*item
  18. func newPriorityQueue(capacity int) priorityQueue {
  19. return make(priorityQueue, 0, capacity)
  20. }
  21. func (pq priorityQueue) Len() int {
  22. return len(pq)
  23. }
  24. func (pq priorityQueue) Less(i, j int) bool {
  25. return pq[i].Priority < pq[j].Priority
  26. }
  27. func (pq priorityQueue) Swap(i, j int) {
  28. pq[i], pq[j] = pq[j], pq[i]
  29. pq[i].Index = i
  30. pq[j].Index = j
  31. }
  32. func (pq *priorityQueue) Push(x interface{}) {
  33. n := len(*pq)
  34. c := cap(*pq)
  35. if n+1 > c {
  36. npq := make(priorityQueue, n, c*2)
  37. copy(npq, *pq)
  38. *pq = npq
  39. }
  40. *pq = (*pq)[0 : n+1]
  41. value := x.(*item)
  42. value.Index = n
  43. (*pq)[n] = value
  44. }
  45. func (pq *priorityQueue) Pop() interface{} {
  46. n := len(*pq)
  47. c := cap(*pq)
  48. if n < (c/2) && c > 25 {
  49. npq := make(priorityQueue, n, c/2)
  50. copy(npq, *pq)
  51. *pq = npq
  52. }
  53. value := (*pq)[n-1]
  54. value.Index = -1
  55. *pq = (*pq)[0 : n-1]
  56. return value
  57. }
  58. func (pq *priorityQueue) PeekAndShift(maxValue int64) (*item, int64) {
  59. if pq.Len() == 0 {
  60. return nil, 0
  61. }
  62. value := (*pq)[0]
  63. if value.Priority > maxValue {
  64. return nil, value.Priority - maxValue
  65. }
  66. heap.Remove(pq, 0)
  67. return value, 0
  68. }
  69. // The end of PriorityQueue implementation.
  70. // DelayQueue is an unbounded blocking queue of *Delayed* elements, in which
  71. // an element can only be taken when its delay has expired. The head of the
  72. // queue is the *Delayed* element whose delay expired furthest in the past.
  73. type DelayQueue struct {
  74. C chan interface{}
  75. mu sync.Mutex
  76. pq priorityQueue
  77. sleeping int32 // Similar to the sleeping state of runtime.timers.
  78. wakeupC chan struct{}
  79. }
  80. // NewDelayQueue creates an instance of delayQueue with the specified size.
  81. func NewDelayQueue(size int) *DelayQueue {
  82. return &DelayQueue{
  83. C: make(chan interface{}),
  84. pq: newPriorityQueue(size),
  85. wakeupC: make(chan struct{}),
  86. }
  87. }
  88. // Offer inserts the element into the current queue.
  89. func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {
  90. value := &item{Value: elem, Priority: expiration}
  91. dq.mu.Lock()
  92. heap.Push(&dq.pq, value)
  93. index := value.Index
  94. dq.mu.Unlock()
  95. if index == 0 {
  96. // A new item with the earliest expiration is added.
  97. if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
  98. dq.wakeupC <- struct{}{}
  99. }
  100. }
  101. }
  102. // Poll starts an infinite loop, in which it continually waits for an element
  103. // to expire and then send the expired element to the channel C.
  104. func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) {
  105. for {
  106. now := nowF()
  107. dq.mu.Lock()
  108. value, delta := dq.pq.PeekAndShift(now)
  109. if value == nil {
  110. // No items left or at least one item is pending.
  111. // We must ensure the atomicity of the whole operation, which is
  112. // composed of the above PeekAndShift and the following StoreInt32,
  113. // to avoid possible race conditions between Offer and Poll.
  114. atomic.StoreInt32(&dq.sleeping, 1)
  115. }
  116. dq.mu.Unlock()
  117. if value == nil {
  118. if delta == 0 {
  119. // No items left.
  120. select {
  121. case <-dq.wakeupC:
  122. // Wait until a new item is added.
  123. continue
  124. case <-exitC:
  125. goto exit
  126. }
  127. } else if delta > 0 {
  128. // At least one item is pending.
  129. select {
  130. case <-dq.wakeupC:
  131. // A new item with an "earlier" expiration than the current "earliest" one is added.
  132. continue
  133. case <-time.After(time.Duration(delta) * time.Millisecond):
  134. // The current "earliest" item expires.
  135. // Reset the sleeping state since there's no need to receive from wakeupC.
  136. if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
  137. // A caller of Offer() is being blocked on sending to wakeupC,
  138. // drain wakeupC to unblock the caller.
  139. <-dq.wakeupC
  140. }
  141. continue
  142. case <-exitC:
  143. goto exit
  144. }
  145. }
  146. }
  147. select {
  148. case dq.C <- value.Value:
  149. // The expired element has been sent out successfully.
  150. case <-exitC:
  151. goto exit
  152. }
  153. }
  154. exit:
  155. // Reset the states
  156. atomic.StoreInt32(&dq.sleeping, 0)
  157. }