agent.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. package simple
  2. import (
  3. "fmt"
  4. "net"
  5. "sync/atomic"
  6. "time"
  7. cnet "github.com/mhaya/extend/net"
  8. cutils "github.com/mhaya/extend/utils"
  9. cfacade "github.com/mhaya/facade"
  10. clog "github.com/mhaya/logger"
  11. cproto "github.com/mhaya/net/proto"
  12. "go.uber.org/zap/zapcore"
  13. )
  14. const (
  15. AgentInit int32 = 0
  16. AgentClosed int32 = 3
  17. )
  18. type (
  19. Agent struct {
  20. cfacade.IApplication // app
  21. conn net.Conn // low-level conn fd
  22. state int32 // current agent state
  23. session *cproto.Session // session
  24. chDie chan struct{} // wait for close
  25. chPending chan *pendingMessage // push message queue
  26. chWrite chan []byte // push bytes queue
  27. lastAt int64 // last heartbeat unix time stamp
  28. onCloseFunc []OnCloseFunc // on close agent
  29. }
  30. pendingMessage struct {
  31. mid uint32
  32. payload interface{}
  33. }
  34. OnCloseFunc func(*Agent)
  35. )
  36. func NewAgent(app cfacade.IApplication, conn net.Conn, session *cproto.Session) Agent {
  37. agent := Agent{
  38. IApplication: app,
  39. conn: conn,
  40. state: AgentInit,
  41. session: session,
  42. chDie: make(chan struct{}),
  43. chPending: make(chan *pendingMessage, writeBacklog),
  44. chWrite: make(chan []byte, writeBacklog),
  45. lastAt: 0,
  46. onCloseFunc: nil,
  47. }
  48. agent.session.Ip = agent.RemoteAddr()
  49. agent.SetLastAt()
  50. if clog.PrintLevel(zapcore.DebugLevel) {
  51. clog.Debugf("[sid = %s,uid = %d] Agent create. [count = %d, ip = %s]",
  52. agent.SID(),
  53. agent.UID(),
  54. Count(),
  55. agent.RemoteAddr(),
  56. )
  57. }
  58. return agent
  59. }
  60. func (a *Agent) State() int32 {
  61. return a.state
  62. }
  63. func (a *Agent) SetState(state int32) bool {
  64. oldValue := atomic.SwapInt32(&a.state, state)
  65. return oldValue != state
  66. }
  67. func (a *Agent) Session() *cproto.Session {
  68. return a.session
  69. }
  70. func (a *Agent) UID() cfacade.UID {
  71. return a.session.Uid
  72. }
  73. func (a *Agent) SID() cfacade.SID {
  74. return a.session.Sid
  75. }
  76. func (a *Agent) Bind(uid cfacade.UID) error {
  77. return BindUID(a.SID(), uid)
  78. }
  79. func (a *Agent) Unbind() {
  80. Unbind(a.SID())
  81. }
  82. func (a *Agent) SetLastAt() {
  83. atomic.StoreInt64(&a.lastAt, time.Now().Unix())
  84. }
  85. func (a *Agent) SendRaw(bytes []byte) {
  86. a.chWrite <- bytes
  87. }
  88. func (a *Agent) Close() {
  89. if a.SetState(AgentClosed) {
  90. select {
  91. case <-a.chDie:
  92. default:
  93. close(a.chDie)
  94. }
  95. }
  96. }
  97. func (a *Agent) Run() {
  98. go a.writeChan()
  99. go a.readChan()
  100. }
  101. func (a *Agent) readChan() {
  102. defer func() {
  103. if clog.PrintLevel(zapcore.DebugLevel) {
  104. clog.Debugf("[sid = %s,uid = %d] Agent read chan exit.",
  105. a.SID(),
  106. a.UID(),
  107. )
  108. }
  109. a.Close()
  110. }()
  111. for {
  112. msg, isBreak, err := ReadMessage(a.conn)
  113. if isBreak || err != nil {
  114. return
  115. }
  116. a.processPacket(&msg)
  117. }
  118. }
  119. func (a *Agent) writeChan() {
  120. ticker := time.NewTicker(heartbeatTime)
  121. defer func() {
  122. if clog.PrintLevel(zapcore.DebugLevel) {
  123. clog.Debugf("[sid = %s,uid = %d] Agent write chan exit.", a.SID(), a.UID())
  124. }
  125. ticker.Stop()
  126. a.closeProcess()
  127. a.Close()
  128. }()
  129. for {
  130. select {
  131. case <-a.chDie:
  132. {
  133. return
  134. }
  135. case <-ticker.C:
  136. {
  137. deadline := time.Now().Add(-heartbeatTime).Unix()
  138. if a.lastAt < deadline {
  139. if clog.PrintLevel(zapcore.DebugLevel) {
  140. clog.Debugf("[sid = %s,uid = %d] Check heartbeat timeout.", a.SID(), a.UID())
  141. }
  142. return
  143. }
  144. }
  145. case pending := <-a.chPending:
  146. {
  147. a.processPending(pending)
  148. }
  149. case bytes := <-a.chWrite:
  150. {
  151. a.write(bytes)
  152. }
  153. }
  154. }
  155. }
  156. func (a *Agent) closeProcess() {
  157. cutils.Try(func() {
  158. for _, fn := range a.onCloseFunc {
  159. fn(a)
  160. }
  161. }, func(errString string) {
  162. clog.Warn(errString)
  163. })
  164. a.Unbind()
  165. if err := a.conn.Close(); err != nil {
  166. clog.Debugf("[sid = %s,uid = %d] Agent connect closed. [error = %s]",
  167. a.SID(),
  168. a.UID(),
  169. err,
  170. )
  171. }
  172. if clog.PrintLevel(zapcore.DebugLevel) {
  173. clog.Debugf("[sid = %s,uid = %d] Agent closed. [count = %d, ip = %s]",
  174. a.SID(),
  175. a.UID(),
  176. Count(),
  177. a.RemoteAddr(),
  178. )
  179. }
  180. close(a.chPending)
  181. close(a.chWrite)
  182. }
  183. func (a *Agent) write(bytes []byte) {
  184. _, err := a.conn.Write(bytes)
  185. if err != nil {
  186. clog.Warn(err)
  187. }
  188. }
  189. func (a *Agent) processPacket(msg *Message) {
  190. nodeRoute, found := GetNodeRoute(msg.MID)
  191. if !found {
  192. if clog.PrintLevel(zapcore.DebugLevel) {
  193. clog.Warnf("[sid = %s,uid = %d] Route not found, close connect! [message = %+v]",
  194. a.SID(),
  195. a.UID(),
  196. msg,
  197. )
  198. }
  199. a.Close()
  200. return
  201. }
  202. onDataRouteFunc(a, msg, nodeRoute)
  203. // update last time
  204. a.SetLastAt()
  205. }
  206. func (a *Agent) RemoteAddr() string {
  207. if a.conn != nil {
  208. return cnet.GetIPV4(a.conn.RemoteAddr())
  209. }
  210. return ""
  211. }
  212. func (p *pendingMessage) String() string {
  213. return fmt.Sprintf("mid = %d, payload = %v", p.mid, p.payload)
  214. }
  215. func (a *Agent) processPending(pending *pendingMessage) {
  216. data, err := a.Serializer().Marshal(pending.payload)
  217. if err != nil {
  218. clog.Warnf("[sid = %s,uid = %d] Payload marshal error. [data = %s]",
  219. a.SID(),
  220. a.UID(),
  221. pending.String(),
  222. )
  223. return
  224. }
  225. // encode packet
  226. pkg, err := pack(pending.mid, data)
  227. if err != nil {
  228. clog.Warn(err)
  229. return
  230. }
  231. a.SendRaw(pkg)
  232. }
  233. func (a *Agent) sendPending(mid uint32, payload interface{}) {
  234. if a.state == AgentClosed {
  235. clog.Warnf("[sid = %s,uid = %d] Session is closed. [mid = %d, payload = %+v]",
  236. a.SID(),
  237. a.UID(),
  238. mid,
  239. payload,
  240. )
  241. return
  242. }
  243. if len(a.chPending) >= writeBacklog {
  244. clog.Warnf("[sid = %s,uid = %d] send buffer exceed. [mid = %d, payload = %+v]",
  245. a.SID(),
  246. a.UID(),
  247. mid,
  248. payload,
  249. )
  250. return
  251. }
  252. pending := &pendingMessage{
  253. mid: mid,
  254. payload: payload,
  255. }
  256. a.chPending <- pending
  257. }
  258. func (a *Agent) Response(mid uint32, v interface{}) {
  259. a.sendPending(mid, v)
  260. if clog.PrintLevel(zapcore.DebugLevel) {
  261. clog.Debugf("[sid = %s,uid = %d] Response ok. [mid = %d, val = %+v]",
  262. a.SID(),
  263. a.UID(),
  264. mid,
  265. v,
  266. )
  267. }
  268. }
  269. func (a *Agent) AddOnClose(fn OnCloseFunc) {
  270. if fn != nil {
  271. a.onCloseFunc = append(a.onCloseFunc, fn)
  272. }
  273. }