message.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. package pomeloMessage
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. cerr "github.com/mhaya/error"
  6. ccompress "github.com/mhaya/extend/compress"
  7. )
  8. var (
  9. nilMessage = Message{}
  10. )
  11. // Message represents a unmarshaled message or a message which to be marshaled
  12. // message协议的主要作用是封装消息头,包括route和消息类型两部分,
  13. // 不同的消息类型有着不同的消息头,在消息头里面可能要打入message id(即requestId)和route信息。
  14. // 由于可能会有route压缩,而且对于服务端push的消息,message id为空,对于客户端请求的响应,route为空
  15. // 消息头分为三部分,flag,message id,route。
  16. // 如下图所示:
  17. // flag(1byte) + message id(0~5byte) + route(0~256bytes)
  18. // flag位是必须的,占用一个byte,它决定了后面的消息类型和内容的格式;
  19. // message id和route则是可选的。
  20. // 其中message id采用varints 128变长编码方式,根据值的大小,长度在0~5byte之间。
  21. // route则根据消息类型以及内容的大小,长度在0~255byte之间。
  22. //
  23. // flag占用message头的第一个byte,其内容如下
  24. // preserved(4bits) + message type(3 bits) + route(1bit)
  25. // 现在只用到了其中的4个bit,这四个bit包括两部分,占用3个bit的message type字段和占用1个bit的route标识,其中:
  26. // message type用来标识消息类型,范围为0~7,
  27. //
  28. // 消息类型: 不同类型的消息,对应不同消息头,消息类型通过flag字段的第2-4位来确定,其对应关系以及相应的消息头如下图:
  29. //
  30. // 现在消息共有四类,request,notify,response,push,值的范围是0~3。
  31. // 不同的消息类型有着不同的消息内容,下面会有详细分析。
  32. // 最后一位的route表示route是否压缩,影响route字段的长度。 这两部分之间相互独立,互不影响。
  33. // request ----000- <message id> <route>
  34. // notify ----001- <route>
  35. // response ----010- <message id>
  36. // push ----011- <route>
  37. //
  38. // 路由压缩标志
  39. // 上图是不同的flag标志对应的route字段的内容:
  40. // flag的最后一位为1时,表示路由压缩,需要通过查询字典来获取route;
  41. // flag最后一位为0是,后面route则由一个uInt8的byte,用来表示route的字节长度。
  42. // 之后是通过utf8编码后的route字 符串,其长度就是前面一位byte的uInt8的值,因此route的长度最大支持256B。
  43. type Message struct {
  44. Type Type // message type 4中消息类型
  45. ID uint // unique id, zero while notify mode 消息id(request response)
  46. Route string // route for locating service 消息路由
  47. Data []byte // payload 消息体的原始数据
  48. routeCompressed bool // is route Compressed 是否启用路由压缩
  49. Error bool // response error
  50. }
  51. func New() Message {
  52. return Message{}
  53. }
  54. func (t *Message) String() string {
  55. return fmt.Sprintf(
  56. "Type: %s, ID: %d, Route: %s, RouteCompressed: %t, Data: %v, BodyLength: %d, Error:%v",
  57. t.Type.String(),
  58. t.ID,
  59. t.Route,
  60. t.routeCompressed,
  61. t.Data,
  62. len(t.Data),
  63. t.Error)
  64. }
  65. // Encode marshals message to binary format. Different message types is corresponding to
  66. // different message header, message types is identified by 2-4 bit of flag field. The
  67. // relationship between message types and message header is presented as follows:
  68. // ------------------------------------------
  69. // | type | flag | other |
  70. // |----------|--------|--------------------|
  71. // | request |----000-|<message id>|<route>|
  72. // | notify |----001-|<route> |
  73. // | response |----010-|<message id> |
  74. // | push |----011-|<route> |
  75. // ------------------------------------------
  76. // The figure above indicates that the bit does not affect the type of message.
  77. // See ref: https://github.com/lonnng/nano/blob/master/docs/communication_protocol.md
  78. // See ref: https://github.com/NetEase/pomelo/wiki/%E5%8D%8F%E8%AE%AE%E6%A0%BC%E5%BC%8F
  79. func Encode(m *Message) ([]byte, error) {
  80. if InvalidType(m.Type) {
  81. return nil, cerr.MessageWrongType
  82. }
  83. buf := make([]byte, 0)
  84. flag := byte(m.Type) << 1
  85. code, compressed := GetCode(m.Route)
  86. if compressed {
  87. flag |= RouteCompressMask
  88. }
  89. if m.Error {
  90. flag |= ErrorMask
  91. }
  92. buf = append(buf, flag)
  93. if m.Type == Request || m.Type == Response {
  94. n := m.ID
  95. // variant length encode
  96. for {
  97. b := byte(n % 128)
  98. n >>= 7
  99. if n != 0 {
  100. buf = append(buf, b+128)
  101. } else {
  102. buf = append(buf, b)
  103. break
  104. }
  105. }
  106. }
  107. if Routable(m.Type) {
  108. if compressed {
  109. buf = append(buf, byte((code>>8)&0xFF))
  110. buf = append(buf, byte(code&0xFF))
  111. } else {
  112. buf = append(buf, byte(len(m.Route)))
  113. buf = append(buf, []byte(m.Route)...)
  114. }
  115. }
  116. if IsDataCompression() {
  117. d, err := ccompress.DeflateData(m.Data)
  118. if err != nil {
  119. return nil, err
  120. }
  121. if len(d) < len(m.Data) {
  122. m.Data = d
  123. buf[0] |= GZIPMask
  124. }
  125. }
  126. buf = append(buf, m.Data...)
  127. return buf, nil
  128. }
  129. // Decode unmarshal the bytes slice to a message
  130. // See ref: https://github.com/lonnng/nano/blob/master/docs/communication_protocol.md
  131. func Decode(data []byte) (Message, error) {
  132. if len(data) < MsgHeadLength {
  133. return nilMessage, cerr.MessageInvalid
  134. }
  135. m := New()
  136. flag := data[0]
  137. offset := 1
  138. m.Type = Type((flag >> 1) & TypeMask)
  139. if InvalidType(m.Type) {
  140. return nilMessage, cerr.MessageWrongType
  141. }
  142. if m.Type == Request || m.Type == Response {
  143. id := uint(0)
  144. // little end byte order
  145. // WARNING: must can be stored in 64 bits integer
  146. // variant length encode
  147. for i := offset; i < len(data); i++ {
  148. b := data[i]
  149. id += uint(b&0x7F) << uint(7*(i-offset))
  150. if b < 128 {
  151. offset = i + 1
  152. break
  153. }
  154. }
  155. m.ID = id
  156. }
  157. if offset > len(data) {
  158. return nilMessage, cerr.MessageInvalid
  159. }
  160. m.Error = flag&ErrorMask == ErrorMask
  161. if Routable(m.Type) {
  162. if flag&RouteCompressMask == 1 {
  163. m.routeCompressed = true
  164. code := binary.BigEndian.Uint16(data[offset:(offset + 2)])
  165. route, found := GetRoute(code)
  166. if !found {
  167. return nilMessage, cerr.MessageRouteNotFound
  168. }
  169. m.Route = route
  170. offset += 2
  171. } else {
  172. m.routeCompressed = false
  173. rl := data[offset]
  174. offset++
  175. m.Route = string(data[offset:(offset + int(rl))])
  176. offset += int(rl)
  177. }
  178. }
  179. if offset > len(data) {
  180. return nilMessage, cerr.MessageInvalid
  181. }
  182. m.Data = data[offset:]
  183. var err error
  184. if flag&GZIPMask == GZIPMask {
  185. m.Data, err = ccompress.InflateData(m.Data)
  186. if err != nil {
  187. return nilMessage, err
  188. }
  189. }
  190. return m, nil
  191. }