command.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package pomelo
  2. import (
  3. "time"
  4. jsoniter "github.com/json-iterator/go"
  5. cfacade "github.com/mhaya/facade"
  6. clog "github.com/mhaya/logger"
  7. pmessage "github.com/mhaya/net/parser/pomelo/message"
  8. ppacket "github.com/mhaya/net/parser/pomelo/packet"
  9. "go.uber.org/zap/zapcore"
  10. )
  11. type (
  12. Command struct {
  13. writeBacklog int
  14. sysData map[string]interface{}
  15. heartbeatTime time.Duration
  16. handshakeBytes []byte
  17. heartbeatBytes []byte
  18. onPacketFuncMap map[ppacket.Type]PacketFunc
  19. onDataRouteFunc DataRouteFunc
  20. }
  21. PacketFunc func(agent *Agent, packet *ppacket.Packet)
  22. DataRouteFunc func(agent *Agent, route *pmessage.Route, msg *pmessage.Message)
  23. )
  24. const (
  25. DataHeartbeat = "heartbeat"
  26. DataDict = "dict"
  27. DataSerializer = "serializer"
  28. )
  29. var (
  30. cmd = Command{
  31. writeBacklog: 64,
  32. sysData: make(map[string]interface{}),
  33. heartbeatTime: 60 * time.Second,
  34. handshakeBytes: make([]byte, 0),
  35. heartbeatBytes: make([]byte, 0),
  36. onPacketFuncMap: make(map[ppacket.Type]PacketFunc, 4),
  37. onDataRouteFunc: DefaultDataRoute,
  38. }
  39. )
  40. func (p *Command) init(app cfacade.IApplication) {
  41. p.setData(DataHeartbeat, p.heartbeatTime.Seconds())
  42. p.setData(DataDict, pmessage.GetDictionary())
  43. p.setData(DataSerializer, app.Serializer().Name())
  44. p.setHandshakeBytes()
  45. p.setHeartbeatBytes()
  46. p.setOnPacketFunc()
  47. }
  48. func (p *Command) setData(name string, value interface{}) {
  49. if _, found := p.sysData[name]; !found {
  50. p.sysData[name] = value
  51. }
  52. }
  53. func (p *Command) setHandshakeBytes() {
  54. handshakeData := map[string]interface{}{
  55. "code": 200,
  56. "sys": p.sysData,
  57. }
  58. handshakeBytes, err := jsoniter.Marshal(handshakeData)
  59. if err != nil {
  60. clog.Error(err)
  61. return
  62. }
  63. p.handshakeBytes, err = ppacket.Encode(ppacket.Handshake, handshakeBytes)
  64. if err != nil {
  65. clog.Error(err)
  66. return
  67. }
  68. clog.Infof("[initCommand] handshake data = %v", handshakeData)
  69. }
  70. func (p *Command) setHeartbeatBytes() {
  71. heartbeatBytes, err := ppacket.Encode(ppacket.Heartbeat, nil)
  72. if err != nil {
  73. clog.Error(err)
  74. return
  75. }
  76. p.heartbeatBytes = heartbeatBytes
  77. }
  78. func (p *Command) setOnPacketFunc() {
  79. packetFuncMaps := map[ppacket.Type]PacketFunc{
  80. ppacket.Handshake: handshakeCommand,
  81. ppacket.HandshakeAck: handshakeACKCommand,
  82. ppacket.Heartbeat: heartbeatCommand,
  83. ppacket.Data: dataCommand,
  84. }
  85. for name, packetFunc := range packetFuncMaps {
  86. _, found := p.onPacketFuncMap[name]
  87. if !found {
  88. p.onPacketFuncMap[name] = packetFunc
  89. }
  90. }
  91. }
  92. func handshakeCommand(agent *Agent, _ *ppacket.Packet) {
  93. agent.SetState(AgentWaitAck)
  94. agent.SendRaw(cmd.handshakeBytes)
  95. if clog.PrintLevel(zapcore.DebugLevel) {
  96. clog.Debugf("[sid = %s,uid = %d] Request handshake. [address = %s]",
  97. agent.SID(),
  98. agent.UID(),
  99. agent.RemoteAddr(),
  100. )
  101. }
  102. }
  103. func handshakeACKCommand(agent *Agent, _ *ppacket.Packet) {
  104. agent.SetState(AgentWorking)
  105. if clog.PrintLevel(zapcore.DebugLevel) {
  106. clog.Debugf("[sid = %s,uid = %d] request handshakeACK. [address = %s]",
  107. agent.SID(),
  108. agent.UID(),
  109. agent.RemoteAddr(),
  110. )
  111. }
  112. }
  113. func heartbeatCommand(agent *Agent, _ *ppacket.Packet) {
  114. agent.SendRaw(cmd.heartbeatBytes)
  115. }
  116. func dataCommand(agent *Agent, pkg *ppacket.Packet) {
  117. if agent.State() != AgentWorking {
  118. if clog.PrintLevel(zapcore.DebugLevel) {
  119. clog.Warnf("[sid = %s,uid = %d] Data State is not working. [state = %d]",
  120. agent.SID(),
  121. agent.UID(),
  122. agent.State(),
  123. )
  124. }
  125. return
  126. }
  127. msg, err := pmessage.Decode(pkg.Data())
  128. if err != nil {
  129. if clog.PrintLevel(zapcore.DebugLevel) {
  130. clog.Warnf("[sid = %s,uid = %d] Data message decode error. [data = %s, error = %s]",
  131. agent.SID(),
  132. agent.UID(),
  133. pkg.Data(),
  134. err,
  135. )
  136. }
  137. return
  138. }
  139. route, err := pmessage.DecodeRoute(msg.Route)
  140. if err != nil {
  141. if clog.PrintLevel(zapcore.DebugLevel) {
  142. clog.Warnf("[sid = %s,uid = %d] Data Message decode route error. [data = %s, error = %s]",
  143. agent.SID(),
  144. agent.UID(),
  145. pkg.Data(),
  146. err,
  147. )
  148. }
  149. return
  150. }
  151. cmd.onDataRouteFunc(agent, route, &msg)
  152. }