actor.go 8.5 KB


  1. package mhayaActor
  2. import (
  3. "strings"
  4. "time"
  5. cutils "github.com/mhaya/extend/utils"
  6. cfacade "github.com/mhaya/facade"
  7. clog "github.com/mhaya/logger"
  8. "go.uber.org/zap/zapcore"
  9. )
  10. /**
  11. - 每个Actor独立运行在一个goroutine中,所有的逻辑都是串行处理
  12. - Actor接收三种消息:本地消息(Local)、远程消息(Remote)、事件消息(Event)
  13. - 三种消息都有自己的队列(Queue),每个队列依据FIFO原则进行消费
  14. - 本地消息(Local),用于接收游戏客户端发送过来的本地消息
  15. - 远程消息(Remote),用于Actor之间调用的远程消息
  16. - 事件消息(Event),通过订阅/发布进行的事件消息
  17. - Actor可以创建多个子Actor(ChildActor),子Actor的消息由父Actor进行路由转发
  18. - Actor可以创建多个定时器(Timer)进行定时业务的处理
  19. - 通过cluster集群组件、discovery发现服务组件,进行跨节点的actor通信
  20. */
  21. var (
  22. _nilActor = Actor{}
  23. )
  24. var (
  25. InitState State = 0
  26. WorkerState State = 1
  27. FreeState State = 2
  28. StopState State = 3
  29. )
  30. type (
  31. State int
  32. Actor struct {
  33. system *System // actor system
  34. path *cfacade.ActorPath // actor path
  35. state State // actor state
  36. close chan struct{} // close flag
  37. handler cfacade.IActorHandler // actor handler
  38. localMail *mailbox // local message mailbox
  39. remoteMail *mailbox // remote message mailbox
  40. event *actorEvent // event
  41. child *actorChild // child actor
  42. timer *actorTimer // timer
  43. lastAt int64 // last process time
  44. arrivalElapsed int64 // arrival elapsed for message
  45. executionElapsed int64 // execution elapsed for message
  46. }
  47. )
  48. func (p *Actor) run() {
  49. p.onInit()
  50. defer p.onStop()
  51. for {
  52. if p.loop() {
  53. break
  54. }
  55. }
  56. }
  57. func (p *Actor) loop() bool {
  58. if p.state == StopState {
  59. if p.localMail.Count() < 1 &&
  60. p.remoteMail.Count() < 1 &&
  61. p.event.Count() < 1 {
  62. return true
  63. }
  64. }
  65. select {
  66. case <-p.localMail.C:
  67. {
  68. p.processLocal()
  69. }
  70. case <-p.remoteMail.C:
  71. {
  72. p.processRemote()
  73. }
  74. case <-p.event.C:
  75. {
  76. p.processEvent()
  77. }
  78. case <-p.close:
  79. {
  80. p.state = StopState
  81. }
  82. }
  83. return false
  84. }
  85. func (p *Actor) processLocal() {
  86. m := p.localMail.Pop()
  87. if m == nil {
  88. return
  89. }
  90. p.lastAt = time.Now().Unix()
  91. next, invoke := p.handler.OnLocalReceived(m)
  92. if invoke {
  93. p.invokeFunc(p.localMail, p.App(), p.system.localInvokeFunc, m)
  94. }
  95. if !next {
  96. return
  97. }
  98. if m.TargetPath().IsChild() {
  99. if p.path.IsChild() {
  100. p.invokeFunc(p.localMail, p.App(), p.system.localInvokeFunc, m)
  101. } else {
  102. if childActor, foundChild := p.findChildActor(m); foundChild {
  103. childActor.PostLocal(m)
  104. } else {
  105. clog.Warnf("Child actor not found. path = %s", m.Target)
  106. }
  107. }
  108. } else {
  109. p.invokeFunc(p.localMail, p.App(), p.system.localInvokeFunc, m)
  110. }
  111. }
  112. func (p *Actor) processRemote() {
  113. m := p.remoteMail.Pop()
  114. if m == nil {
  115. return
  116. }
  117. p.lastAt = time.Now().Unix()
  118. next, invoke := p.handler.OnRemoteReceived(m)
  119. if invoke {
  120. p.invokeFunc(p.remoteMail, p.App(), p.system.remoteInvokeFunc, m)
  121. }
  122. if !next {
  123. return
  124. }
  125. if m.TargetPath().IsChild() {
  126. if p.path.IsChild() {
  127. p.invokeFunc(p.remoteMail, p.App(), p.system.remoteInvokeFunc, m)
  128. } else {
  129. if childActor, foundChild := p.findChildActor(m); foundChild {
  130. childActor.PostRemote(m)
  131. } else {
  132. clog.Warnf("Child actor not found. path = %s", m.Target)
  133. }
  134. }
  135. } else {
  136. p.invokeFunc(p.remoteMail, p.App(), p.system.remoteInvokeFunc, m)
  137. }
  138. }
  139. func (p *Actor) processEvent() {
  140. eventData := p.event.Pop()
  141. if eventData == nil {
  142. return
  143. }
  144. p.lastAt = time.Now().Unix()
  145. p.event.funcInvoke(eventData)
  146. }
  147. func (p *Actor) invokeFunc(mb *mailbox, app cfacade.IApplication, fn cfacade.InvokeFunc, m *cfacade.Message) {
  148. funcInfo, found := mb.funcMap[m.FuncName]
  149. if !found {
  150. clog.Warnf("[%s] Function not found. [source = %s, target = %s -> %s]",
  151. mb.name,
  152. m.Source,
  153. m.Target,
  154. m.FuncName,
  155. )
  156. return
  157. }
  158. p.arrivalElapsed = m.PostTime - m.BuildTime
  159. if p.arrivalElapsed > p.system.arrivalTimeOut {
  160. clog.Warnf("[%s] Invoke timeout.[path = %s -> %s -> %s, postTime = %d, buildTime = %d, arrival = %dms]",
  161. mb.name,
  162. m.Source,
  163. m.Target,
  164. m.FuncName,
  165. m.PostTime,
  166. m.BuildTime,
  167. p.arrivalElapsed,
  168. )
  169. }
  170. now := time.Now().UnixMilli()
  171. defer func() {
  172. p.executionElapsed = time.Now().UnixMilli() - now
  173. if p.executionElapsed > p.system.executionTimeout {
  174. clog.Warnf("[%s] Invoke timeout.[source = %s, target = %s->%s, execution = %dms]",
  175. mb.name,
  176. m.Source,
  177. m.Target,
  178. m.FuncName,
  179. p.executionElapsed,
  180. )
  181. }
  182. if rev := recover(); rev != nil {
  183. clog.Errorf("[%s] Invoke error. [source = %s, target = %s->%s, type = %v]",
  184. mb.name,
  185. m.Source,
  186. m.Target,
  187. m.FuncName,
  188. funcInfo.InArgs,
  189. )
  190. }
  191. }()
  192. fn(app, funcInfo, m)
  193. }
  194. func (p *Actor) findChildActor(m *cfacade.Message) (*Actor, bool) {
  195. // 如果当前actor为子actor,则终止本次消息处理
  196. if p.path.IsChild() {
  197. clog.Warnf("[findChildActor] Child actor cannot be created again。[target = %s->%s]",
  198. m.Target,
  199. m.FuncName,
  200. )
  201. return nil, false
  202. }
  203. // 寻找childActor
  204. childActor, found := p.child.Get(m.TargetPath().ChildID)
  205. if !found {
  206. childActor, found = p.handler.OnFindChild(m)
  207. }
  208. if found {
  209. if cActor, ok := childActor.(*Actor); ok {
  210. return cActor, true
  211. }
  212. }
  213. return nil, false
  214. }
  215. func (p *Actor) onInit() {
  216. p.state = WorkerState
  217. p.handler.OnInit()
  218. }
  219. func (p *Actor) onStop() {
  220. cutils.Try(func() {
  221. close(p.close)
  222. if p.path.IsParent() {
  223. p.system.removeActor(p.ActorID())
  224. p.child.onStop()
  225. } else {
  226. if parent, found := p.system.GetActor(p.path.ActorID); found {
  227. parent.child.Remove(p.path.ChildID)
  228. }
  229. }
  230. p.handler.OnStop()
  231. p.timer.onStop()
  232. p.event.onStop()
  233. p.localMail.onStop()
  234. p.remoteMail.onStop()
  235. }, func(errString string) {
  236. clog.Error(errString)
  237. })
  238. p.system.wg.Done()
  239. }
  240. func (p *Actor) State() State {
  241. return p.state
  242. }
  243. func (p *Actor) App() cfacade.IApplication {
  244. return p.system.app
  245. }
  246. func (p *Actor) ActorID() string {
  247. if p.path.IsChild() {
  248. return p.path.ChildID
  249. }
  250. return p.path.ActorID
  251. }
  252. func (p *Actor) Path() *cfacade.ActorPath {
  253. return p.path
  254. }
  255. func (p *Actor) PathString() string {
  256. return p.path.String()
  257. }
  258. func (p *Actor) Call(targetPath, funcName string, arg interface{}) int32 {
  259. return p.system.Call(p.path.String(), targetPath, funcName, arg)
  260. }
  261. func (p *Actor) CallWait(targetPath, funcName string, arg interface{}, reply interface{}) int32 {
  262. return p.system.CallWait(p.path.String(), targetPath, funcName, arg, reply)
  263. }
  264. // LastAt second
  265. func (p *Actor) LastAt() int64 {
  266. return p.lastAt
  267. }
  268. func (p *Actor) Exit() {
  269. p.close <- struct{}{}
  270. if clog.PrintLevel(zapcore.DebugLevel) {
  271. clog.Debugf("[Exit] actor exit! path = %s", p.path)
  272. }
  273. }
  274. func (p *Actor) System() *System {
  275. return p.system
  276. }
  277. func (p *Actor) Local() IMailBox {
  278. return p.localMail
  279. }
  280. func (p *Actor) Remote() IMailBox {
  281. return p.remoteMail
  282. }
  283. func (p *Actor) Event() IEvent {
  284. return p.event
  285. }
  286. func (p *Actor) Child() cfacade.IActorChild {
  287. return p.child
  288. }
  289. func (p *Actor) Timer() ITimer {
  290. return p.timer
  291. }
  292. func (p *Actor) PostRemote(m *cfacade.Message) {
  293. p.remoteMail.Push(m)
  294. }
  295. func (p *Actor) PostLocal(m *cfacade.Message) {
  296. p.localMail.Push(m)
  297. }
  298. func (p *Actor) PostEvent(data cfacade.IEventData) {
  299. p.system.PostEvent(data)
  300. }
  301. func newActor(actorID, childID string, handler cfacade.IActorHandler, c *System) (Actor, error) {
  302. if strings.TrimSpace(actorID) == "" {
  303. clog.Error("[newActor] actor id is nil.")
  304. return _nilActor, ErrActorIDIsNil
  305. }
  306. thisActor := Actor{
  307. path: &cfacade.ActorPath{
  308. NodeID: c.NodeId(),
  309. ActorID: actorID,
  310. ChildID: childID,
  311. },
  312. state: InitState,
  313. system: c,
  314. close: make(chan struct{}, 1),
  315. handler: handler,
  316. lastAt: time.Now().Unix(),
  317. }
  318. localMailbox := newMailbox(LocalName)
  319. thisActor.localMail = &localMailbox
  320. remoteMailbox := newMailbox(RemoteName)
  321. thisActor.remoteMail = &remoteMailbox
  322. event := newEvent(&thisActor)
  323. thisActor.event = &event
  324. child := newChild(&thisActor)
  325. thisActor.child = &child
  326. timer := newTimer(&thisActor)
  327. thisActor.timer = &timer
  328. // register update timer func
  329. thisActor.remoteMail.Register(updateTimerFuncName, thisActor.timer._updateTimer_)
  330. // spawn load!
  331. actorLoad, ok := handler.(IActorLoader)
  332. if ok {
  333. actorLoad.load(thisActor)
  334. }
  335. c.wg.Add(1)
  336. return thisActor, nil
  337. }