Logstash to ES

logstash 配置日志发送 ES

日志收集的架构如下所示:

┌────────────┐
│Java logback│\
└────────────┘ \
                  ┌─────┐      ┌────────┐      ┌──────┐      ┌────────┐
                  │kafka│ ───> │logstash│ ───> │  ES  │ ───> │ kibana │
                  └─────┘      └────────┘      └──────┘      └────────┘
┌────────────┐ /
│Java logback│/
└────────────┘

java 应用日志通过 logback 发送给 kafka,logstash 从 kafka 消费日志,并将日志转发给 ES。一开始一个应用一个 kafka topic,logstash 消费了之后根据 topic 来确定 ES 的索引。

logback 的配置:

logback.xml
<appender name="KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender">
  <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"  charset="UTF-8" >
      <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
  </encoder>
  <topic>spring-boot-demo</topic>
  <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
  <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
  <producerConfig>bootstrap.servers=192.168.0.107:9092</producerConfig>
    <producerConfig>retries=1</producerConfig>
    <producerConfig>batch-size=16384</producerConfig>
    <producerConfig>buffer-memory=33554432</producerConfig>
    <producerConfig>properties.max.request.size==2097152</producerConfig>
</appender>
<logger name="com.cheon.demo" level="INFO" additivity="false">
    <appender-ref ref="KAFKA" />
</logger>

pom 文件依赖:

<dependency>
  <groupId>com.github.danielwegener</groupId>
  <artifactId>logback-kafka-appender</artifactId>
  <version>0.2.0-RC2</version>
</dependency>

logstash 配置:

logstash.conf
input {
    kafka {
        id => "spring-boot-demo"
        bootstrap_servers => "192.168.0.107:9092"
        group_id => "spring-boot-demo"
        topics_pattern => "spring-boot-demo"
        consumer_threads => 3
        decorate_events => true
        auto_offset_reset => "earliest"
    }
}

filter {
    ruby {
        code => "event.set('timestamp', event.get('@timestamp').time.localtime)"
    }
    ruby {
        code => "event.set('@timestamp', event.get('timestamp'))"
    }

    mutate {
        remove_field => ["timestamp"]
    }
}

output {
    stdout{
    }
    elasticsearch {
       hosts => "http://192.168.0.112:9200"
       index => "%{[@metadata][kafka][topic]}-%{+YYYY-MM-dd}"
    }
}

正常运行了一段时间之后,日志发送 kafka 报错了。查看了才发现是 kafka topic 数量达到限制了。改变方案,将同一项目下应用的日志发送给一个 topic,在日志开头添加索引字段用于区分 ES 索引。

修改后的 logback 的配置:

logback.xml
<appender name="KAFKA" class="com.github.danielwegener.logback.kafka.KafkaAppender">
  <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"  charset="UTF-8" >
      <pattern>[spring-app1-demo] %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
  </encoder>
  <topic>spring-boot-demo</topic>
  <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
  <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
  <producerConfig>bootstrap.servers=192.168.0.107:9092</producerConfig>
    <producerConfig>retries=1</producerConfig>
    <producerConfig>batch-size=16384</producerConfig>
    <producerConfig>buffer-memory=33554432</producerConfig>
    <producerConfig>properties.max.request.size==2097152</producerConfig>
</appender>
<logger name="com.cheon.demo" level="INFO" additivity="false">
    <appender-ref ref="KAFKA" />
</logger>

配置中 pattern 最开始 [spring-app1-demo] 的字段即为用于区分 ES 索引的字段。这一部分内容由 logstash 的 grok 模块正则匹配出来。

修改后的 logstash 配置:

logstash.conf
input {
    kafka {
        id => "spring-boot-demo"
        bootstrap_servers => "192.168.0.107:9092"
        group_id => "spring-boot-demo"
        topics_pattern => "spring-boot-demo"
        consumer_threads => 3
        decorate_events => true
        auto_offset_reset => "earliest"
    }
}

filter {
    ruby {
        code => "event.set('timestamp', event.get('@timestamp').time.localtime)"
    }
    ruby {
        code => "event.set('@timestamp', event.get('timestamp'))"
    }

    mutate {
        remove_field => ["timestamp"]
    }

    grok {
        match => {
            "message" => "\[(?<index_name>[^ ]*)\]"
        }
    }

}

output {
    stdout{
    }
    elasticsearch {
       hosts => "http://192.168.0.112:9200"
       index => "%{[@metadata][kafka][topic]}-%{index_name}-%{+YYYY-MM-dd}"
    }
}

根据配置可以预测,ES 的索引应为 spring-boot-demo-spring-app1-demo-2019-07-04

运行一段时间后,项目组有新的需求,需要将日志各个字段解析以便于做统计分析。这里就需要修改grok的正则。并且由于一个topic中有多个应用的日志,每个应用的日志格式可能不一样,所以可以写多个正则表达式,匹配不到就用下一个正则。测试 grok 正则可以用 grok debugger 网站。

最终的配置:

input {
    kafka {
        id => "spring-boot-demo"
        bootstrap_servers => "192.168.0.107:9092"
        group_id => "spring-boot-demo"
        topics_pattern => "spring-boot-demo"
        consumer_threads => 3
        decorate_events => true
        auto_offset_reset => "earliest"
    }
}

filter {
    ruby {
        code => "event.set('timestamp', event.get('@timestamp').time.localtime)"
    }
    ruby {
        code => "event.set('@timestamp', event.get('timestamp'))"
    }

    mutate {
        remove_field => ["timestamp"]
    }

    grok {
        match => {
            "message" => ["\[(?<index_name>[A-Za-z\-_0-9]*)\] (?<date>[0-9]{4}-[0-9]{1,2}-[0-9]{1,2} [0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2}.[0-9]{3}) (?<level>[A-Z]*) \[[^ ]*\] \- \[[^ ]*\] \[(?<api_id>[^ ]*)\] \[(?<request_addr>[^,]*),(?<request_start>[0-9]*),(?<request_end>[0-9]*),(?<response_time>[0-9]*)\] (?<status>[0-9]*) : (?<request_and_response>.*)","\[(?<index_name>[a-zA-Z\-1-9]*)\]"]
        }
    }

    mutate {
        convert => ["response_time", "integer"]
    }
}

output {
    stdout{
    }
    elasticsearch {
       hosts => "http://192.168.0.112:9200"
       index => "%{[@metadata][kafka][topic]}-%{index_name}-%{+YYYY-MM-dd}"
    }
}

注意配置中正则解析结束后还将response_time做了数据类型转化,转成了整数类型,便于分析。

Thu Jul 4, 2019 cheon