discovery_default.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package mhayaDiscovery
  2. import (
  3. "math/rand"
  4. "sync"
  5. cerr "github.com/mhaya/error"
  6. cslice "github.com/mhaya/extend/slice"
  7. cfacade "github.com/mhaya/facade"
  8. clog "github.com/mhaya/logger"
  9. cproto "github.com/mhaya/net/proto"
  10. cprofile "github.com/mhaya/profile"
  11. )
  12. // DiscoveryDefault 默认方式,通过读取profile文件的节点信息
  13. //
  14. // 该类型发现服务仅用于开发测试使用,直接读取profile.json->node配置
  15. type DiscoveryDefault struct {
  16. memberMap sync.Map // key:nodeId,value:cfacade.IMember
  17. onAddListener []cfacade.MemberListener
  18. onRemoveListener []cfacade.MemberListener
  19. }
  20. func (n *DiscoveryDefault) PreInit() {
  21. n.memberMap = sync.Map{}
  22. }
  23. func (n *DiscoveryDefault) Load(_ cfacade.IApplication) {
  24. // load node info from profile file
  25. nodeConfig := cprofile.GetConfig("node")
  26. if nodeConfig.LastError() != nil {
  27. clog.Error("`node` property not found in profile file.")
  28. return
  29. }
  30. for _, nodeType := range nodeConfig.Keys() {
  31. typeJson := nodeConfig.Get(nodeType)
  32. for i := 0; i < typeJson.Size(); i++ {
  33. item := typeJson.Get(i)
  34. nodeId := item.Get("node_id").ToString()
  35. if nodeId == "" {
  36. clog.Errorf("nodeId is empty in nodeType = %s", nodeType)
  37. break
  38. }
  39. if _, found := n.GetMember(nodeId); found {
  40. clog.Errorf("nodeType = %s, nodeId = %s, duplicate nodeId", nodeType, nodeId)
  41. break
  42. }
  43. member := &cproto.Member{
  44. NodeId: nodeId,
  45. NodeType: nodeType,
  46. Address: item.Get("rpc_address").ToString(),
  47. Settings: make(map[string]string),
  48. }
  49. settings := item.Get("__settings__")
  50. for _, key := range settings.Keys() {
  51. member.Settings[key] = settings.Get(key).ToString()
  52. }
  53. n.memberMap.Store(member.NodeId, member)
  54. }
  55. }
  56. }
  57. func (n *DiscoveryDefault) Name() string {
  58. return "default"
  59. }
  60. func (n *DiscoveryDefault) Map() map[string]cfacade.IMember {
  61. memberMap := map[string]cfacade.IMember{}
  62. n.memberMap.Range(func(key, value any) bool {
  63. if member, ok := value.(cfacade.IMember); ok {
  64. memberMap[member.GetNodeId()] = member
  65. }
  66. return true
  67. })
  68. return memberMap
  69. }
  70. func (n *DiscoveryDefault) ListByType(nodeType string, filterNodeId ...string) []cfacade.IMember {
  71. var memberList []cfacade.IMember
  72. n.memberMap.Range(func(key, value any) bool {
  73. member := value.(cfacade.IMember)
  74. if member.GetNodeType() == nodeType {
  75. if _, ok := cslice.StringIn(member.GetNodeId(), filterNodeId); !ok {
  76. memberList = append(memberList, member)
  77. }
  78. }
  79. return true
  80. })
  81. return memberList
  82. }
  83. func (n *DiscoveryDefault) Random(nodeType string) (cfacade.IMember, bool) {
  84. memberList := n.ListByType(nodeType)
  85. memberLen := len(memberList)
  86. if memberLen < 1 {
  87. return nil, false
  88. }
  89. if memberLen == 1 {
  90. return memberList[0], true
  91. }
  92. return memberList[rand.Intn(len(memberList))], true
  93. }
  94. func (n *DiscoveryDefault) GetType(nodeId string) (nodeType string, err error) {
  95. member, found := n.GetMember(nodeId)
  96. if !found {
  97. return "", cerr.Errorf("nodeId = %s not found.", nodeId)
  98. }
  99. return member.GetNodeType(), nil
  100. }
  101. func (n *DiscoveryDefault) GetMember(nodeId string) (cfacade.IMember, bool) {
  102. if nodeId == "" {
  103. return nil, false
  104. }
  105. value, found := n.memberMap.Load(nodeId)
  106. if !found {
  107. return nil, false
  108. }
  109. return value.(cfacade.IMember), found
  110. }
  111. func (n *DiscoveryDefault) AddMember(member cfacade.IMember) {
  112. _, loaded := n.memberMap.LoadOrStore(member.GetNodeId(), member)
  113. if loaded {
  114. clog.Warnf("duplicate nodeId. [nodeType = %s], [nodeId = %s], [address = %s]",
  115. member.GetNodeType(),
  116. member.GetNodeId(),
  117. member.GetAddress(),
  118. )
  119. return
  120. }
  121. for _, listener := range n.onAddListener {
  122. listener(member)
  123. }
  124. clog.Debugf("addMember new member. [member = %s]", member)
  125. }
  126. func (n *DiscoveryDefault) RemoveMember(nodeId string) {
  127. value, loaded := n.memberMap.LoadAndDelete(nodeId)
  128. if loaded {
  129. member := value.(cfacade.IMember)
  130. clog.Debugf("remove member. [member = %s]", member)
  131. for _, listener := range n.onRemoveListener {
  132. listener(member)
  133. }
  134. }
  135. }
  136. func (n *DiscoveryDefault) OnAddMember(listener cfacade.MemberListener) {
  137. if listener == nil {
  138. return
  139. }
  140. n.onAddListener = append(n.onAddListener, listener)
  141. }
  142. func (n *DiscoveryDefault) OnRemoveMember(listener cfacade.MemberListener) {
  143. if listener == nil {
  144. return
  145. }
  146. n.onRemoveListener = append(n.onRemoveListener, listener)
  147. }
  148. func (n *DiscoveryDefault) Stop() {
  149. }