ws_connector.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package mhayaConnector
  2. import (
  3. "io"
  4. "net/http"
  5. "time"
  6. "github.com/gorilla/websocket"
  7. cfacade "github.com/mhaya/facade"
  8. clog "github.com/mhaya/logger"
  9. )
  10. type (
  11. WSConnector struct {
  12. cfacade.Component
  13. Connector
  14. Options
  15. upgrade *websocket.Upgrader
  16. }
  17. // WSConn is an adapter to t.INetConn, which implements all INetConn
  18. // interface base on *websocket.INetConn
  19. WSConn struct {
  20. *websocket.Conn
  21. typ int // message type
  22. reader io.Reader
  23. }
  24. )
  25. func (*WSConnector) Name() string {
  26. return "websocket_connector"
  27. }
  28. func (w *WSConnector) OnAfterInit() {
  29. }
  30. func (w *WSConnector) OnStop() {
  31. w.Stop()
  32. }
  33. func NewWS(address string, opts ...Option) *WSConnector {
  34. if address == "" {
  35. clog.Warn("create websocket fail. address is null.")
  36. return nil
  37. }
  38. ws := &WSConnector{
  39. Options: Options{
  40. address: address,
  41. certFile: "",
  42. keyFile: "",
  43. chanSize: 256,
  44. },
  45. upgrade: &websocket.Upgrader{
  46. ReadBufferSize: 1024,
  47. WriteBufferSize: 1024,
  48. CheckOrigin: func(_ *http.Request) bool {
  49. return true
  50. },
  51. },
  52. }
  53. for _, opt := range opts {
  54. opt(&ws.Options)
  55. }
  56. ws.Connector = NewConnector(ws.chanSize)
  57. return ws
  58. }
  59. func (w *WSConnector) Start() {
  60. listener, err := w.GetListener(w.certFile, w.keyFile, w.address)
  61. if err != nil {
  62. clog.Fatalf("failed to listen: %s", err)
  63. }
  64. clog.Infof("Websocket connector listening at Address %s", w.address)
  65. if w.certFile != "" || w.keyFile != "" {
  66. clog.Infof("certFile = %s, keyFile = %s", w.certFile, w.keyFile)
  67. }
  68. w.Connector.Start()
  69. http.Serve(listener, w)
  70. }
  71. func (w *WSConnector) Stop() {
  72. w.Connector.Stop()
  73. }
  74. func (w *WSConnector) SetUpgrade(upgrade *websocket.Upgrader) {
  75. if upgrade != nil {
  76. w.upgrade = upgrade
  77. }
  78. }
  79. func (w *WSConnector) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
  80. wsConn, err := w.upgrade.Upgrade(rw, r, nil)
  81. if err != nil {
  82. clog.Infof("Upgrade failure, URI=%s, Error=%s", r.RequestURI, err.Error())
  83. return
  84. }
  85. conn := NewWSConn(wsConn)
  86. w.InChan(&conn)
  87. }
  88. // NewWSConn return an initialized *WSConn
  89. func NewWSConn(conn *websocket.Conn) WSConn {
  90. c := WSConn{
  91. Conn: conn,
  92. }
  93. return c
  94. }
  95. func (c *WSConn) Read(b []byte) (int, error) {
  96. if c.reader == nil {
  97. t, r, err := c.NextReader()
  98. if err != nil {
  99. return 0, err
  100. }
  101. c.typ = t
  102. c.reader = r
  103. }
  104. n, err := c.reader.Read(b)
  105. if err != nil && err != io.EOF {
  106. return n, err
  107. } else if err == io.EOF {
  108. _, r, err := c.NextReader()
  109. if err != nil {
  110. return 0, err
  111. }
  112. c.reader = r
  113. }
  114. return n, nil
  115. }
  116. func (c *WSConn) Write(b []byte) (int, error) {
  117. err := c.WriteMessage(websocket.BinaryMessage, b)
  118. if err != nil {
  119. return 0, err
  120. }
  121. return len(b), nil
  122. }
  123. func (c *WSConn) SetDeadline(t time.Time) error {
  124. if err := c.SetReadDeadline(t); err != nil {
  125. return err
  126. }
  127. return c.SetWriteDeadline(t)
  128. }