discovery_nats.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package mhayaDiscovery
  2. import (
  3. "fmt"
  4. "time"
  5. cfacade "github.com/mhaya/facade"
  6. clog "github.com/mhaya/logger"
  7. cnats "github.com/mhaya/net/nats"
  8. cproto "github.com/mhaya/net/proto"
  9. cprofile "github.com/mhaya/profile"
  10. "github.com/nats-io/nats.go"
  11. )
  12. // DiscoveryNATS master节点模式(master为单节点)
  13. // 先启动一个master节点
  14. // 其他节点启动时Request(mhaya.discovery.register),到master节点注册
  15. // master节点subscribe(mhaya.discovery.register),返回已注册节点列表
  16. // master节点publish(mhaya.discovery.addMember),当前已注册的节点到
  17. // 所有客户端节点subscribe(mhaya.discovery.addMember),接收新节点
  18. // 所有节点subscribe(mhaya.discovery.unregister),退出时注销节点
  19. type DiscoveryNATS struct {
  20. DiscoveryDefault
  21. app cfacade.IApplication
  22. thisMember cfacade.IMember
  23. thisMemberBytes []byte
  24. masterMember cfacade.IMember
  25. registerSubject string
  26. unregisterSubject string
  27. addSubject string
  28. checkSubject string
  29. }
  30. func (m *DiscoveryNATS) Name() string {
  31. return "nats"
  32. }
  33. func (m *DiscoveryNATS) isMaster() bool {
  34. return m.app.NodeId() == m.masterMember.GetNodeId()
  35. }
  36. func (m *DiscoveryNATS) isClient() bool {
  37. return m.app.NodeId() != m.masterMember.GetNodeId()
  38. }
  39. func (m *DiscoveryNATS) buildSubject(subject string) string {
  40. return fmt.Sprintf(subject, m.masterMember.GetNodeId())
  41. }
  42. func (m *DiscoveryNATS) Load(app cfacade.IApplication) {
  43. m.DiscoveryDefault.PreInit()
  44. m.app = app
  45. m.loadMember()
  46. m.init()
  47. }
  48. func (m *DiscoveryNATS) loadMember() {
  49. m.thisMember = &cproto.Member{
  50. NodeId: m.app.NodeId(),
  51. NodeType: m.app.NodeType(),
  52. Address: m.app.RpcAddress(),
  53. Settings: make(map[string]string),
  54. }
  55. memberBytes, err := m.app.Serializer().Marshal(m.thisMember)
  56. if err != nil {
  57. clog.Warnf("err = %s", err)
  58. return
  59. }
  60. m.thisMemberBytes = memberBytes
  61. //get nats config
  62. config := cprofile.GetConfig("cluster").GetConfig(m.Name())
  63. if config.LastError() != nil {
  64. clog.Fatalf("nats config parameter not found. err = %v", config.LastError())
  65. }
  66. // get master node id
  67. masterId := config.GetString("master_node_id")
  68. if masterId == "" {
  69. clog.Fatal("master node id not in config.")
  70. }
  71. // load master node config
  72. masterNode, err := cprofile.LoadNode(masterId)
  73. if err != nil {
  74. clog.Fatal(err)
  75. }
  76. m.masterMember = &cproto.Member{
  77. NodeId: masterNode.NodeId(),
  78. NodeType: masterNode.NodeType(),
  79. Address: masterNode.RpcAddress(),
  80. Settings: make(map[string]string),
  81. }
  82. }
  83. func (m *DiscoveryNATS) init() {
  84. m.registerSubject = m.buildSubject("mhaya.discovery.%s.register")
  85. m.unregisterSubject = m.buildSubject("mhaya.discovery.%s.unregister")
  86. m.addSubject = m.buildSubject("mhaya.discovery.%s.addMember")
  87. m.checkSubject = m.buildSubject("mhaya.discovery.%s.check")
  88. m.subscribe(m.unregisterSubject, func(msg *nats.Msg) {
  89. unregisterMember := &cproto.Member{}
  90. err := m.app.Serializer().Unmarshal(msg.Data, unregisterMember)
  91. if err != nil {
  92. clog.Warnf("err = %s", err)
  93. return
  94. }
  95. if unregisterMember.NodeId == m.app.NodeId() {
  96. return
  97. }
  98. // remove member
  99. m.RemoveMember(unregisterMember.NodeId)
  100. })
  101. m.serverInit()
  102. m.clientInit()
  103. clog.Infof("[discovery = %s] is running.", m.Name())
  104. }
  105. func (m *DiscoveryNATS) serverInit() {
  106. if !m.isMaster() {
  107. return
  108. }
  109. //addMember master node
  110. m.AddMember(m.masterMember)
  111. // subscribe register message
  112. m.subscribe(m.registerSubject, func(msg *nats.Msg) {
  113. newMember := &cproto.Member{}
  114. err := m.app.Serializer().Unmarshal(msg.Data, newMember)
  115. if err != nil {
  116. clog.Warnf("IMember Unmarshal[name = %s] error. dataLen = %+v, err = %s",
  117. m.app.Serializer().Name(),
  118. len(msg.Data),
  119. err,
  120. )
  121. return
  122. }
  123. // addMember new member
  124. m.AddMember(newMember)
  125. // response member list
  126. memberList := &cproto.MemberList{}
  127. m.memberMap.Range(func(key, value any) bool {
  128. protoMember := value.(*cproto.Member)
  129. if protoMember.NodeId != newMember.NodeId {
  130. memberList.List = append(memberList.List, protoMember)
  131. }
  132. return true
  133. })
  134. rspData, err := m.app.Serializer().Marshal(memberList)
  135. if err != nil {
  136. clog.Warnf("marshal fail. err = %s", err)
  137. return
  138. }
  139. // response member list
  140. err = msg.Respond(rspData)
  141. if err != nil {
  142. clog.Warnf("respond fail. err = %s", err)
  143. return
  144. }
  145. // publish addMember new node
  146. err = cnats.Get().Publish(m.addSubject, msg.Data)
  147. if err != nil {
  148. clog.Warnf("publish fail. err = %s", err)
  149. return
  150. }
  151. })
  152. // subscribe check message
  153. m.subscribe(m.checkSubject, func(msg *nats.Msg) {
  154. msg.Respond(nil)
  155. })
  156. }
  157. func (m *DiscoveryNATS) clientInit() {
  158. if !m.isClient() {
  159. return
  160. }
  161. // receive registered node
  162. m.subscribe(m.addSubject, func(msg *nats.Msg) {
  163. addMember := &cproto.Member{}
  164. err := m.app.Serializer().Unmarshal(msg.Data, addMember)
  165. if err != nil {
  166. clog.Warnf("err = %s", err)
  167. return
  168. }
  169. if _, ok := m.GetMember(addMember.NodeId); !ok {
  170. m.AddMember(addMember)
  171. }
  172. })
  173. go m.checkMaster()
  174. }
  175. func (m *DiscoveryNATS) checkMaster() {
  176. for {
  177. _, found := m.GetMember(m.masterMember.GetNodeId())
  178. if !found {
  179. m.registerToMaster()
  180. }
  181. time.Sleep(cnats.Get().ReconnectDelay())
  182. }
  183. }
  184. func (m *DiscoveryNATS) registerToMaster() {
  185. // register current node to master
  186. rsp, err := cnats.Get().Request(m.registerSubject, m.thisMemberBytes)
  187. if err != nil {
  188. clog.Warnf("register node to [master = %s] fail. [address = %s] [err = %s]",
  189. m.masterMember.GetNodeId(),
  190. cnats.Get().Address(),
  191. err,
  192. )
  193. return
  194. }
  195. clog.Infof("register node to [master = %s]. [member = %s]",
  196. m.masterMember.GetNodeId(),
  197. m.thisMember,
  198. )
  199. memberList := cproto.MemberList{}
  200. err = m.app.Serializer().Unmarshal(rsp.Data, &memberList)
  201. if err != nil {
  202. clog.Warnf("err = %s", err)
  203. return
  204. }
  205. for _, member := range memberList.GetList() {
  206. m.AddMember(member)
  207. }
  208. }
  209. func (m *DiscoveryNATS) Stop() {
  210. err := cnats.Get().Publish(m.unregisterSubject, m.thisMemberBytes)
  211. if err != nil {
  212. clog.Warnf("publish fail. err = %s", err)
  213. return
  214. }
  215. clog.Debugf("[nodeId = %s] unregister node to [master = %s]",
  216. m.app.NodeId(),
  217. m.masterMember.GetNodeId(),
  218. )
  219. }
  220. func (m *DiscoveryNATS) subscribe(subject string, cb nats.MsgHandler) {
  221. _, err := cnats.Get().Subscribe(subject, cb)
  222. if err != nil {
  223. clog.Warnf("subscribe fail. err = %s", err)
  224. return
  225. }
  226. }