Moncef AOUDIA

Développeur logiciel - Passionné d'OSS
en | fr

Mise À Jour En Masse Avec Spring Data MongoDB Reactive

Publié le 2022-06-20 | Mis à jour le 2024-02-09 | 10 mins lecture | Tutorial Reactive Programming

Afin de mettre à jour les documents d’une collection MongoDB, on passe souvent par des requêtes de mise à jour, si le volume des données est conséquent, cela peut conduire à des problèmes de performance et une surconsommation des ressources matérielles.

On va implémenter une solution pour enrichir et mettre à jour efficacement un grand volume de données en utilisant Spring Data MongoDB Reactive.

Avant de continuer la lecture, si vous n’êtes pas familier avec la pile réactive de Spring et MongoDB, je suggère de consulter la section ressources.

1. EIP enrichisseur de contenu

eip enrichisseur de contenu

Le modèle d’intégration d’entreprise enrichisseur de contenu permet d’ajouter des informations à un message existant depuis une source externe. Il utilise les informations contenues dans le message entrant pour effectuer l’opération d’enrichissement.

On va implémenter une version simplifiée de l'EIP :

  1. Message en entrée : représenté par un document MongoDB.

  2. Enrichisseur : notre application.

  3. Ressource : appel à une API RESTful.

  4. Message en sortie : nous ne conserverons que le document enrichi.

1.1. Flux d’intégration

schema du flux d'intégration

L’application va lire les documents adresse, ajouter le produit et sauvegarder les documents enrichis dans la base MongoDB.

2. Configuration du projet

2.2. Génération

On génère le squelette du projet depuis Spring Initializr.

2.3. Structure

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
.
│  .gitignore
│  docker-compose.yml
│  pom.xml
│  README.adoc
├───data
│  ├───mongodb
│  │     address.ndjson
│  └───product
│        db.json
└───src
   ├───main
   │  ├───java
   │  │  └───com
   │  │     └───maoudia
   │  │        └───tutorial
   │  │             Application.java
   │  │             AppProperties.java
   │  │             CollectionService.java
   │  │             NetworkConfig.java
   │  └───resources
   │        application.yml
   └───test
      └───java
        └───com
           └───maoudia
              └───tutorial
                   CollectionServiceTest.java

2.4. Conteneurs

On télécharge le dossier data vers la racine du projet.

On utilise docker-compose pour créer les conteneurs nécessaires pour ce tutoriel.

docker-compose.yml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
services:
  mongodb: (1)
    container_name: maoudia-mongodb
    image: mongo:7.0.2
    environment:
      - MONGO_INITDB_DATABASE=test
      - MONGO_INITDB_ROOT_USERNAME=admin
      - MONGO_INITDB_ROOT_PASSWORD=password
    networks:
      - mongodb-network
    ports:
      - 15015:27017
    volumes:
      - ./data/mongodb:/data/mongodb

  mongo-express: (2)
    container_name: maoudia-mongo-express
    image: mongo-express:1.0.0
    depends_on:
      - mongodb
    networks:
      - mongodb-network
    environment:
      - ME_CONFIG_MONGODB_SERVER=maoudia-mongodb
      - ME_CONFIG_MONGODB_ADMINUSERNAME=admin
      - ME_CONFIG_MONGODB_ADMINPASSWORD=password
      - ME_CONFIG_BASICAUTH_USERNAME=admin
      - ME_CONFIG_BASICAUTH_PASSWORD=password
    ports:
      - 1515:8081
    volumes:
      - ./data/mongodb:/data/mongodb

  product-api: (3)
    container_name: maoudia-product-api
    image: clue/json-server:latest
    ports:
      - 1519:80
    volumes:
      - ./data/product/db.json:/data/db.json

networks:
  mongodb-network:
    driver: bridge
1MongoDB initialisé avec la base de données test.
2MongoExpress est une interface d’administration MongoDB.
3L’API produit est configurée depuis le fichier db.json.

On démarre les services :

1
docker-compose up -d

2.5. Initialisation des données

On utilise un document JSON issu de la base d’adresses française.

Adresse
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{
  "id": "59350",
  "type": "municipality",
  "name": "Lille",
  "postcode": [
    "59000",
    "59800",
    "59260",
    "59777",
    "59160"
  ],
  "citycode": "59350",
  "x": 703219.96,
  "y": 7059335.72,
  "lon": 3.045433,
  "lat": 50.630992,
  "population": 234475,
  "city": "Lille",
  "context": "59, Nord, Hauts-de-France",
  "importance": 0.56333
}

On importe la collection d’adresses :

1
mongoimport --uri "mongodb://admin:password@localhost:15015" --authenticationDatabase=admin --db test --collection address ./data/mongodb/address.ndjson

Ou :

On utilise MongoExpress qui est accessible sur http://localhost:1515.

Le produit représente une offre d’internet par satellite.

Produit
1
2
3
4
5
6
7
{
  "id": 1,
  "available": true,
  "company": "SPACEX",
  "provider": "STARLINK",
  "type": "SATELLITE"
}

L’API produit est accessible sur http://localhost:1519.

3. Application

3.1. Configuration

On change l’extension du fichier de application.properties vers application.yml.

application.yml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
app:
  buffer-max-size: 500
  bulk-size: 100
  collection-name: address
  enriching-key: product
  enriching-uri: http://localhost:1519/products/1
spring:
  main:
    web-application-type: none
  data:
    mongodb:
      database: test
      uri: mongodb://admin:password@localhost:15015
---
spring.config.activate.on-profile: dev
logging:
  level:
    org.mongodb.driver: debug
---
spring.config.activate.on-profile: test
app:
  bulk-size: 2

On déclare une classe qui va contenir les propriétés de configuration de l’application.

AppProperties.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
@ConfigurationProperties(prefix = "app")
@Validated
public record AppProperties(
        @DefaultValue("128")
        @Positive
        int bulkSize,

        @DefaultValue("1024")
        @Positive
        int bufferMaxSize,

        @NotBlank
        String collectionName,

        @NotBlank
        String enrichingKey,

        @NotNull
        URI enrichingUri
) {
}

On crée un @Bean du client HTTP non bloquant de Spring.

NetworkConfig.java
1
2
3
4
5
6
7
8
9
@Configuration
public class NetworkConfig {

    @Bean
    public WebClient client() {
        return WebClient.create();
    }

}

3.2. Implémentation

On crée le @Service qui va contenir la logique métier de l’application.

CollectionService.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Service
public class CollectionService {
    private final AppProperties properties;
    private final ReactiveMongoTemplate template;
    private final WebClient client;

    public CollectionService(AppProperties properties,
                             ReactiveMongoTemplate template,
                             WebClient client) {
        this.properties = properties;
        this.template = template;
        this.client = client;
    }

    public Flux<BulkWriteResult> enrichAll(String collectionName, String enrichingKey, String enrichingUri) {
            return template.findAll(Document.class, collectionName) (1)
                    .onBackpressureBuffer(properties.bufferMaxSize()) (2)
                    .flatMap(document -> enrich(document,  enrichingKey, enrichingUri)) (3)
                    .map(CollectionService::toReplaceOneModel) (4)
                    .window(properties.bulkSize()) (5)
                    .flatMap(replaceOneModelFlux -> bulkWrite(replaceOneModelFlux, collectionName)); (6)
    }
}
1Crée un flux de documents à partir de la collection.
2Limite le nombre maximum de documents chargés dans la RAM en cas de consommation plus lente que la production. Si la taille maximale du tampon est dépassée, une IllegalStateException est levée.
3Enrichie le document avec le document externe d’une façon asynchrone.
4Crée un ReplaceOneModel à partir du document.
5Regroupe les documents en flux de taille fixe. Le dernier flux peut être de taille inférieure.
6Appel la fonction d’écriture en masse.

La propriété de configuration app.bulk-size peut être ajustée en fonction des besoins et ressources matérielles disponibles. Plus la taille du bulk est grande, plus la consommation de mémoire et la taille des requêtes seront élevées.

On crée les fonctions d’enrichissement de document.

CollectionService.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
private Publisher<Document> enrich(Document document, String enrichingKey, String enrichingUri) { (1)
    return getEnrichingDocument(enrichingUri)
            .map(enrichingDocument -> {
                document.put(enrichingKey, enrichingDocument);
                document.put("updatedAt", new Date());
                return document;
            });
}

private Mono<Document> getEnrichingDocument(String enrichingUri) { (2)
    return client.get()
            .uri(URI.create(enrichingUri))
            .retrieve()
            .bodyToMono(Document.class);
}
1Ajoute le document récupéré depuis l’appel HTTP à la racine du document à enrichir avec la clef passée en paramètre.
2Récupère le document depuis l'URI.

MongoDB convertie et stocke les dates en UTC par défaut.

CollectionService.java
1
2
3
4
5
6
7
8
private static final ReplaceOptions REPLACE_OPTIONS = new ReplaceOptions(); (1)
private static ReplaceOneModel<Document> toReplaceOneModel (Document document) {
    return new ReplaceOneModel<>(
            Filters.eq("_id", document.get("_id")), (2)
            document, (3)
            REPLACE_OPTIONS
    );
}
1Instancie la configuration de remplacement par défaut.
2Le filtre permet la correspondance par identifiant document.
3Le contenu à remplacer, représente l’intégralité du document enrichi.
CollectionService.java
1
2
3
4
5
6
private static final BulkWriteOptions BULK_WRITE_OPTIONS = new BulkWriteOptions().ordered(false); (1)
private Flux<BulkWriteResult> bulkWrite(Flux<ReplaceOneModel<Document>> updateOneModelFlux, String collectionName) {
    return updateOneModelFlux.collectList() (2)
            .flatMapMany(unused -> template.getCollection(collectionName) (3)
                    .flatMapMany(collection -> collection.bulkWrite(updateOneModels, BULK_WRITE_OPTIONS))); (4)
}
1Instancie les options d’écritures en désactivant l’ordre des opérations.
2Collecte le flux dans une liste.
3Récupère la collection passée en paramètre.
4Écrit en masse les documents dans la collection MongoDB.

Les transactions sont supportées sur les Replicaset depuis MongoDB 4.2. Si les transactions sont activées, on peut utiliser @Transactional ou TransactionalOperator pour rendre une méthode transactionnelle.

On implémente les interfaces suivantes :

  • CommandLineRunner : exécute la commande d’enrichissement au démarrage de l’application.

  • ExitCodeGenerator : gère le code de sortie système.

Application.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@SpringBootApplication(exclude = MongoReactiveRepositoriesAutoConfiguration.class) (1)
@ConfigurationPropertiesScan("com.maoudia.tutorial") (2)
public class Application implements CommandLineRunner, ExitCodeGenerator {
    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
    private final AppProperties properties;
    private final CollectionService service;
    private int exitCode = 255;

    public static void main(String[] args) {
        System.exit(SpringApplication.exit(SpringApplication.run(Application.class, args)));
    }

    public Application(AppProperties properties, CollectionService service) {
        this.properties = properties;
        this.service = service;
    }

    @Override
    public void run(final String... args) {
        service.enrichAll(properties.collectionName(), properties.enrichingKey(), properties.enrichingUri())
                .doOnSubscribe(unused -> LOGGER.info("------------------< Staring Collection Enriching Command >-------------------")) (3)
                .doOnNext(bulkWriteResult -> LOGGER.info("Bulk write result with {} modified document(s)", bulkWriteResult.getModifiedCount()))
                .doOnError(throwable -> {
                    exitCode = 1;
                    LOGGER.error("Collection enriching failed due to : {}", throwable.getMessage(), throwable);
                })
                .doOnComplete(() -> exitCode = 0)
                .doOnTerminate(() -> LOGGER.info("------------------< Collection Enriching Command Finished >------------------"))
                .blockLast(); (4)
    }

    @Override
    public int getExitCode() {
        return exitCode;
    }

}
1Désactive l’auto-configuration des repositories, car on utilise MongoReactiveTemplate seulement.
2Permet de scanner et détecter les beans qui portent l’annotation @ConfigProperties.
3L’inscription au flux déclenche le traitement.
4Sans un serveur web en fonctionnement, nous devons nous abonner indéfiniment au Publisher afin de déclencher et attendre la fin de l’exécution.

3.3. Démo

On lance l’application :

1
mvn spring-boot:run

Sortie :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
...
2023-11-10T02:02:58.673+01:00  INFO 84802 --- [           main] com.maoudia.tutorial.Application         : Started Application in 0.831 seconds (process running for 0.992)
2023-11-10T02:02:58.725+01:00  INFO 84802 --- [           main] com.maoudia.tutorial.Application         : ------------------< Staring Collection Enriching Command >-------------------
2023-11-10T02:02:59.186+01:00  INFO 84802 --- [ntLoopGroup-2-4] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.244+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.290+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.357+01:00  INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.438+01:00  INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.503+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.578+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.632+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.727+01:00  INFO 84802 --- [ntLoopGroup-2-3] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.776+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : Bulk write result with 100 modified document(s)
2023-11-10T02:02:59.776+01:00  INFO 84802 --- [ntLoopGroup-2-5] com.maoudia.tutorial.Application         : ------------------< Collection Enriching Command Finished >------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  7.282 s
[INFO] Finished at: 2023-11-10T02:03:03+01:00
[INFO] ------------------------------------------------------------------------

3.4. Rapport VisuelVM

VisualVM est un outil de profilage léger. On l’utilise pour avoir une vue d’ensemble sur les threads qui sont lancés par l’application.

rapport VisualVM

On observe deux groupes de threads qui exécutent les opérations en parallèle, chaque groupe forme une l'event loop.

  • Les requêtes MongoDB sont exécutées par nioEventLoopGroup.

  • Les requêtes HTTP sont exécutées par reactor-http-nio.

4. Tests d’intégration

On utilise JUnit 5 et le module Testcontainers MongoDB pour les tests d’intégration. Cela permet d’avoir un retour proche du comportement réel de l’application qui fait essentiellement des opérations de lecture/écriture.

Pour que ce tutoriel reste court, on va se contenter d’écrire qu’un seul test.

CollectionServiceTest.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Profile("test")
@SpringBootTest
@Testcontainers (1)
class CollectionServiceTest {

    @Container
    public static GenericContainer<?> jsonServerContainer = new GenericContainer<>("clue/json-server:latest")
            .withExposedPorts(80)
            .withFileSystemBind("./data/product/db.json", "/data/db.json", BindMode.READ_ONLY)
            .waitingFor(Wait.forHttp("/").forStatusCode(200).forPort(80))
            .withReuse(true); (2)

    @Container
    private static final MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:7.0.2");

    @DynamicPropertySource
    private static void setProperties(DynamicPropertyRegistry registry) {
        registry.add("spring.data.mongodb.uri", mongoDBContainer::getReplicaSetUrl); (3)
        registry.add("app.enriching-uri", () -> "http://" + jsonServerContainer.getHost() + ":" + jsonServerContainer.getMappedPort(80) + "/products/1");
    }

    @Autowired
    private AppProperties properties;
    @Autowired
    private CollectionService command;
    @Autowired
    private ReactiveMongoTemplate template;

    @Test
    void multipleBulkWriteResultsAreReturned() {
        Document givenDocument1 = new Document();
        givenDocument1.put("_id", "628ea3edb5110304e5e814f6");
        givenDocument1.put("type", "municipality");
        Document givenDocument2 = new Document();
        givenDocument2.put("_id", "628ea3edb5110304e5e814f7");
        givenDocument2.put("type", "street");
        Document givenDocument3 = new Document();
        givenDocument3.put("_id", "628ea3edb5110304e5e814f8");
        givenDocument3.put("type", "housenumber");

        template.insert(Arrays.asList(givenDocument1, givenDocument2, givenDocument3), properties.collectionName()).blockLast();

        BulkWriteResult expectedBulkWriteResult1 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 2, 2, Collections.emptyList(),
                Collections.emptyList());
        BulkWriteResult expectedBulkWriteResult2 = BulkWriteResult.acknowledged(WriteRequest.Type.REPLACE, 1, 1, Collections.emptyList(),
                Collections.emptyList());

        command.enrichAll( properties.collectionName(), properties.enrichingKey() , properties.enrichingUri())
                .as(StepVerifier::create) (4)
                .expectNext(expectedBulkWriteResult1)
                .expectNext(expectedBulkWriteResult2)
                .verifyComplete();
    }
}
1Ajoute l’extension Junit 5 de TestContainers.
2Démarre un conteneur MongoDB.
3Configure l’application avec l’URI du conteneur.
4Utilise StepVerifier de Reactor Test pour faire des assertions sur le flux en sortie.

On lance les tests d’intégration :

1
mvn test -Dspring.profiles.active=test

Résultats des tests :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
...
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.098 s - in com.maoudia.tutorial.CollectionServiceTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  11.539 s
[INFO] Finished at: 2023-11-10T02:06:45+01:00
[INFO] ------------------------------------------------------------------------

5. Conclusion

Dans ce tutoriel, on a réussi à implémenter une solution complète pour enrichir et mettre à jour efficacement une collection MongoDB. De plus, on a vu comment écrire des tests d’intégration avec JUnit 5 et Testcontainers.

Le code source complet est disponible sur Github.

Dans le prochain chapitre de la série MongoDB Reactive CLI, on ajoutera de nouvelles fonctionnalités et utilisera Picocli afin de faciliter les interactions avec l’application.