Kafka
Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和任务关键型应用程序。
Apache Kafka 是一个分布式的发布-订阅的消息系统。官方称为 distributed streaming platform
核心能力
- 高吞吐量 使用延迟低至 2 毫秒的机器集群以网络有限的吞吐量传递消息。
- 可扩展 将生产集群扩展到一千个代理、每天数万亿条消息、PB 级数据、数十万个分区。弹性扩展和收缩存储和处理。
- 永久的存储 将数据流安全地存储在分布式、持久、容错的集群中。
- 高可用性 在可用区域上有效地扩展集群或跨地理区域连接单独的集群。
生态系统
- 内置流处理 使用事件时间和精确一次处理,通过连接、聚合、过滤器、转换等处理事件流。
- 连接到几乎任何东西 Kafka 开箱即用的 Connect 接口与数百个事件源和事件接收器集成,包括 Postgres、JMS、Elasticsearch、AWS S3 等。
- 客户端库 使用大量编程语言读取、写入和处理事件流。
- 大型生态系统开源工具 大型开源工具生态系统:利用大量社区驱动的工具。
平台具有三个关键功能:
- 发布和订阅记录流,类似于消息队列或企业消息传递系统。
- 以高容错的持久方式存储记录流。
- 处理记录流,保证数据流的顺序。
Kafka 通常用于两大类应用程序:
- 建立实时流数据管道,以可靠地在系统或应用程序之间获取数据
- 构建实时流应用程序以转换或响应数据流
要了解Kafka如何执行这些操作,让我们从头开始深入研究Kafka的功能。
首先几个概念:
- Kafka 在一个或多个可以跨越多个数据中心的服务器上作为集群运行。
- Kafka 集群 将 records 流存储在分类中称为 topics 。
- 每个记录由一个键,一个值和一个时间戳组成。
Kafka具有四个核心API:
- Producer API 允许应用程序发布的记录流至一个或多个 Kafka topics。
- Consumer API 允许应用程序订阅一个或多个主题,并处理所产生的记录的数据流。
- Streams API 允许应用程序充当流处理器,从一个或多个主题消耗的输入流,并产生一个输出流至一个或多个输出的主题,有效地将所述输入数据流,以输出流。
- Connector API 允许构建和运行可重复使用的生产者或消费者连接卡夫卡主题,以现有的应用程序或数据系统。例如,关系数据库的连接器可能会捕获对表的所有更改。
角色模型及概念
Producer:消息生产者。
Broker:kafka集群中的服务器。
Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。
Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Consumer:消息消费者。
Consumer Group 我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
Partition的副本机制
为了保证消息的可靠性,kafka为分区引入了多副本Replica机制,通过增加副本数量提高容灾能力。一个分区中的所有副本统称为AR(Assigned Replicas),每个分区中有leader与follower两种角色副本,二者之间是主从关系,一个分区只有一个leader,通常多个follower。
分区的ISR集机制
所有与leader保持同步副本组成了ISR(In-Sync-Replicas),ISR集合是AR集合的一个子集。消息读写请求先发往leader,再同步到follower,在同步期间,follower副本较之leader往往具有一定的滞后,具体滞后标准参数可配置。与leader滞后过多的follower副本组成OSR(Out-of-Sync Replicas), 因此AR由ISR和OSR两部分组成。正常情况下,所有的follower副本都应该与leader副本保持一致,即没有滞后副本,OSR集合为空。
leader对ISR的维护
leader负责维护和跟踪ISR集中所有follower副本的滞后状态,当follower滞后太多时,leader会把follower从ISR集中剔除掉,将其移入OSR集中,当滞后状态又重新追上leader时,再从OSR集中恢复到ISR集合中。当leader发生故障时,只有在ISR中集合的副本才有资格参与新leader的选举。
高效,吞吐量大
- Topic分区,不同分区分布在不同Broker上,解决大量写下的单机IO瓶颈问题,利用多机进行分布式存储来提高吞吐量。
- 文件分段
- 顺序读写
- 缓冲并批量发送
- 零拷贝
ISR实现可用性与数据一致性的动态平衡
高效使用磁盘
顺序写磁盘
Kafka的整个设计中,Partition相当于一个非常长的数组,而Broker接收到的所有消息顺序写入这个大数组中。同时Consumer通过Offset顺序消费这些数据,并且不删除已经消费的数据,从而避免了随机写磁盘的过程。
由于磁盘有限,不可能保存所有数据,实际上作为消息系统Kafka也没必要保存所有数据,需要删除旧的数据。而这个删除过程,并非通过使用“读-写”模式去修改文件,而是将Partition分为多个Segment,每个Segment对应一个物理文件,通过删除整个文件的方式去删除Partition内的数据。这种方式清除旧数据的方式,也避免了对文件的随机写操作。
充分利用Page Cache
使用Page Cache的好处如下
- I/O Scheduler会将连续的小块写组装成大块的物理写从而提高性能
- I/O Scheduler会尝试将一些写操作重新按顺序排好,从而减少磁盘头的移动时间
- 充分利用所有空闲内存(非JVM内存)。如果使用应用层Cache(即JVM堆内存),会增加GC负担
- 读操作可直接在Page Cache内进行。如果消费和生产速度相当,甚至不需要通过物理磁盘(直接通过Page Cache)交换数据
- 如果进程重启,JVM内的Cache会失效,但Page Cache仍然可用
支持多Disk Drive
Broker的log.dirs
配置项,允许配置多个文件夹。如果机器上有多个Disk Drive,可将不同的Disk挂载到不同的目录,然后将这些目录都配置到log.dirs
里。Kafka会尽可能将不同的Partition分配到不同的目录,也即不同的Disk上,从而充分利用了多Disk的优势。
零拷贝
Kafka中存在大量的网络数据持久化到磁盘(Producer到Broker)和磁盘文件通过网络发送(Broker到Consumer)的过程。这一过程的性能直接影响Kafka的整体吞吐量。
sendfile和transferTo实现零拷贝
Linux 2.4+内核通过sendfile
系统调用,提供了零拷贝。数据通过DMA拷贝到内核态Buffer后,直接通过DMA拷贝到NIC Buffer,无需CPU拷贝。这也是零拷贝这一说法的来源。除了减少数据拷贝外,因为整个读文件-网络发送由一个sendfile
调用完成,整个过程只有两次上下文切换,因此大大提高了性能。
减少网络开销
批处理
批处理是一种常用的用于提高I/O性能的方式。对Kafka而言,批处理既减少了网络传输的Overhead,又提高了写磁盘的效率。
数据压缩降低网络负载
Kafka从0.7开始,即支持将数据压缩后再传输给Broker。除了可以将每条消息单独压缩然后传输外,Kafka还支持在批量发送时,将整个Batch的消息一起压缩后传输。数据压缩的一个基本原理是,重复数据越多压缩效果越好。因此将整个Batch的数据一起压缩能更大幅度减小数据量,从而更大程度提高网络传输效率。
Broker接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘。Consumer Fetch到数据后再解压缩。因此Kafka的压缩不仅减少了Producer到Broker的网络传输负载,同时也降低了Broker磁盘操作的负载,也降低了Consumer与Broker间的网络传输量,从而极大得提高了传输效率,提高了吞吐量。
高效的序列化方式
Kafka消息的Key和Payload(或者说Value)的类型可自定义,只需同时提供相应的序列化器和反序列化器即可。因此用户可以通过使用快速且紧凑的序列化-反序列化方式(如Avro,Protocal Buffer)来减少实际网络传输和磁盘存储的数据规模,从而提高吞吐率。这里要注意,如果使用的序列化方法太慢,即使压缩比非常高,最终的效率也不一定高。
数据可靠性和一致性
Leader 的选举
主要从维护的ISR(in-sync replicas)列表中选出。ISR官方的解释是:“isr” is the set of “in-sync” replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
也就是说ISR是满足指定同步条件的replica的集合。它是所有replica的子集。为什么我这里说是满足了指定条件的replica呢,因为要进入ISR列表,一般需要满足两个条件,follower即落后的消息数不超过replica.lag.max.messages所配置的数值并且follower能够在指定的时间(replica.lag.time.max.ms)内向leader发送fetch请求,则不会将它从ISR列表中删除。这样在unclean.leader.election.enable=false,的情况下,如果leader挂了,则kafka会从ISR列表中选择第一个follower作为leader,这就保证了已经提交的数据的可靠性。
Broker 的分布式部署
Broker的分布式部署(一般三个以上实例),保证了数据在传输过程中,不会因为单Broker挂掉而导致数据丢失。
Partition 的 replications
Kafka 从 0.8.0 版本开始引入了分区副本KAFKA-50的概念,我们在创建Kafka Topic的时候,通过指定replication-factor可以设置Partition的副本数,也可以在配置文件中通过参数:default.replication.factor,指定默认的分区副本数。
Producer 的 acks
Producer通过配置的acks来判断消息是否发送成功。
- 如果设置acks=0,表示不等待ack确认,则立刻认为发送成功,并进行下一次发送。在这种模式下,Kafka的吞吐量非常大,但是发送丢失数据的概率也随之变大,是很有可能丢失的。
- 如果设置acks=1,表示等待leader反馈接收成功后,则认为发送成功,并进行下一次发送。如果Leader在接收到消息后,仍未同步到Follower,此时发生了崩溃,是有可能导致数据丢失的。
- acks=all或者acks=-1这种情况是最安全的,当然效率也是最低的。Producer会等待Leader返回确认信息,而Leader会等待所有的副本均同步完成。
一致性
为了保障不同的消费者在数据消费过程中的一致性,Kafka引入了High Water Mark 机制,当我们将隔离级别isolation.level设置为:read_committed时,Kafka会保证所有消费者所消费的消息都是在High Water Mark之下。所谓High Water Mark类似于木桶原理所示,水位线的最高点为已经同步到所有Follower的消息所对应的的offset。也就是ISR列表中偏移量最小的副本。
Q&A
Kafka 是如何实现高吞吐率的?
- 顺序读写
- 零拷贝
- 文件分段
- 批量发送
- 数据压缩。
Kafka 缺点?
- 由于是批量发送,数据并非真正的实时;
- 对于mqtt协议不支持;
- 不支持物联网传感数据直接接入;
- 仅支持统一分区内消息有序,无法实现全局消息有序;
- 监控不完善,需要安装插件;
- 依赖zookeeper进行元数据管理;
在 Kafka 中,ZooKeeper 的作用是什么?
目前,Kafka 使用 ZooKeeper 存放集群元数据、成员管理、Controller 选举,以及其他一些管理类任务。之后,等 KIP-500 提案完成后,Kafka 将完全不再依赖 于 ZooKeeper。 记住,一定要突出“目前”,以彰显你非常了解社区的演进计划。“存放元数据”是指主题 分区的所有数据都保存在 ZooKeeper 中,且以它保存的数据为权威,其他“人”都要与它 保持对齐。“成员管理”是指 Broker 节点的注册、注销以及属性变更,等 等。“Controller 选举”是指选举集群 Controller,而其他管理类任务包括但不限于主题 删除、参数配置等。 不过,抛出 KIP-500 也可能是个双刃剑。碰到非常资深的面试官,他可能会进一步追问你 KIP-500 是做的。一言以蔽之:KIP-500 思想,是使用社区自研的基于 Raft 的共识算法, 替代 ZooKeeper,实现 Controller 自选举。
解释下 Kafka 中位移(offset)的作用
在 Kafka 中,每个 主题分区下的每条消息都被赋予了一个唯一的 ID 数值,用于标识它在分区中的位置。这个 ID 数值,就被称为位移,或者叫偏移量。一旦消息被写入到分区日志,它的位移值将不能 被修改。
Kafka 中的领导者副本(Leader Replica)和追随者副本 (Follower Replica)的区别
Kafka 副本当前分为领导者副本和追随者副本。只有 Leader 副本才能 对外提供读写服务,响应 Clients 端的请求。Follower 副本只是采用拉(PULL)的方 式,被动地同步 Leader 副本中的数据,并且在 Leader 副本所在的 Broker 宕机后,随时 准备应聘 Leader 副本。强调 Follower 副本也能对外提供读服务。自 Kafka 2.4 版本开始,社区通过引入新的 Broker 端参数,允许 Follower 副本有限度地提供读服务。
强调 Leader 和 Follower 的消息序列在实际场景中不一致。很多原因都可能造成 Leader 和 Follower 保存的消息序列不一致,比如程序 Bug、网络问题等。这是很严重 的错误,必须要完全规避。你可以补充下,之前确保一致性的主要手段是高水位机制, 但高水位值无法保证 Leader 连续变更场景下的数据一致性,因此,社区引入了 Leader Epoch 机制,来修复高水位值的弊端。关于“Leader Epoch 机制”
高水位的作用
- 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费的。
- 帮助 Kafka 完成副本同步。
日志末端位移的概念,即 Log End Offset,简写是 LEO。它表示副本写入下一条消息的位移值。显然,介于高水位和 LEO 之间的消息就属于未提交消息。同一个副本对象,其高水位值不会大于 LEO 值。
Kafka 所有副本都有对应的高水位和 LEO 值,而不仅仅是 Leader 副本。只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。
什么叫与 Leader 副本保持同步。
- 该远程 Follower 副本在 ISR 中。
- 该远程 Follower 副本 LEO 值落后于 Leader 副本 LEO 值的时间,不超过 Broker 端参数 replica.lag.time.max.ms 的值。如果使用默认值的话,就是不超过 10 秒。
社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。 所谓 Leader Epoch,大致可以认为是 Leader 版本。它由两部分数据组成。
- Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
- 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
删除 ZooKeeper 节点 /controller,触发 Controller 重选举。 Controller 重选举能够为所有主题分区重刷分区状态,可以有效解决因不一致导致的 Leader 不可用问题。
在 Kafka 中,体现 Zero Copy 使用场景的地方有两处:基于 mmap 的索引和日志文件读写所用的 TransportLayer。
索引都是基于 MappedByteBuffer 的,也就是让用户态和内核态共享内核态 的数据缓冲区,此时,数据不需要复制到用户态空间。不过,mmap 虽然避免了不必要的 拷贝,但不一定就能保证很高的性能。在不同的操作系统下,mmap 的创建和销毁成本可 能是不一样的。很高的创建和销毁开销会抵消 Zero Copy 带来的性能优势。由于这种不确 定性,在 Kafka 中,只有索引应用了 mmap,最核心的日志并未使用 mmap 机制。
TransportLayer 是 Kafka 传输层的接口。它的某个实现类使用了 FileChannel 的 transferTo 方法。该方法底层使用 sendfile 实现了 Zero Copy。对 Kafka 而言,如果 I/O 通道使用普通的 PLAINTEXT,那么,Kafka 就可以利用 Zero Copy 特 性,直接将页缓存中的数据发送到网卡的 Buffer 中,避免中间的多次拷贝。相反,如果 I/O 通道启用了 SSL,那么,Kafka 便无法利用 Zero Copy 特性了。