1
  
compile  'org.apache.rocketmq:rocketmq-client:4.3.0' 
 
 
消息发送 
发送同步消息 
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
  
public  class  SyncProducer  { 
	public  static  void  main ( String []  args )  throws  Exception  { 
     	// 实例化消息生产者Producer
  DefaultMQProducer  producer  =  new  DefaultMQProducer ( "please_rename_unique_group_name" ); 
    	// 设置NameServer的地址
  producer . setNamesrvAddr ( "localhost:9876" ); 
    	// 启动Producer实例
  producer . start (); 
    	for  ( int  i  =  0 ;  i  <  100 ;  i ++)  { 
     	    // 创建消息,并指定Topic,Tag和消息体
  Message  msg  =  new  Message ( "TopicTest"  /* Topic */ , 
        	"TagA"  /* Tag */ , 
         	( "Hello RocketMQ "  +  i ). getBytes ( RemotingHelper . DEFAULT_CHARSET )  /* Message body */ 
         	); 
         	// 发送消息到一个Broker
  SendResult  sendResult  =  producer . send ( msg ); 
            // 通过sendResult返回消息是否成功送达
  System . out . printf ( "%s%n" ,  sendResult ); 
    	} 
     	// 如果不再发送消息,关闭Producer实例。
  producer . shutdown (); 
    } 
 } 
 
 
发送异步消息 
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
  
public  class  AsyncProducer  { 
	public  static  void  main ( String []  args )  throws  Exception  { 
     	// 实例化消息生产者Producer
  DefaultMQProducer  producer  =  new  DefaultMQProducer ( "please_rename_unique_group_name" ); 
    	// 设置NameServer的地址
  producer . setNamesrvAddr ( "localhost:9876" ); 
    	// 启动Producer实例
  producer . start (); 
        producer . setRetryTimesWhenSendAsyncFailed ( 0 ); 
 	
 	int  messageCount  =  100 ; 
         // 根据消息数量实例化倒计时计算器
  final  CountDownLatch2  countDownLatch  =  new  CountDownLatch2 ( messageCount ); 
    	for  ( int  i  =  0 ;  i  <  messageCount ;  i ++)  { 
                 final  int  index  =  i ; 
             	// 创建消息,并指定Topic,Tag和消息体
  Message  msg  =  new  Message ( "TopicTest" , 
                    "TagA" , 
                     "OrderID188" , 
                     "Hello world" . getBytes ( RemotingHelper . DEFAULT_CHARSET )); 
                 // SendCallback接收异步返回结果的回调
  producer . send ( msg ,  new  SendCallback ()  { 
                    @Override 
                     public  void  onSuccess ( SendResult  sendResult )  { 
                         countDownLatch . countDown (); 
                         System . out . printf ( "%-10d OK %s %n" ,  index , 
                             sendResult . getMsgId ()); 
                     } 
                     @Override 
                     public  void  onException ( Throwable  e )  { 
                         countDownLatch . countDown (); 
       	                System . out . printf ( "%-10d Exception %s %n" ,  index ,  e ); 
       	                e . printStackTrace (); 
                     } 
             	}); 
     	} 
 	// 等待5s
  countDownLatch . await ( 5 ,  TimeUnit . SECONDS ); 
    	// 如果不再发送消息,关闭Producer实例。
  producer . shutdown (); 
    } 
 } 
 
 
单向发送消息 
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
  
public  class  OnewayProducer  { 
	public  static  void  main ( String []  args )  throws  Exception { 
     	// 实例化消息生产者Producer
  DefaultMQProducer  producer  =  new  DefaultMQProducer ( "please_rename_unique_group_name" ); 
    	// 设置NameServer的地址
  producer . setNamesrvAddr ( "localhost:9876" ); 
    	// 启动Producer实例
  producer . start (); 
    	for  ( int  i  =  0 ;  i  <  100 ;  i ++)  { 
         	// 创建消息,并指定Topic,Tag和消息体
  Message  msg  =  new  Message ( "TopicTest"  /* Topic */ , 
                "TagA"  /* Tag */ , 
                 ( "Hello RocketMQ "  +  i ). getBytes ( RemotingHelper . DEFAULT_CHARSET )  /* Message body */ 
         	); 
         	// 发送单向消息,没有任何返回结果
  producer . sendOneway ( msg ); 
     	} 
     	// 如果不再发送消息,关闭Producer实例。
  producer . shutdown (); 
    } 
 } 
 
 
消费消息 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
  
public  class  Consumer  { 
 	public  static  void  main ( String []  args )  throws  InterruptedException ,  MQClientException  { 
 
     	// 实例化消费者
  DefaultMQPushConsumer  consumer  =  new  DefaultMQPushConsumer ( "please_rename_unique_group_name" ); 
     	// 设置NameServer的地址
  consumer . setNamesrvAddr ( "localhost:9876" ); 
     	// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
  consumer . subscribe ( "TopicTest" ,  "*" ); 
    	// 注册回调实现类来处理从broker拉取回来的消息
  consumer . registerMessageListener ( new  MessageListenerConcurrently ()  { 
            @Override 
             public  ConsumeConcurrentlyStatus  consumeMessage ( List < MessageExt >  msgs ,  ConsumeConcurrentlyContext  context )  { 
                 System . out . printf ( "%s Receive New Messages: %s %n" ,  Thread . currentThread (). getName (),  msgs ); 
                 // 标记该消息已经被成功消费
  return  ConsumeConcurrentlyStatus . CONSUME_SUCCESS ; 
            } 
         }); 
         // 启动消费者实例
  consumer . start (); 
        System . out . printf ( "Consumer Started.%n" ); 
 	} 
 } 
 
 
顺序消息样例 
全局顺序消息:
RocketMQ 的一个 Topic 下默认有八个读队列和八个写队列,如果要保证全局顺序消息的话需要在生产端只保留一个读写队列,然后消费端只有一个消费线程,这样会降低 RocketMQ 的高可用和高吞吐量。
分区顺序消息:
分区顺序消息同样需要生产端和消费端配合,生产端根据同一个订单 ID 把消息路由到同一个 MessageQueue,消费端控制从同一个 MessageQueue 取出的消息不被并发处理。
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
顺序消息生产 
  1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
  
public  class  Producer  { 
    public  static  void  main ( String []  args )  throws  Exception  { 
        DefaultMQProducer  producer  =  new  DefaultMQProducer ( "please_rename_unique_group_name" ); 
 
        producer . setNamesrvAddr ( "127.0.0.1:9876" ); 
 
        producer . start (); 
 
        String []  tags  =  new  String []{ "TagA" ,  "TagC" ,  "TagD" }; 
 
        // 订单列表
  List < OrderStep >  orderList  =  new  Producer (). buildOrders (); 
        Date  date  =  new  Date (); 
        SimpleDateFormat  sdf  =  new  SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ); 
        String  dateStr  =  sdf . format ( date ); 
        for  ( int  i  =  0 ;  i  <  10 ;  i ++)  { 
            // 加个时间前缀
  String  body  =  dateStr  +  " Hello RocketMQ "  +  orderList . get ( i ); 
           Message  msg  =  new  Message ( "TopicTest" ,  tags [ i  %  tags . length ],  "KEY"  +  i ,  body . getBytes ()); 
 
            SendResult  sendResult  =  producer . send ( msg ,  new  MessageQueueSelector ()  { 
                @Override 
                public  MessageQueue  select ( List < MessageQueue >  mqs ,  Message  msg ,  Object  arg )  { 
                    Long  id  =  ( Long )  arg ;   //根据订单id选择发送queue
  long  index  =  id  %  mqs . size (); 
                   return  mqs . get (( int )  index ); 
                } 
            },  orderList . get ( i ). getOrderId ()); //订单id
             System . out . println ( String . format ( "SendResult status:%s, queueId:%d, body:%s" , 
                sendResult . getSendStatus (), 
                sendResult . getMessageQueue (). getQueueId (), 
                body )); 
        } 
 
        producer . shutdown (); 
    } 
 
    /**
      * 订单的步骤
     */ 
   private  static  class  OrderStep  { 
        private  long  orderId ; 
        private  String  desc ; 
 
        public  long  getOrderId ()  { 
            return  orderId ; 
        } 
 
        public  void  setOrderId ( long  orderId )  { 
            this . orderId  =  orderId ; 
        } 
 
        public  String  getDesc ()  { 
            return  desc ; 
        } 
 
        public  void  setDesc ( String  desc )  { 
            this . desc  =  desc ; 
        } 
 
        @Override 
        public  String  toString ()  { 
            return  "OrderStep{"  + 
                "orderId="  +  orderId  + 
                ", desc='"  +  desc  +  '\''  + 
                '}' ; 
        } 
    } 
 
    /**
      * 生成模拟订单数据
     */ 
   private  List < OrderStep >  buildOrders ()  { 
        List < OrderStep >  orderList  =  new  ArrayList < OrderStep >(); 
 
        OrderStep  orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103111039L ); 
        orderDemo . setDesc ( "创建" ); 
        orderList . add ( orderDemo ); 
 
        orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103111065L ); 
        orderDemo . setDesc ( "创建" ); 
        orderList . add ( orderDemo ); 
 
        orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103111039L ); 
        orderDemo . setDesc ( "付款" ); 
        orderList . add ( orderDemo ); 
 
        orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103117235L ); 
        orderDemo . setDesc ( "创建" ); 
        orderList . add ( orderDemo ); 
 
        orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103111065L ); 
        orderDemo . setDesc ( "付款" ); 
        orderList . add ( orderDemo ); 
 
        orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103117235L ); 
        orderDemo . setDesc ( "付款" ); 
        orderList . add ( orderDemo ); 
 
        orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103111065L ); 
        orderDemo . setDesc ( "完成" ); 
        orderList . add ( orderDemo ); 
 
        orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103111039L ); 
        orderDemo . setDesc ( "推送" ); 
        orderList . add ( orderDemo ); 
 
        orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103117235L ); 
        orderDemo . setDesc ( "完成" ); 
        orderList . add ( orderDemo ); 
 
        orderDemo  =  new  OrderStep (); 
        orderDemo . setOrderId ( 15103111039L ); 
        orderDemo . setDesc ( "完成" ); 
        orderList . add ( orderDemo ); 
 
        return  orderList ; 
    } 
 } 
 
 
顺序消费消息
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
  
/**
 * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
 */ 
public  class  ConsumerInOrder  { 
    public  static  void  main ( String []  args )  throws  Exception  { 
        DefaultMQPushConsumer  consumer  =  new  DefaultMQPushConsumer ( "please_rename_unique_group_name_3" ); 
        consumer . setNamesrvAddr ( "127.0.0.1:9876" ); 
        /**
          * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */ 
       consumer . setConsumeFromWhere ( ConsumeFromWhere . CONSUME_FROM_FIRST_OFFSET ); 
 
        consumer . subscribe ( "TopicTest" ,  "TagA || TagC || TagD" ); 
 
        consumer . registerMessageListener ( new  MessageListenerOrderly ()  { 
 
            Random  random  =  new  Random (); 
 
            @Override 
            public  ConsumeOrderlyStatus  consumeMessage ( List < MessageExt >  msgs ,  ConsumeOrderlyContext  context )  { 
                context . setAutoCommit ( true ); 
                for  ( MessageExt  msg  :  msgs )  { 
                    // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
  System . out . println ( "consumeThread="  +  Thread . currentThread (). getName ()  +  "queueId="  +  msg . getQueueId ()  +  ", content:"  +  new  String ( msg . getBody ())); 
               } 
 
                try  { 
                    //模拟业务逻辑处理中...
  TimeUnit . SECONDS . sleep ( random . nextInt ( 10 )); 
               }  catch  ( Exception  e )  { 
                    e . printStackTrace (); 
                } 
                return  ConsumeOrderlyStatus . SUCCESS ; 
            } 
        }); 
 
        consumer . start (); 
 
        System . out . println ( "Consumer Started." ); 
    } 
 } 
 
 
延时消息样例 
启动消费者等待传入订阅消息 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
  
public  class  ScheduledMessageConsumer  { 
   public  static  void  main ( String []  args )  throws  Exception  { 
       // 实例化消费者
  DefaultMQPushConsumer  consumer  =  new  DefaultMQPushConsumer ( "ExampleConsumer" ); 
      // 订阅Topics
  consumer . subscribe ( "TestTopic" ,  "*" ); 
      // 注册消息监听者
  consumer . registerMessageListener ( new  MessageListenerConcurrently ()  { 
          @Override 
           public  ConsumeConcurrentlyStatus  consumeMessage ( List < MessageExt >  messages ,  ConsumeConcurrentlyContext  context )  { 
               for  ( MessageExt  message  :  messages )  { 
                   // Print approximate delay time period
  System . out . println ( "Receive message[msgId="  +  message . getMsgId ()  +  "] "  +  ( System . currentTimeMillis ()  -  message . getBornTimestamp ())  +  "ms later" ); 
              } 
               return  ConsumeConcurrentlyStatus . CONSUME_SUCCESS ; 
           } 
       }); 
       // 启动消费者
  consumer . start (); 
  } 
 } 
 
 
发送延时消息 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
  
public  class  ScheduledMessageProducer  { 
   public  static  void  main ( String []  args )  throws  Exception  { 
       // 实例化一个生产者来产生延时消息
  DefaultMQProducer  producer  =  new  DefaultMQProducer ( "ExampleProducerGroup" ); 
      // 启动生产者
  producer . start (); 
      int  totalMessagesToSend  =  100 ; 
       for  ( int  i  =  0 ;  i  <  totalMessagesToSend ;  i ++)  { 
           Message  message  =  new  Message ( "TestTopic" ,  ( "Hello scheduled message "  +  i ). getBytes ()); 
           // 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
  message . setDelayTimeLevel ( 3 ); 
          // 发送消息
  producer . send ( message ); 
      } 
        // 关闭生产者
  producer . shutdown (); 
  } 
 } 
 
 
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
1
 2
 3
  
// 延时消息的使用限制 SendMessageProcessor.java
 // org/apache/rocketmq/store/config/MessageStoreConfig.java
 private  String  messageDelayLevel  =  "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ; 
 
 
批量消息样例 
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
  
String  topic  =  "BatchTest" ; 
List < Message >  messages  =  new  ArrayList <>(); 
messages . add ( new  Message ( topic ,  "TagA" ,  "OrderID001" ,  "Hello world 0" . getBytes ())); 
messages . add ( new  Message ( topic ,  "TagA" ,  "OrderID002" ,  "Hello world 1" . getBytes ())); 
messages . add ( new  Message ( topic ,  "TagA" ,  "OrderID003" ,  "Hello world 2" . getBytes ())); 
try  { 
   producer . send ( messages ); 
 }  catch  ( Exception  e )  { 
   e . printStackTrace (); 
    //处理error
  } 
 
 
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
  
public  class  ListSplitter  implements  Iterator < List < Message >>  {  
    private  final  int  SIZE_LIMIT  =  1024  *  1024  *  4 ; 
     private  final  List < Message >  messages ; 
     private  int  currIndex ; 
     public  ListSplitter ( List < Message >  messages )  {  
         this . messages  =  messages ; 
     } 
     @Override  public  boolean  hasNext ()  { 
         return  currIndex  <  messages . size ();  
     } 
     @Override  public  List < Message >  next ()  {  
         int  startIndex  =  getStartIndex (); 
         int  nextIndex  =  startIndex ; 
         int  totalSize  =  0 ; 
         for  (;  nextIndex  <  messages . size ();  nextIndex ++)  { 
             Message  message  =  messages . get ( nextIndex );  
             int  tmpSize  =  calcMessageSize ( message ); 
             if  ( tmpSize  +  totalSize  >  SIZE_LIMIT )  { 
                 break ;  
             }  else  { 
                 totalSize  +=  tmpSize ;  
             } 
         } 
         List < Message >  subList  =  messages . subList ( startIndex ,  nextIndex );  
         currIndex  =  nextIndex ; 
         return  subList ; 
     } 
     private  int  getStartIndex ()  { 
         Message  currMessage  =  messages . get ( currIndex );  
         int  tmpSize  =  calcMessageSize ( currMessage );  
         while ( tmpSize  >  SIZE_LIMIT )  { 
             currIndex  +=  1 ; 
             Message  message  =  messages . get ( curIndex );  
             tmpSize  =  calcMessageSize ( message ); 
         } 
         return  currIndex ;  
     } 
     private  int  calcMessageSize ( Message  message )  { 
         int  tmpSize  =  message . getTopic (). length ()  +  message . getBody (). length ;  
         Map < String ,  String >  properties  =  message . getProperties (); 
         for  ( Map . Entry < String ,  String >  entry  :  properties . entrySet ())  { 
             tmpSize  +=  entry . getKey (). length ()  +  entry . getValue (). length ();  
         } 
         tmpSize  =  tmpSize  +  20 ;  // 增加⽇日志的开销20字节
  return  tmpSize ;  
    } 
 } 
//把大的消息分裂成若干个小的消息
 ListSplitter  splitter  =  new  ListSplitter ( messages ); 
while  ( splitter . hasNext ())  { 
  try  { 
       List < Message >   listItem  =  splitter . next (); 
       producer . send ( listItem ); 
   }  catch  ( Exception  e )  { 
       e . printStackTrace (); 
       //处理error
  } 
} 
 
 
过滤消息样例 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
  
DefaultMQProducer  producer  =  new  DefaultMQProducer ( "please_rename_unique_group_name" ); 
producer . start (); 
Message  msg  =  new  Message ( "TopicTest" , 
   tag , 
    ( "Hello RocketMQ "  +  i ). getBytes ( RemotingHelper . DEFAULT_CHARSET ) 
 ); 
// 设置一些属性
 msg . putUserProperty ( "a" ,  String . valueOf ( i )); 
SendResult  sendResult  =  producer . send ( msg ); 
 producer . shutdown (); 
 // 消费者样例
 DefaultMQPushConsumer  consumer  =  new  DefaultMQPushConsumer ( "please_rename_unique_group_name_4" ); 
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
 consumer . subscribe ( "TopicTest" ,  MessageSelector . bySql ( "a between 0 and 3" ); 
consumer . registerMessageListener ( new  MessageListenerConcurrently ()  { 
   @Override 
    public  ConsumeConcurrentlyStatus  consumeMessage ( List < MessageExt >  msgs ,  ConsumeConcurrentlyContext  context )  { 
        return  ConsumeConcurrentlyStatus . CONSUME_SUCCESS ; 
    } 
 }); 
consumer . start (); 
 
 
RocketMQ 的特点 
亿级消息的堆积能力,单个队列中的百万级消息的累积容量。 
高可用性:Broker服务器支持多Master多Slave的同步双写以及Master多Slave的异步复制模式,其中同步双写可保证消息不丢失。 
高可靠性:生产者将消息发送到Broker端有三种方式,同步、异步和单向,其中同步和异步都可以保证消息成功的成功发送。Broker在对于消息刷盘有两种策略:同步刷盘和异步刷盘,其中同步刷盘可以保证消息成功的存储到磁盘中。消费者的消费模式也有集群消费和广播消费两种,默认集群消费,如果集群模式中消费者挂了,一个组里的其他消费者会接替其消费。综上所述,是高可靠的。 
支持分布式事务消息:这里是采用半消息确认和消息回查机制来保证分布式事务消息的,下面会详细描述。 
支持消息过滤:建议采用消费者业务端的tag过滤 
支持顺序消息:消息在Broker中是采用队列的FIFO模式存储的,也就是发送是顺序的,只要保证消费的顺序性即可。 
支持定时消息和延迟消息:Broker中由定时消息的机制,消息发送到Broker中,不会立即被Consumer消费,会等到一定的时间才被消费。延迟消息也是一样,延迟一定时间之后才会被Consumer消费。 
 
生产者 
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags(“TagA”)。
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。
消费者 
RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否在关系数据库中存在。如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)
msgId一定是全局唯一标识符,但是实际使用中,可能会存在相同的消息有两个不同msgId的情况(消费者主动重发、因客户端重投机制导致的重复等),这种情况就需要使业务字段进行重复消费。
1
 2
 3
  
// 订单Id   
 String  orderId  =  "20034568923546" ;    
message . setKeys ( orderId );    
 
 
事务消息 
RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息。
         事务消息 
    
RocketMQ 事务消息流程概要 
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1.事务消息发送及提交:
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2.补偿流程:
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
代码实例 
通过一个简单的场景模拟RocketMQ的事务消息:存在2个微服务,分别是订单服务和商品服务。订单服务进行下单处理,并发送消息给商品服务,对于下单成功的商品进行减库存。
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
  
// 使用RocketMQ事务消息——订单服务发送事务消息,然后进行本地下单,并通知商品服务减库存
 public  class  OrderService  { 
  public  static  void  main ( String []  args )  throws  Exception  { 
     TransactionMQProducer  producer  =  new  TransactionMQProducer (); 
     producer . setNamesrvAddr ( RocketMQConstants . NAMESRV_ADDR ); 
     producer . setProducerGroup ( RocketMQConstants . TRANSACTION_PRODUCER_GROUP ); 
 
     //自定义线程池,执行事务操作
  ThreadPoolExecutor  executor  =  new  ThreadPoolExecutor ( 10 ,  50 ,  10L ,  TimeUnit . SECONDS ,  new  ArrayBlockingQueue <>( 20 ),  ( Runnable  r )  ->  new  Thread ( "Order Transaction Massage Thread" )); 
    producer . setExecutorService ( executor ); 
 
     //设置事务消息监听器
  producer . setTransactionListener ( new  OrderTransactionListener ()); 
     producer . start (); 
 
     System . err . println ( "OrderService Start" ); 
 
     for  ( int  i  =  0 ; i  <  10 ; i ++){ 
       String  orderId  =  UUID . randomUUID (). toString (); 
       String  payload  =  "下单,orderId: "  +  orderId ; 
       String  tags  =  "Tag" ; 
       Message  message  =  new  Message ( RocketMQConstants . TRANSACTION_TOPIC_NAME ,  tags ,  orderId ,  payload . getBytes ( RemotingHelper . DEFAULT_CHARSET )); 
 
       //发送事务消息
  TransactionSendResult  result  =  producer . sendMessageInTransaction ( message ,  orderId ); 
      System . err . println ( "发送事务消息,发送结果: "  +  result ); 
     } 
   } 
 } 
 // 订单事务消息监听器。事务消息需要一个TransactionListener,主要进行本地事务的执行和事务回查
 public  class  OrderTransactionListener  implements  TransactionListener  { 
  private  static  final  Map < String ,  Boolean >  results  =  new  ConcurrentHashMap <>(); 
 
   @Override 
   public  LocalTransactionState  executeLocalTransaction ( Message  msg ,  Object  arg )  { 
     String  orderId  =  ( String )  arg ; 
 
     //记录本地事务执行结果
  boolean  success  =  persistTransactionResult ( orderId ); 
    System . err . println ( "订单服务执行本地事务下单,orderId: "  +  orderId  +  ", result: "  +  success ); 
     return  success  ?  LocalTransactionState . COMMIT_MESSAGE  :  LocalTransactionState . ROLLBACK_MESSAGE ; 
   } 
 
   @Override 
   public  LocalTransactionState  checkLocalTransaction ( MessageExt  msg )  { 
     String  orderId  =  msg . getKeys (); 
     System . err . println ( "执行事务消息回查,orderId: "  +  orderId ); 
     return  Boolean . TRUE . equals ( results . get ( orderId ))  ?  LocalTransactionState . COMMIT_MESSAGE  :  LocalTransactionState . ROLLBACK_MESSAGE ; 
   } 
 
   private  boolean  persistTransactionResult ( String  orderId )  { 
     boolean  success  =  Math . abs ( Objects . hash ( orderId ))  %  2  ==  0 ; 
     results . put ( orderId ,  success ); 
     return  success ; 
   } 
 } 
 
 
 1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
  
//使用RocketMQ事务消息——商品服务接收下单的事务消息,如果消息成功commit则本地减库存
 public  class  ProductService  { 
  public  static  void  main ( String []  args )  throws  Exception  { 
     DefaultMQPushConsumer  consumer  =  new  DefaultMQPushConsumer (); 
     consumer . setNamesrvAddr ( RocketMQConstants . NAMESRV_ADDR ); 
     consumer . setConsumerGroup ( RocketMQConstants . TRANSACTION_CONSUMER_GROUP ); 
     consumer . subscribe ( RocketMQConstants . TRANSACTION_TOPIC_NAME ,  "*" ); 
     consumer . registerMessageListener ( new  ProductListener ()); 
     consumer . start (); 
     System . err . println ( "ProductService Start" ); 
   } 
 } 
 public  class  ProductListener  implements  MessageListenerConcurrently  { 
  @Override 
   public  ConsumeConcurrentlyStatus  consumeMessage ( List < MessageExt >  msgs ,  ConsumeConcurrentlyContext  context )  { 
     Optional . ofNullable ( msgs ). orElse ( Collections . emptyList ()). forEach ( m  ->  { 
       String  orderId  =  m . getKeys (); 
       System . err . println ( "监听到下单消息,orderId: "  +  orderId  +  ", 商品服务减库存" ); 
     }); 
     return  ConsumeConcurrentlyStatus . CONSUME_SUCCESS ; 
   } 
  
 
事务消息不支持延时消息和批量消息。 
为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。 
事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。 
事务性消息可能不止一次被检查或消费。 
提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。 
事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。 
 
Logappender 样例 
RocketMQ日志提供log4j、log4j2和logback日志框架作为业务应用,下面是配置样例
1
 2
 3
 4
  
<RocketMQ  name= "rocketmqAppender"  producerGroup= "yourLogGroup"  nameServerAddress= "yourRocketmqNameserverAddress" 
   topic= "yourLogTopic"  tag= "yourTag" > 
   <PatternLayout  pattern= "%d [%p] hahahah %c %m%n" /> 
 </RocketMQ>