When using Spring Cloud Stream Rabbit, if a consumer throws an exception during consumption. You can re-consume the message in several ways.

Option 1 (default)

When the consumer side throws an exception while processing a message, then the default will be Retry for 3 times in the current thread. this option is the default and can be modified by modifying the configuration file and specifying the parameters under channel, e.g.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
spring:
  cloud:
    stream:
      bindings:
        input-test-event:
          destination: test-event
          group: test-group
          binder: rabbit
          consumer:
            max-attempts: 1
  • max-attempts if equal to 1, means no retries.
  • max-attempts if greater than 1, its value is the number of retries.

When a message exceeds the maximum number of retries, the message will be discarded if DLQ is not configured to be enabled. By default, this method cannot set the retry interval.

Option 2

Option 1 is to retry in the current thread, which is equivalent to blocking the message behind. Sometimes we don’t want to block, then we can use Dead Letter Queue (abbreviated DLQ) to retry asynchronously.

Let’s look at the logic of DLQ first.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
spring:
  cloud:
    stream:
      bindings:
        input-test-event:
          destination: test-event
          group: test-group
          binder: rabbit
      rabbit:
        bindings:
          input-test-event:
            consumer:
              autoBindDlq: true

Setting spring.cloud.stream.rabbit.bindings.<channelName>.consumer.autoBindDlq to true will automatically create a DLQ for the corresponding channel, binding the Dead Letter Exchange (abbreviated as DLX). By default, the name of the queue is its corresponding destination.group appended with .dlq, and the routingKey of the message entering the queue is the original destination.

rabbitmq queue

As configured above, after the message enters DLQ, the message will always be stored in DLQ because there is no consumer. You can add the dlqTtl parameter to set how long the message will live in DLQ, and the message will be deleted after the default expiration in the case of no consumer.

If you want to specify the name of the DLQ, you can do so with the deadLetterQueueName parameter.

The logic of retrying is actually to use DLQ, set a default exchange to it, and after the TTL time expires, the message will be redirected to the queue corresponding to the specified exchange.

To implement this logic, three parameters need to be configured.

  • autoBindDql set to true to enable DLQ
  • dlqTtl sets a dead letter message timeout, which disguises the retry interval
  • dlqDeadLetterExchange Add this parameter and leave it blank to set the default value. With the default value, the message in DLQ will be delivered to the quque with the corresponding name according to the value of its routingKey (which can also be specified by the deadLetterRoutingKey parameter), enabling the message to be re-consumed.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
spring:
  cloud:
    stream:
      bindings:
        input-test-event:
          destination: test-event
          group: test-group
          binder: rabbit
      rabbit:
        bindings:
          input-test-event:
            consumer:
              autoBindDlq: true 
              dlqTtl: 5000 
              dlqDeadLetterExchange:

Note: Because the group parameter is set in the configuration, when this parameter is used, the default durable parameter of rabbit is enabled, i.e. the exhange and queue of the channel are persistent, and will not be automatically cleared after the application exits, and the parameters at the time of creation are preserved. So when you change the parameters of the channel, you need to remove the queue and let it rebuild automatically, otherwise the new change will not take effect, and then automatic retry will not be possible.

According to the above configuration, after deleting the old queue and restarting the application, the created queue information is as follows.

rbbitmq queue

After the message is redelivered, some retry information is added to its header, as shown in the following figure.

rabbitmq message header

  • The deliveryAttempt value represents the number of retries in the current thread, i.e. the retry logic for Option 1.
  • The x-death header records some details about the retry loop, in particular the value count records the number of asynchronous retries via DLQ.

However, sometimes we want to know the exact exception of the last error, so we can add the republishToDlq parameter, which, when set to true, will add detailed exception and exception stack information in the message header.

republishToDlq

Note: When republishToDlq is set to a different value, routingKey takes a different logic. When false, the first routing-keys value in the x-death header is taken; when true, the value of the X_ORIGINAL_ROUTING_KEY_HEADER header is taken.

At this point, the message will keep repeating the queue -> DLQ -> queue loop (assuming the consumer side keeps rejecting or throwing exceptions). If we want to set the number of retries to be greater than 3, we can throw the exception ImmediateAcknowledgeAmqpException and the message will be discarded and not enter DLQ.

About message rejection

Usually, the rejection of a message is done by throwing an exception. But this exception cannot be thrown indiscriminately. Different exceptions are handled by the framework in different ways.

  • A normal exception, equivalent to AmqpRejectAndDontRequeueException, will cause the message to retry.
  • ImmediateAcknowledgeAmqpException which causes the message to be discarded without triggering a retry.

Sometimes, we don’t expect a retry ERROR in the production log, consider the following option.

  1. change the acknowledgeMode on the consumer side from the default automatic to manual, i.e. acknowledgeMode: MANUAL.

  2. inject the channel into the consuming side and handle it manually, e.g.

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    
    @Component
    @EnableBinding(TestSink.class)
    public class TestConsumer {
    
        private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
        private static final Long MAX_RETRY = Long.valueOf(3L);
        @StreamListener(TestSink.INPUT)
        public void consume(Message message,
                            @Header(name = AmqpHeaders.CHANNEL, required = false) Channel channel,
                            @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag,
                            @Header(name = "x-death", required = false) Map<?,?> death) throws IOException {
            logger.info("Recv:{}", message);
    
            if(death!=null && death.get("count")!=null && Long.valueOf(death.get("count").toString()).compareTo(MAX_RETRY)>=0){
                logger.error("Discard the message");
                channel.basicAck(deliveryTag, false);
                return;
            }
            //c
            channel.basicReject(deliveryTag, false);
        }
    }
    

Reference http://edisonxu.com/2022/01/28/spring-cloud-stream-rabbit.html