logstash.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package logstash
  2. import (
  3. "errors"
  4. "strconv"
  5. cfacade "github.com/mhaya/facade"
  6. "github.com/mhaya/game/game_cluster/internal/code"
  7. "github.com/mhaya/game/game_cluster/internal/param"
  8. "github.com/mhaya/game/game_cluster/internal/pb"
  9. clog "github.com/mhaya/logger"
  10. )
  11. // node type
  12. const (
  13. logstashType = "logstash"
  14. )
  15. // actor id
  16. const (
  17. opsActor = ".ops"
  18. logrecordActor = ".logrecord"
  19. )
  20. // funcName
  21. const (
  22. ping = "ping"
  23. handleLogRecore = "handlelog"
  24. )
  25. const (
  26. sourcePath = ".system"
  27. )
  28. // Ping 访问logstash节点,确认logstash已启动
  29. func Ping(app cfacade.IApplication) bool {
  30. nodeId := getLogstashNodeID(app)
  31. if nodeId == "" {
  32. return false
  33. }
  34. rsp := &pb.Bool{}
  35. targetPath := nodeId + opsActor
  36. errCode := app.ActorSystem().CallWait(sourcePath, targetPath, ping, nil, rsp)
  37. if code.IsFail(errCode) {
  38. return false
  39. }
  40. return rsp.Value
  41. }
  42. func HandleLogRecord(app cfacade.IApplication, req *param.HandleLogReq) error {
  43. targetPath := getTargetPath(app, logrecordActor)
  44. errCode := app.ActorSystem().CallWait(sourcePath, targetPath, handleLogRecore, req, nil)
  45. if code.IsFail(errCode) {
  46. clog.Warnf("[HandleLogRecord] errCode = %v", errCode)
  47. return errors.New(strconv.FormatInt(int64(errCode), 10))
  48. }
  49. return nil
  50. }
  51. func getLogstashNodeID(app cfacade.IApplication) string {
  52. list := app.Discovery().ListByType(logstashType)
  53. if len(list) > 0 {
  54. return list[0].GetNodeId()
  55. }
  56. return ""
  57. }
  58. func getTargetPath(app cfacade.IApplication, actorID string) string {
  59. nodeId := getLogstashNodeID(app)
  60. return nodeId + actorID
  61. }