package service import ( "context" "errors" "log" "math" "os" "sort" "strconv" "time" "github.com/mhaya/game/game_cluster/internal/constant" "github.com/mhaya/game/game_cluster/internal/mdb" "github.com/mhaya/game/game_cluster/internal/mdb/models" "github.com/mhaya/game/game_cluster/nodes/webadmin/entity" "github.com/spf13/cast" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type Synthesis struct { db *mongo.Database } func NewSynthesis() *Synthesis { return &Synthesis{ db: mdb.MDB, } } func (s *Synthesis) FindMDBUserLogDaily(req *entity.UserLogDailyReq) ([]entity.UserLogDailyResp, error) { // 定义上下文 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) defer cancel() // 指定集合 collection := mdb.MDB.Collection("playerDailyRecord") // 构建查询条件 - 如果查询值为空那就不添加查询条件 filter := bson.M{} if req.StartTime != 0 { filter["daily"] = bson.M{ "$gte": req.StartTime, "$lte": req.EndTime, } } if req.Platform != "" && req.Platform != "all" { filter["platform"] = req.Platform } if req.Channel != "" { filter["channel"] = req.Channel } // 分页参数 skip := (req.Page - 1) * req.Size req.Total = 0 // 计算总数 count, err := collection.CountDocuments(ctx, filter) if err != nil { return nil, err } req.Total = count // 执行查询 opts := options.Find() opts.SetSkip(int64(skip)) opts.SetLimit(int64(req.Size)) opts.SetSort(bson.D{{"daily", -1}}) cursor, err := collection.Find(ctx, filter, opts) if err != nil { return nil, err } defer cursor.Close(ctx) // 解析查询结果 var results []entity.UserLogDailyResp for cursor.Next(ctx) { var result entity.UserLogDailyResp err := cursor.Decode(&result) if err != nil { return nil, err } results = append(results, result) } // 同一天的数据全部放到platform= ALl 且数据累加 // 如果没有platform=all 的那就新增一个 同一个时间段只能有一个 platform=all // 将同一天的数据累加到platform=all的记录中 allPlatformRecordMap := make(map[int64]*entity.UserLogDailyResp) for _, result := range results { allPlatformRecordMap[result.Timestamp] = &entity.UserLogDailyResp{ Timestamp: result.Timestamp, Platform: "all", } } for _, v := range results { if v.Timestamp == allPlatformRecordMap[v.Timestamp].Timestamp { allPlatformRecordMap[v.Timestamp].Registered += v.Registered allPlatformRecordMap[v.Timestamp].LoggedIn += v.LoggedIn allPlatformRecordMap[v.Timestamp].NewActive += v.NewActive allPlatformRecordMap[v.Timestamp].OldActive += v.OldActive allPlatformRecordMap[v.Timestamp].TotalPoints += v.TotalPoints allPlatformRecordMap[v.Timestamp].UProduced += v.UProduced allPlatformRecordMap[v.Timestamp].UCashout += v.UCashout allPlatformRecordMap[v.Timestamp].NewLogin += v.NewLogin allPlatformRecordMap[v.Timestamp].OldLogin += v.OldLogin } } // 替换原有结果 for _, record := range allPlatformRecordMap { results = append(results, *record) } return results, nil } // FindWithdrawal 根据请求查询提现记录 func (s *Synthesis) FindWithdrawal(req *entity.UserWithdrawalReq) ([]*entity.UserWithdrawalResp, int64, error) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) defer cancel() collection := mdb.MDB.Collection(constant.CNameCashOutRecord) // 构建过滤器 filter := bson.M{} if req.UserName != "" { filter["userName"] = req.UserName } if req.NickName != "" { filter["nickName"] = req.NickName } if req.ID != "" { filter["_id"], _ = primitive.ObjectIDFromHex(req.ID) } if req.StartTime != 0 { filter["createAt"] = bson.M{ "$gte": req.StartTime, "$lte": req.EndTime, } } if req.Withdrawal != "" { withdrawal, err := strconv.Atoi(req.Withdrawal) if err != nil { return nil, 0, os.ErrInvalid } filter["withdrawal"] = withdrawal } if req.Address != "" { filter["address"] = req.Address } if req.Status != "" { filter["status"] = req.Status } // 设置分页选项 findOptions := options.Find() findOptions.SetSkip(int64((req.Page - 1) * req.Size)) findOptions.SetLimit(int64(req.Size)) findOptions.SetSort(bson.D{{"createAt", -1}}) // 获取总数total count, err := collection.CountDocuments(ctx, filter) if err != nil { return nil, 0, err } // 查询数据 var results []*entity.UserWithdrawalResp cursor, err := collection.Find(ctx, filter, findOptions) if err != nil { return nil, 0, err } defer cursor.Close(ctx) // 解析结果 for cursor.Next(ctx) { var result entity.UserWithdrawalResp if err := cursor.Decode(&result); err != nil { return nil, 0, err } results = append(results, &result) } if err := cursor.Err(); err != nil { return nil, 0, err } return results, count, nil } // WithdrawalStatus 更新提现状态 func (s *Synthesis) WithdrawalStatus(req *entity.UserWithdrawalStatus) error { // ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) // defer cancel() collection := mdb.MDB.Collection(constant.CNameCashOutRecord) // 更新条件 updateCondition := bson.M{"userName": req.UserName} // 更新内容 updateContent := bson.M{"$set": bson.M{"status": req.Status}} // 设置更新选项 updateOptions := options.Update() // 设置 upsert 选项 // 执行更新操作 _, err := collection.UpdateOne(context.TODO(), updateCondition, updateContent, updateOptions) if err != nil { return err } return nil } // WithdrawalStatusBatch 更新提现状态 func (s *Synthesis) WithdrawalStatusBatch(req *entity.UserWithdrawalStatusBatch) error { collection := mdb.MDB.Collection(constant.CNameCashOutRecord) if len(req.ID) == 0 { return errors.New("id 不能为空") } for _, id := range req.ID { objID := primitive.ObjectID{} objID, _ = primitive.ObjectIDFromHex(id) updateCondition := bson.M{"_id": objID} updateContent := bson.M{} withdrawal := models.CashOutRecord{} err := collection.FindOne(context.TODO(), updateCondition).Decode(&withdrawal) if err != nil { log.Printf("查询提现记录失败: %v", err) continue } if req.Withdrawal != 0 { if withdrawal.Status == 1 { updateContent = bson.M{"$set": bson.M{"withdrawal": req.Withdrawal}} } else { continue } } if req.Status > 0 { if withdrawal.Status != 0 { continue } updateContent = bson.M{"$set": bson.M{"status": req.Status}} } updateOptions := options.Update().SetUpsert(true) _, err = collection.UpdateOne(context.TODO(), updateCondition, updateContent, updateOptions) if err != nil { continue } } return nil } // FindUserCountryCount 查询用户国家分布 // // 返回值为 UserCountryResp 的切片和错误。 func (s *Synthesis) FindUserCountryCount() ([]*entity.UserCountryResp, error) { // 选择数据库和集合 collection := mdb.MDB.Collection("playerCountryStat") // 定义聚合管道 // 定义聚合管道 pipeline := []bson.D{ { {Key: "$project", Value: bson.D{ {Key: "playerRegisterCountry", Value: bson.D{{Key: "$objectToArray", Value: "$playerRegisterCountry"}}}, }}, }, { {Key: "$unwind", Value: "$playerRegisterCountry"}, }, { {Key: "$group", Value: bson.D{ {Key: "_id", Value: "$playerRegisterCountry.k"}, {Key: "totalValue", Value: bson.D{{Key: "$sum", Value: "$playerRegisterCountry.v"}}}, }}, }, { {Key: "$project", Value: bson.D{ {Key: "_id", Value: 0}, {Key: "countryKey", Value: "$_id"}, {Key: "totalValue", Value: 1}, }}, }, } // 执行聚合查询 cursor, err := collection.Aggregate(context.TODO(), pipeline) if err != nil { return nil, err } defer cursor.Close(context.TODO()) // 遍历查询结果 var results []bson.M if err := cursor.All(context.TODO(), &results); err != nil { return nil, err } var totalIPCount int64 var data []*entity.UserCountryResp // 将结果转换为 UserCountryResp for _, r := range results { var resp entity.UserCountryResp resp.Country = r["countryKey"].(string) resp.IPCount = int(r["totalValue"].(int32)) totalIPCount += int64(resp.IPCount) data = append(data, &resp) } for _, v := range data { // 保留小数点后两位 v.Percentage = math.Round(float64(v.IPCount)/float64(totalIPCount)*10000) / 100 } // 根据阈值过滤结果 otherCount := 0 otherPercentage := 0.00 filteredResults := make([]*entity.UserCountryResp, 0) threshold := 1.00 for _, r := range data { if r.Percentage >= threshold { filteredResults = append(filteredResults, r) // 保留小数点后两位 r.Percentage = math.Round(r.Percentage*100) / 100 otherPercentage += r.Percentage } else { otherCount += r.IPCount } } // 将其他国家添加到过滤后的结果中 if otherCount > 0 { p := 100.00 - math.Round(otherPercentage*100)/100 filteredResults = append(filteredResults, &entity.UserCountryResp{ Country: "other", IPCount: otherCount, Percentage: math.Round(p*100) / 100, }) } return filteredResults, nil } // FindUserRetention UserRetentionResp 用户留存率 // 1. 获取指定日期范围内的注册用户数量 // 2. 获取指定日期范围内的活跃用户数量 // 3. 计算留存率 // 4. 返回结果 func (s *Synthesis) FindUserRetention(req *entity.UserRetentionReq) ([]*entity.UserRetentionResp, error) { playerPreserve := models.GetPlayerPreserve(req.StartTime, req.EndTime) // 查询数据 var results []*entity.UserRetentionResp for key, v := range playerPreserve { var resp entity.UserRetentionResp var retention entity.Retention for _, vv := range v { if vv.ID == 1 { retention.Day1 = entity.DayRetention{ LoggedIn: vv.Ratio, LoginDate: key, } } if vv.ID == 3 { retention.Day3 = entity.DayRetention{ LoggedIn: vv.Ratio, LoginDate: key, } } if vv.ID == 7 { retention.Day7 = entity.DayRetention{ LoggedIn: vv.Ratio, LoginDate: key, } } if vv.ID == 14 { retention.Day14 = entity.DayRetention{ LoggedIn: vv.Ratio, LoginDate: key, } } if vv.ID == 30 { retention.Day30 = entity.DayRetention{ LoggedIn: vv.Ratio, LoginDate: key, } } } resp.RegistrationDate = key resp.RetentionData = retention results = append(results, &resp) } return results, nil } // FindUserLevel 用户等级统计 func (s *Synthesis) FindUserLevel() ([]*entity.UserLevelCountResp, error) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) defer cancel() collection := mdb.MDB.Collection("playerLevelStat") // 查询所有文档 cursor, err := collection.Find(ctx, bson.M{}) if err != nil { return nil, err } defer cursor.Close(ctx) var results []*entity.UserLevelCountResp var reData []*models.PlayerLevelStat for cursor.Next(ctx) { var result models.PlayerLevelStat if err := cursor.Decode(&result); err != nil { return nil, err } reData = append(reData, &result) } if err := cursor.Err(); err != nil { return nil, err } dataMap := make(map[string]int) for _, v := range reData { for key, v1 := range v.ServerLevel { dataMap[key] += v1 } } keys := make([]string, 0, len(dataMap)) for k := range dataMap { keys = append(keys, k) } sort.Strings(keys) for _, v := range keys { results = append(results, &entity.UserLevelCountResp{ Level: cast.ToInt(v), UserCount: dataMap[v], }) } return results, nil }