agent.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package pomelo
  2. import (
  3. "fmt"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. cnet "github.com/mhaya/extend/net"
  8. cutils "github.com/mhaya/extend/utils"
  9. cfacade "github.com/mhaya/facade"
  10. clog "github.com/mhaya/logger"
  11. pomeloMessage "github.com/mhaya/net/parser/pomelo/message"
  12. pomeloPacket "github.com/mhaya/net/parser/pomelo/packet"
  13. cproto "github.com/mhaya/net/proto"
  14. "go.uber.org/zap/zapcore"
  15. )
  16. const (
  17. AgentInit int32 = 0
  18. AgentWaitAck int32 = 1
  19. AgentWorking int32 = 2
  20. AgentClosed int32 = 3
  21. )
  22. type (
  23. Agent struct {
  24. cfacade.IApplication // app
  25. conn net.Conn // low-level conn fd
  26. state int32 // current agent state
  27. session *cproto.Session // session
  28. chDie chan struct{} // wait for close
  29. chPending chan *pendingMessage // push message queue
  30. chWrite chan []byte // push bytes queue
  31. lastAt int64 // last heartbeat unix time stamp
  32. onCloseFunc []OnCloseFunc // on close agent
  33. }
  34. pendingMessage struct {
  35. typ pomeloMessage.Type // message type
  36. route string // message route(push)
  37. mid uint // response message id(response)
  38. payload interface{} // payload
  39. err bool // if it's an error
  40. }
  41. OnCloseFunc func(*Agent)
  42. )
  43. func NewAgent(app cfacade.IApplication, conn net.Conn, session *cproto.Session) Agent {
  44. agent := Agent{
  45. IApplication: app,
  46. conn: conn,
  47. state: AgentInit,
  48. session: session,
  49. chDie: make(chan struct{}),
  50. chPending: make(chan *pendingMessage, cmd.writeBacklog),
  51. chWrite: make(chan []byte, cmd.writeBacklog),
  52. lastAt: 0,
  53. onCloseFunc: nil,
  54. }
  55. agent.session.Ip = agent.RemoteAddr()
  56. agent.SetLastAt()
  57. if clog.PrintLevel(zapcore.DebugLevel) {
  58. clog.Debugf("[sid = %s,uid = %d] Agent create. [count = %d, ip = %s]",
  59. agent.SID(),
  60. agent.UID(),
  61. Count(),
  62. agent.RemoteAddr(),
  63. )
  64. }
  65. return agent
  66. }
  67. func (a *Agent) State() int32 {
  68. return a.state
  69. }
  70. func (a *Agent) SetState(state int32) bool {
  71. oldValue := atomic.SwapInt32(&a.state, state)
  72. return oldValue != state
  73. }
  74. func (a *Agent) Session() *cproto.Session {
  75. return a.session
  76. }
  77. func (a *Agent) UID() cfacade.UID {
  78. return a.session.Uid
  79. }
  80. func (a *Agent) SID() cfacade.SID {
  81. return a.session.Sid
  82. }
  83. func (a *Agent) Bind(uid cfacade.UID) error {
  84. return BindUID(a.SID(), uid)
  85. }
  86. func (a *Agent) IsBind() bool {
  87. return a.session.Uid > 0
  88. }
  89. func (a *Agent) Unbind() {
  90. Unbind(a.SID())
  91. }
  92. func (a *Agent) SetLastAt() {
  93. atomic.StoreInt64(&a.lastAt, time.Now().Unix())
  94. }
  95. func (a *Agent) SendRaw(bytes []byte) {
  96. a.chWrite <- bytes
  97. }
  98. func (a *Agent) SendPacket(typ pomeloPacket.Type, data []byte) {
  99. pkg, err := pomeloPacket.Encode(typ, data)
  100. if err != nil {
  101. clog.Warn(err)
  102. return
  103. }
  104. a.SendRaw(pkg)
  105. }
  106. func (a *Agent) Close() {
  107. if a.SetState(AgentClosed) {
  108. select {
  109. case <-a.chDie:
  110. default:
  111. close(a.chDie)
  112. }
  113. }
  114. }
  115. func (a *Agent) Run() {
  116. go a.writeChan()
  117. go a.readChan()
  118. }
  119. func (a *Agent) readChan() {
  120. defer func() {
  121. if clog.PrintLevel(zapcore.DebugLevel) {
  122. clog.Debugf("[sid = %s,uid = %d] Agent read chan exit.",
  123. a.SID(),
  124. a.UID(),
  125. )
  126. }
  127. a.Close()
  128. }()
  129. for {
  130. packets, isBreak, err := pomeloPacket.Read(a.conn)
  131. if isBreak || err != nil {
  132. return
  133. }
  134. if len(packets) < 1 {
  135. continue
  136. }
  137. for _, packet := range packets {
  138. a.processPacket(packet)
  139. }
  140. }
  141. }
  142. func (a *Agent) writeChan() {
  143. ticker := time.NewTicker(cmd.heartbeatTime)
  144. defer func() {
  145. if clog.PrintLevel(zapcore.DebugLevel) {
  146. clog.Debugf("[sid = %s,uid = %d] Agent write chan exit.", a.SID(), a.UID())
  147. }
  148. ticker.Stop()
  149. a.closeProcess()
  150. a.Close()
  151. }()
  152. var lastAt, deadline int64
  153. for {
  154. select {
  155. case <-a.chDie:
  156. {
  157. return
  158. }
  159. case <-ticker.C:
  160. {
  161. lastAt = atomic.LoadInt64(&a.lastAt)
  162. deadline = time.Now().Add(-cmd.heartbeatTime).Unix()
  163. if lastAt < deadline {
  164. if clog.PrintLevel(zapcore.DebugLevel) {
  165. clog.Debugf("[sid = %s,uid = %d] Check heartbeat timeout.", a.SID(), a.UID())
  166. }
  167. return
  168. }
  169. }
  170. case pending := <-a.chPending:
  171. {
  172. a.processPending(pending)
  173. }
  174. case bytes := <-a.chWrite:
  175. {
  176. a.write(bytes)
  177. }
  178. }
  179. }
  180. }
  181. func (a *Agent) closeProcess() {
  182. cutils.Try(func() {
  183. for _, fn := range a.onCloseFunc {
  184. fn(a)
  185. }
  186. }, func(errString string) {
  187. clog.Warn(errString)
  188. })
  189. a.Unbind()
  190. if err := a.conn.Close(); err != nil {
  191. clog.Debugf("[sid = %s,uid = %d] Agent connect closed. [error = %s]",
  192. a.SID(),
  193. a.UID(),
  194. err,
  195. )
  196. }
  197. if clog.PrintLevel(zapcore.DebugLevel) {
  198. clog.Debugf("[sid = %s,uid = %d] Agent closed. [count = %d, ip = %s]",
  199. a.SID(),
  200. a.UID(),
  201. Count(),
  202. a.RemoteAddr(),
  203. )
  204. }
  205. close(a.chPending)
  206. close(a.chWrite)
  207. }
  208. func (a *Agent) write(bytes []byte) {
  209. _, err := a.conn.Write(bytes)
  210. if err != nil {
  211. clog.Warn(err)
  212. }
  213. }
  214. func (a *Agent) processPacket(packet *pomeloPacket.Packet) {
  215. process, found := cmd.onPacketFuncMap[packet.Type()]
  216. if !found {
  217. if clog.PrintLevel(zapcore.DebugLevel) {
  218. clog.Warnf("[sid = %s,uid = %d] Packet type not found, close connect! [packet = %+v]",
  219. a.SID(),
  220. a.UID(),
  221. packet,
  222. )
  223. }
  224. a.Close()
  225. return
  226. }
  227. process(a, packet)
  228. // update last time
  229. a.SetLastAt()
  230. }
  231. func (a *Agent) RemoteAddr() string {
  232. if a.conn != nil {
  233. return cnet.GetIPV4(a.conn.RemoteAddr())
  234. }
  235. return ""
  236. }
  237. func (p *pendingMessage) String() string {
  238. return fmt.Sprintf("typ = %d, route = %s, mid = %d, payload = %v", p.typ, p.route, p.mid, p.payload)
  239. }
  240. func (a *Agent) processPending(data *pendingMessage) {
  241. payload, err := a.Serializer().Marshal(data.payload)
  242. if err != nil {
  243. clog.Warnf("[sid = %s,uid = %d] Payload marshal error. [data = %s]",
  244. a.SID(),
  245. a.UID(),
  246. data.String(),
  247. )
  248. return
  249. }
  250. // construct message and encode
  251. m := &pomeloMessage.Message{
  252. Type: data.typ,
  253. ID: data.mid,
  254. Route: data.route,
  255. Data: payload,
  256. Error: data.err,
  257. }
  258. // encode message
  259. em, err := pomeloMessage.Encode(m)
  260. if err != nil {
  261. clog.Warn(err)
  262. return
  263. }
  264. // encode packet
  265. a.SendPacket(pomeloPacket.Data, em)
  266. }
  267. func (a *Agent) sendPending(typ pomeloMessage.Type, route string, mid uint32, v interface{}, isError bool) {
  268. if a.state == AgentClosed {
  269. clog.Warnf("[sid = %s,uid = %d] Session is closed. [typ = %v, route = %s, mid = %d, val = %+v, err = %v]",
  270. a.SID(),
  271. a.UID(),
  272. typ,
  273. route,
  274. mid,
  275. v,
  276. isError,
  277. )
  278. return
  279. }
  280. if len(a.chPending) >= cmd.writeBacklog {
  281. clog.Warnf("[sid = %s,uid = %d] send buffer exceed. [typ = %v, route = %s, mid = %d, val = %+v, err = %v]",
  282. a.SID(),
  283. a.UID(),
  284. typ,
  285. route,
  286. mid,
  287. v,
  288. isError,
  289. )
  290. return
  291. }
  292. pending := &pendingMessage{
  293. typ: typ,
  294. mid: uint(mid),
  295. route: route,
  296. payload: v,
  297. err: isError,
  298. }
  299. a.chPending <- pending
  300. }
  301. func (a *Agent) Response(session *cproto.Session, v interface{}, isError ...bool) {
  302. a.ResponseMID(session.Mid, v, isError...)
  303. }
  304. func (a *Agent) ResponseCode(session *cproto.Session, statusCode int32, isError ...bool) {
  305. rsp := &cproto.Response{
  306. Code: statusCode,
  307. }
  308. a.ResponseMID(session.Mid, rsp, isError...)
  309. }
  310. func (a *Agent) ResponseMID(mid uint32, v interface{}, isError ...bool) {
  311. isErr := false
  312. if len(isError) > 0 {
  313. isErr = isError[0]
  314. }
  315. a.sendPending(pomeloMessage.Response, "", mid, v, isErr)
  316. if clog.PrintLevel(zapcore.DebugLevel) {
  317. clog.Debugf("[sid = %s,uid = %d] Response ok. [mid = %d, isError = %v]",
  318. a.SID(),
  319. a.UID(),
  320. mid,
  321. isErr,
  322. )
  323. }
  324. }
  325. func (a *Agent) Push(route string, val interface{}) {
  326. a.sendPending(pomeloMessage.Push, route, 0, val, false)
  327. if clog.PrintLevel(zapcore.DebugLevel) {
  328. clog.Debugf("[sid = %s,uid = %d] Push ok. [route = %s]",
  329. a.SID(),
  330. a.UID(),
  331. route,
  332. )
  333. }
  334. }
  335. func (a *Agent) Kick(reason interface{}, closed bool) {
  336. bytes, err := a.Serializer().Marshal(reason)
  337. if err != nil {
  338. clog.Warnf("[sid = %s,uid = %d] Kick marshal fail. [reason = {%+v}, err = %s]",
  339. a.SID(),
  340. a.UID(),
  341. reason,
  342. err,
  343. )
  344. }
  345. pkg, err := pomeloPacket.Encode(pomeloPacket.Kick, bytes)
  346. if err != nil {
  347. clog.Warnf("[sid = %s,uid = %d] Kick packet encode error.[reason = %+v, err = %s]",
  348. a.SID(),
  349. a.UID(),
  350. reason,
  351. err,
  352. )
  353. return
  354. }
  355. if clog.PrintLevel(zapcore.DebugLevel) {
  356. clog.Debugf("[sid = %s,uid = %d] Kick ok. [reason = %+v, closed = %v]",
  357. a.SID(),
  358. a.UID(),
  359. reason,
  360. closed,
  361. )
  362. }
  363. // 不进入pending chan,直接踢了
  364. a.write(pkg)
  365. if closed {
  366. a.Close()
  367. }
  368. }
  369. func (a *Agent) AddOnClose(fn OnCloseFunc) {
  370. if fn != nil {
  371. a.onCloseFunc = append(a.onCloseFunc, fn)
  372. }
  373. }