一、前言

1.1 产生背景

ElastAlert最初由Yelp开发并开源,旨在解决实时监控和告警的需求。由于Elasticsearch 的日志处理能力强大,许多组织和企业使用它来存储和分析大量的日志数据。然而,仅仅存储和分析数据可能无法满足实时监控和快速响应的需求(XPACK收费),因此 ElastAlert应运而生。

1.2 功能介绍

  • 实时日志监控:ElastAlert可以实时监听Elasticsearch中的日志数据,支持灵活的查 询语法来检索和筛选符合特定条件的日志事件。
  • 告警规则定义:用户可以使用YAML配置文件定义告警规则,包括查询条件、时间窗 口、触发条件等。
  • 多种告警通知方式:ElastAlert支持多种告警通知方式,如电子邮件、Slack、 PagerDuty等。
  • 内置和自定义告警策略:ElastAlert内置了一些常见的告警策略及自定义告警模块, 如持续时间、聚合计数、阈值等。
  • 智能告警延迟和降噪:ElastAlert具有智能的延迟和降噪机制(时间窗口、事件频率 等),避免因短暂的异常导致大量冗余告警。

二、ElastAlert部署配置

准备工作目录

[root@master01 ~]# mkdir -p /root/9/ElastAlert
[root@master01 ~]# cd /root/9/ElastAlert/

2.1 封装dingtalk告警模块

下载dingtalk组件:

[root@master01 ElastAlert]# wget https://xmars-devops.oss-cn-shanghai.aliyuncs.com/AliCloud/master.zip

增加dingtalk告警逻辑 dingtalk_alert.py

#! /usr/bin/env python
# -*- coding: utf-8 -*-
import json
import requests
from elastalert.alerts import Alerter, DateTimeEncoder
from requests.exceptions import RequestException
from elastalert.util import EAException
import time
import hmac
import hashlib
import base64
import urllib.parse

class DingTalkAlerter(Alerter):
    required_options = frozenset(['dingtalk_webhook', 'dingtalk_msgtype'])

    def __init__(self, rule):
        super(DingTalkAlerter, self).__init__(rule)
        self.dingtalk_webhook_url = self.rule['dingtalk_webhook']
        self.dingtalk_msgtype = self.rule.get('dingtalk_msgtype', 'text')
        self.dingtalk_isAtAll = self.rule.get('dingtalk_isAtAll', False)
        self.dingtalk_title = self.rule.get('dingtalk_title', '')
        self.dingtalk_secret = self.rule.get('dingtalk_secret','')
    def format_body(self, body):
        return body.encode('utf8')

    def alert(self, matches):
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json;charset=utf-8"
        }
        body = self.create_alert_body(matches)
        payload = {
            "msgtype": self.dingtalk_msgtype,
            "text": {
                "content": body
            },
            "at": {
                "isAtAll": False
            }
        }
        if self.dingtalk_secret!="":
            timestamp = str(round(time.time() * 1000))
            secret = self.dingtalk_secret
            secret_enc = secret.encode('utf-8')
            string_to_sign = '{}\n{}'.format(timestamp, secret)
            string_to_sign_enc = string_to_sign.encode('utf-8')
            hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
            sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
            self.dingtalk_webhook_url=self.dingtalk_webhook_url+"&timestamp={}&sign={}".format(timestamp,sign)

        try:
            response = requests.post(self.dingtalk_webhook_url,
                                     data=json.dumps(payload, cls=DateTimeEncoder),
                                     headers=headers)
            response.raise_for_status()
        except RequestException as e:
            raise EAException("Error request to Dingtalk: {0}".format(str(e)))

    def get_info(self):
        return {
            "type": "dingtalk",
            "dingtalk_webhook": self.dingtalk_webhook_url
        }
        pass

重新封装基于dingtalk模块的监控告警:

[root@master01 ElastAlert]# vim Dockerfile
FROM registry.cn-hangzhou.aliyuncs.com/github_images1024/elastalert-docker:0.2.4
ADD master.zip  /opt/elastalert/
RUN cd /opt/elastalert;unzip master.zip;cd elastalert-dingtalk-plugin-master;pip3 install -i https://mirrors.aliyun.com/pypi/simple/   pyOpenSSL==16.2.0;pip3 install -i https://mirrors.aliyun.com/pypi/simple/  setuptools==46.1.3;cp -r elastalert_modules /usr/local/lib/python3.6/;cd  /usr/local/lib/python3.6/elastalert_modules; rm -rf dingtalk_alert.py
ADD dingtalk_alert.py /usr/local/lib/python3.6/elastalert_modules/
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ  /etc/localtime  && echo $TZ > /etc/timezone

镜像编译&推送

# 编译
[root@master01 ElastAlert]# docker build -t registry.cn-hangzhou.aliyuncs.com/abroad_images/elatrt:v1 .

# 推送
[root@master01 ElastAlert]# docker push registry.cn-hangzhou.aliyuncs.com/abroad_images/elatrt:v1

2.2 定制K8S方式部署

获取钉钉机器人Webhook

  • https://oapi.dingtalk.com/robot/send?access_token=5fddceb7c1a3169016bfcad7ae5e3412fd32a90e0ff919a8b480432c810fe4d3

image-20250415174132448

获取钉钉机器人加签信息

  • SECae2d562aa405ed88b9dcee0b2ba549aac43f186613b42ed64d273eaecb96a3db

image-20250415174221669

  • configmap:配置文件 & Rules文件
  • deployment:elastalert控制器文件
[root@master01 ElastAlert]# vim elastalert.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: elastalert-config
  namespace: logging
  labels:
    app: elastalert
data:
  elastalert_config: |-            #  elastalert配置文件
    ---
    rules_folder: /opt/rules       # 指定规则的目录
    scan_subdirectories: false
    es_host: elasticsearch-master               # 修改为当前集群的es链接地址
    es_port: 9200
    run_every:                     # 多久从ES中查询一次
      seconds:  30
    buffer_time:              #向上翻30分钟查找
      minutes: 30
    writeback_index: elastalert    #创建索引名字
    use_ssl: False      #ssl不做认证
    verify_certs: True
    alert_time_limit:             # 失败重试限制
      minutes:  2400

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: elastalert-rules
  namespace: logging
  labels:
    app: elastalert
data:
  rule_config.yaml: |-        #elastalert规则文件
    name: test-alert     #规则名字,唯一值
    es_host: elasticsearch-master     #es地址,k8s的es
    es_port: 9200               #es端口
    type: any                  #所有类型
    index: k8s-*               #要搜索的索引
    num_events: 1
    timeframe:
      minutes: 1
     #1分钟内,统计个数大于等于1个触发报警
    filter:
     - query:
        query_string:
          query: "ERROR"  #key:value格式,匹配错误日志
    alert:
    - "elastalert_modules.dingtalk_alert.DingTalkAlerter"  #钉钉模块
    dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=5fddceb7c1a3169016bfcad7ae5e3412fd32a90e0ff919a8b480432c810fe4d3"  #钉钉地址
    dingtalk_sercurity_tpye: "sign"      #钉钉加签格式
    dingtalk_msgtype: "text"             #消息类型
    dingtalk_secret: "SECae2d562aa405ed88b9dcee0b2ba549aac43f186613b42ed64d273eaecb96a3db"  #钉钉加签
    alert_subject: "EFK Error!!!"      #报警信息
    alert_text_type: alert_text_only
    alert_text: |  #和下面匹配key:value
     EFK 日志报错, 参照如下信息进行定位!!!
     time: {}
     hostname: {}
     podName: {}
     nameSpaces: {}
     message: {}
     logIndex: {}
    alert_text_args:
    - "@timestamp"
    - kubernetes.host
    - kubernetes.pod_name
    - kubernetes.namespace_name
    - message
    - kubernetes.labels.logIndex
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: elastalert
  namespace: logging
  labels:
    app: elastalert
spec:
  selector:
    matchLabels:
      app: elastalert
  template:
    metadata:
      labels:
        app: elastalert
    spec:
      containers:
      - name: elastalert
        image: registry.cn-hangzhou.aliyuncs.com/abroad_images/elatrt:v1
        imagePullPolicy: IfNotPresent
        command: ["/opt/elastalert/run.sh"]
        volumeMounts:
        - name: config
          mountPath: /opt/config
        - name: rules
          mountPath: /opt/rules
        resources:
          limits:
            cpu: 512m
            memory: 512Mi
          requests:
            cpu: 50m
            memory: 256Mi
      volumes:
      - name: rules
        configMap:
          name: elastalert-rules
      - name: config
        configMap:
          name: elastalert-config
          items:
          - key: elastalert_config
            path: elastalert_config.yaml

应用部署:

# 部署
[root@master01 ElastAlert]# kubectl apply -f elastalert.yaml

# 验证
[root@master01 ElastAlert]# kubectl get pod -nlogging |grep elastalert
elastalert-6489777b77-kqf25     1/1     Running   0               54m

浏览器输入http://kibana.zhang-qing.com/,依次点击"Management"-"Stack Management"-"索引管理"

image-20250415180429265

三、测试验证

部署测试pod

[root@master01 ElastAlert]# vim testerrpod.yaml
apiVersion: v1
kind: Pod
metadata:
  name: counter
  labels:
    logging: "true" # 一定要具有该标签才会被采集
    logIndex: "zhdya"  # 指定索引名称
spec:
  containers:
    - name: count
      image: registry.cn-hangzhou.aliyuncs.com/abroad_images/busybox:1.30
      args:
        [
          /bin/sh,
          -c,
          'i=0; while true; do echo "ERROR $i: $(date)"; i=$((i+1)); sleep 1; done',            # 新增报错ERROR字段
        ]

# pod类型的yaml不能使用kaf testerrpod.yaml进行更新配置
[root@master01 ElastAlert]# k delete -f  testerrpod.yaml ; kaf testerrpod.yaml 

# 验证
[root@master01 ElastAlert]# kgp | grep counter
counter                                    1/1     Running   0              34s

Kinbana查看日志数据:

点击"Discover",选择k8s-zhdya-*,观察到有ERROR日志产生

Day09-可观察性-ELK&Loki-图20

查看钉钉告警:

image-20250415185938366

停止报错日志输出:

[root@master01 ElastAlert]# k delete -f  testerrpod.yaml 

四、企业场景下的多类型告警方案

4.1 多索引,多匹配

name: MultipleErrorLogs
type: frequency
num_events: 5    
timeframe:      
  minutes: 10
#10分钟内,统计个数大于等于5个触发报警
index:
    - myapp-logs-*
    - otherapp-logs-*
filter:
- query:
    query_string:
      query: "error OR exception OR warning"
      default_operator: OR
- query:
    query_string:
      query: "something else"
      default_operator: OR
- query:
    query_string:
      query: "another error"
      default_operator: OR
alert:
- "email"

4.2 多个单索引,单匹配

name: ErrorAlert1
type: frequency
num_events: 1
timeframe:
  minutes: 5
index: my-index-1
filter:
- query:
    query_string:
      query: "error OR exception"
      default_operator: OR
alert:
- "email"

name: ErrorAlert2
type: frequency
num_events: 1
timeframe:
  minutes: 5
index: my-index-2
filter:
- query:
    query_string:
      query: "file not found OR invalid argument"
      default_operator: OR
alert:
- "email"

4.3 基于复杂场景下的configmap

apiVersion: v1
kind: ConfigMap
metadata:
  name: elastalert-config
  namespace: logging
  labels:
    app: elastalert
data:
  elastalert_config: |-            #  elastalert配置文件
    ---
    rules_folder: /opt/rules       # 指定规则的目录
    scan_subdirectories: false
    es_host: elasticsearch-master
    es_port: 9200
    run_every:                     # 多久从 ES 中查询一次
      minutes:  1
    buffer_time:              #向上翻30分钟查找
      minutes: 30
    writeback_index: elastalert    #创建索引名字
    use_ssl: False      #ssl不做认证
    verify_certs: True
    alert_time_limit:             # 失败重试限制
      minutes:  2400

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: elastalert-rules
  namespace:  logging
  labels:
    app: elastalert
data:
  rule_config.yaml: |-        # elastalert规则文件
     name: test-alert     # 规则名字,唯一值
     es_host: elasticsearch-master     #es地址,k8s的es
     es_port: 9200               #es端口
     #type: any                  #所有类型
     index: allapps-*           #要搜索的索引
     #策略规则,如果在10分钟内,匹配的个数大于等于5,那么就触发钉钉报警
     type: frequency
     num_events: 5
     timeframe:
       minutes: 10
     filter:
      - query:
         query_string:
           query: "\"获取AccessToken失败\""
     alert:
     - "elastalert_modules.dingtalk_alert.DingTalkAlerter"  #钉钉模块
     dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=5fddceb7c1a3169016bfcad7ae5e3412fd32a90e0ff919a8b480432c810fe4d3"  #钉钉地址
     dingtalk_sercurity_tpye: "sign"      #钉钉加签格式,感觉可以不要
     dingtalk_msgtype: "text"             #发消息内容
     dingtalk_secret: "SECae2d562aa405ed88b9dcee0b2ba549aac43f186613b42ed64d273eaecb96a3db"  #钉钉加签
     alert_subject: "ERROR!!!"      #报警信息
     alert_text_type: alert_text_only
     alert_text: |  #和下面匹配key:value
      日志监控
      mess: {}
      pod-name: {}
     alert_text_args:
     - message
     - kubernetes.pod.name

  rule_kafka.yaml: |-        # elastalert规则文件
    name: kafka-alert     # 规则名字,唯一值
    es_host: elasticsearch     #es地址,k8s的es
    es_port: 9200               #es端口
    #type: any                  #所有类型
    index: dtk-go-tb-order-*           #要搜索的索引
    type: frequency
    num_events: 5
    timeframe:
      minutes: 10
    filter:
     - query:
        query_string:
          query: "\"写入Kafka消息彻底失败\""
    alert:
    - "elastalert_modules.dingtalk_alert.DingTalkAlerter"  #钉钉模块
    dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=cc67ce67cb55XXXXXX113c2d30ea521cf7e704"  #钉钉地址
    dingtalk_sercurity_tpye: "sign"      #钉钉加签格式,感觉可以不要
    dingtalk_msgtype: "text"             #发消息内容
    dingtalk_secret: "SECae2d562aa405ed88b9dcee0b2ba549aac43f186613b42ed64d273eaecb96a3db"  #钉钉加签
    alert_subject: "ERROR!!!"      #报警信息
    alert_text_type: alert_text_only
    alert_text: |  #和下面匹配key:value
     日志监控
     mess: {}
     pod-name: {}
    alert_text_args:
    - message
    - kubernetes.pod.name

五、总结

  • 部署方式:改良传统docker运行方式,适配当前EFLK模式,运行在Kubernetes集群;
  • 规则定义:通过编写规则文件来定义要监控的指标或事件,并设置条件和阈值;
  • 告警方式:提升并优化告警方式为快消息类型,提升效率;
  • 调优和优化:可以根据企业实际情况进行参数调优。包括调整告警规则、优化查询性能、改进告警策略等;