In this article, you will learn how to use Confluent Schema Registry with Spring Cloud Stream and Kafka in a microservices architecture. We will use Apache Avro to serialize and deserialize events exchanged between our applications. Spring Cloud Stream provides a handy mechanism for integration with Kafka and schema registry.
Ok, but before we start, let’s say some words about schema registry. What is this? And why we may use it in our event-driven architecture? Let’s imagine we change the message on the producer side, by adding or removing some fields. We sent that message to a Kafka topic, but we don’t have many subscribers is receiving such events. In a typical microservices architecture, we may have many producers and many subscribers. It is often necessary for all those microservices to agree on a contract that is based on a schema. If a schema is evolving, the existing microservices are still required to work. Here comes a schema registry server. It provides a RESTful interface for storing and retrieving schemas in different formats like JSON, Protobuf, or Avro. It also stores a versioned history of all schemas and provides schema compatibility checks.
We may choose between several available products. Spring Cloud has its own implementation of a schema registry server. Although it can be easily integrated with Spring Cloud Stream, we won’t use it. Currently, it doesn’t allow verifying compatibility between different versions. There is also an Apicurio registry. On the other hand, it is not possible to easily integrate it with Spring Cloud Stream. Therefore our choice fell on the Confluent schema registry.
Event-driven architecture with Spring Cloud and schema registry
We are going to run three applications. One of them is sending events to the Kafka topic, while two others are receiving them. The integration with Kafka is built on top of Spring Cloud Stream. The consumer
Consumer-A is expecting events compatible with the
v1 of schema, while the second subscriber is expecting events compatible with the
v2 of schema. Before sending a message to Kafka the producer application tries to load schema definition from a remote server. If there is no result, it submits the data to the server, which replies with versioning information. The following diagram illustrates our architecture.
If a new schema is not compatible with the previous version, a schema registry rejects it. As a result, Spring Cloud Stream doesn’t allow to send a message to the Kafka topic. Otherwise, it serializes a message using Apache Avro. When a subscriber receives a message it first fetches schema from a remote registry. It gets a version of the schema from the header of a message. Finally, it deserializes it using the Avro format.
If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository and switch to the schema-registry branch. Then go to the
event-driven directory. After that, you should just follow my instructions. Let’s begin.
Running Confluent Schema Registry on Kubernetes
It seems that the simplest way to run Confluent Schema Registry locally is on Kubernetes. Since we need to run at least Zookeeper and Kafka to be able to run schema registry we will use Helm for it. First, let’s add Confluent Helm repository.
Then we just need to install Confluent Platform using operator.
Finally, let’s display a list of running pods in the
After that, we may display a list of Kubernetes services. Our application we will connect to the Kafka cluster through the
Integrate Spring Cloud Stream with Confluent Schema Registry
In order to enable integration with Confluent Schema Registry we first need to include the
spring-cloud-schema-registry-client dependency to the Maven
After that, we should enable
RegistryClient through annotation. By default, the client uses a schema registry server provided by Spring Cloud. Therefore, we have registered the
ConfluentSchemaRegistryClient bean as a default client implementation.
Since we run our schema registry on Kubernetes, its address is different the default one. Let’s override it in
Because we are going to serialize messages using Apache Avro format, we need to change a default content type for all topics to
application/*-avro. The message is sent with a
contentType header by using the following scheme:
prefix is configurable and
subject is deduced from the payload type. The default prefix is
vnd, and since the name of a message class is
CallmeEvent the value of the header would be
application/vnd.callmeevent.v1+avro for the
v1 version of schema or
application/vnd.callmeevent.v2+avro for the
Alternatively, we may set a content type just for a single destination. But more about it in the next sections.
Event class and Apache Avro serialization
We may choose between two types of approaches to the event class creation when working with Apache Avro. It is possible to generate Avro schema from a model class, or generate class from Avro schema using
avro-maven-plugin. Assuming we use a second approach we first need to create Avro schema and place it in the source code as the
.avsc file. Let’s say it is our Avro schema. It contains three fields
eventType. The name of a generated class will be
CallmeEvent and a package name will be the same as the
After that, we need to the following plugin to the Maven
pom.xml. We just need to configure the input directory with Avro schema files, and the output directory for the generated classes. Once you run a build, using for example
mvn clean package command it will generate a required class.
Just to simplify working with generated classes, let’s include the
target/generated-sources/avro as a source directory.
However, the simplest approach, especially in development, is to generate Avro schema automatically from the source code. With this approach, we first need to create
Then, we just need to enable dynamic Avro schema generation. Once you do it, Spring Cloud Stream automatically generates and sends schema to the schema registry before sending a message to a Kafka topic.
Integrate Spring Cloud Stream with Kafka
Spring Cloud Stream offers a broker agnostic programming model for sending and receiving messages. If you are looking for a quick introduction to that model and event-driven microservices read my article Introduction to event-driven microservices with Spring Cloud Stream. We use the same scenario as described in this article. However, we will add schema registry support and replace RabbitMQ with Kafka. In order to change the broker, we just need to replace a binder implementation as shown below.
Here’s the main class of the
producer-service application. It uses the
Supplier bean to generate events continuously after startup.
Here’s a Spring Cloud Stream configuration for
Supplier bean. It configures partitioning based on the value of the
Both consumers are receiving messages from the
callme-events topic. The same as for producer-service we need to enable
We also need to configure deserialization with Avro and partitioning on the consumer side.
Deploy applications on Kubernetes
Firstly, let’s deploy our Spring Cloud Stream applications on Kubernetes. Here’s a
Deployment manifest for
We also have similar manifests for consumer applications. We need to set the
INSTANCE_INDEX environment variable, which is then responsible for partitioning configuration.
The Deployment manifest for the
consumer-b application is visible below.
All those applications may be deployed on Kubernetes with Skaffold. Each application directory contains a Skaffold configuration file
skaffold.yaml, so you just need to execute the following command to run them on Kubernetes.
Testing integration between Spring Cloud Stream and schema registry
In order to register the
v1 version of the schema, we should run the
producer-service application with the following event class.
Then, we should restart it with the new version of the
CallmeEvent class as shown below.
Now, we can verify a list of schemas registered on the server. First, let’s enable port forwarding for the Confluent Schema Registry service.
Thanks to that, we may access schema registry REST API on the local port. Let’s display a list of registered subjects. As you see there is a single subject called
In the next step, we may get a list of versions registered under the
callmeevent subject. As we expect, there are two versions available in the schema registry.
We can display a full schema definition by calling the following endpoint using schema id.
Finally, we are going to change our schema once again. Until then, a new version of a schema was compatible with the previous one. Now, we create a schema, which is incompatible with the previous version. In particular, we change the
eventType field into
eventTp. That change is provided on the producer side.
producer-service Spring Cloud Stream tries to register a new version of the schema. Let’s just take a look at application logs. As you see, a new schema has been rejected by the Confluent Schema Registry. Here’s a fragment of
producer-service logs after a schema change.