Suppose our application organizes asynchronous domain logic inside
DomainEventListener’s like the following code snippet.
This post shows how to integrate these listeners with Spring AMQP by taking advantage of Spring’s infrastructure. We also briefly discuss several important topics on using messaging middleware.
Up and running
The web management UI is available at http://localhost:15672. The default username and
password are both
Messaging with RabbitMQ is a very basic Hello World example for Spring Boot integration.
For a more production-ready setting, it’s mandatory to refer to the Spring AMQP reference documentation.
AMQP (Advanced Message Queuing Protocol) is an application layer protocol for messaging middleware. RabbitMQ primarily supports the AMQP 0-9-1 model. It mainly revolves around the following three AMQP entities:
Exchange, queue, and binding
Messages are published to an exchange and consumed from a queue. A queue can subscribe to exchanges with bindings. When a message arrives at an exchange, it will be routed to different queues based on:
- the routing key in the message, and
- the type of the addressed exchange.
The four basic types of exchange are:
- Direct exchange
- Fanout exchange
- Topic exchange
- Headers exchange
The routing mechanism further decouples the publisher and subscriber, making it easy to adapt to different workflows. It may look overwhelming at first sight, so we are sticking with the simplest Direct Exchange. In this case, the message will be routed to subscribing queues with the same routing key.
To learn more about RabbitMQ and AMQP, I would recommend RabbitMQ in Depth. Its clear explanation and illustration make it easier to grasp these concepts.
Declarative configuration with Spring AMQP
With Spring AMQP, we can configure the required AMQP entities in the message broker by registering them as beans in the application context. At runtime, Spring AMQP will issue requests to the broker and create them.
I like this declarative approach. It feels like Kubernetes without nasty YAML configurations.
In this code snippet, we first declare an application-wide
domainEventExchange. Then, we loop over
DomainEventListener’s in the application context and create the necessary queues and bindings.
- We can declare multiple AMQP entities in a
Declarables. They don’t need to be the same type.
durablemeans the created entity will survive broker restart.
So far, we have declared desired entities in the broker, but haven’t yet wired up our
DomainEventListener’s to consume
messages from it and perform business logic. To do so, we need to wrap our event handlers
MessageListenerContainer represents an active or hot component. It handles the connection to the message broker.
When the connection is broken,
SimpleMessageListenerConainer will try to restart the listener. As a lifecycle
component, it provides methods for starting and stopping.
To programmatically register our domain listeners, we can implement
RabbitListenerConfigurer and use
RabbitListenerEndpointRegistrar like this:
Spring AMQP will take care of creating the listener containers at runtime. Note that we passed an instance
RabbitListenerContainerFactory to the registrar. We will see that we can configure common properties of the
containers through the container factory.
Check out more details at:
Messages are ephemeral
It’s important to realize that in the AMQP model, the message broker just acts as a postman between sender and receiver. It holds on to the messages only temporarily. After successful delivery to all consumers, the message is usually removed from the broker.
In contrast, Redis Stream and Kafka are more like databases that store the messages until explicitly deleted.
- It’s easy to lose messages if something is misconfigured. For example, a message published to an exchange with no bound queues will be dropped. Conceptually, it seems only queues in AMQP have memory.
- No first-class support for doing CRUD on messages. For example, the web UI does not have a list screen to page through the messages. It’s possible to retrieve a few ones in the queue detail page, but it has a warning that says “Getting messages from a queue is a destructive action.” Although not necessarily always “destructive”, the action will probably cause side effects on the message.
- Spring AMQP shares this kind of mindset. For example, it is important to configure a
RetryInterceptor. By default, Spring AMQP will drop the message after retrying for configured times and issue a warning. Retry is discussed in the following section.
Message consumers can fail at any time (due to business exceptions, dropped connections, application crash, etc). To prevent losing messages in this way, message brokers use acknowledgments: a message is removed from the broker only after the client explicitly acknowledges that it has processed it.
In Laravel, when a job is taken from the queue, it’s placed at
myqueue:reservedkey at the same time. These two steps form an atomic operation by using Lua scripting. This can also be seen as a form of acknowledgment.
Dead letter exchange
Suppose there is a mal-formed message. In our Spring application, if our message handlers throw an exception due to this message, by default, the message will be re-queued and delivered again, resulting in an infinite loop. A viable solution would be sending the bad message to other places for inspection after a few retries.
RabbitMQ can handle this situation with dead letter exchanges. When our consumer negatively acknowledges a message, the queue will “dead-letter” the message (annotate with some information about the failure) and route it to the configured dead letter exchange. This configuration can be applied globally in the broker with a policy or specified when creating a queue.
You can also apply a policy in the RabbitMQ web UI under the “admin” tag.
Note that the dead letter exchange is just a regular exchange. If you specify a non-existent exchange, RabbitMQ does not create it automatically. This is a type of misconfiguration that can cause lost messages.
AMQP entity declaration in Spring for configuring dead letter exchanges:
For each listener, we create:
- a corresponding dead letter queue, and
- its binding to the application-wide
By properly configuring listening queues and bindings, we ensure dead letters won’t get lost.
Then, update our
queues() code to configure the queues’ dead letter exchange.
We also need to set up our listener containers to instruct RabbitMQ not to requeue rejected messages. If a dead letter exchange is configured on the queue, the message will be routed to it. Otherwise, the message will be dropped.
Laravel’s failed jobs table is the equivalent concept.
It may be helpful to retry a few times before routing a failed message to the dead letter exchange. Spring provides some retry helpers, and we can configure them in the container factory like this: