discovery_etcd.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package mhayaETCD
  2. import (
  3. "context"
  4. "fmt"
  5. "strings"
  6. "time"
  7. jsoniter "github.com/json-iterator/go"
  8. cfacade "github.com/mhaya/facade"
  9. clog "github.com/mhaya/logger"
  10. cdiscovery "github.com/mhaya/net/discovery"
  11. cproto "github.com/mhaya/net/proto"
  12. cprofile "github.com/mhaya/profile"
  13. "go.etcd.io/etcd/api/v3/mvccpb"
  14. clientv3 "go.etcd.io/etcd/client/v3"
  15. "go.etcd.io/etcd/client/v3/namespace"
  16. )
  17. var (
  18. keyPrefix = "/mhaya/node/"
  19. registerKeyFormat = keyPrefix + "%s"
  20. )
  21. // ETCD etcd方式发现服务
  22. type ETCD struct {
  23. app cfacade.IApplication
  24. cdiscovery.DiscoveryDefault
  25. prefix string
  26. config clientv3.Config
  27. ttl int64
  28. cli *clientv3.Client // etcd client
  29. leaseID clientv3.LeaseID // get lease id
  30. }
  31. func New() *ETCD {
  32. return &ETCD{}
  33. }
  34. func (p *ETCD) Name() string {
  35. return "etcd"
  36. }
  37. func (p *ETCD) Load(app cfacade.IApplication) {
  38. p.DiscoveryDefault.PreInit()
  39. p.app = app
  40. p.ttl = 10
  41. clusterConfig := cprofile.GetConfig("cluster").GetConfig(p.Name())
  42. if clusterConfig.LastError() != nil {
  43. clog.Fatalf("etcd config not found. err = %v", clusterConfig.LastError())
  44. return
  45. }
  46. p.loadConfig(clusterConfig)
  47. p.init()
  48. p.getLeaseId()
  49. p.register()
  50. p.watch()
  51. clog.Infof("[etcd] init complete! [endpoints = %v] [leaseId = %d]", p.config.Endpoints, p.leaseID)
  52. }
  53. func (p *ETCD) OnStop() {
  54. key := fmt.Sprintf(registerKeyFormat, p.app.NodeId())
  55. _, err := p.cli.Delete(context.Background(), key)
  56. clog.Infof("etcd stopping! err = %v", err)
  57. err = p.cli.Close()
  58. if err != nil {
  59. clog.Warnf("etcd stopping error! err = %v", err)
  60. }
  61. }
  62. func getDialTimeout(config jsoniter.Any) time.Duration {
  63. t := time.Duration(config.Get("dial_timeout_second").ToInt64()) * time.Second
  64. if t < 1*time.Second {
  65. t = 3 * time.Second
  66. }
  67. return t
  68. }
  69. func getEndPoints(config jsoniter.Any) []string {
  70. return strings.Split(config.Get("end_points").ToString(), ",")
  71. }
  72. func (p *ETCD) loadConfig(config cfacade.ProfileJSON) {
  73. p.config = clientv3.Config{
  74. Logger: clog.DefaultLogger.Desugar(),
  75. }
  76. p.config.Endpoints = getEndPoints(config)
  77. p.config.DialTimeout = getDialTimeout(config)
  78. p.config.Username = config.GetString("user")
  79. p.config.Password = config.GetString("password")
  80. p.ttl = config.GetInt64("ttl", 5)
  81. p.prefix = config.GetString("prefix", "mhaya")
  82. }
  83. func (p *ETCD) init() {
  84. var err error
  85. p.cli, err = clientv3.New(p.config)
  86. if err != nil {
  87. clog.Fatalf("etcd connect fail. err = %v", err)
  88. return
  89. }
  90. // set namespace
  91. p.cli.KV = namespace.NewKV(p.cli.KV, p.prefix)
  92. p.cli.Watcher = namespace.NewWatcher(p.cli.Watcher, p.prefix)
  93. p.cli.Lease = namespace.NewLease(p.cli.Lease, p.prefix)
  94. }
  95. func (p *ETCD) getLeaseId() {
  96. var err error
  97. //设置租约时间
  98. resp, err := p.cli.Grant(context.Background(), p.ttl)
  99. if err != nil {
  100. clog.Fatal(err)
  101. return
  102. }
  103. p.leaseID = resp.ID
  104. //设置续租 定期发送需求请求
  105. keepaliveChan, err := p.cli.KeepAlive(context.Background(), resp.ID)
  106. if err != nil {
  107. clog.Fatal(err)
  108. return
  109. }
  110. go func() {
  111. for {
  112. select {
  113. case <-keepaliveChan:
  114. {
  115. }
  116. case die := <-p.app.DieChan():
  117. {
  118. if die {
  119. return
  120. }
  121. }
  122. }
  123. }
  124. }()
  125. }
  126. func (p *ETCD) register() {
  127. registerMember := &cproto.Member{
  128. NodeId: p.app.NodeId(),
  129. NodeType: p.app.NodeType(),
  130. Address: p.app.RpcAddress(),
  131. Settings: make(map[string]string),
  132. }
  133. jsonString, err := jsoniter.MarshalToString(registerMember)
  134. if err != nil {
  135. clog.Fatal(err)
  136. return
  137. }
  138. key := fmt.Sprintf(registerKeyFormat, p.app.NodeId())
  139. _, err = p.cli.Put(context.Background(), key, jsonString, clientv3.WithLease(p.leaseID))
  140. if err != nil {
  141. clog.Fatal(err)
  142. return
  143. }
  144. }
  145. func (p *ETCD) watch() {
  146. resp, err := p.cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
  147. if err != nil {
  148. clog.Fatal(err)
  149. return
  150. }
  151. for _, ev := range resp.Kvs {
  152. p.addMember(ev.Value)
  153. }
  154. watchChan := p.cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
  155. go func() {
  156. for rsp := range watchChan {
  157. for _, ev := range rsp.Events {
  158. switch ev.Type {
  159. case mvccpb.PUT:
  160. {
  161. p.addMember(ev.Kv.Value)
  162. }
  163. case mvccpb.DELETE:
  164. {
  165. p.removeMember(ev.Kv)
  166. }
  167. }
  168. }
  169. }
  170. }()
  171. }
  172. func (p *ETCD) addMember(data []byte) {
  173. member := &cproto.Member{}
  174. err := jsoniter.Unmarshal(data, member)
  175. if err != nil {
  176. return
  177. }
  178. p.AddMember(member)
  179. }
  180. func (p *ETCD) removeMember(kv *mvccpb.KeyValue) {
  181. key := string(kv.Key)
  182. nodeId := strings.ReplaceAll(key, keyPrefix, "")
  183. if nodeId == "" {
  184. clog.Warn("remove member nodeId is empty!")
  185. }
  186. p.RemoveMember(nodeId)
  187. }