Moncef AOUDIA

Software developer - OSS enthusiast
en | fr

Bulk Update With Spring Data MongoDB Reactive

Published on 2022-06-20 | Updated on 2024-02-09 | 10 mins read | Tutorial Reactive Programming

In order to update documents in a MongoDB collection, we often use update requests, if the volume of data is too large, it could lead to performance issues and overconsumption of hardware resources.

We will implement a solution to enrich and update efficiently a large amount of data using Spring Data MongoDB Reactive.

Before continuing the reading, if you are not familiar with Spring reactive stack and MongoDB, I suggest you to check the resources section.

1. EIP content enricher

eip content enricher

Enterprise Integration Pattern Content Enricher appends information to an existing message from an external source. It uses information inside the incoming message to perform the enrichment operation.

We will implement a simplified version of the EIP :

  1. Input message : represented by a MongoDB document.

  2. Enricher : our application.

  3. Resource : call to a RESTful API.

  4. Output message : we will keep only the enriched document.

1.1. Integration flow

integration flow schema

The application will read the address documents, add the product and save the enriched documents to the MongoDB database.

2. Project setup

2.2. Generation

We generate the project skeleton from Spring Initializr.

2.3. Structure

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
.
│  .gitignore
│  docker-compose.yml
│  pom.xml
│  README.adoc
├───data
│  ├───mongodb
│  │     address.ndjson
│  └───product
│        db.json
└───src
   ├───main
   │  ├───java
   │  │  └───com
   │  │     └───maoudia
   │  │        └───tutorial
   │  │             Application.java
   │  │             AppProperties.java
   │  │             CollectionService.java
   │  │             NetworkConfig.java
   │  └───resources
   │        application.yml
   └───test
      └───java
        └───com
           └───maoudia
              └───tutorial
                   CollectionServiceTest.java

2.4. Containers

Download data directory to the root of the project.

We use docker-compose to create the needed containers for this tutorial.

docker-compose.yml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
services:
  mongodb: (1)
    container_name: maoudia-mongodb
    image: mongo:7.0.2
    environment:
      - MONGO_INITDB_DATABASE=test
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=password
    networks:
      - mongodb-network
    ports:
      - 15015:27017
    volumes:
      - ./data/mongodb:/data/mongodb

  mongo-express: (2)
    container_name: maoudia-mongo-express
    image: mongo-express:1.0.0
    depends_on:
      - mongodb
    networks:
      - mongodb-network
    environment:
      - ME_CONFIG_MONGODB_SERVER=maoudia-mongodb
      - ME_CONFIG_MONGODB_ADMINUSERNAME=admin
      - ME_CONFIG_MONGODB_ADMINPASSWORD=password
      - ME_CONFIG_BASICAUTH_USERNAME=admin
      - ME_CONFIG_BASICAUTH_PASSWORD=password
    ports:
      - 1515:8081
    volumes:
      - ./data/mongodb:/data/mongodb

  product-api: (3)
    container_name: maoudia-product-api
    image: clue/json-server:latest
    ports:
      - 1519:80
    volumes:
      - ./data/product/db.json:/data/db.json

networks:
  mongodb-network:
    driver: bridge
1MongoDB initialized with the test database.
2MongoExpress is a MongoDB administration interface.
3Product API which is configured from db.json file.

We start up the services :

1
docker-compose up -d

2.5. Data initialization

We use a JSON document from the French address database.

Address
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "id": "59350",
  "type": "municipality",
  "name": "Lille",
  "postcode": [
    "59000",
    "59800",
    "59260",
    "59777",
    "59160"
  ],
  "citycode": "59350",
  "x": 703219.96,
  "y": 7059335.72,
  "lon": 3.045433,
  "lat": 50.630992,
  "population": 234475,
  "city": "Lille",
  "context": "59, Nord, Hauts-de-France",
  "importance": 0.56333
}

Import address collection :

1
mongoimport --uri "mongodb://admin:password@localhost:15015" --authenticationDatabase=admin --db test --collection address ./data/mongodb/address.ndjson

Or:

We use MongoExpress which is available at http://localhost:1515.

Product represents a satellite internet offer.

Product
1
2
3
4
5
6
7
{
  "id": 1,
  "available": true,
  "company": "SPACEX",
  "provider": "STARLINK",
  "type": "SATELLITE"
}

Product API is available at http://localhost:1519.

3. Application

3.1. Configuration

We change file extension from application.properties to application.yml.

application.yml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
app:
  buffer-max-size: 500
  bulk-size: 100
  collection-name: address
  enriching-key: product
  enriching-uri: http://localhost:1519/products/1
spring:
  main:
    web-application-type: none
  data:
    mongodb:
      database: test
      uri: mongodb://admin:password@localhost:15015
---
spring.config.activate.on-profile: dev
logging:
  level:
    org.mongodb.driver: debug
---
spring.config.activate.on-profile: test
app:
  bulk-size: 2

We declare a class which contains application configuration properties.

AppProperties.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@ConfigurationProperties(prefix = "app")
@Validated
public record AppProperties(
        @DefaultValue("128")
        @Positive
        int bulkSize,

        @DefaultValue("1024")
        @Positive
        int bufferMaxSize,

        @NotBlank
        String collectionName,

        @NotBlank
        String enrichingKey,

        @NotNull
        URI enrichingUri
) {
}

We create a @Bean of Spring non-blocking HTTP client.

NetworkConfig.java
1
2
3
4
5
6
7
8
9
@Configuration
public class NetworkConfig {

    @Bean
    public WebClient client() {
        return WebClient.create();
    }

}

3.2. Implementation

We create a @Service which contains application business logic.

CollectionService.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class CollectionService {
    private final AppProperties properties;
    private final ReactiveMongoTemplate template;
    private final WebClient client;

    public CollectionService(AppProperties properties,
                             ReactiveMongoTemplate template,
                             WebClient client) {
        this.properties = properties;
        this.template = template;
        this.client = client;
    }

    public Flux<BulkWriteResult> enrichAll(String collectionName, String enrichingKey, String enrichingUri) {
            return template.findAll(Document.class, collectionName) (1)
                    .onBackpressureBuffer(properties.bufferMaxSize()) (2)
                    .flatMap(document -> enrich(document,  enrichingKey, enrichingUri)) (3)
                    .map(CollectionService::toReplaceOneModel) (4)
                    .window(properties.bulkSize()) (5)
                    .flatMap(replaceOneModelFlux -> bulkWrite(replaceOneModelFlux, collectionName)); (6)
    }
}
1Creates a stream of documents from the collection.
2Limits the maximum number of loaded documents in the RAM in case of consumption process is slower than production. If the maximum buffer size is exceeded, an IllegalStateException is thrown.
3Enriches document asynchronously with the external one.
4Creates a ReplaceOneModel from document.
5Group documents into streams of fixed size. The last stream can be smaller.
6Calls bulk write function.

Configuration property app.bulk-size can be adjusted according to the project needs and available hardware resources. The larger the value of the maximum size, the higher the memory consumption and the size of the requests.

We create document enrichment functions.

CollectionService.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private Publisher<Document> enrich(Document document, String enrichingKey, String enrichingUri) { (1)
    return getEnrichingDocument(enrichingUri)
            .map(enrichingDocument -> {
                document.put(enrichingKey, enrichingDocument);
                document.put("updatedAt", new Date());
                return document;
            });
}

private Mono<Document> getEnrichingDocument(String enrichingUri) { (2)
    return client.get()
            .uri(URI.create(enrichingUri))
            .retrieve()
            .bodyToMono(Document.class);
}
1Adds the retrieved document from HTTP call to root of document to be enriched with the key passed in parameter.
2Retrieves a document from an URI.

MongoDB converts and stores dates in UTC by default.

CollectionService.java
1
2
3
4
5
6
7
8
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions(); (1)
private static ReplaceOneModel<Document> toReplaceOneModel (Document document) {
    return new ReplaceOneModel<>(
            Filters.eq("_id", document.get("_id")), (2)
            document, (3)
            REPLACE_OPTIONS
    );
}
1Instantiates default replacement configuration.
2Filter that allows matching by document identifier.
3Content to be replaced, represents the complete enriched document.
CollectionService.java
1
2
3
4
5
6
private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions().ordered(false); (1)
private Flux<BulkWriteResult> bulkWrite(Flux<ReplaceOneModel<Document>> updateOneModelFlux, String collectionName) {
    return updateOneModelFlux.collectList() (2)
            .flatMapMany(unused -> template.getCollection(collectionName) (3)
                    .flatMapMany(collection -> collection.bulkWrite(updateOneModels, BULK_WRITE_OPTIONS))); (4)
}
1Instantiates writing options with disabling operations order.
2Collects the stream into a list.
3Retrieves the collection passed as a parameter.
4Bulk writes documents into MongoDB collection.

Transactions are supported on Replicaset since MongoDB 4.2. If transactions are enabled, we can use @Transactional or TransactionalOperator to make a method transactional.

We implement the following interfaces:

  • CommandLineRunner : runs enrichment command at application startup.

  • ExitCodeGenerator : manages application system exit code.

Application.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SpringBootApplication(exclude = MongoReactiveRepositoriesAutoConfiguration.class) (1)
@ConfigurationPropertiesScan("com.maoudia.tutorial") (2)
public class Application implements CommandLineRunner, ExitCodeGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
    private final AppProperties properties;
    private final CollectionService service;
    private int exitCode = 255;

    public static void main(String[] args) {
        System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
    }

    public Application(AppProperties properties, CollectionService service) {
        this.properties = properties;
        this.service = service;
    }

    @Override
    public void run(final String... args) {
        service.enrichAll(properties.collectionName(), properties.enrichingKey(), properties.enrichingUri())
                .doOnSubscribe(unused -> LOGGER.info("------------------< Staring Collection Enriching Command >-------------------")) (3)
                .doOnNext(bulkWriteResult -> LOGGER.info("Bulk write result with {} modified document(s)", bulkWriteResult.getModifiedCount()))
                .doOnError(throwable -> {
                    exitCode = 1;
                    LOGGER.error("Collection enriching failed due to : {}", throwable.getMessage(), throwable);
                })
                .doOnComplete(() -> exitCode = 0)
                .doOnTerminate(() -> LOGGER.info("------------------< Collection Enriching Command Finished >------------------"))
                .blockLast(); (4)
    }

    @Override
    public int getExitCode() {
        return exitCode;
    }

}
1Disables auto-configuration of repositories, as we use MongoReactiveTemplate only.
2Allows scanning and detecting beans that carry the @ConfigProperties annotation.
3Subscribing to stream triggers the processing.
4Without a running web server, we have to subscribe indefinitely to the Publisher in order to trigger and wait until the end of the execution.

3.3. Demo

We launch the application :

1
mvn spring-boot:run

Output :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
...
2023-11-10T02:02:58.673+01:00  INFO 84802 --- [           main] com.maoudia.tutorial.Application         : Started Application in 0.831 seconds (process running for 0.992)
2023-11-10T02:02:58.725+01:00  INFO 84802 --- [           main] com.maoudia.tutorial.Application         : ------------------< Staring Collection Enriching Command >-------------------
2023-11-10T02:02:59.186+01:00  INFO 84802 --- [ntLoopGroup-2-4] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.244+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.290+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.357+01:00  INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.438+01:00  INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.503+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.578+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.632+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.727+01:00  INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.776+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.776+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : ------------------< Collection Enriching Command Finished >------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  7.282 s
[INFO] Finished at: 2023-11-10T02:03:03+01:00
[INFO] ------------------------------------------------------------------------

3.4. VisuelVM report

VisualVM is a lightweight profiling tool. It is used to have an overview of the threads which are launched by the application.

visuelvm report

There are two groups of threads that execute operations in parallel, each group forms an event loop.

  • MongoDB requests are executed by nioEventLoopGroup.

  • HTTP requests are executed by reactor-http-nio.

4. Integration tests

We use JUnit 5 and the Testcontainers MongoDB module for the integration tests. It allows to have a feedback close to the real behaviour of the application which essentially do read/write operations.

To keep this tutorial short, we will only write one test.

CollectionServiceTest.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Profile("test")
@SpringBootTest
@Testcontainers (1)
class CollectionServiceTest {

    @Container
    public static GenericContainer<?> jsonServerContainer = new GenericContainer<>("clue/json-server:latest")
            .withExposedPorts(80)
            .withFileSystemBind("./data/product/db.json", "/data/db.json", BindMode.READ_ONLY)
            .waitingFor(Wait.forHttp("/").forStatusCode(200).forPort(80))
            .withReuse(true); (2)

    @Container
    private static final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:7.0.2");

    @DynamicPropertySource
    private static void setProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.data.mongodb.uri", mongoDBContainer::getReplicaSetUrl); (3)
        registry.add("app.enriching-uri", () -> "http://" + jsonServerContainer.getHost() + ":" + jsonServerContainer.getMappedPort(80) + "/products/1");
    }

    @Autowired
    private AppProperties properties;
    @Autowired
    private CollectionService command;
    @Autowired
    private ReactiveMongoTemplate template;

    @Test
    void multipleBulkWriteResultsAreReturned() {
        Document givenDocument1 = new Document();
        givenDocument1.put("_id", "628ea3edb5110304e5e814f6");
        givenDocument1.put("type", "municipality");
        Document givenDocument2 = new Document();
        givenDocument2.put("_id", "628ea3edb5110304e5e814f7");
        givenDocument2.put("type", "street");
        Document givenDocument3 = new Document();
        givenDocument3.put("_id", "628ea3edb5110304e5e814f8");
        givenDocument3.put("type", "housenumber");

        template.insert(Arrays.asList(givenDocument1, givenDocument2, givenDocument3), properties.collectionName()).blockLast();

        BulkWriteResult expectedBulkWriteResult1 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 2, 2, Collections.emptyList(), Collections.emptyList());
        BulkWriteResult expectedBulkWriteResult2 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 1, 1, Collections.emptyList(), Collections.emptyList());

        command.enrichAll( properties.collectionName(), properties.enrichingKey() , properties.enrichingUri())
                .as(StepVerifier::create) (4)
                .expectNext(expectedBulkWriteResult1)
                .expectNext(expectedBulkWriteResult2)
                .verifyComplete();
    }
}
1Adds TestContainers Junit 5 extension.
2Starts a MongoDB container.
3Sets up application with container’s URI.
4Uses StepVerifier from Reactor Test to assert output stream.

We launch the integration tests :

1
mvn test -Dspring.profiles.active=test

Test results :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
...
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.098 s - in com.maoudia.tutorial.CollectionServiceTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  11.539 s
[INFO] Finished at: 2023-11-10T02:06:45+01:00
[INFO] ------------------------------------------------------------------------

5. Conclusion

In this tutorial, we managed to implement a complete solution to enrich and update efficiently a MongoDB collection. Moreover, we have seen how to write integration tests with JUnit 5 and Testcontainers.

The complete source code is available on Github.

In the next chapter of MongoDB Reactive CLI series, we will add new features and use Picocli to facilitate interactions with the application.