In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for an intro to the Spring Cloud Stream project you should read my article about it. It describes how to use Spring Cloud Stream with RabbitMQ in order to build event-driven microservices.

In Spring Cloud Stream there are two binders supporting the Kafka platform. We will focus on the second of them – Apache Kafka Streams Binder. You can read more about it in Spring Cloud documentation available here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

There are three major types in Kafka Streams – KStream , KTable and GlobalKTable . Spring Cloud Stream supports all of them. We can easily convert the stream to the table and vice-versa. To clarify, all Kafka topics are stored as a stream. The difference is: when we want to consume that topic, we can either consume it as a table or a stream. KTable takes a stream of records from a topic and reduces it down to unique entries using a key of each message.

Architecture

KStream represents an immutable stream of data where each new record is treated as INSERT . In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. The stock-service application receives and handles events from those topics. In the first step, it needs to change the key of each message from the orderId to the productId . That’s because it has to join orders from different topics related to the same product in order to execute transactions. The final transaction price is an average of sell and buy order price.

kafka-streams-spring-cloud-concept|696x176

We are building a very simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sell order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we may perform a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order an amount of product for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

kafka-streams-spring-cloud-arch|506x403

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is through Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. You just need to have Docker installed. Once you installed Redpanda on your machine you need to create a cluster. Since you don’t need a large cluster during development, you can create a single-node instance using the following command:

1
$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:50842 . So, now I can display a list of created topics using the following command:

1
$ rpk topic list --brokers 127.0.0.1:50842

Currently, there are no topics created. We don’t need to do anything manually. Spring Cloud Stream automatically creates missing topics on the application startup. In case, you would like to remove the Redpanda instance after our exercise, you just need to run the following command:

1
$ rpk container purge

Perfectly! Our local instance of Kafka is running. After that, we may proceed to the development.

Send events to Kafka with Spring Cloud Stream

In order to generate and send events continuously with Spring Cloud Stream Kafka, we need to define a Supplier bean. In our case, the order-service application generates test data. Each message contains a key and a payload that is serialized to JSON. The message key is the order’s id . We have two Supplier beans since we are sending messages to the two topics. Here’s the Order event class:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Order {
   private Long id;
   private Integer customerId;
   private Integer productId;
   private int productCount;
   @JsonDeserialize(using = LocalDateTimeDeserializer.class)
   @JsonSerialize(using = LocalDateTimeSerializer.class)
   private LocalDateTime creationDate;
   private OrderType type;
   private int amount;
}

Our application uses Lombok and Jackson for messages serialization. Of course, we also need to include Spring Cloud Stream Kafka Binder. Opposite to the consumer side, the producer does not use Kafka Streams, because it is just generating and sending events.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
<dependencies>
  <dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

We have a predefined list of orders just to test our solution. We use MessageBuilder to build a message that contains the header kafka_messageKey and the Order payload.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@SpringBootApplication
@Slf4j
public class OrderService {

   private static long orderId = 0;
   private static final Random r = new Random();

   LinkedList<Order> buyOrders = new LinkedList<>(List.of(
      new Order(++orderId, 1, 1, 100, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 2, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 3, 1, 100, LocalDateTime.now(), OrderType.BUY, 1030),
      new Order(++orderId, 4, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 5, 1, 200, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 11, 1, 100, LocalDateTime.now(), OrderType.BUY, 1050)
   ));

   LinkedList<Order> sellOrders = new LinkedList<>(List.of(
      new Order(++orderId, 6, 1, 200, LocalDateTime.now(), OrderType.SELL, 950),
      new Order(++orderId, 7, 1, 100, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 8, 1, 100, LocalDateTime.now(), OrderType.SELL, 1050),
      new Order(++orderId, 9, 1, 300, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 10, 1, 200, LocalDateTime.now(), OrderType.SELL, 1020)
   ));

   public static void main(String[] args) {
      SpringApplication.run(OrderService.class, args);
   }

   @Bean
   public Supplier<Message<Order>> orderBuySupplier() {
      return () -> {
         if (buyOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(buyOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(buyOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

   @Bean
   public Supplier<Message<Order>> orderSellSupplier() {
      return () -> {
         if (sellOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(sellOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(sellOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

}

After that, we need to provide some configuration settings inside the application.yml file. Since we use multiple binding beans (in our case Supplier beans) we have to define the property spring.cloud.stream.function.definition that contains a list of bindable functions. We need to pass the Supplier method names divided by a semicolon. In the next few lines, we are setting the name of the target topics on Kafka and the message key serializer. Of course, we also need to set the address of the Kafka broker.

1
2
3
4
5
6
7
8
9
spring.kafka.bootstrap-servers: ${KAFKA_URL}

spring.cloud.stream.function.definition: orderBuySupplier;orderSellSupplier

spring.cloud.stream.bindings.orderBuySupplier-out-0.destination: orders.buy
spring.cloud.stream.kafka.bindings.orderBuySupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

spring.cloud.stream.bindings.orderSellSupplier-out-0.destination: orders.sell
spring.cloud.stream.kafka.bindings.orderSellSupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

Before running the application I need to create an environment variable containing the address of the Kafka broker.

1
$ export KAFKA_URL=127.0.0.1:50842

Then, let’s run our Spring Cloud application using the following Maven command:

1
$ mvn clean spring-boot:run

Once you did that, it sent some test orders for the same product ( productId=1 ) as shown below.

|696x139

We can also verify a list of topics on our local Kafka instance. Both of them have been automatically created by the Spring Cloud Stream Kafka binder before sending messages.

|553x73

Consume Kafka Streams with Spring Cloud Stream

Now, we are going to switch to the stock-service implementation. In order to process streams of events, we need to include the Spring Cloud Stream Kafka Streams binder. Also, our application would have an ORM layer for storing data, so we have to include the Spring Data JPA starter and the H2 database.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-jpa</artifactId>
  </dependency>
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable ? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams, we need to declare a functional bean that takes KStream as an input parameter. If there are two sources, we have to use BiConsumer (just for consumption) or BiFunction (to consume and send events to the new target stream) beans. In that case, we are not creating a new stream of events, so we can use BiConsumer .

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
@Autowired
OrderLogic logic;

@Bean
public BiConsumer<KStream<Long, Order>, KStream<Long, Order>> orders() {
   return (orderBuy, orderSell) -> orderBuy
         .merge(orderSell)
         .peek((k, v) -> {
            log.info("New({}): {}", k, v);
            logic.add(v);
         });
}

After that, we need to add some configuration settings. There are two input topics, so we need to map their names. Also, if we have more than one functional bean we need to set applicationId related to the particular function. For now, it is not required, since we have only a single function. But later, we are going to add other functions for some advanced operations.

1
2
3
spring.cloud.stream.bindings.orders-in-0.destination: orders.buy
spring.cloud.stream.bindings.orders-in-1.destination: orders.sell
spring.cloud.stream.kafka.streams.binder.functions.orders.applicationId: orders

For now, that’s all. You can now run the instance of stock-service using the Maven command mvn spring-boot:run .

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

|696x224

In order to implement the scenario described above, we need to define the BiFunction bean. It takes two input KStream from orders.buy and orders.sell and creates a new KStream of transaction events sent to the output transactions topic. While joining streams it uses 10 seconds sliding window and invokes the execute method for creating a new transaction.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Bean
public BiFunction<KStream<Long, Order>, KStream<Long, Order>, KStream<Long, Transaction>> transactions() {
   return (orderBuy, orderSell) -> orderBuy
         .selectKey((k, v) -> v.getProductId())
         .join(orderSell.selectKey((k, v) -> v.getProductId()),
               this::execute,
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Integer(), 
                                 new JsonSerde<>(Order.class), 
                                 new JsonSerde<>(Order.class)))
         .filterNot((k, v) -> v == null)
         .map((k, v) -> new KeyValue<>(v.getId(), v))
         .peek((k, v) -> log.info("Done -> {}", v));
}

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), orderSell.getProductCount());
      boolean allowed = logic.performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            Math.min(orderBuy.getProductCount(), orderSell.getProductCount()),
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW");
   } else {
      return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Service
public class OrderLogic {

   private OrderRepository repository;

   public OrderLogic(OrderRepository repository) {
      this.repository = repository;
   }

   public Order add(Order order) {
      return repository.save(order);
   }

   @Transactional
   public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
      Order buyOrder = repository.findById(buyOrderId).orElseThrow();
      Order sellOrder = repository.findById(sellOrderId).orElseThrow();
      int buyAvailableCount = buyOrder.getProductCount() - buyOrder.getRealizedCount();
      int sellAvailableCount = sellOrder.getProductCount() - sellOrder.getRealizedCount();
      if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
         buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
         sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
         repository.save(buyOrder);
         repository.save(sellOrder);
         return true;
      } else {
         return false;
      }
   }
}

Here’s our repository class with the findById method. It sets a pessimistic lock on the Order entity during the transaction.

1
2
3
4
5
6
public interface OrderRepository extends CrudRepository<Order, Long> {

  @Lock(LockModeType.PESSIMISTIC_WRITE)
  Optional<Order> findById(Long id);

}

We also need to provide configuration settings for the transaction BiFunction .

1
2
3
4
5
6
spring.cloud.stream.bindings.transactions-in-0.destination: orders.buy
spring.cloud.stream.bindings.transactions-in-1.destination: orders.sell
spring.cloud.stream.bindings.transactions-out-0.destination: transactions
spring.cloud.stream.kafka.streams.binder.functions.transactions.applicationId: transactions

spring.cloud.stream.function.definition: orders;transactions

Use Kafka KTable with Spring Cloud Stream

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. Now, we would like to examine data generated by our stock-service application. The most important things are how many transactions were generated, what was the volume of transactions globally, and per product. Three key statistics related to our transactions are: the number of transactions, the number of products sell/buy during transactions, and the total amount of transactions ( price * productsCount ). Here’s the definition of our object used for counting aggregations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class TransactionTotal {
   private int count;
   private int productCount;
   private int amount;
}

In order to call an aggregation method, we first need to group orders stream by the selected key. In the method visible below we use the status field as a grouping key. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total amount. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name all-transactions-store .

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@Bean
public Consumer<KStream<Long, Transaction>> total() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "all-transactions-store");
   return transactions -> transactions
         .groupBy((k, v) -> v.getStatus(), 
                  Grouped.with(Serdes.String(), new JsonSerde<>(Transaction.class)))
         .aggregate(
                 TransactionTotal::new,
                 (k, v, a) -> {
                    a.setCount(a.getCount() + 1);
                    a.setProductCount(a.getProductCount() + v.getAmount());
                    a.setAmount(a.getAmount() + (v.getPrice() * v.getAmount()));
                    return a;
                 },
                 Materialized.<String, TransactionTotal> as(storeSupplier)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total: {}", v));
}

The next function performs a similar aggregate operation, but this time per each product. Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. The same as before we are materializing aggregation as a state store.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> totalPerProduct() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "transactions-per-product-store");
   return (transactions, orders) -> transactions
         .selectKey((k, v) -> v.getSellOrderId())
         .join(orders.selectKey((k, v) -> v.getId()),
               (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Long(), 
                  new JsonSerde<>(Transaction.class), 
                  new JsonSerde<>(Order.class)))
         .groupBy((k, v) -> v.getProductId(), 
            Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
         .aggregate(
               TransactionTotal::new,
               (k, v, a) -> {
                  a.setCount(a.getCount() + 1);
                  a.setProductCount(a.getProductCount() + v.getTransaction().getAmount());
                  a.setAmount(a.getAmount() + (v.getTransaction().getPrice() * v.getTransaction().getAmount()));
                  return a;
               },
               Materialized.<Integer, TransactionTotal> as(storeSupplier)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total per product({}): {}", k, v));
}

What if we would like to perform similar aggregations to described above, but only for a particular period of time? We need to invoke the windowedBy method and produce a dedicated state store for such operations.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> latestPerProduct() {
   WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(
      "latest-transactions-per-product-store", Duration.ofSeconds(30), Duration.ofSeconds(30), false);
   return (transactions, orders) -> transactions
      .selectKey((k, v) -> v.getSellOrderId())
      .join(orders.selectKey((k, v) -> v.getId()),
            (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
            JoinWindows.of(Duration.ofSeconds(10)),
            StreamJoined.with(Serdes.Long(), new JsonSerde<>(Transaction.class), new JsonSerde<>(Order.class)))
      .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
      .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
      .aggregate(
            TransactionTotal::new,
            (k, v, a) -> {
               a.setCount(a.getCount() + 1);
               a.setAmount(a.getAmount() + v.getTransaction().getAmount());
               return a;
            },
            Materialized.<Integer, TransactionTotal> as(storeSupplier)
               .withKeySerde(Serdes.Integer())
               .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
      .toStream()
      .peek((k, v) -> log.info("Total per product last 30s({}): {}", k, v));
}

Interactive queries

We have already created and configured all required Kafka Streams with Spring Cloud. Finally, we can execute queries on state stores. This operation is called an interactive query. Let’s create a REST controller for exposing such endpoints with the results. In order to query Kafka Streams state stores with Spring Cloud, we need to inject the InteractiveQueryService bean into the controller.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@RestController
@RequestMapping("/transactions")
public class TransactionController {

   private InteractiveQueryService queryService;

   public TransactionController(InteractiveQueryService queryService) {
      this.queryService = queryService;
   }

   @GetMapping("/all")
   public TransactionTotal getAllTransactionsSummary() {
      ReadOnlyKeyValueStore<String, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("all-transactions-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get("NEW");
   }

   @GetMapping("/product/{productId}")
   public TransactionTotal getSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product/latest/{productId}")
   public TransactionTotal getLatestSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("latest-transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product")
   public Map<Integer, TransactionTotal> getSummaryByAllProducts() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      KeyValueIterator<Integer, TransactionTotal> it = keyValueStore.all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

}

Before you run the latest version of the stock-service application you should generate more differentiated random data. Let’s say we would like to generate orders for 5 different products with floating prices as shown below. Just uncomment the following fragment of code in the order-service and run the application once again to generate an infinitive stream of events.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static long orderId = 0;
private static final Random r = new Random();

private Map<Integer, Integer> prices = Map.of(
      1, 1000, 
      2, 2000, 
      3, 5000, 
      4, 1500, 
      5, 2500);

@Bean
public Supplier<Message<Order>> orderBuySupplier() {
   return () -> {
      Integer productId = r.nextInt(1, 6);
      int price = prices.get(productId) + r.nextInt(-100, 100);
      Order o = new Order(
         ++orderId,
         r.nextInt(1, 6),
         productId,
         100,
         LocalDateTime.now(),
         OrderType.BUY,
         price);
      log.info("Order: {}", o);
      return MessageBuilder
         .withPayload(o)
         .setHeader(KafkaHeaders.MESSAGE_KEY, orderId)
         .build();
   };
}

You may also want to generate more messages. To do that you need to decrease timeout for Spring Cloud Stream Kafka Supplier .

1
spring.cloud.stream.poller.fixedDelay: 100

After running both our sample applications you may verify the logs on the stock-service side.

|696x198

Then you may call our REST endpoints performing interactive queries on the materialized Kafka KTable .

1
2
3
$ curl http://localhost:8080/transactions/all
$ curl http://localhost:8080/transactions/product/3
$ curl http://localhost:8080/transactions/product/latest/5

It looks simple? Well, under the hood it may look quite more complicated Here’s a final list of topics automatically created to the needs of our application.

kafka-streams-spring-cloud-topics|621x344

Final Thoughts

Spring Cloud Stream simplifies working with Kafka Streams and interactive queries. Kafka Streams by itself is a very powerful mechanism. In this article, I showed you how we can use it to implement not very trivial logic and then analyze data in various ways.

Reference https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/