1
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
消息发送
发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SyncProducer {
public static void main ( String [] args ) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer ( "please_rename_unique_group_name" );
// 设置NameServer的地址
producer . setNamesrvAddr ( "localhost:9876" );
// 启动Producer实例
producer . start ();
for ( int i = 0 ; i < 100 ; i ++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message ( "TopicTest" /* Topic */ ,
"TagA" /* Tag */ ,
( "Hello RocketMQ " + i ). getBytes ( RemotingHelper . DEFAULT_CHARSET ) /* Message body */
);
// 发送消息到一个Broker
SendResult sendResult = producer . send ( msg );
// 通过sendResult返回消息是否成功送达
System . out . printf ( "%s%n" , sendResult );
}
// 如果不再发送消息,关闭Producer实例。
producer . shutdown ();
}
}
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class AsyncProducer {
public static void main ( String [] args ) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer ( "please_rename_unique_group_name" );
// 设置NameServer的地址
producer . setNamesrvAddr ( "localhost:9876" );
// 启动Producer实例
producer . start ();
producer . setRetryTimesWhenSendAsyncFailed ( 0 );
int messageCount = 100 ;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2 ( messageCount );
for ( int i = 0 ; i < messageCount ; i ++) {
final int index = i ;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message ( "TopicTest" ,
"TagA" ,
"OrderID188" ,
"Hello world" . getBytes ( RemotingHelper . DEFAULT_CHARSET ));
// SendCallback接收异步返回结果的回调
producer . send ( msg , new SendCallback () {
@Override
public void onSuccess ( SendResult sendResult ) {
countDownLatch . countDown ();
System . out . printf ( "%-10d OK %s %n" , index ,
sendResult . getMsgId ());
}
@Override
public void onException ( Throwable e ) {
countDownLatch . countDown ();
System . out . printf ( "%-10d Exception %s %n" , index , e );
e . printStackTrace ();
}
});
}
// 等待5s
countDownLatch . await ( 5 , TimeUnit . SECONDS );
// 如果不再发送消息,关闭Producer实例。
producer . shutdown ();
}
}
单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class OnewayProducer {
public static void main ( String [] args ) throws Exception {
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer ( "please_rename_unique_group_name" );
// 设置NameServer的地址
producer . setNamesrvAddr ( "localhost:9876" );
// 启动Producer实例
producer . start ();
for ( int i = 0 ; i < 100 ; i ++) {
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message ( "TopicTest" /* Topic */ ,
"TagA" /* Tag */ ,
( "Hello RocketMQ " + i ). getBytes ( RemotingHelper . DEFAULT_CHARSET ) /* Message body */
);
// 发送单向消息,没有任何返回结果
producer . sendOneway ( msg );
}
// 如果不再发送消息,关闭Producer实例。
producer . shutdown ();
}
}
消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Consumer {
public static void main ( String [] args ) throws InterruptedException , MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ( "please_rename_unique_group_name" );
// 设置NameServer的地址
consumer . setNamesrvAddr ( "localhost:9876" );
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer . subscribe ( "TopicTest" , "*" );
// 注册回调实现类来处理从broker拉取回来的消息
consumer . registerMessageListener ( new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage ( List < MessageExt > msgs , ConsumeConcurrentlyContext context ) {
System . out . printf ( "%s Receive New Messages: %s %n" , Thread . currentThread (). getName (), msgs );
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus . CONSUME_SUCCESS ;
}
});
// 启动消费者实例
consumer . start ();
System . out . printf ( "Consumer Started.%n" );
}
}
顺序消息样例
全局顺序消息:
RocketMQ 的一个 Topic 下默认有八个读队列和八个写队列,如果要保证全局顺序消息的话需要在生产端只保留一个读写队列,然后消费端只有一个消费线程,这样会降低 RocketMQ 的高可用和高吞吐量。
分区顺序消息:
分区顺序消息同样需要生产端和消费端配合,生产端根据同一个订单 ID 把消息路由到同一个 MessageQueue,消费端控制从同一个 MessageQueue 取出的消息不被并发处理。
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
顺序消息生产
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
public class Producer {
public static void main ( String [] args ) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer ( "please_rename_unique_group_name" );
producer . setNamesrvAddr ( "127.0.0.1:9876" );
producer . start ();
String [] tags = new String []{ "TagA" , "TagC" , "TagD" };
// 订单列表
List < OrderStep > orderList = new Producer (). buildOrders ();
Date date = new Date ();
SimpleDateFormat sdf = new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" );
String dateStr = sdf . format ( date );
for ( int i = 0 ; i < 10 ; i ++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList . get ( i );
Message msg = new Message ( "TopicTest" , tags [ i % tags . length ], "KEY" + i , body . getBytes ());
SendResult sendResult = producer . send ( msg , new MessageQueueSelector () {
@Override
public MessageQueue select ( List < MessageQueue > mqs , Message msg , Object arg ) {
Long id = ( Long ) arg ; //根据订单id选择发送queue
long index = id % mqs . size ();
return mqs . get (( int ) index );
}
}, orderList . get ( i ). getOrderId ()); //订单id
System . out . println ( String . format ( "SendResult status:%s, queueId:%d, body:%s" ,
sendResult . getSendStatus (),
sendResult . getMessageQueue (). getQueueId (),
body ));
}
producer . shutdown ();
}
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId ;
private String desc ;
public long getOrderId () {
return orderId ;
}
public void setOrderId ( long orderId ) {
this . orderId = orderId ;
}
public String getDesc () {
return desc ;
}
public void setDesc ( String desc ) {
this . desc = desc ;
}
@Override
public String toString () {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}' ;
}
}
/**
* 生成模拟订单数据
*/
private List < OrderStep > buildOrders () {
List < OrderStep > orderList = new ArrayList < OrderStep >();
OrderStep orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103111039L );
orderDemo . setDesc ( "创建" );
orderList . add ( orderDemo );
orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103111065L );
orderDemo . setDesc ( "创建" );
orderList . add ( orderDemo );
orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103111039L );
orderDemo . setDesc ( "付款" );
orderList . add ( orderDemo );
orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103117235L );
orderDemo . setDesc ( "创建" );
orderList . add ( orderDemo );
orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103111065L );
orderDemo . setDesc ( "付款" );
orderList . add ( orderDemo );
orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103117235L );
orderDemo . setDesc ( "付款" );
orderList . add ( orderDemo );
orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103111065L );
orderDemo . setDesc ( "完成" );
orderList . add ( orderDemo );
orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103111039L );
orderDemo . setDesc ( "推送" );
orderList . add ( orderDemo );
orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103117235L );
orderDemo . setDesc ( "完成" );
orderList . add ( orderDemo );
orderDemo = new OrderStep ();
orderDemo . setOrderId ( 15103111039L );
orderDemo . setDesc ( "完成" );
orderList . add ( orderDemo );
return orderList ;
}
}
顺序消费消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class ConsumerInOrder {
public static void main ( String [] args ) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ( "please_rename_unique_group_name_3" );
consumer . setNamesrvAddr ( "127.0.0.1:9876" );
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer . setConsumeFromWhere ( ConsumeFromWhere . CONSUME_FROM_FIRST_OFFSET );
consumer . subscribe ( "TopicTest" , "TagA || TagC || TagD" );
consumer . registerMessageListener ( new MessageListenerOrderly () {
Random random = new Random ();
@Override
public ConsumeOrderlyStatus consumeMessage ( List < MessageExt > msgs , ConsumeOrderlyContext context ) {
context . setAutoCommit ( true );
for ( MessageExt msg : msgs ) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System . out . println ( "consumeThread=" + Thread . currentThread (). getName () + "queueId=" + msg . getQueueId () + ", content:" + new String ( msg . getBody ()));
}
try {
//模拟业务逻辑处理中...
TimeUnit . SECONDS . sleep ( random . nextInt ( 10 ));
} catch ( Exception e ) {
e . printStackTrace ();
}
return ConsumeOrderlyStatus . SUCCESS ;
}
});
consumer . start ();
System . out . println ( "Consumer Started." );
}
}
延时消息样例
启动消费者等待传入订阅消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class ScheduledMessageConsumer {
public static void main ( String [] args ) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ( "ExampleConsumer" );
// 订阅Topics
consumer . subscribe ( "TestTopic" , "*" );
// 注册消息监听者
consumer . registerMessageListener ( new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage ( List < MessageExt > messages , ConsumeConcurrentlyContext context ) {
for ( MessageExt message : messages ) {
// Print approximate delay time period
System . out . println ( "Receive message[msgId=" + message . getMsgId () + "] " + ( System . currentTimeMillis () - message . getBornTimestamp ()) + "ms later" );
}
return ConsumeConcurrentlyStatus . CONSUME_SUCCESS ;
}
});
// 启动消费者
consumer . start ();
}
}
发送延时消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class ScheduledMessageProducer {
public static void main ( String [] args ) throws Exception {
// 实例化一个生产者来产生延时消息
DefaultMQProducer producer = new DefaultMQProducer ( "ExampleProducerGroup" );
// 启动生产者
producer . start ();
int totalMessagesToSend = 100 ;
for ( int i = 0 ; i < totalMessagesToSend ; i ++) {
Message message = new Message ( "TestTopic" , ( "Hello scheduled message " + i ). getBytes ());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message . setDelayTimeLevel ( 3 );
// 发送消息
producer . send ( message );
}
// 关闭生产者
producer . shutdown ();
}
}
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
1
2
3
// 延时消息的使用限制 SendMessageProcessor.java
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ;
批量消息样例
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
1
2
3
4
5
6
7
8
9
10
11
String topic = "BatchTest" ;
List < Message > messages = new ArrayList <>();
messages . add ( new Message ( topic , "TagA" , "OrderID001" , "Hello world 0" . getBytes ()));
messages . add ( new Message ( topic , "TagA" , "OrderID002" , "Hello world 1" . getBytes ()));
messages . add ( new Message ( topic , "TagA" , "OrderID003" , "Hello world 2" . getBytes ()));
try {
producer . send ( messages );
} catch ( Exception e ) {
e . printStackTrace ();
//处理error
}
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ListSplitter implements Iterator < List < Message >> {
private final int SIZE_LIMIT = 1024 * 1024 * 4 ;
private final List < Message > messages ;
private int currIndex ;
public ListSplitter ( List < Message > messages ) {
this . messages = messages ;
}
@Override public boolean hasNext () {
return currIndex < messages . size ();
}
@Override public List < Message > next () {
int startIndex = getStartIndex ();
int nextIndex = startIndex ;
int totalSize = 0 ;
for (; nextIndex < messages . size (); nextIndex ++) {
Message message = messages . get ( nextIndex );
int tmpSize = calcMessageSize ( message );
if ( tmpSize + totalSize > SIZE_LIMIT ) {
break ;
} else {
totalSize += tmpSize ;
}
}
List < Message > subList = messages . subList ( startIndex , nextIndex );
currIndex = nextIndex ;
return subList ;
}
private int getStartIndex () {
Message currMessage = messages . get ( currIndex );
int tmpSize = calcMessageSize ( currMessage );
while ( tmpSize > SIZE_LIMIT ) {
currIndex += 1 ;
Message message = messages . get ( curIndex );
tmpSize = calcMessageSize ( message );
}
return currIndex ;
}
private int calcMessageSize ( Message message ) {
int tmpSize = message . getTopic (). length () + message . getBody (). length ;
Map < String , String > properties = message . getProperties ();
for ( Map . Entry < String , String > entry : properties . entrySet ()) {
tmpSize += entry . getKey (). length () + entry . getValue (). length ();
}
tmpSize = tmpSize + 20 ; // 增加⽇日志的开销20字节
return tmpSize ;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter ( messages );
while ( splitter . hasNext ()) {
try {
List < Message > listItem = splitter . next ();
producer . send ( listItem );
} catch ( Exception e ) {
e . printStackTrace ();
//处理error
}
}
过滤消息样例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
DefaultMQProducer producer = new DefaultMQProducer ( "please_rename_unique_group_name" );
producer . start ();
Message msg = new Message ( "TopicTest" ,
tag ,
( "Hello RocketMQ " + i ). getBytes ( RemotingHelper . DEFAULT_CHARSET )
);
// 设置一些属性
msg . putUserProperty ( "a" , String . valueOf ( i ));
SendResult sendResult = producer . send ( msg );
producer . shutdown ();
// 消费者样例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ( "please_rename_unique_group_name_4" );
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer . subscribe ( "TopicTest" , MessageSelector . bySql ( "a between 0 and 3" );
consumer . registerMessageListener ( new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage ( List < MessageExt > msgs , ConsumeConcurrentlyContext context ) {
return ConsumeConcurrentlyStatus . CONSUME_SUCCESS ;
}
});
consumer . start ();
RocketMQ 的特点
亿级消息的堆积能力,单个队列中的百万级消息的累积容量。
高可用性:Broker服务器支持多Master多Slave的同步双写以及Master多Slave的异步复制模式,其中同步双写可保证消息不丢失。
高可靠性:生产者将消息发送到Broker端有三种方式,同步、异步和单向,其中同步和异步都可以保证消息成功的成功发送。Broker在对于消息刷盘有两种策略:同步刷盘和异步刷盘,其中同步刷盘可以保证消息成功的存储到磁盘中。消费者的消费模式也有集群消费和广播消费两种,默认集群消费,如果集群模式中消费者挂了,一个组里的其他消费者会接替其消费。综上所述,是高可靠的。
支持分布式事务消息:这里是采用半消息确认和消息回查机制来保证分布式事务消息的,下面会详细描述。
支持消息过滤:建议采用消费者业务端的tag过滤
支持顺序消息:消息在Broker中是采用队列的FIFO模式存储的,也就是发送是顺序的,只要保证消费的顺序性即可。
支持定时消息和延迟消息:Broker中由定时消息的机制,消息发送到Broker中,不会立即被Consumer消费,会等到一定的时间才被消费。延迟消息也是一样,延迟一定时间之后才会被Consumer消费。
生产者
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。
消费者
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
1
2
3
// 订单Id
String orderId = "20034568923546" ;
message . setKeys ( orderId );
事务消息
RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
事务消息
RocketMQ 事务消息流程概要
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
代码实例
通过一个简单的场景模拟RocketMQ的事务消息:存在2个微服务,分别是订单服务和商品服务。订单服务进行下单处理,并发送消息给商品服务,对于下单成功的商品进行减库存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 使用RocketMQ事务消息——订单服务发送事务消息,然后进行本地下单,并通知商品服务减库存
public class OrderService {
public static void main ( String [] args ) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer ();
producer . setNamesrvAddr ( RocketMQConstants . NAMESRV_ADDR );
producer . setProducerGroup ( RocketMQConstants . TRANSACTION_PRODUCER_GROUP );
//自定义线程池,执行事务操作
ThreadPoolExecutor executor = new ThreadPoolExecutor ( 10 , 50 , 10L , TimeUnit . SECONDS , new ArrayBlockingQueue <>( 20 ), ( Runnable r ) -> new Thread ( "Order Transaction Massage Thread" ));
producer . setExecutorService ( executor );
//设置事务消息监听器
producer . setTransactionListener ( new OrderTransactionListener ());
producer . start ();
System . err . println ( "OrderService Start" );
for ( int i = 0 ; i < 10 ; i ++){
String orderId = UUID . randomUUID (). toString ();
String payload = "下单,orderId: " + orderId ;
String tags = "Tag" ;
Message message = new Message ( RocketMQConstants . TRANSACTION_TOPIC_NAME , tags , orderId , payload . getBytes ( RemotingHelper . DEFAULT_CHARSET ));
//发送事务消息
TransactionSendResult result = producer . sendMessageInTransaction ( message , orderId );
System . err . println ( "发送事务消息,发送结果: " + result );
}
}
}
// 订单事务消息监听器。事务消息需要一个TransactionListener,主要进行本地事务的执行和事务回查
public class OrderTransactionListener implements TransactionListener {
private static final Map < String , Boolean > results = new ConcurrentHashMap <>();
@Override
public LocalTransactionState executeLocalTransaction ( Message msg , Object arg ) {
String orderId = ( String ) arg ;
//记录本地事务执行结果
boolean success = persistTransactionResult ( orderId );
System . err . println ( "订单服务执行本地事务下单,orderId: " + orderId + ", result: " + success );
return success ? LocalTransactionState . COMMIT_MESSAGE : LocalTransactionState . ROLLBACK_MESSAGE ;
}
@Override
public LocalTransactionState checkLocalTransaction ( MessageExt msg ) {
String orderId = msg . getKeys ();
System . err . println ( "执行事务消息回查,orderId: " + orderId );
return Boolean . TRUE . equals ( results . get ( orderId )) ? LocalTransactionState . COMMIT_MESSAGE : LocalTransactionState . ROLLBACK_MESSAGE ;
}
private boolean persistTransactionResult ( String orderId ) {
boolean success = Math . abs ( Objects . hash ( orderId )) % 2 == 0 ;
results . put ( orderId , success );
return success ;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//使用RocketMQ事务消息——商品服务接收下单的事务消息,如果消息成功commit则本地减库存
public class ProductService {
public static void main ( String [] args ) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ();
consumer . setNamesrvAddr ( RocketMQConstants . NAMESRV_ADDR );
consumer . setConsumerGroup ( RocketMQConstants . TRANSACTION_CONSUMER_GROUP );
consumer . subscribe ( RocketMQConstants . TRANSACTION_TOPIC_NAME , "*" );
consumer . registerMessageListener ( new ProductListener ());
consumer . start ();
System . err . println ( "ProductService Start" );
}
}
public class ProductListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage ( List < MessageExt > msgs , ConsumeConcurrentlyContext context ) {
Optional . ofNullable ( msgs ). orElse ( Collections . emptyList ()). forEach ( m -> {
String orderId = m . getKeys ();
System . err . println ( "监听到下单消息,orderId: " + orderId + ", 商品服务减库存" );
});
return ConsumeConcurrentlyStatus . CONSUME_SUCCESS ;
}
事务消息不支持延时消息和批量消息。
为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
事务性消息可能不止一次被检查或消费。
提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
Logappender 样例
RocketMQ日志提供log4j、log4j2和logback日志框架作为业务应用,下面是配置样例
1
2
3
4
<RocketMQ name= "rocketmqAppender" producerGroup= "yourLogGroup" nameServerAddress= "yourRocketmqNameserverAddress"
topic= "yourLogTopic" tag= "yourTag" >
<PatternLayout pattern= "%d [%p] hahahah %c %m%n" />
</RocketMQ>