Several ways to implement MQ sequential messages

This article was last updated on: February 7, 2024 pm

The necessity of sequential messages

If several messages in the message queue are operated on the same data, these operations have a relationship before and after, and must be executed in the order of before and after, otherwise data exceptions will be caused.
For example, if the data synchronization of two databases through mysql binlog is sequential, if the order of operations is reversed, it will cause immeasurable errors. For example, the database has inserted > update-> delete operations on a piece of data in turn, this order must be like this, if during the synchronization process, the order of messages becomes "delete->insert-> update, then the data that should have been deleted has not been deleted, resulting in data inconsistency.
Another example: in the order creation, payment, refund and other processes in the transaction scenario, the order can be paid before the order is created, and the completed order can be refunded.

Kafka

Basic concepts

Topic

Topic\

Partition

The physical concept is that each topic contains one or more partitions, a partition corresponds to a folder, and the partition’s data and index files are stored under this folder, and each partition is ordered internally.

Topic & Partition

kafka_groups

A topic is a type of message, and each message must specify a topic. Physically, a topic is divided into one or more partitions, and each partition has multiple copies distributed in different brokers, as shown in the following figure.

kafka-broker-partition-replication

Each partition is an append log file at the storage level, and messages posted to this partition are appended to the end of the log file and written to disk sequentially. The position of each message in the log file is called offset, and offset is a long number that uniquely marks a message. As shown in the following figure:

kafka partition offset

The only metadata saved by each consumer is the offset value, which is completely controlled by the consumer, so the consumer can consume the records in any order, as shown above.

Sequential message practices

Kafka’s guarantee of message order

Kafka guarantees message order within the same partition, and cannot ensure global order if a topic has multiple partitions. If you need to guarantee the global order, you need to control the number of partitions to one.

Limits for Kafka sequential messages
  • From the producer’s point of view, writes to different partitions are completely parallel; From a consumer perspective, the number of concurrency depends entirely on the number of partitions (if the number of consumers is greater than the number of partitions, there must be idle consumers). Therefore, configuring the number of partitions is important to take advantage of Kafka concurrency performance. Because sequential messages can only be configured with a single partition, it is difficult to improve their concurrent performance.
  • A single partition can only be used by the same consumer groupIndividual consumersProcess consumption.
  • A single consumer process can consume multiple partitions at the same time, that is, partitions limit the concurrency of the consumer side.

RabbitMQ

Basic concepts

Queue

Queue <sup id=“fnref:2” class=“footnote-ref”>[2] has two main operationsSequential data structures: An item can be queued (added) at the tail and queued (consumed) from the head. Queues play an important role in messaging technology: many messaging protocols and tools assume that publishers and consumers communicate using a queue-like storage mechanism.

Queues in RabbitMQ are FIFOs (First In, First Out). Some queue characteristics, namely consumer prioritization and requeuing, affect the ordering observed by consumers.

Sequential message practices

A queue in RabbitMQ is an ordered collection of messages. Messages are queued and dequeued (delivered to consumers) in FIFO fashion.

FIFO ordering does not guarantee priority queues and sharded queues. Therefore, as long as you configure a normal queue, do not configure priority queues and sharded queues, then the messages in the queue are sequential messages.

Multiple competing consumers, consumer priorites, and message redeliveries also affect ordering. Therefore, if you want to consume messages sequentially, there can only be one Consumer.

To summarize, to implement RabbitMQ sequential messages, configure a Queue corresponding to a Consumer, send all messages that need to ensure order to this Queue, and close autoackprefetchCount=1, only one message is consumed at a time, processed and manually acked, and then receives the next message, which is processed by only one consumer.

RocketMQ

Basic concepts

Normal Ordered Message

In the normal sequential consumption mode, consumers passThe sameMessage queues (topic partitions, called Message queues) receive messages sequentially, while messages received by different message queues may be out of order<sup id=“fnref:3” class=“footnote-ref”>[3].

Strictly Ordered Message

In strictly sequential message mode, all messages received by consumers are sequential.

Message order

Message ordering refers to the order in which a type of message can be consumed in the order in which it is sent. For example, an order generates three messages: order creation, order payment, and order completion. When consuming, it is necessary to consume in this order to be meaningful, but at the same time, orders can be consumed in parallel. RocketMQ can strictly ensure that messages are ordered.

Sequential messages are divided into global sequential messages and partitioned sequential messages, and the global order means that all messages under a topic must ensure order; Partial sequential messages only need to ensure that each set of messages is consumed sequentially.

  • Global Order For a specified topic, all messages are published and consumed in a strictly first-in, first-out (FIFO) order. Applicable scenarios: Scenarios where performance requirements are not high, and all messages are published and consumed in strict accordance with FIFO principles
  • Partition order For a specified topic, all messages are partitioned according to the sharding key. Messages within the same partition are published and consumed in strict FIFO order. Sharding key is a key field used in sequential messages to distinguish between different partitions, and the key is a completely different concept from ordinary messages. Applicable scenarios: scenarios with high performance requirements, sharding key as the partition field, and message release and consumption in strict accordance with FIFO principles in the same block.

Sequential message practices

Message ordering means that messages can be consumed in the order in which they are sent (FIFO). RocketMQ can strictly ensure the order of messages, which can be divided into partition order or global order.

The principle of sequential consumption is analyzed, and by default, message sending will take Round Robin polling mode to send messages to different queues (partitioned queues); When consuming messages, messages are pulled from multiple queues, and the order of sending and consuming is not guaranteed. However, if you control the order in which messages are sentSend only to the same queue one after the other, when consumingOnly pull sequentially from this queue, the order is guaranteed. When there is only one queue participating in sending and consuming, it is global order; If multiple queues participate, it is partitioned ordered, that is, messages are ordered relative to each queue.

The following is an example of ordering the order with the order. The sequential process of an order is: create, pay, push, complete. Messages with the same order number will be sent to the same queue one after another, and when consumed, the same OrderId must be obtained in the same queue.

Sequential message production

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

/**
* Producer,发送顺序消息
*/
public class Producer {

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("127.0.0.1:9876");

producer.start();

String[] tags = new String[]{"TagA", "TagC", "TagD"};

// 订单列表
List<OrderStep> orderList = new Producer().buildOrders();

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加个时间前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; // 根据订单 id 选择发送 queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());// 订单 id

System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}

producer.shutdown();
}

/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;

public long getOrderId() {
return orderId;
}

public void setOrderId(long orderId) {
this.orderId = orderId;
}

public String getDesc() {
return desc;
}

public void setDesc(String desc) {
this.desc = desc;
}

@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}

/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();

OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc(" 创建 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc(" 创建 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc(" 付款 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc(" 创建 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc(" 付款 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc(" 付款 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc(" 完成 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc(" 推送 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc(" 完成 ");
orderList.add(orderDemo);

orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc(" 完成 ");
orderList.add(orderDemo);

return orderList;
}
}

Sequential message consumption

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.apache.rocketmq.example.order2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* 顺序消息消费,带事务方式(应用可控制 Offset 什么时候提交)
*/
public class ConsumerInOrder {

public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* 设置 Consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费 <br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() {

Random random = new Random();

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个 queue 有唯一的 consume 线程来消费, 订单对每个 queue(分区) 有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}

try {
// 模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();

System.out.println("Consumer Started.");
}
}

Sequential message consumption recommendations

Consumers will lock down each message queue to ensure that they are consumed one by one, which will result in performance degradation but be useful when you are concerned about message order. We do not recommend throwing exceptions that you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT As an alternative.

Apache Pulsar

conception

Topics and partitions

A topic <sup id=“fnref:4” class=“footnote-ref”>[4] is the name of a category in which messages can be stored and published. The producer writes the message to the topic, and the consumer reads the message from the topic.

Pulsar’s topics are divided into two categories: Partitioned Topics and Non-Partitioned Topics, and Non-Partitioned Topics can be understood as a topic with a partition number of 1. In fact, in Plar, Topic is a virtual concept, creating a 3-partition topic, in fact, creating 3 “partition topics”, and the message sent to this topic will be sent to multiple “partition topics” corresponding to this topic.
For example: A producer sends a message to a partition named with a number of 3 my-topic The topic is sent evenly up the data stream or according to certain rules (if a key is specified). my-topic-partition-0my-topic-partition-1 and my-topic-partition-2 Three “partition topics”.

When partitioning Topic does data persistence, partition is a logical concept, and the actual storage unit is segment.

As shown in the following figure, partitioning Topic1-Part2 The data consists of N segments, each evenly distributed and stored in multiple Bookie nodes in the Apache BookKeeper cluster, each with 3 replicas.

Pulsar topic partition 和 segment

Message type

In Message Queuing, according to the characteristics and usage scenarios of messages, messages can be classified as follows:

Message type Order of consumption Performance Applicable scenarios
Normal messages Out of order Best The throughput is huge and there is no requirement for the order of production and consumption
Local order messages All messages under the same partition follow the first-in, first-out (FIFO) rule Better The throughput is large, ordered within the same partition, and disordered in different partitions
Global Order Message All messages under the same topic follow the first-in, first-out (FIFO) rule General Throughput is average, globally ordered, single partition
Dead letter messages - - Messages that cannot be consumed normally

Normal messages

Ordinary message is a basic message type, which is delivered by the production to a specified topic and consumed by consumers who subscribe to the topic. The concept of unordered in the topic of ordinary messages allows you to use multiple partitions to improve the production and consumption efficiency of messages, and its performance is best when the throughput is huge.

Local order messages

Compared with ordinary message types, local sequential messages have an additional local sequential feature. That is, under the same partition, when consumers consume news, they consume in strict accordance with the order in which the producers deliver to the partition. While ensuring a certain degree of orderliness, local sequential messages retain the partitioning mechanism to improve performance. However, local order messages do not guarantee order between different partitions.

Global order messages

The biggest feature of global sequential messages is that they are strictly guaranteed to be consumed in the order in which they are delivered by producers. Therefore, it uses a single partition to process messages, and the user cannot customize the number of partitions, which has lower performance than the first two message types.

Dead letter messages

Dead-letter messages are messages that cannot be consumed normally. TDMQ Pulsar automatically creates a dead-letter queue to process such messages when a new subscription is created (a consumer has determined a subscription relationship with a topic).

Sequential message practices

Producer:

It is relatively simple for the sender to ensure the sequence of messages:

One option: exploitSingle queueSend:

  • One transaction corresponds to one queue
  • A queue can only be monitored by one consumer

Another option: take advantage of Pulsar’s partitioned topics

  • When the producer sends a message, you need to specify the key attribute (such as key is the order number), and Pulsar will automatically assign the message to the specified partition according to the key value
  • Multiple consumers can listen to the same partition, but the same key will only be assigned to the same consumer

Consumer:

There are two ways of thinking about the sequence of consumer assurance messages:

One is: single-threaded execution

  • Single-threaded execution guarantees sequential consumption
  • Low consumption efficiency

The other is: concurrent consumption

  • In the message body sent by the producer, you need to specify the key, and the consumer locates the corresponding thread according to the partition and key.

<section class=“footnotes”>

References

  1. Kafka categorizes messages, specifying a topic for each message sent to the cluster. ↩
  2. Apache Kafka

  3. Queues — RabbitMQ
  4. rocketmq/docs/cn at master · apache/rocketmq (github.com)
  5. Messaging · Apache Pulsar


Several ways to implement MQ sequential messages
https://e-whisper.com/posts/60020/
Author
east4ming
Posted on
October 1, 2021
Licensed under