Quellcode durchsuchen

update 新增logstash RPC接口

Alvin vor 10 Monaten
Ursprung
Commit
c7aa448a7b
1 geänderte Dateien mit 74 neuen und 0 gelöschten Zeilen
  1. 74 0
      game/game_cluster/internal/rpc/logstash/logstash.go

+ 74 - 0
game/game_cluster/internal/rpc/logstash/logstash.go

@@ -0,0 +1,74 @@
+package logstash
+
+import (
+	"errors"
+	"strconv"
+
+	cfacade "github.com/mhaya/facade"
+	"github.com/mhaya/game/game_cluster/internal/code"
+	"github.com/mhaya/game/game_cluster/internal/param"
+	"github.com/mhaya/game/game_cluster/internal/pb"
+	clog "github.com/mhaya/logger"
+)
+
+// node type
+const (
+	logstashType = "logstash"
+)
+
+// actor id
+const (
+	opsActor       = ".ops"
+	logrecordActor = ".logrecord"
+)
+
+// funcName
+const (
+	ping            = "ping"
+	handleLogRecore = "handlelog"
+)
+
+const (
+	sourcePath = ".system"
+)
+
+// Ping 访问logstash节点,确认logstash已启动
+func Ping(app cfacade.IApplication) bool {
+	nodeId := getLogstashNodeID(app)
+	if nodeId == "" {
+		return false
+	}
+
+	rsp := &pb.Bool{}
+	targetPath := nodeId + opsActor
+	errCode := app.ActorSystem().CallWait(sourcePath, targetPath, ping, nil, rsp)
+	if code.IsFail(errCode) {
+		return false
+	}
+
+	return rsp.Value
+}
+
+func HandleLogRecord(app cfacade.IApplication, req *param.HandleLogReq) error {
+	targetPath := getTargetPath(app, logrecordActor)
+	errCode := app.ActorSystem().CallWait(sourcePath, targetPath, handleLogRecore, req, nil)
+	if code.IsFail(errCode) {
+		clog.Warnf("[HandleLogRecord] errCode = %v", errCode)
+		return errors.New(strconv.FormatInt(int64(errCode), 10))
+	}
+
+	return nil
+}
+
+func getLogstashNodeID(app cfacade.IApplication) string {
+	list := app.Discovery().ListByType(logstashType)
+	if len(list) > 0 {
+		return list[0].GetNodeId()
+	}
+	return ""
+}
+
+func getTargetPath(app cfacade.IApplication, actorID string) string {
+	nodeId := getLogstashNodeID(app)
+	return nodeId + actorID
+}