queue.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package mhayaActor
  2. import (
  3. "sync/atomic"
  4. "unsafe"
  5. )
  6. type (
  7. queue struct {
  8. head, tail *queueNode
  9. C chan int32
  10. count int32
  11. }
  12. queueNode struct {
  13. next *queueNode
  14. val interface{}
  15. }
  16. )
  17. func newQueue() queue {
  18. stub := &queueNode{}
  19. q := queue{
  20. head: stub,
  21. tail: stub,
  22. C: make(chan int32, 1),
  23. count: 0,
  24. }
  25. return q
  26. }
  27. func (p *queue) Push(v interface{}) {
  28. if v == nil {
  29. return
  30. }
  31. n := new(queueNode)
  32. n.val = v
  33. // current producer acquires head node
  34. prev := (*queueNode)(atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&p.head)), unsafe.Pointer(n)))
  35. // release node to consumer
  36. atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&prev.next)), unsafe.Pointer(n))
  37. p._setCount(1)
  38. }
  39. func (p *queue) Pop() interface{} {
  40. tail := p.tail
  41. next := (*queueNode)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next)))) // acquire
  42. if next != nil {
  43. p.tail = next
  44. v := next.val
  45. next.val = nil
  46. p._setCount(-1)
  47. return v
  48. }
  49. return nil
  50. }
  51. func (p *queue) Empty() bool {
  52. tail := p.tail
  53. next := (*queueNode)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&tail.next))))
  54. return next == nil
  55. }
  56. func (p *queue) Count() int32 {
  57. return atomic.LoadInt32(&p.count)
  58. }
  59. func (p *queue) _setCount(delta int32) {
  60. count := atomic.AddInt32(&p.count, delta)
  61. if count > 0 {
  62. select {
  63. case p.C <- count:
  64. default:
  65. }
  66. }
  67. }
  68. func (p *queue) Destroy() {
  69. close(p.C)
  70. p.head = nil
  71. p.tail = nil
  72. p.count = 0
  73. }