一、消息队列简介¶
1.1 消息队列历史¶
1983 年在 MIT 工作的26岁的印度小伙 Vivek Ranadive突发奇想,以前我们的软件相互通信,都是点对点的,而且要实现相同的协议,能不能有一种专门用来通信的中间件,就像主板(BUS)一样,把不同的软件集成起来呢?于是他搞了一家公司(Teknekron),开发了世界上第一个消息队列软件The Information Bus(TIB)。最开始的时候,它被高盛这些公司用在金融交易里面。因为TIB 实现了发布订阅(Publish/Subscribe)模型,信息的生产者和消费者可以完全解耦,这个特性引起了电信行业特别是新闻机构的注意。1994年路透社收购了
Teknekron。
TIB 的成功马上引起了业界大佬IBM 的注意,他们研发了自己的IBM MQ(IBMWesphere)。后面微软也加入了这场竟争,研发了MSMQ。这个时候,每个厂商的产品是孤立的,大家都有自己的技术壁垒。比如一个应用订阅了IBM MQ 的消息,如果有要订阅MSMQ 的消息,因为协议、API 不同,又要重复去实现。。
J2EE 制定了JDBC 的规范,那么那么各个数据库厂商自己去实现协议,提供jar 包,在Java 里面就可以使用相同的API 做操作不同的数据库了。MQ 产品的问题也是一样的
2001 年的时候,SUN 公司发布了 JMS 规范,JMS 即 Java 消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM,message oriented middleware)的API
JMS 想要在各大厂商的MQ 上面统一包装一层Java 的规范,大家都只需要针对API 编程就可以了,不需要关注使用了什么样的消息中间件,只要选择合适的MQ 驱动。可以用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
但是JMS 只适用于Java 语言,它是跟语言绑定的,没有从根本上解决这个问题(只是一个API)。
所以在2006 年的时候,AMQP (Advanced Message Queuing Protocol)规范发布了。它是跨语言和跨平台的,真正地促进了消息队列的繁荣发展。
2007 年的时候,Rabbit 技术公司基于Erlang语言开发了符合AMQP 规范 RabbitMQ 1.0。从最开始用在金融行业里面,现在RabbitMQ 已经在世界各地的公司中遍地开花。国内的绝大部分大厂都在用RabbitMQ,包括头条,美团,滴滴(TMD),去哪儿,艺龙,淘宝也有用。
1.2 MQ 定义¶
阿里云消息队列
https://www.aliyun.com/product/ons?spm=5176.234368.h2v3icoap.427.2620db25lcHi1Q&aly_as=Tz_Lue_o
在分布式场景中,相对于大量的用户请求来说,内部的功能主机之间、功能模块之间等,数据传递的数据量是无法想象的,因为一个用户请求,会涉及到各种内部的业务逻辑跳转等操作。那么,在大量用户的业务场景中,如何保证所有的内部业务逻辑请求都处于稳定而且快捷的数据传递呢? 消息队列(Message Queue)技术可以满足此需求
消息队列(Message Queue,简称 MQ)是构建分布式互联网应用的基础设施,通过 MQ 实现的松耦合架构设计可以提高系统可用性以及可扩展性,是适用于现代应用的最佳设计方案。
消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及缓解高峰期工作负载。
1.3 MQ 使用场合¶

消息队列作为高并发系统的核心组件之一,能够帮助业务系统结构提升开发效率和系统稳定性
消息队列主要有以下应用场景
削峰填谷
诸如电商业务中的秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列可提供削峰填谷的服务来解决该问题。
异步解耦
交易系统作为淘宝等电商的最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列可实现异步通信和应用解耦,确保主站业务的连续性。
顺序收发
细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列提供的顺序消息即保证消息FIFO。
分布式事务一致性
交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
大数据分析
数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用消息队列与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
分布式缓存同步
电商的大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列构建分布式缓存,实时通知商品数据的变化
蓄流压测
线上有些链路不方便做压力测试,可以通过堆积一定量消息再放开来压测
1.4 主流 MQ¶
目前主流的消息队列软件有 Kafka、RabbitMQ、ActiveMQ、RocketMQ等,还有相对小众的消息队列软件如ZeroMQ、Apache Qpid 等。
热度随时间变化的趋势


二、Kafka 介绍¶

kafka
A distributed streaming platform
阿里云消息队列
https://www.aliyun.com/product/ons?spm=5176.234368.h2v3icoap.427.2620db251chi1Q&aly_as=Tz_Lue_o
Kafka 被称为下一代分布式消息系统,由 Scala 和 Java编写,是非营利性组织ASF(Apache Software Foundation)基金会中的一个开源项目,比如:HTTP Server、Tomcat、Hadoop、ActiveMQ等开源软件都属于 Apache基金会的开源软件,类似的消息系统还有RabbitMQ、ActiveMQ、ZeroMQ。
Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家组织中同时投入生产协同工作。
官网: http://kafka.apache.org/
三、常用消息队列对比¶
kafka 最主要的优势是其具备分布式功能、并可以结合 zookeeper 可以实现动态扩容,Kafka 是一种高吞吐量的分布式发布订阅消息系统。
| ActiveMQ | RabbitMQ | Kafka | |
|---|---|---|---|
| 所属社区/公司 | Apache | Mozilla Public License | Apache/LinkedIn |
| 开发语言 | Java | Erlang | Java |
| 支持的协议 | OpenWire、STOMP、REST、XMPP、AMQP | AMQP | 仿AMQP |
| 事务 | 支持 | 不支持 | 不支持 |
| 集群 | 支持 | 支持 | 支持 |
| 负载均衡 | 支持 | 支持 | 支持 |
| 动态扩容 | 不支持 | 不支持 | 支持(zk) |
四、Kafka 特点和优势¶

特点
- 分布式: 支持分布式多主机部署实现
- 分区: 一个消息可以拆分出多个,分别存储在多个位置
- 多副本: 防止信息丢失,可以多来几个备份
- 多订阅者: 可以有很多应用连接kafka
Zookeeper: 早期版本的Kafka依赖于zookeeper, 2021年4月19日Kafka 2.8.0正式发布,此版本包括了很多重要改动,最主要的是kafka通过自我管理的仲裁来替代ZooKeeper,即Kafka将不再需要ZooKeeper!
优势
- Kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 级别以上的消息存储也能够保持长时间的稳定性能
- 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka 服务器分区消息。
- 分布式: Kafka 基于分布式集群实现高可用的容错机制,可以实现自动的故障转移
- 顺序保证:在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。 Kafka保证一个Partiton内的消息的有序性(分区间数据是无序的,如果对数据的顺序有要求,应将在创建主题时将分区数partitions设置为1)
- 支持 Hadoop 并行数据加载
- 通常用于大数据场合,传递单条消息比较大,而Rabbitmq 消息主要是传输业务的指令数据,单条数据较小
Kafka 的性能出色原因
是多种设计选择协同作用的结果,主要原因可以归结为以下几个核心方面:
| 特性 | 解决的问题 | 带来的好处 |
|---|---|---|
| 顺序I/O | 磁盘随机写速度慢 | 极高的磁盘吞吐量,实现持久化与高性能兼得 |
| 零拷贝 | 网络数据传输中CPU拷贝开销大 | 极大降低CPU消耗,提升网络吞吐量 |
| 页缓存 | JVM堆内GC开销大,缓存预热慢 | 避免GC停顿,利用OS高效缓存,重启后快速恢复 |
| 批处理与压缩 | 小消息网络和I/O效率低 | 减少网络请求,提升I/O效率,压缩比率高节省带宽 |
| 分区机制 | 单点性能瓶颈 | 实现生产和消费的水平扩展和并行处理 |
| 二进制格式 | XML/JSON等文本协议解析慢、冗余高 | 处理效率高,网络/存储格式统一,节省CPU |
正是上面设计原则的有机结合,使得 Kafka 能够以极低的延迟和极高的吞吐量处理海量数据
随机和顺序IO比较

O(1)就是最低的时空复杂度,也就是耗时/耗空间与输入数据大小无关,无论输入数据增大多少倍,耗时/耗空间都不变,哈希算法就是典型的O(1)时间复杂度,无论数据规模多大,都可以在一次计算后找到目标
五、Kafka 角色和流程¶
5.1 Kafka 概念¶


Producer:Producer即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka broker。
Consumer:消费者,用于消费消息,即处理消息
Broker:Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例,假设每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如: broker-0、broker-1等……
Controller:是整个 Kafka 集群的管理者角色,任何集群范围内的状态变更都需要通过 Controller 进行,在整个集群中是个单点的服务,可以通过选举协议进行故障转移,负责集群范围内的一些关键操作:主题的新建和删除,主题分区的新建、重新分配,Broker 的加入、退出,触发分区 Leader 选举等,每个 Broker 里都有一个 Controller 实例,多个 Broker 的集群同时最多只有一个 Controller 可以对外提供集群管理服务,Controller 可以在 Broker 之间进行故障转移,Kafka 集群管理的工作主要是由 Controller 来完成的,而 Controller 又通过监听Zookeeper 节点的变动来进行监听集群变化事件,Controller 进行集群管理需要保存集群元数据,监听集群状态变化情况并进行处理,以及处理集群中修改集群元数据的请求,这些主要都是利用 Zookeeper 来实现
Topic :消息的主题,可以理解为消息的分类,一个Topic相当于数据库中的一张表,一条消息相当于关系数据库的一条记录,或者一个Topic相当于Redis中列表数据类型的一个Key,一条消息即为列表中的一个元素。kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
虽然一个 topic的消息保存于一个或多个broker 上同一个目录内, 物理上不同 topic 的消息分开存储在不同的文件夹,但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处,topic 在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的topic 才能消费topic中的消息。
Consumer group: 每个consumer 属于一个特定的consumer group(可为每个consumer 指定 group name,若不指定 group name 则属于默认的group),同一topic的一条消息只能被同一个consumer group 内的一个consumer 消费,类似于一对一的单播机制,但多个consumer group 可同时消费这一消息,类似于一对多的多播机制,默认消费组的多个消费者是共享消息

Partition :是物理上的概念,每个 topic 分割为一个或多个partition,即一个topic切分为多份, 当创建 topic 时可指定 partition 数量,partition的表现形式就是一个一个的文件夹,该文件夹下存储该partition的数据和索引文件,分区的作用还可以实现负载均衡,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,一般Partition数不要超过节点数,注意同一个partition数据是有顺序的,但不同的partition则是无序的。
Replication: 同样数据的副本,包括leader和follower的副本数,基本于数据安全,建议至少2个,是Kafka的高可靠性的保障,和ES的副本有所不同,Kafka中的副本(leader+follower)数包括主分片数,而Elasticsearch中的副本数(follower)不包括主分片数
为了实现数据的高可用,比如将分区 0 的数据分散到不同的kafka 节点,每一个分区都有一个 broker 作为 Leader 和一个 broker 作为Follower,类似于ES中的主分片和副本分片,
假设分区为 3, 即分三个分区0-2,副本为3,即每个分区都有一个 leader,再加两个follower,分区 0 的leader为服务器A,则服务器 B 和服务器 C 为 A 的follower,而分区 1 的leader为服务器B,则服务器 A 和C 为服务器B 的follower,而分区 2 的leader 为C,则服务器A 和 B 为C 的follower。
副本数不能超过集群节点的数量
AR: Assigned Replicas,分区中的所有副本的统称,包括leader和 follower,AR= lSR+ OSR
lSR:ln Sync Replicas,所有与leader副本保持同步的副本 follower和leader本身组成的集合,包括leader和 follower,是AR的子集
OSR:out-of-Sync Replied,所有与leader副本同步不能同步的 follower的集合,是AR的子集
分区和副本的优势:
- 实现存储空间的横向扩容,即将多个kafka服务器的空间组合利用
- 提升性能,多服务器并行读
- 实区即 leader 分布在不同的kafka 服务器,并且有对应follower 分布在和leader不同的服务器上

5.2 Kafka 工作机制¶
Apache Kafka的选举机制涉及到其使用的分区复制模式(partition replication)。
每个Kafka Topic 的分区都有一个领导者(leader)和零个或多个追随者(follower)。
领导者负责所有客户端请求的读写操作,而追随者则复制领导者的数据(kafka2.4新版本后,也支持读)
Kafka的领导者选举过程发生在以下情况中:
-
当新的分区创建时,Kafka选择ISR(In-Sync Replica)列表中的第一个副本作为领导者。
-
当领导者失败或无法与ZooKeeper通信时,会重新选举新的领导者。这个过程被称为领导者故障转移(leader failover)。
Kafka领导者选举的详细步骤:
-
当领导者发生故障,ZooKeeper将会检测到它的会话过期。
-
ZooKeeper接着将通知所有的副本进行领导者选举。
-
副本们会查看它们在ZooKeeper中存储的元数据并确定新的领导者,选择规则是选取副本集合(ISR)中最新的副本。
-
一旦新的领导者被选出,ZooKeeper将通知所有的副本更新它们的元数据。
注意:Kafka 2.8.0版本开始,增加了KRaft(KRaft是Kafka Raft协议模式的简称)模式,这个模式下Kafka不再需要ZooKeeper,而是使用内置的Raft协议来进行元数据的管理和领导者选举。Raft协议是一种为分布式系统提供一致性的算法,它更易于理解和实施,同时也保证了系统的可用性和数据的一致性。

Apache Kafka可以配置为提供强一致性的语义,但这是以牺牲一定的性能为代价的。
在Kafka中,数据被分布在各个分区上,每个分区都有一个领导者(leader)和若干个追随者(follower)。领导者负责处理所有读写请求,而追随者则复制领导者的数据。这种设计确保了数据的高可用性和容错性。
然而,为了保证高性能和低延迟,Kafka允许领导者在数据还未被所有的追随者复制之前就将写操作确认为成功。这就带来了一种可能的情况,即在领导者确认写操作成功后、新数据还未被所有追随者复制完成之前,领导者发生故障,此时新数据可能会丢失,从而造成一致性问题。
要实现强一致性,你需要将Kafka的 min.insync.replicas 参数设置为大于1,这样可以确保至少有N个副本确认数据写入才认为写操作成功。并且需要设置acks为all或者-1,表示领导者需要等待所有的ISR(In-Sync Replicas)确认接收到消息后,才向生产者返回ack。这样,即使领导者发生故障,由于至少有一个追随者已经复制了数据,新的领导者仍然可以提供所有的数据,从而保证了强一致性。
总的来说,Kafka可以配置为提供强一致性,但这可能会影响其性能和吞吐量。在实际使用中,你需要根据自己的业务需求在一致性和性能之间进行权衡。
Kafka的复制配置
Kafka的复制方式可以通过acks(acknowledgments)配置来实现:
- acks=0:生产者发送消息后不等确认即认为成功。这种方式相当于没有复制,只有单个副本存储在Leader上。此方式性能比较好
- acks=1:消息发送到Leader即认为成功,相当于异步复制。在这种情况下,生产者只需要等待Leader的确认,而不需要等待ISR(InSync Replicas,同步副本集)中其他副本的确认。
- acks=all:需要等待ISR列表中所有同步的Slave都确认才认为成功,相当于同步复制。这种方式下,生产者需要等待ISR中所有副本都写入成功后才认为消息发送成功。此方式性能比较差