en | fr

Reactor Disposable Management: Composite and Swap

Published on 2024-05-20 | Updated on 2025-12-02 | 10 mins read | Tutorial Reactive Programming

Reactor Disposable provides a mechanism for managing resources, subscriptions, or actions in a reactive application. This guide explores three main types: Disposable, Disposable.Composite, and Disposable.Swap, with practical examples using Spring Boot.

1. Overview

In reactive programming, proper resource management is crucial to prevent memory leaks and ensure clean shutdown of applications. Project Reactor provides the Disposable interface and its variants to manage the lifecycle of reactive streams and associated resources.

This tutorial demonstrates practical usage of these disposables in a Spring Boot application with a user search service that manages asynchronous operations.

2. Types of Disposables

2.1. Disposable

Represents a single resource that can be disposed of when it’s no longer needed, typically used for cleanup purposes.

Diagram

2.1.1. Real-World Usage

  • Messaging Application: Manage subscriptions to message queues. When a user subscribes to a topic, create a disposable subscription. When they unsubscribe, call dispose() to release resources.

  • File Processing System: Manage file streams. When a file is opened for processing, create a disposable stream. When processing completes or an error occurs, dispose the stream to release system resources.

2.1.2. Example

Disposable subscription = flux
    .subscribe(
        value -> System.out.println(value),
        error -> System.err.println(error),
        () -> System.out.println("Complete")
    );

subscription.dispose();

2.2. Disposable.Composite

Represents a collection of disposables that can be managed as a single unit, allowing for the disposal of multiple resources at once.

Diagram

2.2.1. Real-World Usage

  • Financial Trading Application: Manage multiple subscriptions to market data feeds, trade execution services, and risk management systems. Dispose of all subscriptions together when the trading system shuts down.

  • IoT System: Manage multiple sensor readings or device connections. Dispose of all connections together when the IoT device is turned off or removed from the network.

2.2.2. Example

Disposable.Composite composite = Disposables.composite();

composite.add(subscription1);
composite.add(subscription2);
composite.add(subscription3);

composite.dispose();

2.3. Disposable.Swap

Allows for the dynamic replacement of one disposable with another, enabling seamless transitions between different resources or actions.

Diagram

2.3.1. Real-World Usage

  • Chat Application: Manage connections to chat servers. When a user switches chat rooms, replace the current connection with a new one using Disposable.Swap, enabling seamless room transitions.

  • Gaming Server: Manage player sessions. When a player logs in or joins a game, create a disposable session. Replace sessions smoothly as players move between games.

2.3.2. Disposable.Swap vs switchMap

AspectswitchMapDisposable.Swap

Control

Automatic switching

Manual control

Cancellation

Handled automatically

Requires explicit handling

Use Case

Stream transformation

Resource management

Complexity

Simpler for stream operations

More control for resources

  • switchMap: Automatically switches between Publishers, discarding previous ones, without user intervention for cancellation handling.

  • Disposable.Swap: Allows manual cancellation handling for seamless replacement of disposables, providing more control over resource lifecycle.

3. Project Setup

3.2. Maven Configuration

pom.xml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.2</version>
        <relativePath/>
    </parent>
    <groupId>com.maoudia</groupId>
    <artifactId>app</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>maoudia-app</name>
    <description>MAOUDIA APP</description>

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

4. Domain Model

4.1. User Record

User.java
1
2
3
4
5
6
7
package com.maoudia;

public record User(
        String username,
        String firstName,
        String lastName,
        int age) {}

4.2. UserRepository Interface

UserRepository.java
1
2
3
4
5
6
7
8
package com.maoudia;

import reactor.core.publisher.Flux;

public interface UserRepository {

    Flux<User> searchByUsername(String username);
}

5. Implementation

5.1. Fake Repository Implementation

This implementation simulates a real repository with random latency to demonstrate async behavior:

FakeUserRepository.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.maoudia;

import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

@Component
public class FakeUserRepository implements UserRepository {
    private static final Random random = new Random();

    @Override
    public Flux<User> searchByUsername(String username) { (1)
        List<User> users = Arrays.asList(
                new User("user1", "John", "Doe", 30),
                new User("user2", "Jane", "Smith", 25),
                new User("user3", "Alice", "Johnson", 35),
                new User("user4", "Michael", "Brown", 40),
                new User("user5", "Emma", "Wilson", 28)
        );

        return Flux.fromIterable(users)
                .filter(user -> user.username().startsWith(username)) (2)
                .delayElements(Duration.ofMillis(random.nextInt(1000) + 1), Schedulers.boundedElastic()); (3)
    }
}
1Searches for users by username, returning a reactive stream.
2Filters users whose usernames start with the provided search term.
3Introduces random latency (up to 1 second) to simulate real-world async operations.

5.2. UserService with Disposable.Swap

This service demonstrates the usage of Disposable.Swap for managing user search operations:

UserService.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.maoudia;

import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.scheduler.Schedulers;

import java.util.UUID;

@Service
public class UserService {
    private static final Logger LOGGER = LoggerFactory.getLogger(UserService.class);

    private final UserRepository userRepository;
    private final Disposable.Swap disposableSwap; (1)

    public UserService(UserRepository userRepository) {
        this.userRepository = userRepository;
        this.disposableSwap = Disposables.swap(); (2)
    }

    public Disposable searchUsers(String query) { (3)
        Disposable nextDisposable = this.userRepository.searchByUsername(query)
                .doOnCancel(() -> LOGGER.warn("User search for '{}' cancelled", query)) (4)
                .doOnSubscribe(s -> LOGGER.info("User search for '{}' subscribed: {}", query, UUID.randomUUID())) (5)
                .subscribeOn(Schedulers.boundedElastic()) (6)
                .subscribe(
                        user -> LOGGER.info("User search '{}' found: {}", query, user.username()),
                        error -> LOGGER.error("Error during User search for '{}'", query, error),
                        () -> LOGGER.info("User search for '{}' completed", query)
                );

        disposableSwap.update(nextDisposable); (7)

        return disposableSwap;
    }

    @PreDestroy (8)
    public void cleanUp() {
        if (!this.disposableSwap.isDisposed()) {
            this.disposableSwap.dispose();
            LOGGER.info("Disposed of disposableSwap");
        }
    }
}
1Declares a Disposable.Swap instance to manage search operations.
2Initializes the swap container in the constructor.
3Performs asynchronous user search and returns the disposable.
4Logs when a search operation is cancelled.
5Logs when a search operation is subscribed with a unique ID.
6Executes the search on the bounded elastic scheduler for blocking operations.
7Atomically replaces the previous disposable with the new one, disposing the old one.
8Ensures proper cleanup when the service bean is destroyed.

5.3. How Disposable.Swap Works

When searchUsers() is called multiple times:

  1. First call creates a disposable for "user1" search

  2. Second call creates a disposable for "user3" search and automatically disposes the "user1" search

  3. Third call creates a disposable for "user" search and automatically disposes the "user3" search

Only the most recent search remains active. Previous searches are cancelled automatically.

6. Testing

6.1. Single Search Test

UserServiceTest.java (excerpt)
1
2
3
4
5
6
7
8
9
@Test
@DisplayName("Single search for users by username")
void singleSearchUsersByUsername() {
    Disposable disposable = userService.searchUsers("user1");

    Assertions.assertThat(disposable)
            .isNotNull()
            .isInstanceOf(Disposable.Swap.class);
}

6.2. Multiple Searches Test

This test demonstrates how Disposable.Swap automatically disposes previous searches:

UserServiceTest.java (excerpt)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
@Test
@DisplayName("Multiple searches for users by username")
void multipleSearchUsersByUsername() {
    Disposable disposable1 = userService.searchUsers("user5"); (1)
    Disposable.Swap disposableSwap1 = (Disposable.Swap) disposable1;
    disposable1 = disposableSwap1.get(); (2)

    Disposable disposable2 = userService.searchUsers("user3"); (3)
    Disposable.Swap disposableSwap2 = (Disposable.Swap) disposable2;
    disposable2 = disposableSwap2.get();

    Disposable disposable3 = userService.searchUsers("user"); (4)

    Assertions.assertThat(disposable1.isDisposed()).isTrue(); (5)
    Assertions.assertThat(disposable2.isDisposed()).isTrue();

    Assertions.assertThat(disposable3)
            .isNotNull()
            .isInstanceOf(Disposable.Swap.class);
}
1First search for "user5" - creates disposable1.
2Extract the inner disposable from the swap container.
3Second search for "user3" - creates disposable2 and disposes disposable1.
4Third search for "user" - creates disposable3 and disposes disposable2.
5Verify that previous disposables are disposed, only the last one remains active.

6.3. Cleanup Test

UserServiceTest.java (excerpt)
1
2
3
4
5
6
7
8
9
@Test
@DisplayName("Clean up resources")
void cleanUpResources() {
    Disposable disposable = userService.searchUsers("user1");

    userService.cleanUp();

    Assertions.assertThat(disposable.isDisposed()).isTrue();
}

7. Use Cases and Best Practices

7.1. When to Use Each Type

7.1.1. Use Disposable When

  • Managing a single subscription or resource

  • Simple cleanup scenarios

  • One-time operations

Disposable subscription = flux.subscribe();
subscription.dispose();

7.1.2. Use Disposable.Composite When

  • Managing multiple independent subscriptions

  • Grouping related resources

  • Bulk cleanup operations

Disposable.Composite composite = Disposables.composite(
    subscription1,
    subscription2,
    subscription3
);
composite.dispose();

7.1.3. Use Disposable.Swap When

  • Replacing active subscriptions dynamically

  • Search-as-you-type scenarios

  • Real-time data switching

  • Session management

Disposable.Swap swap = Disposables.swap();
swap.update(newSubscription);

7.2. Best Practices

7.2.1. Always Clean Up Resources

Use @PreDestroy or implement DisposableBean to ensure cleanup:

@PreDestroy
public void cleanUp() {
    if (!disposable.isDisposed()) {
        disposable.dispose();
    }
}

7.2.2. Check Disposal Status

Before disposing, check if already disposed to avoid errors:

if (!disposable.isDisposed()) {
    disposable.dispose();
}

7.2.3. Use Appropriate Schedulers

Choose schedulers based on operation type:

.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())

7.2.4. Add Logging

Include logging for debugging and monitoring:

.doOnSubscribe(s -> log.info("Subscribed"))
.doOnCancel(() -> log.warn("Cancelled"))
.doOnComplete(() -> log.info("Completed"))

7.2.5. Handle Errors Gracefully

Always provide error handlers:

.subscribe(
    value -> process(value),
    error -> log.error("Error", error),
    () -> log.info("Complete")
)

8. Common Patterns

8.1. Search-as-You-Type

Perfect use case for Disposable.Swap:

public class SearchService {
    private final Disposable.Swap searchSwap = Disposables.swap();

    public void search(String query) {
        Disposable search = repository.search(query)
            .subscribe(result -> display(result));

        searchSwap.update(search);
    }
}

8.2. Multi-Resource Management

Use Disposable.Composite for managing multiple resources:

public class DataService {
    private final Disposable.Composite subscriptions = Disposables.composite();

    public void start() {
        subscriptions.add(dataFeed1.subscribe());
        subscriptions.add(dataFeed2.subscribe());
        subscriptions.add(dataFeed3.subscribe());
    }

    @PreDestroy
    public void stop() {
        subscriptions.dispose();
    }
}

8.3. Session Management

Replace sessions dynamically with Disposable.Swap:

public class SessionManager {
    private final Disposable.Swap sessionSwap = Disposables.swap();

    public void switchSession(String sessionId) {
        Disposable session = sessionService.connect(sessionId)
            .subscribe();

        sessionSwap.update(session);
    }
}

9. Troubleshooting

9.1. Memory Leaks

If you notice memory leaks:

  • Ensure all disposables are disposed in @PreDestroy methods

  • Check for forgotten subscriptions

  • Use memory profilers to identify leaked subscriptions

9.2. Race Conditions

When using Disposable.Swap:

  • The update() method is thread-safe and atomic

  • Previous disposable is disposed before the new one is set

  • No manual synchronization needed

9.3. Disposed Too Early

If operations are cancelled unexpectedly:

  • Check if parent component is being destroyed

  • Verify disposal isn’t called prematurely

  • Add logging to track disposal lifecycle

10. Performance Considerations

10.1. Thread Pools

  • Use Schedulers.boundedElastic() for blocking operations (I/O, database)

  • Use Schedulers.parallel() for CPU-intensive operations

  • Avoid creating custom schedulers unnecessarily

10.2. Memory Overhead

  • Disposable.Swap has minimal memory overhead (single reference)

  • Disposable.Composite overhead scales with number of disposables

  • Dispose promptly to free resources

10.3. Cancellation Speed

  • Cancellation is immediate for Disposable.Swap

  • Disposable.Composite disposes all children in sequence

  • Consider timeout strategies for long-running operations

11. Conclusion

Proper management of reactive resources is crucial for building robust Spring Boot applications with Project Reactor. Understanding when and how to use Disposable, Disposable.Composite, and Disposable.Swap enables you to:

  • Prevent memory leaks through proper resource cleanup

  • Dynamically switch between active operations seamlessly

  • Manage multiple subscriptions efficiently

  • Build responsive applications with search-as-you-type and real-time features

Key takeaways:

  • Use Disposable for single subscriptions

  • Use Disposable.Composite for managing multiple related resources

  • Use Disposable.Swap for dynamic replacement of active operations

  • Always clean up resources with @PreDestroy

  • Add proper logging and error handling

The complete source code is available on GitHub Gist.