콘텐츠로 이동

2024 07 18

2024-07-18

Filebeat Config

filebeat.inputs:  # 파일비트가 읽을 로그 설정

  - type: log     # 로그 파일을 읽을거라고 알려줌
    paths:        # 로그 파일 경로

      - /opc/home/twm/log/application.log
    multiline.pattern: ^20[1-9][0-9]    # 2019로 시작하는 라인을 찾아서 멀티라인으로 처리
    multiline.negate: true              # 멀티라인 패턴이 한 줄의 시작에서 발견되면 신규 라인으로 처리
    multiline.match: after              # 파일비트가 뒤에 나오는 라인을 앞선 것과 패턴 맞으면 합침
    fields:
      tag: twm-application     # 이벤트의 커스텀 메타데이터
      type: application        # 이벤트의 커스텀 메타데이터

  - type: log
    paths:

      - /opc/home/twm/log/access.log
    fields:
      tag: twm-access
      type: access
output.logstash:
  hosts: twm-logstash:20500     # 어디로 파일비트 산출물을 보낼까요
  loadbalanace: true

Logstash Config

# 여러개의 로그 데이터 인풋 소스를 정의.
input {
  # beats가 데이터를 20500 포트로 전송
  beats {
    port => 20500
    client_inactivity_timeout => 0
  }
  # 얘는 헬스체크용
  heartbeat {
    type => "heartbeat"
  }
}

# 들어온 데이터를 가공하는 곳
filter {
  # 여러개의 태스크를 진행할 수 있음 - 여기선 logstashHost 필드 추가
  mutate {
    add_field => { "logstashHost" => "${HOSTNAME}" }
  }
  if [agent] {
    # rename도 가능하고
    mutate {
      rename => { "agent" => "beat" }
    }
  }
  if [log] {
    mutate {
      rename => { "log" => "beatRegistry" }
    }
  }

  # fields.env
  if [fields] and [fields][env] {
    # 필드 추가/제거
    mutate {
      add_field => {"[@metadata][index_prefix]" => "%{[fields][env]}-"}
      remove_field => ["[fields][env]"]
    }
  } else {
    mutate {
      add_field => {"[@metadata][index_prefix]" => "#{env}-"}
    }
  }

  # fields.type
  if [fields] and [fields][type] {
    mutate {
      add_field => { "type" => "%{[fields][type]}" }
      remove_field => [ "[fields][type]", "[input]", "[log]", "[prospector]", "[ecs]" ]
    }
  }

  if [host] and [host][name] {
    # 필드 대체도 가능
    mutate {
      replace => {
        "host" => "%{[host][name]}"
      }
    }
  }

  # 넘어온 타입에 따라서 어떻게 가공할것인가
  if [type] == "access" {
    grok {
      timeout_millis => 3000
      match => { message => [
        "^%{TIMESTAMP_ISO8601:timestamp}%{SPACE}\[%{DATA:trackingId}\]%{SPACE}%{IP:clientip}%{SPACE}%{URIPATHPARAM:url}%{SPACE}%{WORD:method}%{SPACE}took%{SPACE}%{INT:elapsedTime}%{SPACE}ms.$"
      ]}
    }

    mutate {
      convert => {
        "elapsedTime" => "integer"
      }
    }

    date {
      match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
      target => "@timestamp"
      timezone => "Asia/Seoul"
      add_field => {
        "debug" => "timestampMatched"
      }
    }
  }

  else if [type] == "application" {
    grok {
      timeout_millis => 1000
      match => { message => [
        "^%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:loglevel}%{SPACE}\[%{DATA:trackingId}\]%{SPACE}%{DATA:thread}%{SPACE}\[%{DATA:logger}\]%{SPACE}%{DATA:log}\n%{GREEDYDATA:stacktrace}$",
        "^%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:loglevel}%{SPACE}\[%{DATA:trackingId}\]%{SPACE}%{DATA:thread}%{SPACE}\[%{DATA:logger}\]%{SPACE}%{GREEDYDATA:log}$"
      ]}
    }

    if [timestamp] {
      mutate {
        remove_field => ["message"]
      }

      date {
        match => ["timestamp", "yyyy-MM-dd HH:mm:ss,SSS"]
        target => "@timestamp"
        timezone => "Asia/Seoul"
        add_field => {
          "debug" => "timestampMatched"
        }
      }
    }
  } # > application
}

# Output destination 설정 - 특정 ES 인덱스로 꽂아주세요.
output {
  if [type] == "access" {
    elasticsearch {
      #{hosts:bzm-moa-log:9200}
      index => "%{[@metadata][index_prefix]}bzm-moa-access-%{[@metadata][index_date]}"
    }
  }

  else if [type] == "application" {
    elasticsearch {
      #{hosts:bzm-moa-log:9200}
      index => "%{[@metadata][index_prefix]}bzm-moa-application-%{[@metadata][index_date]}"
    }
  }

  else if [type] == "heartbeat" {
    file {
      path => "/home/goldmine/logstash/logs/heartbeat.bzm-moa.%{dt}.log"
      codec => line { format => "%{ts} %{message}" }
    }
  }

  else {
    elasticsearch {
      #{hosts:bzm-moa-log:9200}
      index => "%{[@metadata][index_prefix]}bzm-moa-log-%{[@metadata][index_date]}"
    }
  }
}