pomelo.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package pomelo
  2. import (
  3. cfacade "github.com/mhaya/facade"
  4. clog "github.com/mhaya/logger"
  5. pmessage "github.com/mhaya/net/parser/pomelo/message"
  6. cproto "github.com/mhaya/net/proto"
  7. )
  8. // DefaultDataRoute 默认的消息路由
  9. func DefaultDataRoute(agent *Agent, route *pmessage.Route, msg *pmessage.Message) {
  10. session := BuildSession(agent, msg)
  11. // current node
  12. if agent.NodeType() == route.NodeType() {
  13. targetPath := cfacade.NewChildPath(agent.NodeId(), route.HandleName(), session.Sid)
  14. LocalDataRoute(agent, session, route, msg, targetPath)
  15. return
  16. }
  17. if !session.IsBind() {
  18. clog.Warnf("[sid = %s,uid = %d] Session is not bind with UID. failed to forward message.[route = %s]",
  19. agent.SID(),
  20. agent.UID(),
  21. msg.Route,
  22. )
  23. return
  24. }
  25. member, found := agent.Discovery().Random(route.NodeType())
  26. if !found {
  27. return
  28. }
  29. targetPath := cfacade.NewPath(member.GetNodeId(), route.HandleName())
  30. err := ClusterLocalDataRoute(agent, session, route, msg, member.GetNodeId(), targetPath)
  31. if err != nil {
  32. clog.Warnf("[sid = %s,uid = %d,route = %s] cluster local data error. err= %v",
  33. agent.SID(),
  34. agent.UID(),
  35. msg.Route,
  36. err,
  37. )
  38. }
  39. }
  40. func LocalDataRoute(agent *Agent, session *cproto.Session, route *pmessage.Route, msg *pmessage.Message, targetPath string) {
  41. message := cfacade.GetMessage()
  42. message.Source = session.AgentPath
  43. message.Target = targetPath
  44. message.FuncName = route.Method()
  45. message.Session = session
  46. message.Args = msg.Data
  47. agent.ActorSystem().PostLocal(&message)
  48. }
  49. func ClusterLocalDataRoute(agent *Agent, session *cproto.Session, route *pmessage.Route, msg *pmessage.Message, nodeID, targetPath string) error {
  50. clusterPacket := cproto.GetClusterPacket()
  51. clusterPacket.SourcePath = session.AgentPath
  52. clusterPacket.TargetPath = targetPath
  53. clusterPacket.FuncName = route.Method()
  54. clusterPacket.Session = session // agent session
  55. clusterPacket.ArgBytes = msg.Data // packet -> message -> data
  56. return agent.Cluster().PublishLocal(nodeID, clusterPacket)
  57. }
  58. func BuildSession(agent *Agent, msg *pmessage.Message) *cproto.Session {
  59. agent.session.Mid = uint32(msg.ID)
  60. return agent.session
  61. }