123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- package mhayaNats
- import (
- "time"
- clog "github.com/mhaya/logger"
- "github.com/nats-io/nats.go"
- )
- type (
- Conn struct {
- *nats.Conn
- options
- running bool
- }
- options struct {
- address string
- maxReconnects int
- reconnectDelay time.Duration
- requestTimeout time.Duration
- user string
- password string
- }
- OptionFunc func(o *options)
- )
- func New(opts ...OptionFunc) *Conn {
- conn := &Conn{}
- if len(opts) > 0 {
- for _, opt := range opts {
- opt(&conn.options)
- }
- }
- return conn
- }
- func (p *Conn) Connect() {
- if p.running {
- return
- }
- for {
- conn, err := nats.Connect(p.address, p.natsOptions()...)
- if err != nil {
- clog.Warnf("nats connect fail! retrying in 3 seconds. err = %s", err)
- time.Sleep(3 * time.Second)
- continue
- }
- p.Conn = conn
- p.running = true
- clog.Infof("nats is connected! [address = %s]", p.address)
- break
- }
- }
- func (p *Conn) Close() {
- if p.running {
- p.running = false
- p.Conn.Close()
- clog.Infof("nats connect execute Close()")
- }
- }
- func (p *Conn) Request(subj string, data []byte, timeout ...time.Duration) (*nats.Msg, error) {
- if len(timeout) > 0 && timeout[0] > 0 {
- return p.Conn.Request(subj, data, timeout[0])
- }
- return p.Conn.Request(subj, data, p.requestTimeout)
- }
- func (p *Conn) ChanExecute(subject string, msgChan chan *nats.Msg, process func(msg *nats.Msg)) {
- _, chanErr := p.ChanSubscribe(subject, msgChan)
- if chanErr != nil {
- clog.Error("subscribe fail. [subject = %s, err = %s]", subject, chanErr)
- return
- }
- for msg := range msgChan {
- process(msg)
- }
- }
- func (p *options) natsOptions() []nats.Option {
- var opts []nats.Option
- if p.reconnectDelay > 0 {
- opts = append(opts, nats.ReconnectWait(p.reconnectDelay))
- }
- if p.maxReconnects > 0 {
- opts = append(opts, nats.MaxReconnects(p.maxReconnects))
- }
- opts = append(opts, nats.DisconnectErrHandler(func(conn *nats.Conn, err error) {
- if err != nil {
- clog.Warnf("Disconnect error. [error = %v]", err)
- }
- }))
- opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
- clog.Warnf("Reconnected [%s]", nc.ConnectedUrl())
- }))
- opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
- clog.Infof("Nats exiting... %s", p.address)
- if nc.LastError() != nil {
- clog.Infof("error = %v", nc.LastError())
- }
- }))
- opts = append(opts, nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) {
- clog.Warnf("IsConnect = %v. %s on connection for subscription on %q",
- nc.IsConnected(),
- err.Error(),
- sub.Subject,
- )
- }))
- if p.user != "" {
- opts = append(opts, nats.UserInfo(p.user, p.password))
- }
- return opts
- }
- func (p *options) Address() string {
- return p.address
- }
- func (p *options) MaxReconnects() int {
- return p.maxReconnects
- }
- func (p *options) ReconnectDelay() time.Duration {
- return p.reconnectDelay
- }
- func (p *options) RequestTimeout() time.Duration {
- return p.requestTimeout
- }
- func WithAddress(address string) OptionFunc {
- return func(opts *options) {
- opts.address = address
- }
- }
- func WithParams(maxReconnects int, reconnectDelay int, requestTimeout int) OptionFunc {
- return func(opts *options) {
- opts.maxReconnects = maxReconnects
- opts.reconnectDelay = time.Duration(reconnectDelay) * time.Second
- opts.requestTimeout = time.Duration(requestTimeout) * time.Second
- }
- }
- func WithAuth(user, password string) OptionFunc {
- return func(opts *options) {
- opts.user = user
- opts.password = password
- }
- }
|