In order to update documents in a MongoDB collection, we often use update requests, if the volume of data is too large, it could lead to performance issues and overconsumption of hardware resources.

We will implement a solution to enrich and update efficiently a large amount of data using Spring Data MongoDB Reactive.

Before continuing the reading, if you are not familiar with Spring reactive stack and MongoDB, I suggest you to check the resources section.

1. EIP content enricher

EIP content enricher

Enterprise Integration Pattern Content Enricher appends information to an existing message from an external source. It uses information inside the incoming message to perform the enrichment operation.

We will implement a simplified version of the EIP:

  1. Input message : represented by a MongoDB document.
  2. Enricher : our application.
  3. Resource : call to a RESTful API.
  4. Output message : we will keep only the enriched document.

1.1. Integration flow

Integration flow

The application will read the address documents, add the product and save the enriched documents to the MongoDB database.

2. Project setup

2.1. Requirements

  • Java 1.8+
  • Maven 3+
  • Docker Compose
  • MongoDB Database Tools

2.2. Generation

We generate the project skeleton from Spring Initializr.

2.3. Structure

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
.
│  .gitignore
│  docker-compose.yml
│  pom.xml
│  README.adoc
├───data
│  ├───mongodb
│  │     address.ndjson
│  └───product
│        db.json
└───src
   ├───main
   │  ├───java
   │  │  └───com
   │  │     └───maoudia
   │  │        └───tutorial
   │  │             Application.java
   │  │             AppProperties.java
   │  │             CollectionService.java
   │  │             NetworkConfig.java
   │  └───resources
   │        application.yml
   └───test
      └───java
        └───com
           └───maoudia
              └───tutorial
                   CollectionServiceTest.java

2.4. Containers

Download data directory to the root of the project.

We use docker-compose to create the needed containers for this tutorial.

docker-compose.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
services:
  mongodb:  #1
    container_name: maoudia-mongodb
    image: mongo:5.0.8
    environment:
      - MONGO_INITDB_DATABASE=test
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=password
    networks:
      - mongodb-network
    ports:
      - 15015:27017
    volumes:
      - ./data/mongodb:/data/mongodb

  mongo-express:  #2
    container_name: maoudia-mongo-express
    image: mongo-express:0.54.0
    depends_on:
      - mongodb
    networks:
      - mongodb-network
    environment:
      - ME_CONFIG_MONGODB_SERVER=maoudia-mongodb
      - ME_CONFIG_MONGODB_ADMINUSERNAME=admin
      - ME_CONFIG_MONGODB_ADMINPASSWORD=password
    ports:
      - 1515:8081
    volumes:
      - ./data/mongodb:/data/mongodb

  product-api: #3
    container_name: maoudia-product-api
    image: clue/json-server:latest
    ports:
      - 1519:80
    volumes:
      - ./data/product/db.json:/data/db.json

networks:
  mongodb-network:
    driver: bridge
  1. #1: MongoDB initialized with the test database.
  2. #2: MongoExpress is a MongoDB administration interface.
  3. #3: Product API which is configured from db.json file.

We start up the services:

1
docker-compose up -d

2.5. Data initialization

We use a JSON document from the French address database.

Address

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "id": "59350",
  "type": "municipality",
  "name": "Lille",
  "postcode": [
    "59000",
    "59800",
    "59260",
    "59777",
    "59160"
  ],
  "citycode": "59350",
  "x": 703219.96,
  "y": 7059335.72,
  "lon": 3.045433,
  "lat": 50.630992,
  "population": 234475,
  "city": "Lille",
  "context": "59, Nord, Hauts-de-France",
  "importance": 0.56333
}

Import address collection :

1
mongoimport --uri "mongodb://admin:password@localhost:15015" --authenticationDatabase=admin --db test --collection address ./data/mongodb/address.ndjson

Ou :

We use MongoExpress which is available at http://localhost:1515.

Product represents a satellite internet offer.

Product

1
2
3
4
5
6
7
{
  "id": 1,
  "available": true,
  "company": "SPACEX",
  "provider": "STARLINK",
  "type": "SATELLITE"
}

Product API is available at http://localhost:1519.

3. Application

3.1. Configuration

We change file extension from application.properties to application.yml.

application.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
app:
  buffer-max-size: 500
  bulk-size: 100
  collection-name: address
  enriching-key: product
  enriching-uri: http://localhost:1519/products/1
spring:
  main:
    web-application-type: none
  data:
    mongodb:
      database: test
      uri: mongodb://admin:password@localhost:15015
---
spring.config.activate.on-profile: dev
logging:
  level:
    org.mongodb.driver: debug
---
spring.config.activate.on-profile: test
app:
  bulk-size: 2

We declare a class which contains application configuration properties.

AppProperties.java

1
2
3
4
5
6
7
8
9
@ConfigurationProperties(prefix = "app")
public class AppProperties {
    private int bulkSize;
    private int bufferMaxSize;
    private String collectionName;
    private String enrichingKey;
    private String enrichingUri;
    // Getter and Setter are omitted
}

We create a @Bean of Spring non-blocking HTTP client.

NetworkConfig.java

1
2
3
4
5
6
7
8
9
@Configuration
public class NetworkConfig {

    @Bean
    public WebClient client() {
        return WebClient.create();
    }

}

3.2. Implementation

We create a @Service which contains application business logic.

CollectionService.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class CollectionService {
    private final AppProperties properties;
    private final ReactiveMongoTemplate template;
    private final WebClient client;

    public CollectionService(AppProperties properties,
                             ReactiveMongoTemplate template,
                             WebClient client) {
        this.properties = properties;
        this.template = template;
        this.client = client;
    }

    public Flux<BulkWriteResult> enrichAll(String collectionName, String enrichingKey, String enrichingUri) {
            return template.findAll(Document.class, collectionName) // #1
                    .onBackpressureBuffer(properties.getBufferMaxSize())  //#2
                    .flatMap(document -> enrich(document,  enrichingKey, enrichingUri))  //#3
                    .map(CollectionService::toReplaceOneModel)  //#4
                    .window(properties.getBulkSize())  //#5
                    .flatMap(replaceOneModelFlux -> bulkWrite(replaceOneModelFlux, collectionName));  // #6
    }
}
  1. #1: Creates a stream of documents from the collection.
  2. #2:Limits the maximum number of loaded documents in the RAM in case of consumption process is slower than production. If the maximum buffer size is exceeded, an IllegalStateException is thrown.
  3. #3:Enriches document asynchronously with the external one.
  4. #4:Creates a ReplaceOneModel from document.
  5. #5:Group documents into streams of fixed size. The last stream can be smaller.
  6. #6:Calls bulk write function.

Configuration property app.bulk-size can be adjusted according to the project needs and available hardware resources. The larger the value of the maximum size, the higher the memory consumption and the size of the requests.

We create document enrichment functions.

CollectionService.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private Publisher<Document> enrich(Document document, String enrichingKey, String enrichingUri) { // #1
    return getEnrichingDocument(enrichingUri)
            .map(enrichingDocument -> {
                document.put(enrichingKey, enrichingDocument);
                document.put("updatedAt", new Date());
                return document;
            });
}

private Mono<Document> getEnrichingDocument(String enrichingUri) {  //#2
    return client.get()
            .uri(URI.create(enrichingUri))
            .retrieve()
            .bodyToMono(Document.class);
}
  1. #1: Adds the retrieved document from HTTP call to root of document to be enriched with the key passed in parameter.
  2. #2: Retrieves a document from an URI.

MongoDB converts and stores dates in UTC by default.

CollectionService.java

1
2
3
4
5
6
7
8
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions();   //#1
private static ReplaceOneModel<Document> toReplaceOneModel (Document document) {
    return new ReplaceOneModel<>(
            Filters.eq("_id", document.get("_id")),  //#1
            document,  // #3
            REPLACE_OPTIONS
    );
}
  1. #1: Instantiates default replacement configuration.
  2. #2: Filter that allows matching by document identifier.
  3. #3: Content to be replaced, represents the complete enriched document.

CollectionService.java

1
2
3
4
5
6
private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions().ordered(false);  // #1
private Flux<BulkWriteResult> bulkWrite(Flux<ReplaceOneModel<Document>> updateOneModelFlux, String collectionName) {
    return updateOneModelFlux.collectList()  // #2
            .flatMapMany(unused -> template.getCollection(collectionName)  //#3
                    .flatMapMany(collection -> collection.bulkWrite(updateOneModels, BULK_WRITE_OPTIONS)));  // #4
}
  1. #1: Instantiates writing options with disabling operations order.
  2. #2: Collects the stream into a list.
  3. #3: Retrieves the collection passed as a parameter.
  4. #4: Bulk writes documents into MongoDB collection.

Transactions are supported on Replicaset since MongoDB 4.2. If transactions are enabled, we can use @Transactional or TransactionalOperator to make a method transactional.

We implement the following interfaces:

  • CommandLineRunner : runs enrichment command at application startup.
  • ExitCodeGenerator : manages application system exit code.

Application.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SpringBootApplication(exclude = MongoReactiveRepositoriesAutoConfiguration.class)  // #1
@ConfigurationPropertiesScan("com.maoudia.tutorial") // #2
public class Application implements CommandLineRunner, ExitCodeGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
    private final AppProperties properties;
    private final CollectionService service;
    private int exitCode = 255;

    public static void main(String[] args) {
        System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
    }

    public Application(AppProperties properties, CollectionService service) {
        this.properties = properties;
        this.service = service;
    }

    @Override
    public void run(final String... args) {
        service.enrichAll(properties.getCollectionName(), properties.getEnrichingKey(), properties.getEnrichingUri())
                .doOnSubscribe(unused -> LOGGER.info("------------------< Staring Collection Enriching Command >-------------------"))  //#3
                .doOnNext(bulkWriteResult -> LOGGER.info("Bulk write result with {} modified document(s)", bulkWriteResult.getModifiedCount()))
                .doOnError(throwable -> {
                    exitCode = 1;
                    LOGGER.error("Collection enriching failed due to : {}", throwable.getMessage(), throwable);
                })
                .doOnComplete(() -> exitCode = 0)
                .doOnTerminate(() -> LOGGER.info("------------------< Collection Enriching Command Finished >------------------"))
                .blockLast();  // #4
    }

    @Override
    public int getExitCode() {
        return exitCode;
    }

}
  1. #1: Disables auto-configuration of repositories, as we use MongoReactiveTemplate only.
  2. #2: Allows scanning and detecting beans that carry the @ConfigProperties annotation.
  3. #3: Subscribing to stream triggers the processing.
  4. #4: Without a running web server, we have to subscribe indefinitely to the Publisher in order to trigger and wait until the end of the execution.

3.3. Demo

We launch the application :

1
mvn spring-boot:run

Output :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
...
2022-06-10 00:36:45.152  INFO 7036 --- [           main] com.maoudia.tutorial.Application         : Started Application in 2.755 seconds (JVM running for 3.251)
2022-06-10 00:36:45.227  INFO 7036 --- [           main] com.maoudia.tutorial.Application         : ------------------< Staring Collection Enriching Command >-------------------
2022-06-10 00:36:45.297  INFO 7036 --- [           main] org.mongodb.driver.cluster               : No server chosen by com.mongodb.reactivestreams.client.internal.ClientSessionHelper$$Lambda$543/543409470@4647881c from cluster description ClusterDescription{type=UNKNOWN, connectionMode=SINGLE, serverDescriptions=[ServerDescription{address=localhost:15015, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out
2022-06-10 00:36:46.527  INFO 7036 --- [localhost:15015] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:1, serverValue:39}] to localhost:15015
2022-06-10 00:36:46.527  INFO 7036 --- [localhost:15015] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:2, serverValue:40}] to localhost:15015
2022-06-10 00:36:46.527  INFO 7036 --- [localhost:15015] org.mongodb.driver.cluster               : Monitor thread successfully connected to server with description ServerDescription{address=localhost:15015, type=STANDALONE, state=CONNECTED, ok=true, minWireVersion=0, maxWireVersion=13, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=61576400}
2022-06-10 00:36:46.692  INFO 7036 --- [ntLoopGroup-2-3] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:3, serverValue:41}] to localhost:15015
2022-06-10 00:36:48.355  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:48.482  INFO 7036 --- [ntLoopGroup-2-4] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:42}] to localhost:15015
2022-06-10 00:36:48.562  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:48.742  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:48.982  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:49.222  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:49.488  INFO 7036 --- [ntLoopGroup-2-4] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:49.701  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:49.852  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:50.031  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:50.105  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2022-06-10 00:36:50.106  INFO 7036 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : ------------------< Collection Enriching Command Finished >------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  17.315 s
[INFO] Finished at: 2022-06-10T00:36:54+02:00
[INFO] ------------------------------------------------------------------------

Process finished with exit code 0

3.4. VisuelVM report

VisualVM is a lightweight profiling tool. It is used to have an overview of the threads which are launched by the application.

VisuelVM report

There are two groups of threads that execute operations in parallel, each group forms an event loop.

  • MongoDB requests are executed by nioEventLoopGroup.
  • HTTP requests are executed by reactor-http-nio.

4. Integration tests

We use JUnit 5 and the Testcontainers MongoDB module for the integration tests. It allows to have a feedback close to the real behaviour of the application which essentially do read/write operations.

To keep this tutorial short, we will only write one test.

CollectionServiceTest.java

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Profile("test")
@SpringBootTest
@Testcontainers  // #1
class CollectionServiceTest {

    @Container
    private static final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:5.0.8")  // #2
            .withReuse(true);

    @DynamicPropertySource
    private static void setProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.data.mongodb.uri", mongoDBContainer::getReplicaSetUrl); // #3
    }

    @Autowired
    private AppProperties properties;
    @Autowired
    private CollectionService command;
    @Autowired
    private ReactiveMongoTemplate template;

    @Test
    void multipleBulkWriteResultsAreReturned() {
        Document givenDocument1 = new Document();
        givenDocument1.put("_id", "628ea3edb5110304e5e814f6");
        givenDocument1.put("type", "municipality");
        Document givenDocument2 = new Document();
        givenDocument2.put("_id", "628ea3edb5110304e5e814f7");
        givenDocument2.put("type", "street");
        Document givenDocument3 = new Document();
        givenDocument3.put("_id", "628ea3edb5110304e5e814f8");
        givenDocument3.put("type", "housenumber");

        template.insert(Arrays.asList(givenDocument1, givenDocument2, givenDocument3), properties.getCollectionName()).blockLast();

        BulkWriteResult expectedBulkWriteResult1 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 2, 2, Collections.emptyList(),
                Collections.emptyList());
        BulkWriteResult expectedBulkWriteResult2 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 1, 1, Collections.emptyList(),
                Collections.emptyList());

        command.enrichAll( properties.getCollectionName(), properties.getEnrichingKey() , properties.getEnrichingUri())
                .as(StepVerifier::create) // #4
                .expectNext(expectedBulkWriteResult1)
                .expectNext(expectedBulkWriteResult2)
                .verifyComplete();
    }
}
  1. #1: Adds TestContainers Junit 5 extension.
  2. #2: Starts a MongoDB container.
  3. #3: Sets up application with container’s URI.
  4. #4: Uses StepVerifier from Reactor Test to assert output stream.

We launch the integration tests :

1
mvn test -Dspring.profiles.active=test

Test results :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
...
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 20.563 s - in com.maoudia.tutorial.CollectionServiceTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  32.100 s
[INFO] Finished at: 2022-06-10T01:02:17+02:00
[INFO] ------------------------------------------------------------------------

5. Conclusion

In this tutorial, we managed to implement a complete solution to enrich and update efficiently a MongoDB collection. Moreover, we have seen how to write integration tests with JUnit 5 and Testcontainers.

The complete source code is available on Github.

In the next chapter of MongoDB Reactive CLI series, we will add new features and use Picocli to facilitate interactions with the application.

6. Resources

Reference https://www.maoudia.com/blog/bulk-update-with-spring-data-mongodb-reactive/