subject.go 579 B

123456789101112131415161718192021222324252627282930
  1. package mhayaNatsCluster
  2. import (
  3. clog "github.com/mhaya/logger"
  4. "github.com/nats-io/nats.go"
  5. )
  6. type (
  7. natsSubject struct {
  8. ch chan *nats.Msg
  9. subject string
  10. subscription *nats.Subscription
  11. }
  12. )
  13. func newNatsSubject(subject string, size int) *natsSubject {
  14. return &natsSubject{
  15. ch: make(chan *nats.Msg, size),
  16. subject: subject,
  17. subscription: nil,
  18. }
  19. }
  20. func (p *natsSubject) stop() {
  21. err := p.subscription.Unsubscribe()
  22. if err != nil {
  23. clog.Warnf("Unsubscribe error. [subject = %s, err = %v]", p.subject, err)
  24. }
  25. close(p.ch)
  26. }