分布式消息队列
分布式消息队列中间件是是大型分布式系统不可缺少的中间件,通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要等待接收此消息。所以消息队列主要解决应用耦合、异步消息、流量削锋等问题,实现高性能、高可用、可伸缩和最终一致性架构。消息队列已经逐渐成为企业应用系统内部通信的核心手段,当前使用较多的消息队列有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ 等,而部分数据库如 Redis、MySQL 以及 PhxSQL 也可实现消息队列的功能。
MQ消息队列的技术应用
解耦 是消息队列要解决的最本质问题。
最终一致性 指的是两个系统的状态保持一致,要么都成功,要么都失败。最终一致性不是消息队列的必备特性,但确实可以依靠消息队列来做最终一致性的事情。
广播 消息队列的基本功能之一是进行广播。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
异步 如果需要提高高延时接口的响应速度。那么,就可以考虑使用 MQ 来实现异步执行,只需要将请求信息写入 MQ 就可以立即返回,实现高响应速度,至于剩余的工作交给 MQ 即可。
错峰与流控 如果业务中有高流量的场景,为了避免高流量一下子打到数据库层,造成数据库瘫痪,可以考虑将请求写入 MQ,即 MQ 作为一个缓冲区,然后在系统支持的消费速度下,通过下游服务不断地消费 MQ 中累积的消息。当高峰期过后,MQ 中的积压消息将慢慢被消费掉。
缺点
- 降低了系统的可用性。原先只有上下游,如今中间多了个 MQ。一旦 MQ 故障,整个系统将不可用。
- 提高了系统的复杂性。多了 MQ 之后,开发过程中可能需要考虑 MQ 重复消费,消息丢失,如果保证消息顺序的问题。
- 一致性问题。MQ 作为中间代理,多个下游执行过程中,某个或多个执行失败,那么势必造成数据的不一致。
为什么要用消息系统
-
解耦 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束
-
冗余 有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。
-
扩展性 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
-
灵活性 & 峰值处理能力 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
-
可恢复性 当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。
-
送达保证 消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,部分消息系统提供了一个”只送达一次”保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是”预定”了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。
-
顺序保证 在大多使用场景下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。部分消息系统保证消息通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。
-
缓冲 在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行–写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。
-
理解数据流 在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息队列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。
-
异步通信 很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。
常用 Message Queue 对比
Kafka Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。 RabbitMQ RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RocketMQ RocketMQ是阿里开源的消息中间件,它是纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
RabbitMQ遵循AMQP协议,RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer通过连接channel和server进行通信,Consumer从queue获取消息进行消费(长连接,queue有消息会推送到consumer端,consumer循环从输入流读取数据)。rabbitMQ以broker为中心;有消息的确认机制。
kafka遵从一般的MQ结构,producer,broker,consumer,以consumer为中心,消息的消费信息保存的客户端consumer上,consumer根据消费的点,从broker上批量pull数据;无消息确认机制。
在吞吐量, kafka kafka具有高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操作,具有O(1)的复杂度,消息处理的效率很高。
rabbitMQ rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不一样,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操作;基于存储的可靠性的要求存储可以采用内存或者硬盘。
在可用性方面,
rabbitMQ
rabbitMQ支持miror的queue,主queue失效,miror queue接管。
kafka
kafka的broker支持主备模式。
在集群负载均衡方面, kafka kafka采用zookeeper对集群中的broker、consumer进行管理,可以注册topic到zookeeper上;通过zookeeper的协调机制,producer保存对应topic的broker信息,可以随机或者轮询发送到broker上;并且producer可以基于语义指定分片,消息发送到broker的某分片上。
rabbitMQ rabbitMQ的负载均衡需要单独的loadbalancer进行支持。
特性 | Kafka | RabbitMQ | RocketMQ |
---|---|---|---|
开发语言 | Scala | Erlang | Java |
单机吞吐量 | 10万 | 万 | 10万 |
可用性 | 非常高(分布式架构) | 高(主从架构) | 非常高(分布式架构) |
功能 | 只支持主要MQ功能,例如消息回溯功能没有提供,毕竟是为大数据准备的 | 基于 erlang开发,所以并发能力强,性能及其好,延时很低;管理界面较丰富 | MQ功能比较完备,扩展性佳 |
消息重复 | at least once、at most once | at least once、at most once | at least once |
消息回溯 | 不支持 | 指定分区offset 位置回溯 | 指定时间点的回溯 |
消息重试 | 不支持,支持指定分区offset位置的回溯,可以实现消息重试。 | 不支持,但是可以利用消息确认机制实现。rabbitmq接收方确认机制,设置autoAck为false。 | 支持 |
如何保证消息不重复消费(幂等)
- 如果将数据写入到 Mysql,需要根据表主键判断行是否存在进行新增或更新即可;
- 如果将数据写入到 redis,直接写入即可。不过,需要一个业务唯一ID作为 redis 的 key;
如何保证消息不丢失
保证生产者不丢数据
一是可以考虑开启 RabbitMQ 提供的事务操作。当发生消息丢失异常时,可以回滚事务,然后重新发送。这种方式下会极大地降低 MQ 吞吐量;
二是开启 channel 的 confirm 模式。生产者发送消息给 MQ 后,MQ 会回传一个 ack。如果生产者没有收到,就可以重试。该监听方法是异步的,不会阻塞,可以直接进行下一条的发送。
保证 MQ 不丢数据
这里需要开启 MQ 的持久化。RabbitMQ 开启持久化分两步,一是开启 queue 的持久化,这样 MQ 会持久化 queue 的元数据;二是设置发送消息时的消息选项,是否持久化。
当 MQ 自己宕机,重启之后,也可以从磁盘中恢复 queue 和 queue 中的消息。
保证消费者不丢数据
消费者默认有 AutoAck 的设置,即消费者会自动通知 RabbitMQ,这条消息已经消费完成。如果这条消息正在消费过程中,且消费者已经通知 MQ 消息消费成功,而此时消费者又发生故障,那么这条消息将丢失。
解决办法也很明显,就是关闭 AutoAck,消息消费完了才返回 ack。
如何保证消息有序消费
RabbitMQ 就是一个 queue 对应一个 consumer。由于写入到一个 queue 的消息是有序的,一个 consumer 消费起来自然也是有序的。Kafka 则是一个 partition 对应一个 consumer。
如果一个 queue 对应一个消费者的消费速度达不到业务需求,可以根据业务 key,将消息通过 hash 算法,映射到多个内存队列,然后开启多个线程进行消费。对于同一个业务 key 的消息,它们的消费是顺序的。
延时消息
RocketMQ 仅支持若干个级别的延时消息,通过在启动时,创建多个主题 schdule_topic_xx 对应不同延时级别。
延时消费流程:
- 首先将消息存到commitLog(MQ的本地文件系统)。当判断出为延时消息后,将消息投递到 schdule_topic_xx 主题队列
- delay service 读取 schdule_topic_xx 队列偏移量,然后根据主题队列的延时级别,不断创建延时迟任务(通过 timer 实现)
- 当延时任务到期后,读取消息投递到目标队列,供消费者消费
MQ 积压大量消息
为什么出现消息堆积,可能造成的原因有
- 生产者发送速率远大于消费者消费速率(高并发场景,比如促销,秒杀)
- 消费者出现问题,导致消费速率异常或无法消费
处理办法
- 先处理 consumer,保证其能正常消费
- 再建立原先 10 倍或 20 倍的队列,保证数据分发速率
- 再启动 10 倍或 20 倍的机器来部署 consumer,将堆积的大量消息消费掉