system.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. package mhayaActor
  2. import (
  3. "strings"
  4. "sync"
  5. "time"
  6. ccode "github.com/mhaya/code"
  7. cutils "github.com/mhaya/extend/utils"
  8. cfacade "github.com/mhaya/facade"
  9. clog "github.com/mhaya/logger"
  10. cproto "github.com/mhaya/net/proto"
  11. )
  12. type (
  13. // System Actor系统
  14. System struct {
  15. app cfacade.IApplication
  16. actorMap *sync.Map // key:actorID, value:*actor
  17. localInvokeFunc cfacade.InvokeFunc // default local func
  18. remoteInvokeFunc cfacade.InvokeFunc // default remote func
  19. wg *sync.WaitGroup // wait group
  20. callTimeout time.Duration // call调用超时
  21. arrivalTimeOut int64 // message到达超时(毫秒)
  22. executionTimeout int64 // 消息执行超时(毫秒)
  23. }
  24. )
  25. func NewSystem() *System {
  26. system := &System{
  27. actorMap: &sync.Map{},
  28. localInvokeFunc: InvokeLocalFunc,
  29. remoteInvokeFunc: InvokeRemoteFunc,
  30. wg: &sync.WaitGroup{},
  31. callTimeout: 60 * time.Second,
  32. arrivalTimeOut: 30000,
  33. executionTimeout: 30000,
  34. }
  35. return system
  36. }
  37. func (p *System) SetApp(app cfacade.IApplication) {
  38. p.app = app
  39. }
  40. func (p *System) NodeId() string {
  41. if p.app == nil {
  42. return ""
  43. }
  44. return p.app.NodeId()
  45. }
  46. func (p *System) Stop() {
  47. p.actorMap.Range(func(key, value any) bool {
  48. actor, ok := value.(*Actor)
  49. if ok {
  50. cutils.Try(func() {
  51. actor.Exit()
  52. }, func(err string) {
  53. clog.Warnf("[OnStop] - [actorID = %s, err = %s]", actor.path, err)
  54. })
  55. }
  56. return true
  57. })
  58. clog.Info("actor system stopping!")
  59. p.wg.Wait()
  60. clog.Info("actor system stopped!")
  61. }
  62. // GetIActor 根据ActorID获取IActor
  63. func (p *System) GetIActor(id string) (cfacade.IActor, bool) {
  64. return p.GetActor(id)
  65. }
  66. // GetActor 根据ActorID获取*actor
  67. func (p *System) GetActor(id string) (*Actor, bool) {
  68. actorValue, found := p.actorMap.Load(id)
  69. if !found {
  70. return nil, false
  71. }
  72. actor, found := actorValue.(*Actor)
  73. return actor, found
  74. }
  75. func (p *System) GetChildActor(actorID, childID string) (*Actor, bool) {
  76. parentActor, found := p.GetActor(actorID)
  77. if !found {
  78. return nil, found
  79. }
  80. return parentActor.child.GetActor(childID)
  81. }
  82. func (p *System) removeActor(actorID string) {
  83. p.actorMap.Delete(actorID)
  84. }
  85. // CreateActor 创建Actor
  86. func (p *System) CreateActor(id string, handler cfacade.IActorHandler) (cfacade.IActor, error) {
  87. if strings.TrimSpace(id) == "" {
  88. return nil, ErrActorIDIsNil
  89. }
  90. if actor, found := p.GetIActor(id); found {
  91. return actor, nil
  92. }
  93. thisActor, err := newActor(id, "", handler, p)
  94. if err != nil {
  95. return nil, err
  96. }
  97. p.actorMap.Store(id, &thisActor) // add to map
  98. go thisActor.run() // new actor is running!
  99. return &thisActor, nil
  100. }
  101. // Call 发送远程消息(不回复)
  102. func (p *System) Call(source, target, funcName string, arg interface{}) int32 {
  103. if target == "" {
  104. clog.Warnf("[Call] Target path is nil. [source = %s, target = %s, funcName = %s]",
  105. source,
  106. target,
  107. funcName,
  108. )
  109. return ccode.ActorPathIsNil
  110. }
  111. if len(funcName) < 1 {
  112. clog.Warnf("[Call] FuncName error. [source = %s, target = %s, funcName = %s]",
  113. source,
  114. target,
  115. funcName,
  116. )
  117. return ccode.ActorFuncNameError
  118. }
  119. targetPath, err := cfacade.ToActorPath(target)
  120. if err != nil {
  121. clog.Warnf("[Call] Target path error. [source = %s, target = %s, funcName = %s, err = %v]",
  122. source,
  123. target,
  124. funcName,
  125. err,
  126. )
  127. return ccode.ActorConvertPathError
  128. }
  129. if targetPath.NodeID != "" && targetPath.NodeID != p.NodeId() {
  130. clusterPacket := cproto.GetClusterPacket()
  131. clusterPacket.SourcePath = source
  132. clusterPacket.TargetPath = target
  133. clusterPacket.FuncName = funcName
  134. if arg != nil {
  135. argsBytes, err := p.app.Serializer().Marshal(arg)
  136. if err != nil {
  137. clog.Warnf("[Call] Marshal arg error. [targetPath = %s, error = %s]",
  138. target,
  139. err,
  140. )
  141. return ccode.ActorMarshalError
  142. }
  143. clusterPacket.ArgBytes = argsBytes
  144. }
  145. err = p.app.Cluster().PublishRemote(targetPath.NodeID, clusterPacket)
  146. if err != nil {
  147. clog.Warnf("[Call] Publish remote fail. [source = %s, target = %s, funcName = %s, err = %v]",
  148. source,
  149. target,
  150. funcName,
  151. err,
  152. )
  153. return ccode.ActorPublishRemoteError
  154. }
  155. } else {
  156. remoteMsg := cfacade.GetMessage()
  157. remoteMsg.Source = source
  158. remoteMsg.Target = target
  159. remoteMsg.FuncName = funcName
  160. remoteMsg.Args = arg
  161. if !p.PostRemote(&remoteMsg) {
  162. clog.Warnf("[Call] Post remote fail. [source = %s, target = %s, funcName = %s]", source, target, funcName)
  163. return ccode.ActorCallFail
  164. }
  165. }
  166. return ccode.OK
  167. }
  168. // CallWait 发送远程消息(等待回复)
  169. func (p *System) CallWait(source, target, funcName string, arg interface{}, reply interface{}) int32 {
  170. sourcePath, err := cfacade.ToActorPath(source)
  171. if err != nil {
  172. clog.Warnf("[CallWait] Source path error. [source = %s, target = %s, funcName = %s, err = %v]",
  173. source,
  174. target,
  175. funcName,
  176. err,
  177. )
  178. return ccode.ActorConvertPathError
  179. }
  180. targetPath, err := cfacade.ToActorPath(target)
  181. if err != nil {
  182. clog.Warnf("[CallWait] Target path error. [source = %s, target = %s, funcName = %s, err = %v]",
  183. source,
  184. target,
  185. funcName,
  186. err,
  187. )
  188. return ccode.ActorConvertPathError
  189. }
  190. if source == target {
  191. clog.Warnf("[CallWait] Source path is equal target. [source = %s, target = %s, funcName = %s]",
  192. source,
  193. target,
  194. funcName,
  195. )
  196. return ccode.ActorSourceEqualTarget
  197. }
  198. if len(funcName) < 1 {
  199. clog.Warnf("[CallWait] FuncName error. [source = %s, target = %s, funcName = %s]",
  200. source,
  201. target,
  202. funcName,
  203. )
  204. return ccode.ActorFuncNameError
  205. }
  206. // forward to remote actor
  207. if targetPath.NodeID != "" && targetPath.NodeID != sourcePath.NodeID {
  208. clusterPacket := cproto.BuildClusterPacket(source, target, funcName)
  209. if arg != nil {
  210. argsBytes, err := p.app.Serializer().Marshal(arg)
  211. if err != nil {
  212. clog.Warnf("[CallWait] Marshal arg error. [targetPath = %s, error = %s]", target, err)
  213. return ccode.ActorMarshalError
  214. }
  215. clusterPacket.ArgBytes = argsBytes
  216. }
  217. rsp := p.app.Cluster().RequestRemote(targetPath.NodeID, clusterPacket, p.callTimeout)
  218. if ccode.IsFail(rsp.Code) {
  219. return rsp.Code
  220. }
  221. if reply != nil {
  222. if err = p.app.Serializer().Unmarshal(rsp.Data, reply); err != nil {
  223. clog.Warnf("[CallWait] Marshal reply error. [targetPath = %s, error = %s]", target, err)
  224. return ccode.ActorMarshalError
  225. }
  226. }
  227. } else {
  228. message := cfacade.GetMessage()
  229. message.Source = source
  230. message.Target = target
  231. message.FuncName = funcName
  232. message.Args = arg
  233. message.ChanResult = make(chan interface{})
  234. var result interface{}
  235. if sourcePath.ActorID == targetPath.ActorID {
  236. if sourcePath.ChildID == targetPath.ChildID {
  237. return ccode.ActorSourceEqualTarget
  238. }
  239. childActor, found := p.GetChildActor(targetPath.ActorID, targetPath.ChildID)
  240. if !found {
  241. return ccode.ActorChildIDNotFound
  242. }
  243. childActor.PostRemote(&message)
  244. result = <-message.ChanResult
  245. } else {
  246. if !p.PostRemote(&message) {
  247. clog.Warnf("[CallWait] Post remote fail. [source = %s, target = %s, funcName = %s]", source, target, funcName)
  248. return ccode.ActorCallFail
  249. }
  250. result = <-message.ChanResult
  251. }
  252. if result != nil {
  253. rsp := result.(*cproto.Response)
  254. if rsp == nil {
  255. clog.Warnf("[CallWait] Response is nil. [targetPath = %s]",
  256. target,
  257. )
  258. return ccode.ActorCallFail
  259. }
  260. if ccode.IsFail(rsp.Code) {
  261. return rsp.Code
  262. }
  263. if reply != nil {
  264. if rsp.Data == nil {
  265. clog.Warnf("[CallWait] rsp.Data is nil. [targetPath = %s, error = %s]",
  266. target,
  267. err,
  268. )
  269. }
  270. err = p.app.Serializer().Unmarshal(rsp.Data, reply)
  271. if err != nil {
  272. clog.Warnf("[CallWait] Unmarshal reply error. [targetPath = %s, error = %s]",
  273. target,
  274. err,
  275. )
  276. return ccode.ActorUnmarshalError
  277. }
  278. }
  279. }
  280. }
  281. return ccode.OK
  282. }
  283. // PostRemote 提交远程消息
  284. func (p *System) PostRemote(m *cfacade.Message) bool {
  285. if m == nil {
  286. clog.Error("Message is nil.")
  287. return false
  288. }
  289. if targetActor, found := p.GetActor(m.TargetPath().ActorID); found {
  290. if targetActor.state == WorkerState {
  291. targetActor.PostRemote(m)
  292. }
  293. return true
  294. }
  295. clog.Warnf("[PostRemote] actor not found. [source = %s, target = %s -> %s]",
  296. m.Source,
  297. m.Target,
  298. m.FuncName,
  299. )
  300. return false
  301. }
  302. // PostLocal 提交本地消息
  303. func (p *System) PostLocal(m *cfacade.Message) bool {
  304. if m == nil {
  305. clog.Error("Message is nil.")
  306. return false
  307. }
  308. if targetActor, found := p.GetActor(m.TargetPath().ActorID); found {
  309. if targetActor.state == WorkerState {
  310. targetActor.PostLocal(m)
  311. }
  312. return true
  313. }
  314. clog.Warnf("[PostLocal] actor not found. [source = %s, target = %s -> %s]",
  315. m.Source,
  316. m.Target,
  317. m.FuncName,
  318. )
  319. return false
  320. }
  321. // PostEvent 提交事件
  322. func (p *System) PostEvent(data cfacade.IEventData) {
  323. if data == nil {
  324. clog.Error("[PostEvent] Event is nil.")
  325. return
  326. }
  327. p.actorMap.Range(func(key, value any) bool {
  328. if thisActor, found := value.(*Actor); found {
  329. if thisActor.state == WorkerState {
  330. thisActor.event.Push(data)
  331. }
  332. }
  333. return true
  334. })
  335. }
  336. func (p *System) SetLocalInvoke(fn cfacade.InvokeFunc) {
  337. if fn != nil {
  338. p.localInvokeFunc = fn
  339. }
  340. }
  341. func (p *System) SetRemoteInvoke(fn cfacade.InvokeFunc) {
  342. if fn != nil {
  343. p.remoteInvokeFunc = fn
  344. }
  345. }
  346. func (p *System) SetCallTimeout(d time.Duration) {
  347. p.callTimeout = d
  348. }
  349. func (p *System) SetArrivalTimeout(t int64) {
  350. if t > 1 {
  351. p.arrivalTimeOut = t
  352. }
  353. }
  354. func (p *System) SetExecutionTimeout(t int64) {
  355. if t > 1 {
  356. p.executionTimeout = t
  357. }
  358. }