123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- package pomelo
- import (
- "net"
- "time"
- ccode "github.com/mhaya/code"
- cfacade "github.com/mhaya/facade"
- clog "github.com/mhaya/logger"
- cactor "github.com/mhaya/net/actor"
- pomeloMessage "github.com/mhaya/net/parser/pomelo/message"
- ppacket "github.com/mhaya/net/parser/pomelo/packet"
- cproto "github.com/mhaya/net/proto"
- "github.com/nats-io/nuid"
- "go.uber.org/zap/zapcore"
- )
- type (
- actor struct {
- cactor.Base
- agentActorID string
- connectors []cfacade.IConnector
- onNewAgentFunc OnNewAgentFunc
- onInitFunc func()
- }
- OnNewAgentFunc func(newAgent *Agent)
- )
- func NewActor(agentActorID string) *actor {
- if agentActorID == "" {
- panic("agentActorID is empty.")
- }
- parser := &actor{
- agentActorID: agentActorID,
- connectors: make([]cfacade.IConnector, 0),
- onInitFunc: nil,
- }
- return parser
- }
- // OnInit Actor初始化前触发该函数
- func (p *actor) OnInit() {
- p.Remote().Register(ResponseFuncName, p.response)
- p.Remote().Register(PushFuncName, p.push)
- p.Remote().Register(KickFuncName, p.kick)
- p.Remote().Register(BroadcastName, p.broadcast)
- if p.onInitFunc != nil {
- p.onInitFunc()
- }
- }
- func (p *actor) SetOnInitFunc(fn func()) {
- p.onInitFunc = fn
- }
- func (p *actor) Load(app cfacade.IApplication) {
- if len(p.connectors) < 1 {
- panic("connectors is nil. Please call the AddConnector(...) method add IConnector.")
- }
- cmd.init(app)
- // Create agent actor
- if _, err := app.ActorSystem().CreateActor(p.agentActorID, p); err != nil {
- clog.Panicf("Create agent actor fail. err = %+v", err)
- }
- for _, connector := range p.connectors {
- connector.OnConnect(p.defaultOnConnectFunc)
- go connector.Start() // start connector!
- }
- }
- func (p *actor) AddConnector(connector cfacade.IConnector) {
- p.connectors = append(p.connectors, connector)
- }
- func (p *actor) Connectors() []cfacade.IConnector {
- return p.connectors
- }
- // defaultOnConnectFunc 创建新连接时,通过当前agentActor创建child agent actor
- func (p *actor) defaultOnConnectFunc(conn net.Conn) {
- session := &cproto.Session{
- Sid: nuid.Next(),
- AgentPath: p.Path().String(),
- Data: map[string]string{},
- }
- agent := NewAgent(p.App(), conn, session)
- if p.onNewAgentFunc != nil {
- p.onNewAgentFunc(&agent)
- }
- BindSID(&agent)
- agent.Run()
- }
- func (*actor) SetDictionary(dict map[string]uint16) {
- pomeloMessage.SetDictionary(dict)
- }
- func (*actor) SetDataCompression(compression bool) {
- pomeloMessage.SetDataCompression(compression)
- }
- func (*actor) SetWriteBacklog(size int) {
- cmd.writeBacklog = size
- }
- func (*actor) SetHeartbeat(t time.Duration) {
- if t.Seconds() < 1 {
- t = 60 * time.Second
- }
- cmd.heartbeatTime = t
- }
- func (*actor) SetSysData(key string, value interface{}) {
- cmd.sysData[key] = value
- }
- func (p *actor) SetOnNewAgent(fn OnNewAgentFunc) {
- p.onNewAgentFunc = fn
- }
- func (*actor) SetOnDataRoute(fn DataRouteFunc) {
- if fn != nil {
- cmd.onDataRouteFunc = fn
- }
- }
- func (*actor) SetOnPacket(typ ppacket.Type, fn PacketFunc) {
- cmd.onPacketFuncMap[typ] = fn
- }
- func (p *actor) response(rsp *cproto.PomeloResponse) {
- agent, found := GetAgent(rsp.Sid)
- if !found {
- if clog.PrintLevel(zapcore.DebugLevel) {
- clog.Debugf("[response] Not found agent. [rsp = %+v]", rsp)
- }
- return
- }
- if ccode.IsOK(rsp.Code) {
- agent.ResponseMID(rsp.Mid, rsp.Data, false)
- } else {
- errRsp := &cproto.Response{
- Code: rsp.Code,
- }
- agent.ResponseMID(rsp.Mid, errRsp, true)
- }
- }
- func (p *actor) push(rsp *cproto.PomeloPush) {
- agent, found := GetAgent(rsp.Sid)
- if !found {
- if clog.PrintLevel(zapcore.DebugLevel) {
- clog.Debugf("[push] Not found agent. [rsp = %+v]", rsp)
- }
- return
- }
- agent.Push(rsp.Route, rsp.Data)
- }
- func (p *actor) kick(rsp *cproto.PomeloKick) {
- agent, found := GetAgentWithUID(rsp.Uid)
- if !found {
- agent, found = GetAgent(rsp.Sid)
- }
- if found {
- agent.Kick(rsp.Reason, rsp.Close)
- }
- }
- func (p *actor) broadcast(rsp *cproto.PomeloBroadcastPush) {
- if rsp.AllUID {
- ForeachAgent(func(agent *Agent) {
- if agent.IsBind() {
- agent.Push(rsp.Route, rsp.Data)
- }
- })
- } else {
- for _, uid := range rsp.UidList {
- if agent, found := GetAgentWithUID(uid); found {
- agent.Push(rsp.Route, rsp.Data)
- }
- }
- }
- }
|