Просмотр исходного кода

update 新增一个logstash节点

Alvin 8 месяцев назад
Родитель
Сommit
f528850703

+ 20 - 0
game/config/profile-gc.json

@@ -54,6 +54,19 @@
         "enable": true
       }
     ],
+    "logstash": [
+      {
+        "node_id": "m-logstash",
+        "address": "",
+        "__settings__": {
+          "db_id_list": {
+            "game_db_id": "game_db_1"
+          },
+          "ref_logger": "logstash_log"
+        },
+        "enable": true
+      }
+    ],
     "gate": [
       {
         "node_id": "m-gate-1",
@@ -164,6 +177,13 @@
       "file_link_path": "logs/center.log",
       "file_path_format": "logs/center_%Y%m%d%H%M.log"
     },
+    "logstash_log": {
+      "level": "debug",
+      "enable_console": true,
+      "enable_write_file": true,
+      "file_link_path": "logs/logstash.log",
+      "file_path_format": "logs/logstash_%Y%m%d%H%M.log"
+    },
     "gate_log": {
       "level": "debug",
       "enable_console": true,

+ 7 - 0
game/game_cluster/internal/param/logstashparam.go

@@ -0,0 +1,7 @@
+package param
+
+type HandleLogReq struct {
+	ServerId    string `json:"serverId"`    // 服务id或者nodeid
+	EventName   string `json:"eventName"`   // 事件名称
+	JsonContent string `json:"jsonContent"` // json格式的日志记录
+}

+ 33 - 0
game/game_cluster/nodes/logstash/logstash.go

@@ -0,0 +1,33 @@
+package main
+
+import (
+	"github.com/mhaya"
+	mhayaCron "github.com/mhaya/components/cron"
+	mhayaMongo "github.com/mhaya/components/mongo"
+	"github.com/mhaya/game/game_cluster/internal/data"
+	"github.com/mhaya/game/game_cluster/internal/mdb"
+	"github.com/mhaya/game/game_cluster/nodes/center/module/ops"
+	"github.com/mhaya/game/game_cluster/nodes/logstash/module/handlelog"
+)
+
+func main() {
+	app := mhaya.Configure(
+		"./game/config/profile-gc.json",
+		"m-logstash",
+		false,
+		mhaya.Cluster,
+	)
+
+	app.Register(mhayaCron.New())
+	app.Register(data.New())
+	// 注册db组件
+	app.Register(mhayaMongo.NewComponent())
+
+	app.AddActors(
+		&handlelog.HandleLogObject{},
+		&ops.ActorOps{},
+		&mdb.ActorDB{},
+	)
+
+	app.Startup()
+}

+ 33 - 0
game/game_cluster/nodes/logstash/logstash/logstash.go

@@ -0,0 +1,33 @@
+package logstash
+
+import (
+	"github.com/mhaya"
+	mhayaCron "github.com/mhaya/components/cron"
+	mhayaMongo "github.com/mhaya/components/mongo"
+	"github.com/mhaya/game/game_cluster/internal/data"
+	"github.com/mhaya/game/game_cluster/internal/mdb"
+	"github.com/mhaya/game/game_cluster/nodes/center/module/ops"
+	"github.com/mhaya/game/game_cluster/nodes/logstash/module/handlelog"
+)
+
+func Run(profileFilePath, nodeId string) {
+	app := mhaya.Configure(
+		profileFilePath,
+		nodeId,
+		false,
+		mhaya.Cluster,
+	)
+
+	app.Register(mhayaCron.New())
+	app.Register(data.New())
+	// 注册db组件
+	app.Register(mhayaMongo.NewComponent())
+
+	app.AddActors(
+		&handlelog.HandleLogObject{},
+		&ops.ActorOps{},
+		&mdb.ActorDB{},
+	)
+
+	app.Startup()
+}

+ 53 - 0
game/game_cluster/nodes/logstash/module/event/enent.go

@@ -0,0 +1,53 @@
+package event
+
+import (
+	"sync"
+
+	mhayaLogger "github.com/mhaya/logger"
+)
+
+type EventManage struct {
+	eventMap sync.Map
+}
+
+var (
+	instance *EventManage
+	once     sync.Once
+)
+
+func GetEventIdMap() *EventManage {
+	return instance
+}
+
+func init() {
+	once.Do(func() {
+		instance = &EventManage{
+			eventMap: sync.Map{},
+		}
+	})
+
+	instance.AddEvent(new(PlayerLoginEventContent))
+}
+
+func (em *EventManage) AddEvent(e Eventer) {
+	em.eventMap.Store(e.EventName(), e)
+}
+
+func (em *EventManage) GetEvent(eventName string) Eventer {
+	value, exist := em.eventMap.Load(eventName)
+	if !exist {
+		return nil
+	}
+
+	return value.(Eventer)
+}
+
+func (em *EventManage) PrintAllEvent() {
+	mhayaLogger.Info("-------------------------------------------------")
+	em.eventMap.Range(func(key, value any) bool {
+		mhayaLogger.Infof("[event name = %s]", key)
+		mhayaLogger.Infof("[event content = %v]", value)
+		return true
+	})
+	mhayaLogger.Info("-------------------------------------------------")
+}

+ 17 - 0
game/game_cluster/nodes/logstash/module/event/eventContent.go

@@ -0,0 +1,17 @@
+package event
+
+type Eventer interface {
+	EventName() string
+}
+
+var (
+	_ Eventer = &PlayerLoginEventContent{}
+)
+
+type PlayerLoginEventContent struct {
+	// TODO 具体埋点数据
+}
+
+func (e *PlayerLoginEventContent) EventName() string {
+	return "PlayerLogin"
+}

+ 60 - 0
game/game_cluster/nodes/logstash/module/handlelog/actor_handle_log.go

@@ -0,0 +1,60 @@
+package handlelog
+
+import (
+	"encoding/json"
+	"strings"
+
+	"github.com/mhaya/game/game_cluster/internal/code"
+	"github.com/mhaya/game/game_cluster/internal/param"
+	"github.com/mhaya/game/game_cluster/nodes/logstash/module/event"
+	mhayaLogger "github.com/mhaya/logger"
+	cactor "github.com/mhaya/net/actor"
+)
+
+type (
+	HandleLogObject struct {
+		cactor.Base
+	}
+)
+
+func (p *HandleLogObject) AliasID() string {
+	return "handlelog"
+}
+
+// OnInit logstash为后端节点,不直接与客户端通信,所以注册了一些remote函数,供RPC调用
+func (p *HandleLogObject) OnInit() {
+	p.Remote().Register("handlelog", p.handlelog)
+}
+
+// handlelog 处理各个节点发送的日志记录
+func (p *HandleLogObject) handlelog(req *param.HandleLogReq) int32 {
+	mhayaLogger.Warnf("handlelog req:%v", req)
+
+	if strings.TrimSpace(req.JsonContent) == "" || strings.TrimSpace(req.ServerId) == "" || strings.TrimSpace(req.EventName) == "" {
+		return code.Error
+	}
+
+	e := event.GetEventIdMap().GetEvent(req.EventName)
+	if e == nil {
+		mhayaLogger.Warnf("handlelog unknow event:%s", req.EventName)
+		return code.Error
+	}
+
+	return p.handlelogContent(req, e)
+}
+
+func (p *HandleLogObject) handlelogContent(req *param.HandleLogReq, e event.Eventer) int32 {
+	switch e.(type) {
+	case *event.PlayerLoginEventContent:
+		content := &event.PlayerLoginEventContent{}
+		err := json.Unmarshal([]byte(req.JsonContent), &content)
+		if err != nil {
+			mhayaLogger.Warnf("handlelog Unmarshal err:%v", err)
+			return code.Error
+		}
+
+		// TODO 将流水日志写入clickhouse等等
+	}
+
+	return code.OK
+}

+ 32 - 0
game/game_cluster/nodes/logstash/module/ops/actor_ops.go

@@ -0,0 +1,32 @@
+package ops
+
+import (
+	"github.com/mhaya/game/game_cluster/internal/code"
+	"github.com/mhaya/game/game_cluster/internal/pb"
+	cactor "github.com/mhaya/net/actor"
+)
+
+var (
+	pingReturn = &pb.Bool{Value: true}
+)
+
+type (
+	ActorOps struct {
+		cactor.Base
+	}
+)
+
+func (p *ActorOps) AliasID() string {
+	return "ops"
+}
+
+// OnInit 注册remote函数
+func (p *ActorOps) OnInit() {
+	p.Remote().Register("ping", p.ping)
+
+}
+
+// ping 请求logstash是否响应
+func (p *ActorOps) ping() (*pb.Bool, int32) {
+	return pingReturn, code.OK
+}