cluster.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package mhayaNatsCluster
  2. import (
  3. "time"
  4. "google.golang.org/protobuf/proto"
  5. ccode "github.com/mhaya/code"
  6. cerr "github.com/mhaya/error"
  7. cfacade "github.com/mhaya/facade"
  8. clog "github.com/mhaya/logger"
  9. cnats "github.com/mhaya/net/nats"
  10. cproto "github.com/mhaya/net/proto"
  11. cprofile "github.com/mhaya/profile"
  12. "github.com/nats-io/nats.go"
  13. "go.uber.org/zap/zapcore"
  14. )
  15. type (
  16. Cluster struct {
  17. app cfacade.IApplication
  18. bufferSize int
  19. prefix string
  20. local *natsSubject
  21. remote *natsSubject
  22. }
  23. OptionFunc func(o *Cluster)
  24. )
  25. func New(app cfacade.IApplication, options ...OptionFunc) cfacade.ICluster {
  26. cluster := &Cluster{
  27. app: app,
  28. bufferSize: 1024,
  29. }
  30. for _, option := range options {
  31. option(cluster)
  32. }
  33. cluster.loadConfig()
  34. return cluster
  35. }
  36. func (p *Cluster) loadConfig() {
  37. natsConfig := cprofile.GetConfig("cluster").GetConfig("nats")
  38. if natsConfig.LastError() != nil {
  39. panic("cluster->nats config not found.")
  40. }
  41. natsConn := cnats.NewFromConfig(natsConfig)
  42. cnats.SetInstance(natsConn)
  43. p.prefix = natsConfig.GetString("prefix", "node")
  44. localSubject := getLocalSubject(p.prefix, p.app.NodeType(), p.app.NodeId())
  45. p.local = newNatsSubject(localSubject, p.bufferSize)
  46. remoteSubject := getRemoteSubject(p.prefix, p.app.NodeType(), p.app.NodeId())
  47. p.remote = newNatsSubject(remoteSubject, p.bufferSize)
  48. }
  49. func (p *Cluster) Init() {
  50. cnats.Get().Connect()
  51. go p.localProcess()
  52. go p.remoteProcess()
  53. clog.Info("nats cluster execute OnInit().")
  54. }
  55. func (p *Cluster) Stop() {
  56. p.local.stop()
  57. p.remote.stop()
  58. cnats.Get().Close()
  59. clog.Info("nats cluster execute OnStop().")
  60. }
  61. func (p *Cluster) localProcess() {
  62. var err error
  63. p.local.subscription, err = cnats.Get().ChanSubscribe(p.local.subject, p.local.ch)
  64. if err != nil {
  65. clog.Errorf("[localProcess] Subscribe fail. [subject = %s, err = %s]", p.local.subject, err)
  66. return
  67. }
  68. process := func(natsMsg *nats.Msg) {
  69. if dropped, err := p.local.subscription.Dropped(); err != nil {
  70. clog.Errorf("[localProcess] Dropped messages. [subject = %s, dropped = %d, err = %v]",
  71. p.local.subject,
  72. dropped,
  73. err,
  74. )
  75. }
  76. packet := cproto.GetClusterPacket()
  77. defer packet.Recycle()
  78. err = proto.Unmarshal(natsMsg.Data, packet)
  79. if err != nil {
  80. clog.Warnf("[localProcess] Unmarshal fail. [subject = %s, %s, err = %s]",
  81. natsMsg.Subject,
  82. packet.PrintLog(),
  83. err,
  84. )
  85. return
  86. }
  87. message := cfacade.GetMessage()
  88. message.BuildTime = packet.BuildTime
  89. message.Source = packet.SourcePath
  90. message.Target = packet.TargetPath
  91. message.FuncName = packet.FuncName
  92. message.IsCluster = true
  93. message.Session = packet.Session
  94. message.Args = packet.ArgBytes
  95. p.app.ActorSystem().PostLocal(&message)
  96. }
  97. for msg := range p.local.ch {
  98. process(msg)
  99. }
  100. }
  101. func (p *Cluster) remoteProcess() {
  102. var err error
  103. p.remote.subscription, err = cnats.Get().ChanSubscribe(p.remote.subject, p.remote.ch)
  104. if err != nil {
  105. clog.Errorf("[remoteProcess] Subscribe fail. [subject = %s, err = %s]", p.remote.subject, err)
  106. return
  107. }
  108. process := func(natsMsg *nats.Msg) {
  109. if dropped, err := p.remote.subscription.Dropped(); err != nil {
  110. clog.Errorf("[remoteProcess] Dropped messages. [subject = %s, dropped = %d, err = %v]",
  111. p.remote.subject,
  112. dropped,
  113. err,
  114. )
  115. }
  116. packet := cproto.GetClusterPacket()
  117. defer packet.Recycle()
  118. err = proto.Unmarshal(natsMsg.Data, packet)
  119. if err != nil {
  120. clog.Warnf("[remoteProcess] Unmarshal fail. [subject = %s, %s, err = %v]",
  121. natsMsg.Subject,
  122. packet.PrintLog(),
  123. err,
  124. )
  125. return
  126. }
  127. message := cfacade.GetMessage()
  128. message.BuildTime = packet.BuildTime
  129. message.Source = packet.SourcePath
  130. message.Target = packet.TargetPath
  131. message.FuncName = packet.FuncName
  132. if packet.ArgBytes != nil {
  133. message.Args = packet.ArgBytes
  134. }
  135. message.IsCluster = true
  136. if len(natsMsg.Reply) > 0 {
  137. message.ClusterReply = natsMsg
  138. }
  139. p.app.ActorSystem().PostRemote(&message)
  140. }
  141. for msg := range p.remote.ch {
  142. process(msg)
  143. }
  144. }
  145. func (p *Cluster) PublishLocal(nodeId string, request *cproto.ClusterPacket) error {
  146. defer request.Recycle()
  147. nodeType, err := p.app.Discovery().GetType(nodeId)
  148. if err != nil {
  149. clog.Debugf("[PublishLocal] get node type fail. [nodeId = %s, %s]",
  150. nodeId,
  151. request.PrintLog(),
  152. )
  153. return err
  154. }
  155. subject := getLocalSubject(p.prefix, nodeType, nodeId)
  156. bytes, err := proto.Marshal(request)
  157. if err != nil {
  158. return err
  159. }
  160. err = p.Publish(subject, bytes)
  161. if clog.PrintLevel(zapcore.DebugLevel) {
  162. clog.Debugf("[PublishLocal] [nodeId = %s, %s]",
  163. nodeId,
  164. request.PrintLog(),
  165. )
  166. }
  167. return err
  168. }
  169. func (p *Cluster) PublishRemote(nodeId string, request *cproto.ClusterPacket) error {
  170. defer request.Recycle()
  171. nodeType, err := p.app.Discovery().GetType(nodeId)
  172. if err != nil {
  173. clog.Debugf("[PublishRemote] Get node type fail. [nodeId = %s, %s, err = %v]",
  174. nodeId,
  175. request.PrintLog(),
  176. err,
  177. )
  178. return err
  179. }
  180. subject := getRemoteSubject(p.prefix, nodeType, nodeId)
  181. bytes, err := proto.Marshal(request)
  182. if err != nil {
  183. clog.Warn(err)
  184. return err
  185. }
  186. err = p.Publish(subject, bytes)
  187. return err
  188. }
  189. func (p *Cluster) RequestRemote(nodeId string, request *cproto.ClusterPacket, timeout ...time.Duration) cproto.Response {
  190. defer request.Recycle()
  191. rsp := cproto.Response{}
  192. nodeType, err := p.app.Discovery().GetType(nodeId)
  193. if err != nil {
  194. clog.Debugf("[PublishRemote] Get node type fail. [nodeId = %s, %s, err = %v]",
  195. nodeId,
  196. request.PrintLog(),
  197. err,
  198. )
  199. rsp.Code = ccode.DiscoveryNotFoundNode
  200. return rsp
  201. }
  202. msg, err := proto.Marshal(request)
  203. if err != nil {
  204. clog.Debugf("[PublishRemote] Marshal fail. [nodeId = %s, %s, err = %v]",
  205. nodeId,
  206. request.PrintLog(),
  207. err,
  208. )
  209. rsp.Code = ccode.RPCMarshalError
  210. return rsp
  211. }
  212. subject := getRemoteSubject(p.prefix, nodeType, nodeId)
  213. natsMsg, err := cnats.Get().Request(subject, msg, timeout...)
  214. if err != nil {
  215. clog.Warnf("[RequestRemote] nats request fail. [nodeId = %s, %s, err = %v]",
  216. nodeId,
  217. request.PrintLog(),
  218. err,
  219. )
  220. rsp.Code = ccode.RPCNetError
  221. return rsp
  222. }
  223. if err = proto.Unmarshal(natsMsg.Data, &rsp); err != nil {
  224. clog.Warnf("[RequestRemote] unmarshal fail. [nodeId = %s, %s, rsp = %v, err = %v]",
  225. nodeId,
  226. request.PrintLog(),
  227. rsp,
  228. err,
  229. )
  230. rsp.Code = ccode.RPCUnmarshalError
  231. return rsp
  232. }
  233. return rsp
  234. }
  235. func (p *Cluster) Publish(subject string, data []byte) error {
  236. if !p.app.Running() {
  237. return cerr.ClusterRPCClientIsStop
  238. }
  239. return cnats.Get().Publish(subject, data)
  240. }
  241. func WithBufferSize(size int) OptionFunc {
  242. return func(o *Cluster) {
  243. o.bufferSize = size
  244. }
  245. }