discovery_etcd.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package mhayaDiscovery
  2. import (
  3. "context"
  4. "fmt"
  5. jsoniter "github.com/json-iterator/go"
  6. cfacade "github.com/mhaya/facade"
  7. clog "github.com/mhaya/logger"
  8. cproto "github.com/mhaya/net/proto"
  9. cprofile "github.com/mhaya/profile"
  10. "go.etcd.io/etcd/api/v3/mvccpb"
  11. clientv3 "go.etcd.io/etcd/client/v3"
  12. "go.etcd.io/etcd/client/v3/namespace"
  13. "strings"
  14. "time"
  15. )
  16. // file from https://github.com/mhaya/tree/master/components/etcd
  17. var (
  18. keyPrefix = "/mhaya/node/"
  19. registerKeyFormat = keyPrefix + "%s"
  20. )
  21. // ETCD etcd方式发现服务
  22. type ETCD struct {
  23. app cfacade.IApplication
  24. DiscoveryDefault
  25. prefix string
  26. config clientv3.Config
  27. ttl int64
  28. cli *clientv3.Client // etcd client
  29. leaseID clientv3.LeaseID // get lease id
  30. }
  31. func (p *ETCD) Name() string {
  32. return "etcd"
  33. }
  34. func (p *ETCD) Load(app cfacade.IApplication) {
  35. p.DiscoveryDefault.PreInit()
  36. p.app = app
  37. p.ttl = 10
  38. clusterConfig := cprofile.GetConfig("cluster").GetConfig(p.Name())
  39. if clusterConfig.LastError() != nil {
  40. clog.Fatalf("etcd config not found. err = %v", clusterConfig.LastError())
  41. return
  42. }
  43. p.loadConfig(clusterConfig)
  44. p.init()
  45. p.getLeaseId()
  46. p.register()
  47. p.watch()
  48. clog.Infof("[etcd] init complete! [endpoints = %v] [leaseId = %d]", p.config.Endpoints, p.leaseID)
  49. }
  50. func (p *ETCD) OnStop() {
  51. key := fmt.Sprintf(registerKeyFormat, p.app.NodeId())
  52. _, err := p.cli.Delete(context.Background(), key)
  53. clog.Infof("etcd stopping! err = %v", err)
  54. err = p.cli.Close()
  55. if err != nil {
  56. clog.Warnf("etcd stopping error! err = %v", err)
  57. }
  58. }
  59. func getDialTimeout(config jsoniter.Any) time.Duration {
  60. t := time.Duration(config.Get("dial_timeout_second").ToInt64()) * time.Second
  61. if t < 1*time.Second {
  62. t = 3 * time.Second
  63. }
  64. return t
  65. }
  66. func getEndPoints(config jsoniter.Any) []string {
  67. return strings.Split(config.Get("end_points").ToString(), ",")
  68. }
  69. func (p *ETCD) loadConfig(config cfacade.ProfileJSON) {
  70. p.config = clientv3.Config{
  71. Logger: clog.DefaultLogger.Desugar(),
  72. }
  73. p.config.Endpoints = getEndPoints(config)
  74. p.config.DialTimeout = getDialTimeout(config)
  75. p.config.Username = config.GetString("user")
  76. p.config.Password = config.GetString("password")
  77. p.ttl = config.GetInt64("ttl", 5)
  78. p.prefix = config.GetString("prefix", "mhaya")
  79. }
  80. func (p *ETCD) init() {
  81. var err error
  82. p.cli, err = clientv3.New(p.config)
  83. if err != nil {
  84. clog.Fatalf("etcd connect fail. err = %v", err)
  85. return
  86. }
  87. // set namespace
  88. p.cli.KV = namespace.NewKV(p.cli.KV, p.prefix)
  89. p.cli.Watcher = namespace.NewWatcher(p.cli.Watcher, p.prefix)
  90. p.cli.Lease = namespace.NewLease(p.cli.Lease, p.prefix)
  91. }
  92. func (p *ETCD) getLeaseId() {
  93. var err error
  94. //设置租约时间
  95. resp, err := p.cli.Grant(context.Background(), p.ttl)
  96. if err != nil {
  97. clog.Fatal(err)
  98. return
  99. }
  100. p.leaseID = resp.ID
  101. //设置续租 定期发送需求请求
  102. keepaliveChan, err := p.cli.KeepAlive(context.Background(), resp.ID)
  103. if err != nil {
  104. clog.Fatal(err)
  105. return
  106. }
  107. go func() {
  108. for {
  109. select {
  110. case <-keepaliveChan:
  111. {
  112. }
  113. case die := <-p.app.DieChan():
  114. {
  115. if die {
  116. return
  117. }
  118. }
  119. }
  120. }
  121. }()
  122. }
  123. func (p *ETCD) register() {
  124. registerMember := &cproto.Member{
  125. NodeId: p.app.NodeId(),
  126. NodeType: p.app.NodeType(),
  127. Address: p.app.RpcAddress(),
  128. Settings: make(map[string]string),
  129. }
  130. jsonString, err := jsoniter.MarshalToString(registerMember)
  131. if err != nil {
  132. clog.Fatal(err)
  133. return
  134. }
  135. key := fmt.Sprintf(registerKeyFormat, p.app.NodeId())
  136. _, err = p.cli.Put(context.Background(), key, jsonString, clientv3.WithLease(p.leaseID))
  137. if err != nil {
  138. clog.Fatal(err)
  139. return
  140. }
  141. }
  142. func (p *ETCD) watch() {
  143. resp, err := p.cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
  144. if err != nil {
  145. clog.Fatal(err)
  146. return
  147. }
  148. for _, ev := range resp.Kvs {
  149. p.addMember(ev.Value)
  150. }
  151. watchChan := p.cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
  152. go func() {
  153. for rsp := range watchChan {
  154. for _, ev := range rsp.Events {
  155. switch ev.Type {
  156. case mvccpb.PUT:
  157. {
  158. p.addMember(ev.Kv.Value)
  159. }
  160. case mvccpb.DELETE:
  161. {
  162. p.removeMember(ev.Kv)
  163. }
  164. }
  165. }
  166. }
  167. }()
  168. }
  169. type Member struct {
  170. NodeId string
  171. NodeType string
  172. Address string
  173. Settings map[string]string
  174. }
  175. func (p *ETCD) addMember(data []byte) {
  176. member := &cproto.Member{}
  177. err := jsoniter.Unmarshal(data, member)
  178. if err != nil {
  179. return
  180. }
  181. p.AddMember(member)
  182. }
  183. func (p *ETCD) removeMember(kv *mvccpb.KeyValue) {
  184. key := string(kv.Key)
  185. nodeId := strings.ReplaceAll(key, keyPrefix, "")
  186. if nodeId == "" {
  187. clog.Warn("remove member nodeId is empty!")
  188. }
  189. p.RemoveMember(nodeId)
  190. }