When using Spring Cloud Stream Rabbit
, if a consumer throws an exception during consumption. You can re-consume the message in several ways.
Option 1 (default)
When the consumer side throws an exception while processing a message, then the default will be Retry for 3 times in the current thread. this option is the default and can be modified by modifying the configuration file and specifying the parameters under channel, e.g.
max-attempts
if equal to 1, means no retries.max-attempts
if greater than 1, its value is the number of retries.
When a message exceeds the maximum number of retries, the message will be discarded if DLQ
is not configured to be enabled. By default, this method cannot set the retry interval.
Option 2
Option 1 is to retry in the current thread, which is equivalent to blocking the message behind. Sometimes we don’t want to block, then we can use Dead Letter Queue (abbreviated DLQ
) to retry asynchronously.
Let’s look at the logic of DLQ
first.
Setting spring.cloud.stream.rabbit.bindings.<channelName>.consumer.autoBindDlq
to true will automatically create a DLQ
for the corresponding channel, binding the Dead Letter Exchange (abbreviated as DLX
). By default, the name of the queue
is its corresponding destination.group
appended with .dlq
, and the routingKey
of the message entering the queue
is the original destination
.
As configured above, after the message enters DLQ
, the message will always be stored in DLQ
because there is no consumer. You can add the dlqTtl
parameter to set how long the message will live in DLQ
, and the message will be deleted after the default expiration in the case of no consumer.
If you want to specify the name of the DLQ
, you can do so with the deadLetterQueueName
parameter.
The logic of retrying is actually to use DLQ
, set a default exchange
to it, and after the TTL
time expires, the message will be redirected to the queue
corresponding to the specified exchange
.
To implement this logic, three parameters need to be configured.
autoBindDql
set to true to enableDLQ
dlqTtl
sets a dead letter message timeout, which disguises the retry intervaldlqDeadLetterExchange
Add this parameter and leave it blank to set the default value. With the default value, the message in DLQ will be delivered to thequque
with the corresponding name according to the value of itsroutingKey
(which can also be specified by thedeadLetterRoutingKey
parameter), enabling the message to be re-consumed.
Note: Because the
group
parameter is set in the configuration, when this parameter is used, the defaultdurable
parameter of rabbit is enabled, i.e. theexhange
andqueue
of the channel are persistent, and will not be automatically cleared after the application exits, and the parameters at the time of creation are preserved. So when you change the parameters of the channel, you need to remove the queue and let it rebuild automatically, otherwise the new change will not take effect, and then automatic retry will not be possible.
According to the above configuration, after deleting the old queue
and restarting the application, the created queue
information is as follows.
After the message is redelivered, some retry information is added to its header
, as shown in the following figure.
- The
deliveryAttempt
value represents the number of retries in the current thread, i.e. the retry logic for Option 1. - The
x-death
header records some details about the retry loop, in particular the valuecount
records the number of asynchronous retries viaDLQ
.
However, sometimes we want to know the exact exception of the last error, so we can add the republishToDlq
parameter, which, when set to true, will add detailed exception and exception stack information in the message header.
Note: When
republishToDlq
is set to a different value,routingKey
takes a different logic. When false, the firstrouting-keys
value in thex-death
header is taken; when true, the value of theX_ORIGINAL_ROUTING_KEY_HEADER
header is taken.
At this point, the message will keep repeating the queue -> DLQ -> queue loop (assuming the consumer side keeps rejecting or throwing exceptions). If we want to set the number of retries to be greater than 3, we can throw the exception ImmediateAcknowledgeAmqpException
and the message will be discarded and not enter DLQ
.
About message rejection
Usually, the rejection of a message is done by throwing an exception. But this exception cannot be thrown indiscriminately. Different exceptions are handled by the framework in different ways.
- A normal exception, equivalent to
AmqpRejectAndDontRequeueException
, will cause the message to retry. ImmediateAcknowledgeAmqpException
which causes the message to be discarded without triggering a retry.
Sometimes, we don’t expect a retry ERROR in the production log, consider the following option.
-
change the
acknowledgeMode
on the consumer side from the default automatic to manual, i.e.acknowledgeMode: MANUAL
. -
inject the channel into the consuming side and handle it manually, e.g.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
@Component @EnableBinding(TestSink.class) public class TestConsumer { private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class); private static final Long MAX_RETRY = Long.valueOf(3L); @StreamListener(TestSink.INPUT) public void consume(Message message, @Header(name = AmqpHeaders.CHANNEL, required = false) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag, @Header(name = "x-death", required = false) Map<?,?> death) throws IOException { logger.info("Recv:{}", message); if(death!=null && death.get("count")!=null && Long.valueOf(death.get("count").toString()).compareTo(MAX_RETRY)>=0){ logger.error("Discard the message"); channel.basicAck(deliveryTag, false); return; } //c channel.basicReject(deliveryTag, false); } }
Reference http://edisonxu.com/2022/01/28/spring-cloud-stream-rabbit.html