Reactor Disposable Management: Composite and Swap
Reactor Disposable provides a mechanism for managing resources, subscriptions, or actions in a reactive application. This guide explores three main types: Disposable, Disposable.Composite, and Disposable.Swap, with practical examples using Spring Boot.
1. Overview
In reactive programming, proper resource management is crucial to prevent memory leaks and ensure clean shutdown of applications. Project Reactor provides the Disposable interface and its variants to manage the lifecycle of reactive streams and associated resources.
This tutorial demonstrates practical usage of these disposables in a Spring Boot application with a user search service that manages asynchronous operations.
2. Types of Disposables
2.1. Disposable
Represents a single resource that can be disposed of when it’s no longer needed, typically used for cleanup purposes.
2.1.1. Real-World Usage
Messaging Application: Manage subscriptions to message queues. When a user subscribes to a topic, create a disposable subscription. When they unsubscribe, call
dispose()to release resources.File Processing System: Manage file streams. When a file is opened for processing, create a disposable stream. When processing completes or an error occurs, dispose the stream to release system resources.
2.1.2. Example
Disposable subscription = flux
.subscribe(
value -> System.out.println(value),
error -> System.err.println(error),
() -> System.out.println("Complete")
);
subscription.dispose();2.2. Disposable.Composite
Represents a collection of disposables that can be managed as a single unit, allowing for the disposal of multiple resources at once.
2.2.1. Real-World Usage
Financial Trading Application: Manage multiple subscriptions to market data feeds, trade execution services, and risk management systems. Dispose of all subscriptions together when the trading system shuts down.
IoT System: Manage multiple sensor readings or device connections. Dispose of all connections together when the IoT device is turned off or removed from the network.
2.2.2. Example
Disposable.Composite composite = Disposables.composite();
composite.add(subscription1);
composite.add(subscription2);
composite.add(subscription3);
composite.dispose();2.3. Disposable.Swap
Allows for the dynamic replacement of one disposable with another, enabling seamless transitions between different resources or actions.
2.3.1. Real-World Usage
Chat Application: Manage connections to chat servers. When a user switches chat rooms, replace the current connection with a new one using
Disposable.Swap, enabling seamless room transitions.Gaming Server: Manage player sessions. When a player logs in or joins a game, create a disposable session. Replace sessions smoothly as players move between games.
2.3.2. Disposable.Swap vs switchMap
| Aspect | switchMap | Disposable.Swap |
|---|---|---|
Control | Automatic switching | Manual control |
Cancellation | Handled automatically | Requires explicit handling |
Use Case | Stream transformation | Resource management |
Complexity | Simpler for stream operations | More control for resources |
switchMap: Automatically switches between Publishers, discarding previous ones, without user intervention for cancellation handling.Disposable.Swap: Allows manual cancellation handling for seamless replacement of disposables, providing more control over resource lifecycle.
3. Project Setup
3.1. Requirements
3.2. Maven Configuration
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.2</version>
<relativePath/>
</parent>
<groupId>com.maoudia</groupId>
<artifactId>app</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>maoudia-app</name>
<description>MAOUDIA APP</description>
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
4. Domain Model
4.1. User Record
1
2
3
4
5
6
7
package com.maoudia;
public record User(
String username,
String firstName,
String lastName,
int age) {}
4.2. UserRepository Interface
1
2
3
4
5
6
7
8
package com.maoudia;
import reactor.core.publisher.Flux;
public interface UserRepository {
Flux<User> searchByUsername(String username);
}
5. Implementation
5.1. Fake Repository Implementation
This implementation simulates a real repository with random latency to demonstrate async behavior:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.maoudia;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@Component
public class FakeUserRepository implements UserRepository {
private static final Random random = new Random();
@Override
public Flux<User> searchByUsername(String username) { (1)
List<User> users = Arrays.asList(
new User("user1", "John", "Doe", 30),
new User("user2", "Jane", "Smith", 25),
new User("user3", "Alice", "Johnson", 35),
new User("user4", "Michael", "Brown", 40),
new User("user5", "Emma", "Wilson", 28)
);
return Flux.fromIterable(users)
.filter(user -> user.username().startsWith(username)) (2)
.delayElements(Duration.ofMillis(random.nextInt(1000) + 1), Schedulers.boundedElastic()); (3)
}
}
| 1 | Searches for users by username, returning a reactive stream. |
| 2 | Filters users whose usernames start with the provided search term. |
| 3 | Introduces random latency (up to 1 second) to simulate real-world async operations. |
5.2. UserService with Disposable.Swap
This service demonstrates the usage of Disposable.Swap for managing user search operations:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.maoudia;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.scheduler.Schedulers;
import java.util.UUID;
@Service
public class UserService {
private static final Logger LOGGER = LoggerFactory.getLogger(UserService.class);
private final UserRepository userRepository;
private final Disposable.Swap disposableSwap; (1)
public UserService(UserRepository userRepository) {
this.userRepository = userRepository;
this.disposableSwap = Disposables.swap(); (2)
}
public Disposable searchUsers(String query) { (3)
Disposable nextDisposable = this.userRepository.searchByUsername(query)
.doOnCancel(() -> LOGGER.warn("User search for '{}' cancelled", query)) (4)
.doOnSubscribe(s -> LOGGER.info("User search for '{}' subscribed: {}", query, UUID.randomUUID())) (5)
.subscribeOn(Schedulers.boundedElastic()) (6)
.subscribe(
user -> LOGGER.info("User search '{}' found: {}", query, user.username()),
error -> LOGGER.error("Error during User search for '{}'", query, error),
() -> LOGGER.info("User search for '{}' completed", query)
);
disposableSwap.update(nextDisposable); (7)
return disposableSwap;
}
@PreDestroy (8)
public void cleanUp() {
if (!this.disposableSwap.isDisposed()) {
this.disposableSwap.dispose();
LOGGER.info("Disposed of disposableSwap");
}
}
}
| 1 | Declares a Disposable.Swap instance to manage search operations. |
| 2 | Initializes the swap container in the constructor. |
| 3 | Performs asynchronous user search and returns the disposable. |
| 4 | Logs when a search operation is cancelled. |
| 5 | Logs when a search operation is subscribed with a unique ID. |
| 6 | Executes the search on the bounded elastic scheduler for blocking operations. |
| 7 | Atomically replaces the previous disposable with the new one, disposing the old one. |
| 8 | Ensures proper cleanup when the service bean is destroyed. |
5.3. How Disposable.Swap Works
When searchUsers() is called multiple times:
First call creates a disposable for "user1" search
Second call creates a disposable for "user3" search and automatically disposes the "user1" search
Third call creates a disposable for "user" search and automatically disposes the "user3" search
Only the most recent search remains active. Previous searches are cancelled automatically.
6. Testing
6.1. Single Search Test
1
2
3
4
5
6
7
8
9
@Test
@DisplayName("Single search for users by username")
void singleSearchUsersByUsername() {
Disposable disposable = userService.searchUsers("user1");
Assertions.assertThat(disposable)
.isNotNull()
.isInstanceOf(Disposable.Swap.class);
}
6.2. Multiple Searches Test
This test demonstrates how Disposable.Swap automatically disposes previous searches:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Test
@DisplayName("Multiple searches for users by username")
void multipleSearchUsersByUsername() {
Disposable disposable1 = userService.searchUsers("user5"); (1)
Disposable.Swap disposableSwap1 = (Disposable.Swap) disposable1;
disposable1 = disposableSwap1.get(); (2)
Disposable disposable2 = userService.searchUsers("user3"); (3)
Disposable.Swap disposableSwap2 = (Disposable.Swap) disposable2;
disposable2 = disposableSwap2.get();
Disposable disposable3 = userService.searchUsers("user"); (4)
Assertions.assertThat(disposable1.isDisposed()).isTrue(); (5)
Assertions.assertThat(disposable2.isDisposed()).isTrue();
Assertions.assertThat(disposable3)
.isNotNull()
.isInstanceOf(Disposable.Swap.class);
}
| 1 | First search for "user5" - creates disposable1. |
| 2 | Extract the inner disposable from the swap container. |
| 3 | Second search for "user3" - creates disposable2 and disposes disposable1. |
| 4 | Third search for "user" - creates disposable3 and disposes disposable2. |
| 5 | Verify that previous disposables are disposed, only the last one remains active. |
6.3. Cleanup Test
1
2
3
4
5
6
7
8
9
@Test
@DisplayName("Clean up resources")
void cleanUpResources() {
Disposable disposable = userService.searchUsers("user1");
userService.cleanUp();
Assertions.assertThat(disposable.isDisposed()).isTrue();
}
7. Use Cases and Best Practices
7.1. When to Use Each Type
7.1.1. Use Disposable When
Managing a single subscription or resource
Simple cleanup scenarios
One-time operations
Disposable subscription = flux.subscribe();
subscription.dispose();7.1.2. Use Disposable.Composite When
Managing multiple independent subscriptions
Grouping related resources
Bulk cleanup operations
Disposable.Composite composite = Disposables.composite(
subscription1,
subscription2,
subscription3
);
composite.dispose();7.1.3. Use Disposable.Swap When
Replacing active subscriptions dynamically
Search-as-you-type scenarios
Real-time data switching
Session management
Disposable.Swap swap = Disposables.swap();
swap.update(newSubscription);7.2. Best Practices
7.2.1. Always Clean Up Resources
Use @PreDestroy or implement DisposableBean to ensure cleanup:
@PreDestroy
public void cleanUp() {
if (!disposable.isDisposed()) {
disposable.dispose();
}
}7.2.2. Check Disposal Status
Before disposing, check if already disposed to avoid errors:
if (!disposable.isDisposed()) {
disposable.dispose();
}7.2.3. Use Appropriate Schedulers
Choose schedulers based on operation type:
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel())7.2.4. Add Logging
Include logging for debugging and monitoring:
.doOnSubscribe(s -> log.info("Subscribed"))
.doOnCancel(() -> log.warn("Cancelled"))
.doOnComplete(() -> log.info("Completed"))7.2.5. Handle Errors Gracefully
Always provide error handlers:
.subscribe(
value -> process(value),
error -> log.error("Error", error),
() -> log.info("Complete")
)8. Common Patterns
8.1. Search-as-You-Type
Perfect use case for Disposable.Swap:
public class SearchService {
private final Disposable.Swap searchSwap = Disposables.swap();
public void search(String query) {
Disposable search = repository.search(query)
.subscribe(result -> display(result));
searchSwap.update(search);
}
}8.2. Multi-Resource Management
Use Disposable.Composite for managing multiple resources:
public class DataService {
private final Disposable.Composite subscriptions = Disposables.composite();
public void start() {
subscriptions.add(dataFeed1.subscribe());
subscriptions.add(dataFeed2.subscribe());
subscriptions.add(dataFeed3.subscribe());
}
@PreDestroy
public void stop() {
subscriptions.dispose();
}
}8.3. Session Management
Replace sessions dynamically with Disposable.Swap:
public class SessionManager {
private final Disposable.Swap sessionSwap = Disposables.swap();
public void switchSession(String sessionId) {
Disposable session = sessionService.connect(sessionId)
.subscribe();
sessionSwap.update(session);
}
}9. Troubleshooting
9.1. Memory Leaks
If you notice memory leaks:
Ensure all disposables are disposed in
@PreDestroymethodsCheck for forgotten subscriptions
Use memory profilers to identify leaked subscriptions
9.2. Race Conditions
When using Disposable.Swap:
The
update()method is thread-safe and atomicPrevious disposable is disposed before the new one is set
No manual synchronization needed
9.3. Disposed Too Early
If operations are cancelled unexpectedly:
Check if parent component is being destroyed
Verify disposal isn’t called prematurely
Add logging to track disposal lifecycle
10. Performance Considerations
10.1. Thread Pools
Use
Schedulers.boundedElastic()for blocking operations (I/O, database)Use
Schedulers.parallel()for CPU-intensive operationsAvoid creating custom schedulers unnecessarily
10.2. Memory Overhead
Disposable.Swaphas minimal memory overhead (single reference)Disposable.Compositeoverhead scales with number of disposablesDispose promptly to free resources
10.3. Cancellation Speed
Cancellation is immediate for
Disposable.SwapDisposable.Compositedisposes all children in sequenceConsider timeout strategies for long-running operations
11. Conclusion
Proper management of reactive resources is crucial for building robust Spring Boot applications with Project Reactor. Understanding when and how to use Disposable, Disposable.Composite, and Disposable.Swap enables you to:
Prevent memory leaks through proper resource cleanup
Dynamically switch between active operations seamlessly
Manage multiple subscriptions efficiently
Build responsive applications with search-as-you-type and real-time features
Key takeaways:
Use
Disposablefor single subscriptionsUse
Disposable.Compositefor managing multiple related resourcesUse
Disposable.Swapfor dynamic replacement of active operationsAlways clean up resources with
@PreDestroyAdd proper logging and error handling
The complete source code is available on GitHub Gist.