invoke.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package mhayaActor
  2. import (
  3. "reflect"
  4. "google.golang.org/protobuf/proto"
  5. ccode "github.com/mhaya/code"
  6. cerror "github.com/mhaya/error"
  7. creflect "github.com/mhaya/extend/reflect"
  8. cutils "github.com/mhaya/extend/utils"
  9. cfacade "github.com/mhaya/facade"
  10. clog "github.com/mhaya/logger"
  11. cproto "github.com/mhaya/net/proto"
  12. )
  13. func InvokeLocalFunc(app cfacade.IApplication, fi *creflect.FuncInfo, m *cfacade.Message) {
  14. if app == nil {
  15. clog.Errorf("[InvokeLocalFunc] app is nil. [message = %+v]", m)
  16. return
  17. }
  18. EncodeLocalArgs(app, fi, m)
  19. values := make([]reflect.Value, 2)
  20. values[0] = reflect.ValueOf(m.Session) // session
  21. values[1] = reflect.ValueOf(m.Args) // args
  22. fi.Value.Call(values)
  23. }
  24. func InvokeRemoteFunc(app cfacade.IApplication, fi *creflect.FuncInfo, m *cfacade.Message) {
  25. if app == nil {
  26. clog.Errorf("[InvokeRemoteFunc] app is nil. [message = %+v]", m)
  27. return
  28. }
  29. EncodeRemoteArgs(app, fi, m)
  30. values := make([]reflect.Value, fi.InArgsLen)
  31. if fi.InArgsLen > 0 {
  32. values[0] = reflect.ValueOf(m.Args) // args
  33. }
  34. if m.IsCluster {
  35. cutils.Try(func() {
  36. rets := fi.Value.Call(values)
  37. rspCode, rspData := retValue(app.Serializer(), rets)
  38. retResponse(m.ClusterReply, &cproto.Response{
  39. Code: rspCode,
  40. Data: rspData,
  41. })
  42. }, func(errString string) {
  43. retResponse(m.ClusterReply, &cproto.Response{
  44. Code: ccode.RPCRemoteExecuteError,
  45. })
  46. clog.Errorf("[InvokeRemoteFunc] invoke error. [message = %+v, err = %s]", m, errString)
  47. })
  48. } else {
  49. cutils.Try(func() {
  50. if m.ChanResult == nil {
  51. fi.Value.Call(values)
  52. } else {
  53. rets := fi.Value.Call(values)
  54. rspCode, rspData := retValue(app.Serializer(), rets)
  55. m.ChanResult <- &cproto.Response{
  56. Code: rspCode,
  57. Data: rspData,
  58. }
  59. }
  60. }, func(errString string) {
  61. if m.ChanResult != nil {
  62. m.ChanResult <- nil
  63. }
  64. clog.Errorf("[remote] invoke error.[source = %s, target = %s -> %s, funcType = %v, err = %+v]",
  65. m.Source,
  66. m.Target,
  67. m.FuncName,
  68. fi.InArgs,
  69. errString,
  70. )
  71. })
  72. }
  73. }
  74. func EncodeRemoteArgs(app cfacade.IApplication, fi *creflect.FuncInfo, m *cfacade.Message) error {
  75. if m.IsCluster {
  76. if fi.InArgsLen == 0 {
  77. return nil
  78. }
  79. return EncodeArgs(app, fi, 0, m)
  80. }
  81. return nil
  82. }
  83. func EncodeLocalArgs(app cfacade.IApplication, fi *creflect.FuncInfo, m *cfacade.Message) error {
  84. return EncodeArgs(app, fi, 1, m)
  85. }
  86. func EncodeArgs(app cfacade.IApplication, fi *creflect.FuncInfo, index int, m *cfacade.Message) error {
  87. argBytes, ok := m.Args.([]byte)
  88. if !ok {
  89. return cerror.Errorf("Encode args error.[source = %s, target = %s -> %s, funcType = %v]",
  90. m.Source,
  91. m.Target,
  92. m.FuncName,
  93. fi.InArgs,
  94. )
  95. }
  96. argValue := reflect.New(fi.InArgs[index].Elem()).Interface()
  97. err := app.Serializer().Unmarshal(argBytes, argValue)
  98. if err != nil {
  99. return cerror.Errorf("Encode args unmarshal error.[source = %s, target = %s -> %s, funcType = %v]",
  100. m.Source,
  101. m.Target,
  102. m.FuncName,
  103. fi.InArgs,
  104. )
  105. }
  106. m.Args = argValue
  107. return nil
  108. }
  109. func retValue(serializer cfacade.ISerializer, rets []reflect.Value) (int32, []byte) {
  110. var (
  111. retsLen = len(rets)
  112. rspCode = ccode.OK
  113. rspData []byte
  114. )
  115. if retsLen == 1 {
  116. if val := rets[0].Interface(); val != nil {
  117. if c, ok := val.(int32); ok {
  118. rspCode = c
  119. }
  120. }
  121. } else if retsLen == 2 {
  122. if !rets[0].IsNil() {
  123. data, err := serializer.Marshal(rets[0].Interface())
  124. if err != nil {
  125. rspCode = ccode.RPCRemoteExecuteError
  126. clog.Warn(err)
  127. } else {
  128. rspData = data
  129. }
  130. }
  131. if val := rets[1].Interface(); val != nil {
  132. if c, ok := val.(int32); ok {
  133. rspCode = c
  134. }
  135. }
  136. }
  137. return rspCode, rspData
  138. }
  139. func retResponse(reply cfacade.IRespond, rsp *cproto.Response) {
  140. if reply != nil {
  141. rspData, _ := proto.Marshal(rsp)
  142. err := reply.Respond(rspData)
  143. if err != nil {
  144. clog.Warn(err)
  145. }
  146. }
  147. }