conn.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package mhayaNats
  2. import (
  3. "time"
  4. clog "github.com/mhaya/logger"
  5. "github.com/nats-io/nats.go"
  6. )
  7. type (
  8. Conn struct {
  9. *nats.Conn
  10. options
  11. running bool
  12. }
  13. options struct {
  14. address string
  15. maxReconnects int
  16. reconnectDelay time.Duration
  17. requestTimeout time.Duration
  18. user string
  19. password string
  20. maxPending int
  21. }
  22. OptionFunc func(o *options)
  23. )
  24. func New(opts ...OptionFunc) *Conn {
  25. conn := &Conn{}
  26. if len(opts) > 0 {
  27. for _, opt := range opts {
  28. opt(&conn.options)
  29. }
  30. }
  31. return conn
  32. }
  33. func (p *Conn) Connect() {
  34. if p.running {
  35. return
  36. }
  37. for {
  38. conn, err := nats.Connect(p.address, p.natsOptions()...)
  39. if err != nil {
  40. clog.Warnf("nats connect fail! retrying in 3 seconds. err = %s", err)
  41. time.Sleep(3 * time.Second)
  42. continue
  43. }
  44. p.Conn = conn
  45. p.running = true
  46. clog.Infof("nats is connected! [address = %s]", p.address)
  47. break
  48. }
  49. }
  50. func (p *Conn) Close() {
  51. if p.running {
  52. p.running = false
  53. p.Conn.Close()
  54. clog.Infof("nats connect execute Close()")
  55. }
  56. }
  57. func (p *Conn) Request(subj string, data []byte, timeout ...time.Duration) (*nats.Msg, error) {
  58. if len(timeout) > 0 && timeout[0] > 0 {
  59. return p.Conn.Request(subj, data, timeout[0])
  60. }
  61. return p.Conn.Request(subj, data, p.requestTimeout)
  62. }
  63. func (p *Conn) ChanExecute(subject string, msgChan chan *nats.Msg, process func(msg *nats.Msg)) {
  64. _, chanErr := p.ChanSubscribe(subject, msgChan)
  65. if chanErr != nil {
  66. clog.Error("subscribe fail. [subject = %s, err = %s]", subject, chanErr)
  67. return
  68. }
  69. for msg := range msgChan {
  70. process(msg)
  71. }
  72. }
  73. func (p *options) natsOptions() []nats.Option {
  74. var opts []nats.Option
  75. if p.reconnectDelay > 0 {
  76. opts = append(opts, nats.ReconnectWait(p.reconnectDelay))
  77. }
  78. if p.maxReconnects > 0 {
  79. opts = append(opts, nats.MaxReconnects(p.maxReconnects))
  80. }
  81. opts = append(opts, nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
  82. if err != nil {
  83. clog.Warnf("Disconnect error. [error = %v]", err)
  84. }
  85. }))
  86. opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
  87. clog.Warnf("Reconnected [%s]", nc.ConnectedUrl())
  88. }))
  89. opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
  90. clog.Infof("Nats exiting... %s", p.address)
  91. if nc.LastError() != nil {
  92. clog.Infof("error = %v", nc.LastError())
  93. }
  94. }))
  95. opts = append(opts, nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
  96. clog.Warnf("IsConnect = %v. %s on connection for subscription on %q",
  97. nc.IsConnected(),
  98. err.Error(),
  99. sub.Subject,
  100. )
  101. }))
  102. if p.user != "" {
  103. opts = append(opts, nats.UserInfo(p.user, p.password))
  104. }
  105. return opts
  106. }
  107. func (p *options) Address() string {
  108. return p.address
  109. }
  110. func (p *options) MaxReconnects() int {
  111. return p.maxReconnects
  112. }
  113. func (p *options) ReconnectDelay() time.Duration {
  114. return p.reconnectDelay
  115. }
  116. func (p *options) RequestTimeout() time.Duration {
  117. return p.requestTimeout
  118. }
  119. func WithAddress(address string) OptionFunc {
  120. return func(opts *options) {
  121. opts.address = address
  122. }
  123. }
  124. func WithParams(maxReconnects int, reconnectDelay int, requestTimeout int) OptionFunc {
  125. return func(opts *options) {
  126. opts.maxReconnects = maxReconnects
  127. opts.reconnectDelay = time.Duration(reconnectDelay) * time.Second
  128. opts.requestTimeout = time.Duration(requestTimeout) * time.Second
  129. }
  130. }
  131. func WithAuth(user, password string) OptionFunc {
  132. return func(opts *options) {
  133. opts.user = user
  134. opts.password = password
  135. }
  136. }