123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- package mhayaDiscovery
- import (
- "fmt"
- "time"
- cfacade "github.com/mhaya/facade"
- clog "github.com/mhaya/logger"
- cnats "github.com/mhaya/net/nats"
- cproto "github.com/mhaya/net/proto"
- cprofile "github.com/mhaya/profile"
- "github.com/nats-io/nats.go"
- )
- // DiscoveryNATS master节点模式(master为单节点)
- // 先启动一个master节点
- // 其他节点启动时Request(mhaya.discovery.register),到master节点注册
- // master节点subscribe(mhaya.discovery.register),返回已注册节点列表
- // master节点publish(mhaya.discovery.addMember),当前已注册的节点到
- // 所有客户端节点subscribe(mhaya.discovery.addMember),接收新节点
- // 所有节点subscribe(mhaya.discovery.unregister),退出时注销节点
- type DiscoveryNATS struct {
- DiscoveryDefault
- app cfacade.IApplication
- thisMember cfacade.IMember
- thisMemberBytes []byte
- masterMember cfacade.IMember
- registerSubject string
- unregisterSubject string
- addSubject string
- checkSubject string
- }
- func (m *DiscoveryNATS) Name() string {
- return "nats"
- }
- func (m *DiscoveryNATS) isMaster() bool {
- return m.app.NodeId() == m.masterMember.GetNodeId()
- }
- func (m *DiscoveryNATS) isClient() bool {
- return m.app.NodeId() != m.masterMember.GetNodeId()
- }
- func (m *DiscoveryNATS) buildSubject(subject string) string {
- return fmt.Sprintf(subject, m.masterMember.GetNodeId())
- }
- func (m *DiscoveryNATS) Load(app cfacade.IApplication) {
- m.DiscoveryDefault.PreInit()
- m.app = app
- m.loadMember()
- m.init()
- }
- func (m *DiscoveryNATS) loadMember() {
- m.thisMember = &cproto.Member{
- NodeId: m.app.NodeId(),
- NodeType: m.app.NodeType(),
- Address: m.app.RpcAddress(),
- Settings: make(map[string]string),
- }
- memberBytes, err := m.app.Serializer().Marshal(m.thisMember)
- if err != nil {
- clog.Warnf("err = %s", err)
- return
- }
- m.thisMemberBytes = memberBytes
- //get nats config
- config := cprofile.GetConfig("cluster").GetConfig(m.Name())
- if config.LastError() != nil {
- clog.Fatalf("nats config parameter not found. err = %v", config.LastError())
- }
- // get master node id
- masterId := config.GetString("master_node_id")
- if masterId == "" {
- clog.Fatal("master node id not in config.")
- }
- // load master node config
- masterNode, err := cprofile.LoadNode(masterId)
- if err != nil {
- clog.Fatal(err)
- }
- m.masterMember = &cproto.Member{
- NodeId: masterNode.NodeId(),
- NodeType: masterNode.NodeType(),
- Address: masterNode.RpcAddress(),
- Settings: make(map[string]string),
- }
- }
- func (m *DiscoveryNATS) init() {
- m.registerSubject = m.buildSubject("mhaya.discovery.%s.register")
- m.unregisterSubject = m.buildSubject("mhaya.discovery.%s.unregister")
- m.addSubject = m.buildSubject("mhaya.discovery.%s.addMember")
- m.checkSubject = m.buildSubject("mhaya.discovery.%s.check")
- m.subscribe(m.unregisterSubject, func(msg *nats.Msg) {
- unregisterMember := &cproto.Member{}
- err := m.app.Serializer().Unmarshal(msg.Data, unregisterMember)
- if err != nil {
- clog.Warnf("err = %s", err)
- return
- }
- if unregisterMember.NodeId == m.app.NodeId() {
- return
- }
- // remove member
- m.RemoveMember(unregisterMember.NodeId)
- })
- m.serverInit()
- m.clientInit()
- clog.Infof("[discovery = %s] is running.", m.Name())
- }
- func (m *DiscoveryNATS) serverInit() {
- if !m.isMaster() {
- return
- }
- //addMember master node
- m.AddMember(m.masterMember)
- // subscribe register message
- m.subscribe(m.registerSubject, func(msg *nats.Msg) {
- newMember := &cproto.Member{}
- err := m.app.Serializer().Unmarshal(msg.Data, newMember)
- if err != nil {
- clog.Warnf("IMember Unmarshal[name = %s] error. dataLen = %+v, err = %s",
- m.app.Serializer().Name(),
- len(msg.Data),
- err,
- )
- return
- }
- // addMember new member
- m.AddMember(newMember)
- // response member list
- memberList := &cproto.MemberList{}
- m.memberMap.Range(func(key, value any) bool {
- protoMember := value.(*cproto.Member)
- if protoMember.NodeId != newMember.NodeId {
- memberList.List = append(memberList.List, protoMember)
- }
- return true
- })
- rspData, err := m.app.Serializer().Marshal(memberList)
- if err != nil {
- clog.Warnf("marshal fail. err = %s", err)
- return
- }
- // response member list
- err = msg.Respond(rspData)
- if err != nil {
- clog.Warnf("respond fail. err = %s", err)
- return
- }
- // publish addMember new node
- err = cnats.Get().Publish(m.addSubject, msg.Data)
- if err != nil {
- clog.Warnf("publish fail. err = %s", err)
- return
- }
- })
- // subscribe check message
- m.subscribe(m.checkSubject, func(msg *nats.Msg) {
- msg.Respond(nil)
- })
- }
- func (m *DiscoveryNATS) clientInit() {
- if !m.isClient() {
- return
- }
- // receive registered node
- m.subscribe(m.addSubject, func(msg *nats.Msg) {
- addMember := &cproto.Member{}
- err := m.app.Serializer().Unmarshal(msg.Data, addMember)
- if err != nil {
- clog.Warnf("err = %s", err)
- return
- }
- if _, ok := m.GetMember(addMember.NodeId); !ok {
- m.AddMember(addMember)
- }
- })
- go m.checkMaster()
- }
- func (m *DiscoveryNATS) checkMaster() {
- for {
- _, found := m.GetMember(m.masterMember.GetNodeId())
- if !found {
- m.registerToMaster()
- }
- time.Sleep(cnats.Get().ReconnectDelay())
- }
- }
- func (m *DiscoveryNATS) registerToMaster() {
- // register current node to master
- rsp, err := cnats.Get().Request(m.registerSubject, m.thisMemberBytes)
- if err != nil {
- clog.Warnf("register node to [master = %s] fail. [address = %s] [err = %s]",
- m.masterMember.GetNodeId(),
- cnats.Get().Address(),
- err,
- )
- return
- }
- clog.Infof("register node to [master = %s]. [member = %s]",
- m.masterMember.GetNodeId(),
- m.thisMember,
- )
- memberList := cproto.MemberList{}
- err = m.app.Serializer().Unmarshal(rsp.Data, &memberList)
- if err != nil {
- clog.Warnf("err = %s", err)
- return
- }
- for _, member := range memberList.GetList() {
- m.AddMember(member)
- }
- }
- func (m *DiscoveryNATS) Stop() {
- err := cnats.Get().Publish(m.unregisterSubject, m.thisMemberBytes)
- if err != nil {
- clog.Warnf("publish fail. err = %s", err)
- return
- }
- clog.Debugf("[nodeId = %s] unregister node to [master = %s]",
- m.app.NodeId(),
- m.masterMember.GetNodeId(),
- )
- }
- func (m *DiscoveryNATS) subscribe(subject string, cb nats.MsgHandler) {
- _, err := cnats.Get().Subscribe(subject, cb)
- if err != nil {
- clog.Warnf("subscribe fail. err = %s", err)
- return
- }
- }
|