actor.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package simple
  2. import (
  3. "encoding/binary"
  4. "net"
  5. "time"
  6. cfacade "github.com/mhaya/facade"
  7. clog "github.com/mhaya/logger"
  8. cactor "github.com/mhaya/net/actor"
  9. cproto "github.com/mhaya/net/proto"
  10. "github.com/nats-io/nuid"
  11. "go.uber.org/zap/zapcore"
  12. )
  13. type (
  14. actor struct {
  15. cactor.Base
  16. agentActorID string
  17. connectors []cfacade.IConnector
  18. onNewAgentFunc OnNewAgentFunc
  19. }
  20. OnNewAgentFunc func(newAgent *Agent)
  21. )
  22. func NewActor(agentActorID string) *actor {
  23. if agentActorID == "" {
  24. panic("agentActorID is empty.")
  25. }
  26. parser := &actor{
  27. agentActorID: agentActorID,
  28. connectors: make([]cfacade.IConnector, 0),
  29. }
  30. return parser
  31. }
  32. // OnInit Actor初始化前触发该函数
  33. func (p *actor) OnInit() {
  34. p.Remote().Register(ResponseFuncName, p.response)
  35. }
  36. func (p *actor) Load(app cfacade.IApplication) {
  37. if len(p.connectors) < 1 {
  38. panic("Connectors is nil. Please call the AddConnector(...) method add IConnector.")
  39. }
  40. // Create agent actor
  41. if _, err := app.ActorSystem().CreateActor(p.agentActorID, p); err != nil {
  42. clog.Panicf("Create agent actor fail. err = %+v", err)
  43. }
  44. for _, connector := range p.connectors {
  45. connector.OnConnect(p.defaultOnConnectFunc)
  46. go connector.Start() // start connector!
  47. }
  48. }
  49. func (p *actor) AddConnector(connector cfacade.IConnector) {
  50. p.connectors = append(p.connectors, connector)
  51. }
  52. func (p *actor) Connectors() []cfacade.IConnector {
  53. return p.connectors
  54. }
  55. func (p *actor) AddNodeRoute(mid uint32, nodeRoute *NodeRoute) {
  56. AddNodeRoute(mid, nodeRoute)
  57. }
  58. // defaultOnConnectFunc 创建新连接时,通过当前agentActor创建child agent actor
  59. func (p *actor) defaultOnConnectFunc(conn net.Conn) {
  60. session := &cproto.Session{
  61. Sid: nuid.Next(),
  62. AgentPath: p.Path().String(),
  63. Data: map[string]string{},
  64. }
  65. agent := NewAgent(p.App(), conn, session)
  66. if p.onNewAgentFunc != nil {
  67. p.onNewAgentFunc(&agent)
  68. }
  69. BindSID(&agent)
  70. agent.Run()
  71. }
  72. func (p *actor) SetOnNewAgent(fn OnNewAgentFunc) {
  73. p.onNewAgentFunc = fn
  74. }
  75. func (p *actor) SetHeartbeatTime(t time.Duration) {
  76. SetHeartbeatTime(t)
  77. }
  78. func (p *actor) SetWriteBacklog(backlog int) {
  79. SetWriteBacklog(backlog)
  80. }
  81. func (p *actor) SetEndian(e binary.ByteOrder) {
  82. SetEndian(e)
  83. }
  84. func (*actor) SetOnDataRoute(fn DataRouteFunc) {
  85. if fn != nil {
  86. onDataRouteFunc = fn
  87. }
  88. }
  89. func (p *actor) response(rsp *cproto.PomeloResponse) {
  90. agent, found := GetAgent(rsp.Sid)
  91. if !found {
  92. if clog.PrintLevel(zapcore.DebugLevel) {
  93. clog.Debugf("[response] Not found agent. [rsp = %+v]", rsp)
  94. }
  95. return
  96. }
  97. agent.Response(rsp.Mid, rsp.Data)
  98. }