123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- package mhayaTimeWheel
- import (
- "container/heap"
- "sync"
- "sync/atomic"
- "time"
- )
- // The start of PriorityQueue implementation.
- // Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go
- type item struct {
- Value interface{}
- Priority int64
- Index int
- }
- // this is a priority queue as implemented by a min heap
- // ie. the 0th element is the *lowest* value
- type priorityQueue []*item
- func newPriorityQueue(capacity int) priorityQueue {
- return make(priorityQueue, 0, capacity)
- }
- func (pq priorityQueue) Len() int {
- return len(pq)
- }
- func (pq priorityQueue) Less(i, j int) bool {
- return pq[i].Priority < pq[j].Priority
- }
- func (pq priorityQueue) Swap(i, j int) {
- pq[i], pq[j] = pq[j], pq[i]
- pq[i].Index = i
- pq[j].Index = j
- }
- func (pq *priorityQueue) Push(x interface{}) {
- n := len(*pq)
- c := cap(*pq)
- if n+1 > c {
- npq := make(priorityQueue, n, c*2)
- copy(npq, *pq)
- *pq = npq
- }
- *pq = (*pq)[0 : n+1]
- value := x.(*item)
- value.Index = n
- (*pq)[n] = value
- }
- func (pq *priorityQueue) Pop() interface{} {
- n := len(*pq)
- c := cap(*pq)
- if n < (c/2) && c > 25 {
- npq := make(priorityQueue, n, c/2)
- copy(npq, *pq)
- *pq = npq
- }
- value := (*pq)[n-1]
- value.Index = -1
- *pq = (*pq)[0 : n-1]
- return value
- }
- func (pq *priorityQueue) PeekAndShift(maxValue int64) (*item, int64) {
- if pq.Len() == 0 {
- return nil, 0
- }
- value := (*pq)[0]
- if value.Priority > maxValue {
- return nil, value.Priority - maxValue
- }
- heap.Remove(pq, 0)
- return value, 0
- }
- // The end of PriorityQueue implementation.
- // DelayQueue is an unbounded blocking queue of *Delayed* elements, in which
- // an element can only be taken when its delay has expired. The head of the
- // queue is the *Delayed* element whose delay expired furthest in the past.
- type DelayQueue struct {
- C chan interface{}
- mu sync.Mutex
- pq priorityQueue
- sleeping int32 // Similar to the sleeping state of runtime.timers.
- wakeupC chan struct{}
- }
- // NewDelayQueue creates an instance of delayQueue with the specified size.
- func NewDelayQueue(size int) *DelayQueue {
- return &DelayQueue{
- C: make(chan interface{}),
- pq: newPriorityQueue(size),
- wakeupC: make(chan struct{}),
- }
- }
- // Offer inserts the element into the current queue.
- func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {
- value := &item{Value: elem, Priority: expiration}
- dq.mu.Lock()
- heap.Push(&dq.pq, value)
- index := value.Index
- dq.mu.Unlock()
- if index == 0 {
- // A new item with the earliest expiration is added.
- if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
- dq.wakeupC <- struct{}{}
- }
- }
- }
- // Poll starts an infinite loop, in which it continually waits for an element
- // to expire and then send the expired element to the channel C.
- func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) {
- for {
- now := nowF()
- dq.mu.Lock()
- value, delta := dq.pq.PeekAndShift(now)
- if value == nil {
- // No items left or at least one item is pending.
- // We must ensure the atomicity of the whole operation, which is
- // composed of the above PeekAndShift and the following StoreInt32,
- // to avoid possible race conditions between Offer and Poll.
- atomic.StoreInt32(&dq.sleeping, 1)
- }
- dq.mu.Unlock()
- if value == nil {
- if delta == 0 {
- // No items left.
- select {
- case <-dq.wakeupC:
- // Wait until a new item is added.
- continue
- case <-exitC:
- goto exit
- }
- } else if delta > 0 {
- // At least one item is pending.
- select {
- case <-dq.wakeupC:
- // A new item with an "earlier" expiration than the current "earliest" one is added.
- continue
- case <-time.After(time.Duration(delta) * time.Millisecond):
- // The current "earliest" item expires.
- // Reset the sleeping state since there's no need to receive from wakeupC.
- if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
- // A caller of Offer() is being blocked on sending to wakeupC,
- // drain wakeupC to unblock the caller.
- <-dq.wakeupC
- }
- continue
- case <-exitC:
- goto exit
- }
- }
- }
- select {
- case dq.C <- value.Value:
- // The expired element has been sent out successfully.
- case <-exitC:
- goto exit
- }
- }
- exit:
- // Reset the states
- atomic.StoreInt32(&dq.sleeping, 0)
- }
|