conn.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package mhayaNats
  2. import (
  3. "time"
  4. clog "github.com/mhaya/logger"
  5. "github.com/nats-io/nats.go"
  6. )
  7. type (
  8. Conn struct {
  9. *nats.Conn
  10. options
  11. running bool
  12. }
  13. options struct {
  14. address string
  15. maxReconnects int
  16. reconnectDelay time.Duration
  17. requestTimeout time.Duration
  18. user string
  19. password string
  20. }
  21. OptionFunc func(o *options)
  22. )
  23. func New(opts ...OptionFunc) *Conn {
  24. conn := &Conn{}
  25. if len(opts) > 0 {
  26. for _, opt := range opts {
  27. opt(&conn.options)
  28. }
  29. }
  30. return conn
  31. }
  32. func (p *Conn) Connect() {
  33. if p.running {
  34. return
  35. }
  36. for {
  37. conn, err := nats.Connect(p.address, p.natsOptions()...)
  38. if err != nil {
  39. clog.Warnf("nats connect fail! retrying in 3 seconds. err = %s", err)
  40. time.Sleep(3 * time.Second)
  41. continue
  42. }
  43. p.Conn = conn
  44. p.running = true
  45. clog.Infof("nats is connected! [address = %s]", p.address)
  46. break
  47. }
  48. }
  49. func (p *Conn) Close() {
  50. if p.running {
  51. p.running = false
  52. p.Conn.Close()
  53. clog.Infof("nats connect execute Close()")
  54. }
  55. }
  56. func (p *Conn) Request(subj string, data []byte, timeout ...time.Duration) (*nats.Msg, error) {
  57. if len(timeout) > 0 && timeout[0] > 0 {
  58. return p.Conn.Request(subj, data, timeout[0])
  59. }
  60. return p.Conn.Request(subj, data, p.requestTimeout)
  61. }
  62. func (p *Conn) ChanExecute(subject string, msgChan chan *nats.Msg, process func(msg *nats.Msg)) {
  63. _, chanErr := p.ChanSubscribe(subject, msgChan)
  64. if chanErr != nil {
  65. clog.Error("subscribe fail. [subject = %s, err = %s]", subject, chanErr)
  66. return
  67. }
  68. for msg := range msgChan {
  69. process(msg)
  70. }
  71. }
  72. func (p *options) natsOptions() []nats.Option {
  73. var opts []nats.Option
  74. if p.reconnectDelay > 0 {
  75. opts = append(opts, nats.ReconnectWait(p.reconnectDelay))
  76. }
  77. if p.maxReconnects > 0 {
  78. opts = append(opts, nats.MaxReconnects(p.maxReconnects))
  79. }
  80. opts = append(opts, nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
  81. if err != nil {
  82. clog.Warnf("Disconnect error. [error = %v]", err)
  83. }
  84. }))
  85. opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
  86. clog.Warnf("Reconnected [%s]", nc.ConnectedUrl())
  87. }))
  88. opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
  89. clog.Infof("Nats exiting... %s", p.address)
  90. if nc.LastError() != nil {
  91. clog.Infof("error = %v", nc.LastError())
  92. }
  93. }))
  94. opts = append(opts, nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
  95. clog.Warnf("IsConnect = %v. %s on connection for subscription on %q",
  96. nc.IsConnected(),
  97. err.Error(),
  98. sub.Subject,
  99. )
  100. }))
  101. if p.user != "" {
  102. opts = append(opts, nats.UserInfo(p.user, p.password))
  103. }
  104. return opts
  105. }
  106. func (p *options) Address() string {
  107. return p.address
  108. }
  109. func (p *options) MaxReconnects() int {
  110. return p.maxReconnects
  111. }
  112. func (p *options) ReconnectDelay() time.Duration {
  113. return p.reconnectDelay
  114. }
  115. func (p *options) RequestTimeout() time.Duration {
  116. return p.requestTimeout
  117. }
  118. func WithAddress(address string) OptionFunc {
  119. return func(opts *options) {
  120. opts.address = address
  121. }
  122. }
  123. func WithParams(maxReconnects int, reconnectDelay int, requestTimeout int) OptionFunc {
  124. return func(opts *options) {
  125. opts.maxReconnects = maxReconnects
  126. opts.reconnectDelay = time.Duration(reconnectDelay) * time.Second
  127. opts.requestTimeout = time.Duration(requestTimeout) * time.Second
  128. }
  129. }
  130. func WithAuth(user, password string) OptionFunc {
  131. return func(opts *options) {
  132. opts.user = user
  133. opts.password = password
  134. }
  135. }