component.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package mhayaMongo
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "time"
  7. cfacade "github.com/mhaya/facade"
  8. clog "github.com/mhaya/logger"
  9. cprofile "github.com/mhaya/profile"
  10. "go.mongodb.org/mongo-driver/mongo"
  11. "go.mongodb.org/mongo-driver/mongo/options"
  12. "go.mongodb.org/mongo-driver/mongo/readpref"
  13. )
  14. const (
  15. Name = "mongo_component"
  16. )
  17. type (
  18. Component struct {
  19. cfacade.Component
  20. dbMap map[string]map[string]*mongo.Database
  21. }
  22. // HashDb hash by group id
  23. HashDb func(dbMaps map[string]*mongo.Database) string
  24. )
  25. func NewComponent() *Component {
  26. return &Component{
  27. dbMap: make(map[string]map[string]*mongo.Database),
  28. }
  29. }
  30. func (*Component) Name() string {
  31. return Name
  32. }
  33. func (s *Component) Init() {
  34. // load only the database contained in the `db_id_list`
  35. mongoIdList := s.App().Settings().Get("db_id_list")
  36. if mongoIdList.LastError() != nil || mongoIdList.Size() < 1 {
  37. clog.Warnf("[nodeId = %s] `mongo_id_list` property not exists.", s.App().NodeId())
  38. return
  39. }
  40. mongoConfig := cprofile.GetConfig("mongo")
  41. if mongoConfig.LastError() != nil {
  42. panic("`mongo` property not exists in profile file.")
  43. }
  44. for _, groupId := range mongoConfig.Keys() {
  45. s.dbMap[groupId] = make(map[string]*mongo.Database)
  46. dbGroup := mongoConfig.GetConfig(groupId)
  47. for i := 0; i < dbGroup.Size(); i++ {
  48. item := dbGroup.GetConfig(i)
  49. var (
  50. enable = item.GetBool("enable", true)
  51. id = item.GetString("db_id")
  52. dbName = item.GetString("db_name")
  53. uri = item.GetString("uri")
  54. timeout = time.Duration(item.GetInt64("timeout", 10)) * time.Second
  55. tlsEnable = item.GetInt("tls")
  56. maxPoolSize = item.GetInt("maxPoolSize")
  57. minPoolSize = item.GetInt("minPoolSize")
  58. maxConnIdleTime = item.GetInt("maxConnIdleTime")
  59. connectTimeout = item.GetInt("connectTimeout")
  60. socketTimeout = item.GetInt("socketTimeout")
  61. setReplicaSet = item.GetString("setReplicaSet")
  62. )
  63. for _, key := range mongoIdList.Keys() {
  64. dbId := mongoIdList.Get(key).ToString()
  65. if id != dbId {
  66. continue
  67. }
  68. if !enable {
  69. panic(fmt.Sprintf("[dbName = %s] is disabled!", dbName))
  70. }
  71. db, err := CreateDatabase(uri, setReplicaSet, dbName, tlsEnable, uint64(maxPoolSize), uint64(minPoolSize), maxConnIdleTime, connectTimeout, socketTimeout, timeout)
  72. if err != nil {
  73. panic(fmt.Sprintf("[dbName = %s] create mongodb fail. error = %s", dbName, err))
  74. }
  75. s.dbMap[groupId][id] = db
  76. clog.Infof("[dbGroup =%s, dbName = %s] is connected.", groupId, id)
  77. }
  78. }
  79. }
  80. }
  81. func CreateDatabase(uri, setReplicaSet, dbName string, tlsEnable int, maxPoolSize uint64, minPoolSize uint64, maxConnIdleTime int, connectTimeout, socketTimeout int, timeout ...time.Duration) (*mongo.Database, error) {
  82. tt := 5 * time.Second
  83. if len(timeout) > 0 && timeout[0].Seconds() > 3 {
  84. tt = timeout[0]
  85. }
  86. var o *options.ClientOptions
  87. if tlsEnable == 1 {
  88. tlsConfig := &tls.Config{
  89. //MinVersion: tls.VersionTLS12,
  90. //PreferServerCipherSuites: true,
  91. InsecureSkipVerify: true,
  92. }
  93. o = options.Client().ApplyURI(uri).SetReplicaSet(setReplicaSet).SetMaxPoolSize(maxPoolSize). //最大连接
  94. SetMinPoolSize(minPoolSize). //最小连接
  95. SetMaxConnIdleTime(time.Duration(maxConnIdleTime) * time.Second). //连接空闲时间
  96. SetConnectTimeout(time.Duration(connectTimeout) * time.Second). //连接超时时间
  97. SetSocketTimeout(time.Duration(socketTimeout) * time.Second).SetTLSConfig(tlsConfig) //套接字超时时间
  98. } else {
  99. o = options.Client().ApplyURI(uri).SetMaxPoolSize(maxPoolSize). //最大连接
  100. SetMinPoolSize(minPoolSize). //最小连接
  101. SetMaxConnIdleTime(time.Duration(maxConnIdleTime) * time.Second). //连接空闲时间
  102. SetConnectTimeout(time.Duration(connectTimeout) * time.Second). //连接超时时间
  103. SetSocketTimeout(time.Duration(socketTimeout) * time.Second) //套接字超时时间
  104. }
  105. if err := o.Validate(); err != nil {
  106. return nil, err
  107. }
  108. ctx, cancel := context.WithTimeout(context.Background(), tt)
  109. defer cancel()
  110. client, err := mongo.Connect(ctx, o)
  111. if err != nil {
  112. return nil, err
  113. }
  114. err = client.Ping(context.Background(), readpref.Primary())
  115. if err != nil {
  116. return nil, err
  117. }
  118. clog.Infof("ping database [uri = %s] is ok", uri)
  119. return client.Database(dbName), nil
  120. }
  121. func (s *Component) GetDb(id string) *mongo.Database {
  122. for _, group := range s.dbMap {
  123. for k, v := range group {
  124. if k == id {
  125. return v
  126. }
  127. }
  128. }
  129. return nil
  130. }
  131. func (s *Component) GetHashDb(groupId string, hashFn HashDb) (*mongo.Database, bool) {
  132. dbGroup, found := s.GetDbMap(groupId)
  133. if !found {
  134. clog.Warnf("groupId = %s not found.", groupId)
  135. return nil, false
  136. }
  137. dbId := hashFn(dbGroup)
  138. db, found := dbGroup[dbId]
  139. return db, found
  140. }
  141. func (s *Component) GetDbMap(groupId string) (map[string]*mongo.Database, bool) {
  142. dbGroup, found := s.dbMap[groupId]
  143. return dbGroup, found
  144. }