123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- package simple
- import (
- "fmt"
- "net"
- "sync/atomic"
- "time"
- cnet "github.com/mhaya/extend/net"
- cutils "github.com/mhaya/extend/utils"
- cfacade "github.com/mhaya/facade"
- clog "github.com/mhaya/logger"
- cproto "github.com/mhaya/net/proto"
- "go.uber.org/zap/zapcore"
- )
- const (
- AgentInit int32 = 0
- AgentClosed int32 = 3
- )
- type (
- Agent struct {
- cfacade.IApplication // app
- conn net.Conn // low-level conn fd
- state int32 // current agent state
- session *cproto.Session // session
- chDie chan struct{} // wait for close
- chPending chan *pendingMessage // push message queue
- chWrite chan []byte // push bytes queue
- lastAt int64 // last heartbeat unix time stamp
- onCloseFunc []OnCloseFunc // on close agent
- }
- pendingMessage struct {
- mid uint32
- payload interface{}
- }
- OnCloseFunc func(*Agent)
- )
- func NewAgent(app cfacade.IApplication, conn net.Conn, session *cproto.Session) Agent {
- agent := Agent{
- IApplication: app,
- conn: conn,
- state: AgentInit,
- session: session,
- chDie: make(chan struct{}),
- chPending: make(chan *pendingMessage, writeBacklog),
- chWrite: make(chan []byte, writeBacklog),
- lastAt: 0,
- onCloseFunc: nil,
- }
- agent.session.Ip = agent.RemoteAddr()
- agent.SetLastAt()
- if clog.PrintLevel(zapcore.DebugLevel) {
- clog.Debugf("[sid = %s,uid = %d] Agent create. [count = %d, ip = %s]",
- agent.SID(),
- agent.UID(),
- Count(),
- agent.RemoteAddr(),
- )
- }
- return agent
- }
- func (a *Agent) State() int32 {
- return a.state
- }
- func (a *Agent) SetState(state int32) bool {
- oldValue := atomic.SwapInt32(&a.state, state)
- return oldValue != state
- }
- func (a *Agent) Session() *cproto.Session {
- return a.session
- }
- func (a *Agent) UID() cfacade.UID {
- return a.session.Uid
- }
- func (a *Agent) SID() cfacade.SID {
- return a.session.Sid
- }
- func (a *Agent) Bind(uid cfacade.UID) error {
- return BindUID(a.SID(), uid)
- }
- func (a *Agent) Unbind() {
- Unbind(a.SID())
- }
- func (a *Agent) SetLastAt() {
- atomic.StoreInt64(&a.lastAt, time.Now().Unix())
- }
- func (a *Agent) SendRaw(bytes []byte) {
- a.chWrite <- bytes
- }
- func (a *Agent) Close() {
- if a.SetState(AgentClosed) {
- select {
- case <-a.chDie:
- default:
- close(a.chDie)
- }
- }
- }
- func (a *Agent) Run() {
- go a.writeChan()
- go a.readChan()
- }
- func (a *Agent) readChan() {
- defer func() {
- if clog.PrintLevel(zapcore.DebugLevel) {
- clog.Debugf("[sid = %s,uid = %d] Agent read chan exit.",
- a.SID(),
- a.UID(),
- )
- }
- a.Close()
- }()
- for {
- msg, isBreak, err := ReadMessage(a.conn)
- if isBreak || err != nil {
- return
- }
- a.processPacket(&msg)
- }
- }
- func (a *Agent) writeChan() {
- ticker := time.NewTicker(heartbeatTime)
- defer func() {
- if clog.PrintLevel(zapcore.DebugLevel) {
- clog.Debugf("[sid = %s,uid = %d] Agent write chan exit.", a.SID(), a.UID())
- }
- ticker.Stop()
- a.closeProcess()
- a.Close()
- }()
- for {
- select {
- case <-a.chDie:
- {
- return
- }
- case <-ticker.C:
- {
- deadline := time.Now().Add(-heartbeatTime).Unix()
- if a.lastAt < deadline {
- if clog.PrintLevel(zapcore.DebugLevel) {
- clog.Debugf("[sid = %s,uid = %d] Check heartbeat timeout.", a.SID(), a.UID())
- }
- return
- }
- }
- case pending := <-a.chPending:
- {
- a.processPending(pending)
- }
- case bytes := <-a.chWrite:
- {
- a.write(bytes)
- }
- }
- }
- }
- func (a *Agent) closeProcess() {
- cutils.Try(func() {
- for _, fn := range a.onCloseFunc {
- fn(a)
- }
- }, func(errString string) {
- clog.Warn(errString)
- })
- a.Unbind()
- if err := a.conn.Close(); err != nil {
- clog.Debugf("[sid = %s,uid = %d] Agent connect closed. [error = %s]",
- a.SID(),
- a.UID(),
- err,
- )
- }
- if clog.PrintLevel(zapcore.DebugLevel) {
- clog.Debugf("[sid = %s,uid = %d] Agent closed. [count = %d, ip = %s]",
- a.SID(),
- a.UID(),
- Count(),
- a.RemoteAddr(),
- )
- }
- close(a.chPending)
- close(a.chWrite)
- }
- func (a *Agent) write(bytes []byte) {
- _, err := a.conn.Write(bytes)
- if err != nil {
- clog.Warn(err)
- }
- }
- func (a *Agent) processPacket(msg *Message) {
- nodeRoute, found := GetNodeRoute(msg.MID)
- if !found {
- if clog.PrintLevel(zapcore.DebugLevel) {
- clog.Warnf("[sid = %s,uid = %d] Route not found, close connect! [message = %+v]",
- a.SID(),
- a.UID(),
- msg,
- )
- }
- a.Close()
- return
- }
- onDataRouteFunc(a, msg, nodeRoute)
- // update last time
- a.SetLastAt()
- }
- func (a *Agent) RemoteAddr() string {
- if a.conn != nil {
- return cnet.GetIPV4(a.conn.RemoteAddr())
- }
- return ""
- }
- func (p *pendingMessage) String() string {
- return fmt.Sprintf("mid = %d, payload = %v", p.mid, p.payload)
- }
- func (a *Agent) processPending(pending *pendingMessage) {
- data, err := a.Serializer().Marshal(pending.payload)
- if err != nil {
- clog.Warnf("[sid = %s,uid = %d] Payload marshal error. [data = %s]",
- a.SID(),
- a.UID(),
- pending.String(),
- )
- return
- }
- // encode packet
- pkg, err := pack(pending.mid, data)
- if err != nil {
- clog.Warn(err)
- return
- }
- a.SendRaw(pkg)
- }
- func (a *Agent) sendPending(mid uint32, payload interface{}) {
- if a.state == AgentClosed {
- clog.Warnf("[sid = %s,uid = %d] Session is closed. [mid = %d, payload = %+v]",
- a.SID(),
- a.UID(),
- mid,
- payload,
- )
- return
- }
- if len(a.chPending) >= writeBacklog {
- clog.Warnf("[sid = %s,uid = %d] send buffer exceed. [mid = %d, payload = %+v]",
- a.SID(),
- a.UID(),
- mid,
- payload,
- )
- return
- }
- pending := &pendingMessage{
- mid: mid,
- payload: payload,
- }
- a.chPending <- pending
- }
- func (a *Agent) Response(mid uint32, v interface{}) {
- a.sendPending(mid, v)
- if clog.PrintLevel(zapcore.DebugLevel) {
- clog.Debugf("[sid = %s,uid = %d] Response ok. [mid = %d, val = %+v]",
- a.SID(),
- a.UID(),
- mid,
- v,
- )
- }
- }
- func (a *Agent) AddOnClose(fn OnCloseFunc) {
- if fn != nil {
- a.onCloseFunc = append(a.onCloseFunc, fn)
- }
- }
|