一、Fluentd采集组件

1.1 前言

Fluentd 是一个高效的日志聚合器,是用 Ruby 编写的,并且可以很好地扩展。对于大 部分企业来说,Fluentd 足够高效并且消耗的资源相对较少。

另外一个工具 Fluent-bit 更轻量级,占用资源更少,但是插件相对 Fluentd 来说不够丰富。

Fluentd 更加成熟,使用更加广泛,所以这里我们使用 Fluentd 来作为日志收集工具。

1.2 工作原理

Fluentd 通过一组给定的数据源抓取日志数据,处理后(转换成结构化的数据格式)将 它们转发给其他服务,比如 Elasticsearch、对象存储等等。

Fluentd 支持超过 300 个日志存储和分析服务,在这方面是非常灵活的。

主要运行步骤如下:

  • 首先 Fluentd 从多个日志源获取数据;
  • 结构化并且标记这些数据;
  • 然后根据匹配的标签将数据发送到多个目标服务去;

Day09-可观察性-ELK&Loki-图5

1.3 日志源配置

收集 Kubernetes 节点上的所有容器日志,就需要做如下的日志源配置:

<source>
  @id fluentd-containers.log
  @type tail                             # Fluentd 内置的输入方式,其原理是不停地从源文件中获取新的日志。
  path /var/log/containers/*.log         # 挂载的宿主机容器日志地址
  pos_file /var/log/es-containers.log.pos
  tag raw.kubernetes.*                   # 设置日志标签
  read_from_head true
  <parse>                                # 多行格式化成JSON
    @type multi_format                   # 使用 multi-format-parser 解析器插件
    <pattern>
      format json                        # JSON 解析器
      time_key time                      # 指定事件时间的时间字段
      time_format %Y-%m-%dT%H:%M:%S.%NZ  # 时间格式
    </pattern>
    <pattern>
      format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
      time_format %Y-%m-%dT%H:%M:%S.%N%:z
    </pattern>
  </parse>
</source>

上面配置部分参数说明如下:

  • id:表示引用该日志源的唯一标识符,该标识可用于进一步过滤和路由结构化日志数据
  • type:Fluentd 内置的指令, tail 表示 Fluentd 从上次读取的位置通过 tail 不断获 取数据,另外一个是http 表示通过一个 GET 请求来收集数据。
  • path: tail 类型下的特定参数,告诉 Fluentd 采集 /var/log/containers 目录 下的所有日志,这是docker 在 Kubernetes 节点上用来存储运行容器 stdout 输出 日志数据的目录。
  • pos_file:检查点,如果 Fluentd 程序重新启动了,它将使用此文件中的位置来恢复日志数据收集。
  • tag:用来将日志源与目标或者过滤器匹配的自定义字符串,Fluentd 匹配源/目标标签来路由日志数据。

1.4 路由配置

配置将日志数据发送到 Elasticsearch:

<match **>
  @id elasticsearch
  @type elasticsearch
  @log_level info
  include_tag_key true
  type_name fluentd
  host "#{ENV['OUTPUT_HOST']}"
  port "#{ENV['OUTPUT_PORT']}"
  logstash_format true
  <buffer>
    @type file
    path /var/log/fluentd-buffers/kubernetes.system.buffer
    flush_mode interval
    retry_type exponential_backoff
    flush_thread_count 2
    flush_interval 5s
    retry_forever
    retry_max_interval 30
    chunk_limit_size "#{ENV['OUTPUT_BUFFER_CHUNK_LIMIT']}"
    queue_limit_length "#{ENV['OUTPUT_BUFFER_QUEUE_LIMIT']}"
    overflow_action block
  </buffer>
</match>
  • match:标识一个目标标签,后面是一个匹配日志源的正则表达式,我们这里想要 捕获所有的日志并将它们发送给 Elasticsearch,所以需要配置成 **。
  • id:目标的一个唯一标识符。
  • type:支持的输出插件标识符,我们这里要输出到 Elasticsearch,所以配置成 elasticsearch,这是 Fluentd 的一个内置插件。
  • log_level:指定要捕获的日志级别,我们这里配置成 info,表示任何该级别或者该级别以上(INFO、WARNING、ERROR)的日志都将被路由到 Elsasticsearch。
  • host/port:定义 Elasticsearch 的地址,也可以配置认证信息,我们的 Elasticsearch 不需要认证,所以这里直接指定 host 和 port 即可。
  • logstash_format:Elasticsearch 服务对日志数据构建反向索引进行搜索,将 logstash_format 设置为 true,Fluentd 将会以 logstash 格式来转发结构化的日志数据。
  • Buffer: Fluentd 允许在目标不可用时进行缓存,比如,如果网络出现故障或者 Elasticsearch 不可用的时候。缓冲区配置也有助于降低磁盘的 IO。

1.5 过滤

由于 Kubernetes 集群中应用太多,也有很多历史数据,所以我们可以只将某些应用的 日志进行收集,比如我们只采集具有 logging=true 这个 Label 标签的 Pod 日志,这 个时候就需要使用filter,如下所示:

# 删除无用的属性
<filter kubernetes.**>
  @type record_transformer
  remove_keys $.docker.container_id,$.kubernetes.container_image_id,$.kubernetes.pod_id,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels.pod-template-hash
</filter>
# 只保留具有logging=true标签的Pod日志
<filter kubernetes.**>
  @id filter_log
  @type grep
  <regexp>
    key $.kubernetes.labels.logging
    pattern ^true$
  </regexp>
</filter>

二、Fluentd部署配置

要收集 Kubernetes 集群的日志,直接用 DasemonSet 控制器来部署 Fluentd 应用,它 就可以从 Kubernetes 节点上采集日志,确保在集群中的每个节点上始终运行一个 Fluentd 容器。

当然,也可以直接使用 Helm 来进行一键安装。不过为了能够了解更多实现细节,我们 这里还是采用手动方法来进行安装。

可以直接使用官方的对于 Kubernetes 集群的安装文档:https://docs.fluentd.org/container-deployment/kubernetes

首先,我们通过 ConfigMap 对象来指定 Fluentd 配置文件,新建 fluentd-configmap.yaml文件,文件内容如下:

[root@master01 9]# vim fluentd-configmap.yaml
kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-conf
  namespace: logging
data:
  system.conf: |-
    <system>
      root_dir /tmp/fluentd-buffers/
    </system>
  fluent.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail                              # Fluentd 内置的输入方式,其原理是不停地从源文件中获取新的日志。
      path /var/log/containers/*.log          # 挂载的服务器Docker容器日志地址
      pos_file /var/log/es-containers.log.pos
      tag raw.kubernetes.*                    # 设置日志标签
      read_from_head true
      <parse>                                 # 多行格式化成JSON
        @type multi_format                    # 使用 multi-format-parser 解析器插件
        <pattern>
          format json                         # JSON解析器
          time_key time                       # 指定事件时间的时间字段
          time_format %Y-%m-%dT%H:%M:%S.%NZ   # 时间格式
        </pattern>
        <pattern>
          format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
          time_format %Y-%m-%dT%H:%M:%S.%N%:z
        </pattern>
      </parse>
    </source>
    # 在日志输出中检测异常,并将其作为一条日志转发
    # https://github.com/GoogleCloudPlatform/fluent-plugin-detect-exceptions
    <match raw.kubernetes.**>           # 匹配tag为raw.kubernetes.**日志信息
      @id kubernetes
      @type detect_exceptions           # 使用detect-exceptions插件处理异常栈信息
      remove_tag_prefix raw             # 移除 raw 前缀
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
    <filter **>  # 拼接日志
      @id filter_concat
      @type concat                # Fluentd Filter 插件,用于连接多个事件中分隔的多行日志。
      key message
      multiline_end_regexp /\n$/  # 以换行符“\n”拼接
      separator ""
    </filter>
    # 添加 Kubernetes metadata 数据
    <filter kubernetes.**>
      @id filter_kubernetes_metadata
      @type kubernetes_metadata
    </filter>
    # 修复 ES 中的 JSON 字段
    # 插件地址:https://github.com/repeatedly/fluent-plugin-multi-format-parser
    <filter kubernetes.**>
      @id filter_parser
      @type parser                # multi-format-parser多格式解析器插件
      key_name log                # 在要解析的记录中指定字段名称。
      reserve_data true           # 在解析结果中保留原始键值对。
      remove_key_name_field true  # key_name 解析成功后删除字段。
      <parse>
        @type multi_format
        <pattern>
          format json
        </pattern>
        <pattern>
          format none
        </pattern>
      </parse>
    </filter>
    # 删除一些多余的属性
    <filter kubernetes.**>
      @type record_transformer
      remove_keys $.kubernetes.namespace_labels.project,$.kubernetes.pod_ip,$.kubernetes.labels.app,$.docker.container_id,$.kubernetes.container_image_id,$.kubernetes.pod_id,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels.pod-template-hash
    </filter>
    # 只保留具有logging=true标签的Pod日志
    <filter kubernetes.**>
      @id filter_log
      @type grep
      <regexp>
        key $.kubernetes.labels.logging
        pattern ^true$
      </regexp>
    </filter>

    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level info
      include_tag_key true
      host elasticsearch-master.logging.svc
      port 9200
      logstash_format true
      logstash_prefix k8slog  # 设置 index 前缀为 k8slog
      request_timeout    30s
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>

上面配置文件中我们只配置了 docker 容器日志目录,收集到数据经过处理后发送到elasticsearch-client:9200 服务。

注意:挂载的日志目录出现 unreadable 说明日志软连接有问题,无法读取日志。 如果有小伙伴,更改了docker的数据目录,这个时候需要更改为对应的数据目录,尤其是/var/log/pods/:

在我的 ds.yaml 中,必须挂载/data/docker/containers 而不是/var/lib/containers/

        volumeMounts:
        - name: fluentconfig
          mountPath: /etc/fluent/config.d
        - name: varlog
          mountPath: /var/log
        - name: varlogpods
          mountPath: /var/log/pods
        - name: datadockercontainers
          mountPath: /data/docker/containers
      volumes:
      - name: fluentconfig
        configMap:
          name: fluentd-conf
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlogpods
        hostPath:
          path: /var/log/pods
      - name: datadockercontainers
        hostPath:
          path: /data/docker/containers

然后新建一个 fluentd-daemonset.yaml 的文件,文件内容如下:

[root@master01 9]# vim fluentd-daemonset.yaml 
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluentd-es
  namespace: logging
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
rules:
  - apiGroups:
      - ""
    resources:
      - "namespaces"
      - "pods"
    verbs:
      - "get"
      - "watch"
      - "list"
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  name: fluentd-es
  labels:
    k8s-app: fluentd-es
    kubernetes.io/cluster-service: "true"
    addonmanager.kubernetes.io/mode: Reconcile
subjects:
  - kind: ServiceAccount
    name: fluentd-es
    namespace: logging
    apiGroup: ""
roleRef:
  kind: ClusterRole
  name: fluentd-es
  apiGroup: ""
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd
  namespace: logging
  labels:
    app: fluentd
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      app: fluentd
  template:
    metadata:
      labels:
        app: fluentd
        kubernetes.io/cluster-service: "true"
    spec:
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule
      serviceAccountName: fluentd-es
      containers:
      - name: fluentd
        image: registry.cn-hangzhou.aliyuncs.com/abroad_images/fluentd:v3.4.0
        volumeMounts:
        - name: fluentconfig
          mountPath: /etc/fluent/config.d
        - name: varlog
          mountPath: /var/log
        - name: varlogpods
          mountPath: /var/log/pods
      volumes:
      - name: fluentconfig
        configMap:
          name: fluentd-conf
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlogpods
        hostPath:
          path: /var/log/pods

我们将上面创建的 fluentd-config 这个 ConfigMap 对象通过 volumes 挂载到了 Fluentd 容器中。

场景:为了能够灵活控制哪些节点的日志可以被收集,还可以添加了一个 nodSelector 属性:

哪台节点上的日志需要采集,那么我们就需要给节点打上上面的标签。

说明:下面实验没有使用nodeSelector

nodeSelector:
  beta.kubernetes.io/fluentd-ds-ready: "true"

如果你需要在其他节点上采集日志,则需要给对应节点打上标签,使用如下命令:

# 命令格式
$ kubectl label nodes <node_name> beta.kubernetes.io/fluentd-ds-ready=true

# 给node02节点添加标签
kubectl label nodes node02 beta.kubernetes.io/fluentd-ds-ready=true

另外集群使用的是 kubeadm 搭建的,默认情况下 master 节点有污点,所以如果想要也收集 master 节点的日志,则需要添加上容忍:

# 方式一:添加如下配置
tolerations:
  - operator: Exists

# 方式二:删除污点
kubectl taint node <node_name> <污点的key>-

分别创建上面的 ConfigMap 对象和 DaemonSet:

$ kubectl create -f fluentd-configmap.yaml

$ kubectl create -f fluentd-daemonset.yaml

创建完成后,查看对应的 Pods 列表,检查是否部署成功:

[root@master01 9]# kubectl get pods -n logging | grep fluentd
fluentd-4pkms                   1/1     Running   0          101s
fluentd-ctpxm                   1/1     Running   0          101s
fluentd-hpmz2                   1/1     Running   0          101s
fluentd-jwc9w                   1/1     Running   0          101s
fluentd-z2sl4                   1/1     Running   0          101s

Fluentd 启动成功后,就可以发送日志到 ES 了,但是我们这里是过滤了只采集具有logging=true 标签的 Pod 日志,所以现在还没有任何数据会被采集。

下面我们部署一个简单的测试应用, 新建 counterlog.yaml 文件,文件内容如下:

[root@master01 9]# vim counterlog.yaml
apiVersion: v1
kind: Pod
metadata:
  name: counterlog
  labels:
    logging: "true" # 一定要具有该标签才会被采集
spec:
  containers:
    - name: count
      image: registry.cn-hangzhou.aliyuncs.com/abroad_images/busybox:1.30
      args:
        [
          /bin/sh,
          -c,
          'i=0; while true; do echo "$i: $(date)"; i=$((i+1)); sleep 1; done',
        ]

该 Pod 只是简单将日志信息打印到 stdout,所以正常来说 Fluentd 会收集到这个日志数据,在Kibana 中也就可以找到对应的日志数据了,使用 kubectl 工具创建该 Pod:

[root@master01 9]# kubectl create -f counterlog.yaml

# 验证
[root@master01 9]# kubectl get pods | grep counterlog
counterlog                                 1/1     Running   0              21s

也可以手动测试下,是否kibana成功绑定了es集群;

手动推送一条测试数据

curl -X POST \
     -H "Content-Type: application/json" \
     -d '{
            "message": "This is a test log message",
            "timestamp": "2023-07-04T10:00:00",
            "source": "kubernets.cn"
        }' \
     http://192.168.216.174:9200/mylog/_doc

# 执行命令后回显内容
{"_index":"mylog","_type":"_doc","_id":"oCcjN5YB7kllP2l5XDn1","_version":1,"result":"created","_shards":{"total":2,"successful":2,"failed":0},"_seq_no":0,"_primary_term":1}

1、首先选择"Management",再选择"Stack Management"(堆栈管理)。

image-20250415095311203

2、在堆栈管理界面中,您将看到一个"Data"(数据)菜单。在该菜单中,选择"Index Management"(索引管理)。这将显示 Elasticsearch 中的索引列表。

image-20250415095523293

image-20250415095652733

Pod 创建并运行后,回到 Kibana Dashboard 页面,点击左侧最下面的 Management -> Stack Management,进入管理页面,点击左侧 Kibana 下面的 索引模式 ,点击 创建索引模式 开始导入索引数据:

Day09-可观察性-ELK&Loki-图7

在这里可以配置我们需要的 Elasticsearch 索引,前面 Fluentd 配置文件中我们采集的日 志使用的是 logstash 格式,定义了一个 k8s 的前缀,所以这里只需要在文本框中输入k8slog-* 即可匹配到 Elasticsearch 集群中采集的 Kubernetes 集群日志数据,时间戳字段选择"@timestamp"

image-20250415100013154

然后点击"创建索引模式",进入以下页面:

image-20250415100321206

点击左侧导航菜单中的 Discover,然后就可以看到一些直方图和最近采集到的日志数据了:

image-20250415100531644

现在的数据就是上面 Counter 应用的日志,如果还有其他的应用,我们也可以筛选过滤:

Day09-可观察性-ELK&Loki-图10

我们也可以通过其他元数据来过滤日志数据,比如您可以单击任何日志条目以查看其他元数据,如容器名称,Kubernetes 节点,命名空间等。

停止采集

[root@master01 9]# k delete -f  counterlog.yaml

三、总结

在 Kubernetes(K8S)架构下使用 EFK(Elasticsearch + Fluentd + Kibana)服务,具 有如下优势:

  1. 日志集中管理:EFK 提供了一个集中的平台来收集、存储和管理容器化应用程序的日志。它能够从多个容器和节点收集日志,并将其发送到集中的 Elasticsearch 数据 存储中。
  2. 可扩展性:Elasticsearch 和 Fluentd 支持水平扩展,可以根据需要增加节点或副本数量,以适应日志量的增长。
  3. 实时日志分析:通过 Kibana 可视化界面,EFK 提供了实时的日志分析和查询功能。 可以轻松地搜索、过滤和分析大量的日志数据,以便进行故障排除、性能优化和监控。
  4. 灵活的日志解析:Fluentd 是一个高度可配置的日志收集代理,可以轻松进行日志 解析、过滤和转换。它支持多种输入和输出插件,可以适应各种应用程序和日志格式。
  5. 强大的搜索和聚合功能:Elasticsearch 是一个分布式搜索和分析引擎,具有强大的 搜索和聚合功能。它可以通过全文搜索、关键字过滤、聚合和可视化等功能,帮助用户快速定位和分析关键日志信息。
  6. 集成与生态系统:EFK 与 Kubernetes 生态系统紧密集成,支持通过标签过滤、命名空间隔离和动态配置等方式来灵活管理日志收集和展示。此外,EFK 还可以与其他监控和告警工具集成,以提供全面的应用程序运行时监控和报警能力。