一、案例分享

1.1 场景一

  • 将 /root/my.txt文件中的日志数据按照指定的格式解析后,通过HTTP POST方法 发送到http://localhost:9090/地址。
  • 发送的数据格式为JSON,每3秒发送一次。

fluentd的配置文件如下:

<source>
  @type tail
  path /root/my.txt
  pos_file /root/my.txt.pos
  tag mylog
  <parse>
    @type regexp
    expression /(?<key>\w+)\s(?<value>\w+)/
  </parse>
</source>

<match mylog>
  @type http
  endpoint http://localhost:9090/
  open_timeout 2
  http_method post

  <format>
    @type json
  </format>
  <buffer>
    flush_interval 3s
  </buffer>
</match>

1.2 场景二

  • 读取指定路径(可以是环境变量或默认路径)的日志文件,不对源数据进行解析, 然后向每条记录中添加主机名信息,并将处理后的数据输出到标准输出。
<source>
  @type tail
  # 这里使用HISTFILE环境变量,如果没有设置,使用默认值/root/.bash_history
  path "#{ENV["HISTFILE"] || /root/.bash_history}"
  pos_file /root/.bash_history.pos
  tag history
  <parse>
    @type none
  </parse>
</source>

<filter history>
  @type record_transformer
  <record>
    hostname ${hostname}
  </record>
</filter>

<match history>
  @type stdout
</match>

1.3 场景三

  • 收集用户操作记录转发到另一个fluentd节点,同时将数据发送到Kafka和存入HDFS。

  • 数据流为:fluentd采集端 -> fluentd收集端 -> kafka和HDFS

  • 示例用户操作记录数据为:

root pts/1 2023-06-26 10:59 (10.180.206.1):root 2023-06-26 11:00:09 130  tail -f /var/log/command.his.log

采集节点的配置:

<source>
  @type tail
  path /var/log/command.his.log
  pos_file /var/log/command.his.log.pos
  tag history
  <parse>
    @type regexp
    # 使用正则解析日志文件
    expression /^(?<who_user>\w+)\s(?<pts>\S+)\s(?<who_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2})\s\((?<remote_ip>\d+\.\d+\.\d+\.\d+)\):(?<user>\w+)\s(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\s(?<res>\d+)\s(?<command>.+)$/
    time_key time
  </parse>
</source>
<filter history>
  @type record_transformer
  <record>
    # event内容增加hostname这一行
    hostname ${hostname}
  </record>
</filter>

<match history>
  @type forward
  send_timeout 60s
  recover_wait 10s
  hard_timeout 60s
  <buffer>
    # 1秒钟向另一个fluentd节点转发一次
    flush_interval 1s
  </buffer>
  <server>
    name myserver1
    host 10.180.xxx.xxx
    port 24225
    weight 60
  </server>
</match>

fluentd收集节点的配置:

<source>
  @type forward
  port 24225
  bind 0.0.0.0
  tag remote
</source>

<match remote>
  # 使用copy方式,分两路输出
  @type copy
  <store>
    @type kafka2
    brokers 10.180.xxx.xxx:9092
    use_event_time true

    <buffer topic>
        @type file
        path /var/log/td-agent/buffer/td
        flush_interval 3s
    </buffer>

    <format>
        @type json
    </format>

    default_topic history
    required_acks -1
  </store>
  <store>
    @type webhdfs
    host 10.180.xxx.xxx
    port 50070
    path "/history/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"
    <buffer>
        flush_interval 60s
    </buffer>
  </store>
</match>