|
- package mhayaDiscovery
- import (
- "context"
- "fmt"
- jsoniter "github.com/json-iterator/go"
- cfacade "github.com/mhaya/facade"
- clog "github.com/mhaya/logger"
- cproto "github.com/mhaya/net/proto"
- cprofile "github.com/mhaya/profile"
- "go.etcd.io/etcd/api/v3/mvccpb"
- clientv3 "go.etcd.io/etcd/client/v3"
- "go.etcd.io/etcd/client/v3/namespace"
- "strings"
- "time"
- )
- // file from https://github.com/mhaya/tree/master/components/etcd
- var (
- keyPrefix = "/mhaya/node/"
- registerKeyFormat = keyPrefix + "%s"
- )
- // ETCD etcd方式发现服务
- type ETCD struct {
- app cfacade.IApplication
- DiscoveryDefault
- prefix string
- config clientv3.Config
- ttl int64
- cli *clientv3.Client // etcd client
- leaseID clientv3.LeaseID // get lease id
- }
- func (p *ETCD) Name() string {
- return "etcd"
- }
- func (p *ETCD) Load(app cfacade.IApplication) {
- p.DiscoveryDefault.PreInit()
- p.app = app
- p.ttl = 10
- clusterConfig := cprofile.GetConfig("cluster").GetConfig(p.Name())
- if clusterConfig.LastError() != nil {
- clog.Fatalf("etcd config not found. err = %v", clusterConfig.LastError())
- return
- }
- p.loadConfig(clusterConfig)
- p.init()
- p.getLeaseId()
- p.register()
- p.watch()
- clog.Infof("[etcd] init complete! [endpoints = %v] [leaseId = %d]", p.config.Endpoints, p.leaseID)
- }
- func (p *ETCD) OnStop() {
- key := fmt.Sprintf(registerKeyFormat, p.app.NodeId())
- _, err := p.cli.Delete(context.Background(), key)
- clog.Infof("etcd stopping! err = %v", err)
- err = p.cli.Close()
- if err != nil {
- clog.Warnf("etcd stopping error! err = %v", err)
- }
- }
- func getDialTimeout(config jsoniter.Any) time.Duration {
- t := time.Duration(config.Get("dial_timeout_second").ToInt64()) * time.Second
- if t < 1*time.Second {
- t = 3 * time.Second
- }
- return t
- }
- func getEndPoints(config jsoniter.Any) []string {
- return strings.Split(config.Get("end_points").ToString(), ",")
- }
- func (p *ETCD) loadConfig(config cfacade.ProfileJSON) {
- p.config = clientv3.Config{
- Logger: clog.DefaultLogger.Desugar(),
- }
- p.config.Endpoints = getEndPoints(config)
- p.config.DialTimeout = getDialTimeout(config)
- p.config.Username = config.GetString("user")
- p.config.Password = config.GetString("password")
- p.ttl = config.GetInt64("ttl", 5)
- p.prefix = config.GetString("prefix", "mhaya")
- }
- func (p *ETCD) init() {
- var err error
- p.cli, err = clientv3.New(p.config)
- if err != nil {
- clog.Fatalf("etcd connect fail. err = %v", err)
- return
- }
- // set namespace
- p.cli.KV = namespace.NewKV(p.cli.KV, p.prefix)
- p.cli.Watcher = namespace.NewWatcher(p.cli.Watcher, p.prefix)
- p.cli.Lease = namespace.NewLease(p.cli.Lease, p.prefix)
- }
- func (p *ETCD) getLeaseId() {
- var err error
- //设置租约时间
- resp, err := p.cli.Grant(context.Background(), p.ttl)
- if err != nil {
- clog.Fatal(err)
- return
- }
- p.leaseID = resp.ID
- //设置续租 定期发送需求请求
- keepaliveChan, err := p.cli.KeepAlive(context.Background(), resp.ID)
- if err != nil {
- clog.Fatal(err)
- return
- }
- go func() {
- for {
- select {
- case <-keepaliveChan:
- {
- }
- case die := <-p.app.DieChan():
- {
- if die {
- return
- }
- }
- }
- }
- }()
- }
- func (p *ETCD) register() {
- registerMember := &cproto.Member{
- NodeId: p.app.NodeId(),
- NodeType: p.app.NodeType(),
- Address: p.app.RpcAddress(),
- Settings: make(map[string]string),
- }
- jsonString, err := jsoniter.MarshalToString(registerMember)
- if err != nil {
- clog.Fatal(err)
- return
- }
- key := fmt.Sprintf(registerKeyFormat, p.app.NodeId())
- _, err = p.cli.Put(context.Background(), key, jsonString, clientv3.WithLease(p.leaseID))
- if err != nil {
- clog.Fatal(err)
- return
- }
- }
- func (p *ETCD) watch() {
- resp, err := p.cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
- if err != nil {
- clog.Fatal(err)
- return
- }
- for _, ev := range resp.Kvs {
- p.addMember(ev.Value)
- }
- watchChan := p.cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
- go func() {
- for rsp := range watchChan {
- for _, ev := range rsp.Events {
- switch ev.Type {
- case mvccpb.PUT:
- {
- p.addMember(ev.Kv.Value)
- }
- case mvccpb.DELETE:
- {
- p.removeMember(ev.Kv)
- }
- }
- }
- }
- }()
- }
- type Member struct {
- NodeId string
- NodeType string
- Address string
- Settings map[string]string
- }
- func (p *ETCD) addMember(data []byte) {
- member := &cproto.Member{}
- err := jsoniter.Unmarshal(data, member)
- if err != nil {
- return
- }
- p.AddMember(member)
- }
- func (p *ETCD) removeMember(kv *mvccpb.KeyValue) {
- key := string(kv.Key)
- nodeId := strings.ReplaceAll(key, keyPrefix, "")
- if nodeId == "" {
- clog.Warn("remove member nodeId is empty!")
- }
- p.RemoveMember(nodeId)
- }
|