actor.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package pomelo
  2. import (
  3. "net"
  4. "time"
  5. ccode "github.com/mhaya/code"
  6. cfacade "github.com/mhaya/facade"
  7. clog "github.com/mhaya/logger"
  8. cactor "github.com/mhaya/net/actor"
  9. pomeloMessage "github.com/mhaya/net/parser/pomelo/message"
  10. ppacket "github.com/mhaya/net/parser/pomelo/packet"
  11. cproto "github.com/mhaya/net/proto"
  12. "github.com/nats-io/nuid"
  13. "go.uber.org/zap/zapcore"
  14. )
  15. type (
  16. actor struct {
  17. cactor.Base
  18. agentActorID string
  19. connectors []cfacade.IConnector
  20. onNewAgentFunc OnNewAgentFunc
  21. onInitFunc func()
  22. }
  23. OnNewAgentFunc func(newAgent *Agent)
  24. )
  25. func NewActor(agentActorID string) *actor {
  26. if agentActorID == "" {
  27. panic("agentActorID is empty.")
  28. }
  29. parser := &actor{
  30. agentActorID: agentActorID,
  31. connectors: make([]cfacade.IConnector, 0),
  32. onInitFunc: nil,
  33. }
  34. return parser
  35. }
  36. // OnInit Actor初始化前触发该函数
  37. func (p *actor) OnInit() {
  38. p.Remote().Register(ResponseFuncName, p.response)
  39. p.Remote().Register(PushFuncName, p.push)
  40. p.Remote().Register(KickFuncName, p.kick)
  41. p.Remote().Register(BroadcastName, p.broadcast)
  42. if p.onInitFunc != nil {
  43. p.onInitFunc()
  44. }
  45. }
  46. func (p *actor) SetOnInitFunc(fn func()) {
  47. p.onInitFunc = fn
  48. }
  49. func (p *actor) Load(app cfacade.IApplication) {
  50. if len(p.connectors) < 1 {
  51. panic("connectors is nil. Please call the AddConnector(...) method add IConnector.")
  52. }
  53. cmd.init(app)
  54. // Create agent actor
  55. if _, err := app.ActorSystem().CreateActor(p.agentActorID, p); err != nil {
  56. clog.Panicf("Create agent actor fail. err = %+v", err)
  57. }
  58. for _, connector := range p.connectors {
  59. connector.OnConnect(p.defaultOnConnectFunc)
  60. go connector.Start() // start connector!
  61. }
  62. }
  63. func (p *actor) AddConnector(connector cfacade.IConnector) {
  64. p.connectors = append(p.connectors, connector)
  65. }
  66. func (p *actor) Connectors() []cfacade.IConnector {
  67. return p.connectors
  68. }
  69. // defaultOnConnectFunc 创建新连接时,通过当前agentActor创建child agent actor
  70. func (p *actor) defaultOnConnectFunc(conn net.Conn) {
  71. session := &cproto.Session{
  72. Sid: nuid.Next(),
  73. AgentPath: p.Path().String(),
  74. Data: map[string]string{},
  75. }
  76. agent := NewAgent(p.App(), conn, session)
  77. if p.onNewAgentFunc != nil {
  78. p.onNewAgentFunc(&agent)
  79. }
  80. BindSID(&agent)
  81. agent.Run()
  82. }
  83. func (*actor) SetDictionary(dict map[string]uint16) {
  84. pomeloMessage.SetDictionary(dict)
  85. }
  86. func (*actor) SetDataCompression(compression bool) {
  87. pomeloMessage.SetDataCompression(compression)
  88. }
  89. func (*actor) SetWriteBacklog(size int) {
  90. cmd.writeBacklog = size
  91. }
  92. func (*actor) SetHeartbeat(t time.Duration) {
  93. if t.Seconds() < 1 {
  94. t = 60 * time.Second
  95. }
  96. cmd.heartbeatTime = t
  97. }
  98. func (*actor) SetSysData(key string, value interface{}) {
  99. cmd.sysData[key] = value
  100. }
  101. func (p *actor) SetOnNewAgent(fn OnNewAgentFunc) {
  102. p.onNewAgentFunc = fn
  103. }
  104. func (*actor) SetOnDataRoute(fn DataRouteFunc) {
  105. if fn != nil {
  106. cmd.onDataRouteFunc = fn
  107. }
  108. }
  109. func (*actor) SetOnPacket(typ ppacket.Type, fn PacketFunc) {
  110. cmd.onPacketFuncMap[typ] = fn
  111. }
  112. func (p *actor) response(rsp *cproto.PomeloResponse) {
  113. agent, found := GetAgent(rsp.Sid)
  114. if !found {
  115. if clog.PrintLevel(zapcore.DebugLevel) {
  116. clog.Debugf("[response] Not found agent. [rsp = %+v]", rsp)
  117. }
  118. return
  119. }
  120. if ccode.IsOK(rsp.Code) {
  121. agent.ResponseMID(rsp.Mid, rsp.Data, false)
  122. } else {
  123. errRsp := &cproto.Response{
  124. Code: rsp.Code,
  125. }
  126. agent.ResponseMID(rsp.Mid, errRsp, true)
  127. }
  128. }
  129. func (p *actor) push(rsp *cproto.PomeloPush) {
  130. agent, found := GetAgent(rsp.Sid)
  131. if !found {
  132. if clog.PrintLevel(zapcore.DebugLevel) {
  133. clog.Debugf("[push] Not found agent. [rsp = %+v]", rsp)
  134. }
  135. return
  136. }
  137. agent.Push(rsp.Route, rsp.Data)
  138. }
  139. func (p *actor) kick(rsp *cproto.PomeloKick) {
  140. agent, found := GetAgentWithUID(rsp.Uid)
  141. if !found {
  142. agent, found = GetAgent(rsp.Sid)
  143. }
  144. if found {
  145. agent.Kick(rsp.Reason, rsp.Close)
  146. }
  147. }
  148. func (p *actor) broadcast(rsp *cproto.PomeloBroadcastPush) {
  149. if rsp.AllUID {
  150. ForeachAgent(func(agent *Agent) {
  151. if agent.IsBind() {
  152. agent.Push(rsp.Route, rsp.Data)
  153. }
  154. })
  155. } else {
  156. for _, uid := range rsp.UidList {
  157. if agent, found := GetAgentWithUID(uid); found {
  158. agent.Push(rsp.Route, rsp.Data)
  159. }
  160. }
  161. }
  162. }