package mhayaTimeWheel import ( "container/heap" "sync" "sync/atomic" "time" ) // The start of PriorityQueue implementation. // Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go type item struct { Value interface{} Priority int64 Index int } // this is a priority queue as implemented by a min heap // ie. the 0th element is the *lowest* value type priorityQueue []*item func newPriorityQueue(capacity int) priorityQueue { return make(priorityQueue, 0, capacity) } func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Less(i, j int) bool { return pq[i].Priority < pq[j].Priority } func (pq priorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].Index = i pq[j].Index = j } func (pq *priorityQueue) Push(x interface{}) { n := len(*pq) c := cap(*pq) if n+1 > c { npq := make(priorityQueue, n, c*2) copy(npq, *pq) *pq = npq } *pq = (*pq)[0 : n+1] value := x.(*item) value.Index = n (*pq)[n] = value } func (pq *priorityQueue) Pop() interface{} { n := len(*pq) c := cap(*pq) if n < (c/2) && c > 25 { npq := make(priorityQueue, n, c/2) copy(npq, *pq) *pq = npq } value := (*pq)[n-1] value.Index = -1 *pq = (*pq)[0 : n-1] return value } func (pq *priorityQueue) PeekAndShift(maxValue int64) (*item, int64) { if pq.Len() == 0 { return nil, 0 } value := (*pq)[0] if value.Priority > maxValue { return nil, value.Priority - maxValue } heap.Remove(pq, 0) return value, 0 } // The end of PriorityQueue implementation. // DelayQueue is an unbounded blocking queue of *Delayed* elements, in which // an element can only be taken when its delay has expired. The head of the // queue is the *Delayed* element whose delay expired furthest in the past. type DelayQueue struct { C chan interface{} mu sync.Mutex pq priorityQueue sleeping int32 // Similar to the sleeping state of runtime.timers. wakeupC chan struct{} } // NewDelayQueue creates an instance of delayQueue with the specified size. func NewDelayQueue(size int) *DelayQueue { return &DelayQueue{ C: make(chan interface{}), pq: newPriorityQueue(size), wakeupC: make(chan struct{}), } } // Offer inserts the element into the current queue. func (dq *DelayQueue) Offer(elem interface{}, expiration int64) { value := &item{Value: elem, Priority: expiration} dq.mu.Lock() heap.Push(&dq.pq, value) index := value.Index dq.mu.Unlock() if index == 0 { // A new item with the earliest expiration is added. if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) { dq.wakeupC <- struct{}{} } } } // Poll starts an infinite loop, in which it continually waits for an element // to expire and then send the expired element to the channel C. func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) { for { now := nowF() dq.mu.Lock() value, delta := dq.pq.PeekAndShift(now) if value == nil { // No items left or at least one item is pending. // We must ensure the atomicity of the whole operation, which is // composed of the above PeekAndShift and the following StoreInt32, // to avoid possible race conditions between Offer and Poll. atomic.StoreInt32(&dq.sleeping, 1) } dq.mu.Unlock() if value == nil { if delta == 0 { // No items left. select { case <-dq.wakeupC: // Wait until a new item is added. continue case <-exitC: goto exit } } else if delta > 0 { // At least one item is pending. select { case <-dq.wakeupC: // A new item with an "earlier" expiration than the current "earliest" one is added. continue case <-time.After(time.Duration(delta) * time.Millisecond): // The current "earliest" item expires. // Reset the sleeping state since there's no need to receive from wakeupC. if atomic.SwapInt32(&dq.sleeping, 0) == 0 { // A caller of Offer() is being blocked on sending to wakeupC, // drain wakeupC to unblock the caller. <-dq.wakeupC } continue case <-exitC: goto exit } } } select { case dq.C <- value.Value: // The expired element has been sent out successfully. case <-exitC: goto exit } } exit: // Reset the states atomic.StoreInt32(&dq.sleeping, 0) }