Comprehensive Notes on Reactive Programming with Spring Boot
👋 Hey there! I’m Shohanur Rahman!
I’m a backend developer with over 5.5 years of experience in building scalable and efficient web applications. My work focuses on Java, Spring Boot, and microservices architecture, where I love designing robust API solutions and creating secure middleware for complex integrations.
💼 What I Do Backend Development: Expert in Spring Boot, Spring Cloud, and Spring WebFlux, I create high-performance microservices that drive seamless user experiences. Cloud & DevOps: AWS enthusiast, skilled in using EC2, S3, RDS, and Docker to design scalable and reliable cloud infrastructures. Digital Security: Passionate about securing applications with OAuth2, Keycloak, and digital signatures for data integrity and privacy. 🚀 Current Projects I’m currently working on API integrations with Spring Cloud Gateway and designing an e-invoicing middleware. My projects often involve asynchronous processing, digital signature implementations, and ensuring high standards of security.
📝 Why I Write I enjoy sharing what I’ve learned through blog posts, covering everything from backend design to API security and cloud best practices. Check out my posts if you’re into backend dev, cloud tech, or digital security!
Table of Contents
Introduction to Reactive Programming
The Reactive Manifesto
Reactive Streams Specification
Project Reactor
Spring WebFlux
Reactive Data Access
Testing Reactive Applications
Error Handling
Best Practices
Common Pitfalls
1. Introduction to Reactive Programming
What is Reactive Programming?
Reactive Programming is a programming paradigm focused on asynchronous data streams and the propagation of change. It's about building systems that react to events and data as they become available.
Key Characteristics:
Non-blocking: Operations don't wait for results
Asynchronous: Tasks execute independently
Event-driven: Responds to data streams
Backpressure-aware: Handles data flow control
Traditional vs Reactive Approach
Traditional (Blocking):
TraditionalApproach.java
public User getUser(Long id) {
User user = userRepository.findById(id); // Thread blocks here
return user;
}
Reactive (Non-blocking):
ReactiveApproach.java
public Mono<User> getUser(Long id) {
return userRepository.findById(id); // Returns immediately
}
When to Use Reactive Programming?
✅ Use When:
High concurrency requirements
I/O-bound operations (database, network calls)
Real-time data streaming
Limited resources (threads, memory)
Microservices architecture with many service calls
❌ Avoid When:
CPU-intensive operations
Simple CRUD applications with low traffic
Team lacks reactive experience
Heavy use of blocking libraries
2. The Reactive Manifesto
The Reactive Manifesto defines four core principles:
2.1 Responsive
System responds in a timely manner
Provides consistent quality of service
Establishes reliable upper bounds on response times
2.2 Resilient
System stays responsive in the face of failure
Failures are contained and isolated
Recovery is handled gracefully
2.3 Elastic
System stays responsive under varying workload
Scales up or down based on demand
No contention points or central bottlenecks
2.4 Message-Driven
Relies on asynchronous message passing
Provides loose coupling and isolation
Enables non-blocking communication
3. Reactive Streams Specification
Reactive Streams is a standard for asynchronous stream processing with non-blocking backpressure.
Core Interfaces:
ReactiveStreamsInterfaces.java
// 1. Publisher - produces data
public interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
// 2. Subscriber - consumes data
public interface Subscriber<T> {
void onSubscribe(Subscription s);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
// 3. Subscription - links Publisher and Subscriber
public interface Subscription {
void request(long n);
void cancel();
}
// 4. Processor - both Publisher and Subscriber
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Backpressure
Backpressure is a mechanism to prevent overwhelming a subscriber with data.
How it works:
Subscriber requests N items from Publisher
Publisher sends only N items
Subscriber processes items
Subscriber requests more items
BackpressureExample.java
Flux.range(1, 1000)
.onBackpressureBuffer(100) // Buffer up to 100 items
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(10); // Request 10 items initially
}
@Override
protected void hookOnNext(Integer value) {
System.out.println(value);
request(1); // Request next item after processing
}
});
4. Project Reactor
Project Reactor is Spring's reactive library implementing Reactive Streams.
4.1 Core Types: Mono and Flux
Mono<T>
Represents 0 or 1 element.
MonoExamples.java
// Creating Mono
Mono<String> monoJust = Mono.just("Hello");
Mono<String> monoEmpty = Mono.empty();
Mono<String> monoError = Mono.error(new RuntimeException("Error"));
// From callable
Mono<String> monoFromCallable = Mono.fromCallable(() -> {
return "Computed Value";
});
// Deferred execution
Mono<String> monoDefer = Mono.defer(() -> Mono.just("Lazy"));
Flux<T>
Represents 0 to N elements.
FluxExamples.java
// Creating Flux
Flux<String> fluxJust = Flux.just("A", "B", "C");
Flux<Integer> fluxRange = Flux.range(1, 10);
Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1));
// From iterable
List<String> list = Arrays.asList("X", "Y", "Z");
Flux<String> fluxFromIterable = Flux.fromIterable(list);
// From stream
Flux<String> fluxFromStream = Flux.fromStream(Stream.of("1", "2", "3"));
4.2 Common Operators
Transformation Operators
TransformationOperators.java
// map - transform each element
Flux.range(1, 5)
.map(i -> i * 2)
.subscribe(System.out::println); // 2, 4, 6, 8, 10
// flatMap - async transformation, flattens results
Flux.just("user1", "user2")
.flatMap(username -> getUserDetails(username))
.subscribe(System.out::println);
// flatMapSequential - maintains order
Flux.just("A", "B", "C")
.flatMapSequential(this::processAsync)
.subscribe(System.out::println);
// concatMap - processes one at a time, maintains order
Flux.just(1, 2, 3)
.concatMap(this::processSequentially)
.subscribe(System.out::println);
Filtering Operators
FilteringOperators.java
// filter - keep elements matching predicate
Flux.range(1, 10)
.filter(i -> i % 2 == 0)
.subscribe(System.out::println); // 2, 4, 6, 8, 10
// take - take first N elements
Flux.range(1, 100)
.take(5)
.subscribe(System.out::println); // 1, 2, 3, 4, 5
// skip - skip first N elements
Flux.range(1, 10)
.skip(5)
.subscribe(System.out::println); // 6, 7, 8, 9, 10
// distinct - remove duplicates
Flux.just(1, 2, 2, 3, 3, 3)
.distinct()
.subscribe(System.out::println); // 1, 2, 3
// distinctUntilChanged - remove consecutive duplicates
Flux.just(1, 1, 2, 2, 3, 1)
.distinctUntilChanged()
.subscribe(System.out::println); // 1, 2, 3, 1
Combining Operators
CombiningOperators.java
// concat - concatenate publishers sequentially
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux.concat(flux1, flux2)
.subscribe(System.out::println); // 1, 2, 3, 4, 5, 6
// merge - merge publishers concurrently
Flux.merge(flux1, flux2)
.subscribe(System.out::println); // Order not guaranteed
// zip - combine elements from multiple sources
Flux<String> names = Flux.just("Alice", "Bob");
Flux<Integer> ages = Flux.just(25, 30);
Flux.zip(names, ages, (name, age) -> name + " is " + age)
.subscribe(System.out::println); // Alice is 25, Bob is 30
// combineLatest - combine latest values
Flux.combineLatest(flux1, flux2, (a, b) -> a + b)
.subscribe(System.out::println);
Timing Operators
TimingOperators.java
// delayElements - delay each element
Flux.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.subscribe(System.out::println);
// timeout - fail if no signal within duration
Mono.delay(Duration.ofSeconds(10))
.timeout(Duration.ofSeconds(5))
.subscribe(
data -> System.out.println(data),
error -> System.out.println("Timeout!")
);
// delaySubscription - delay subscription
Flux.just(1, 2, 3)
.delaySubscription(Duration.ofSeconds(2))
.subscribe(System.out::println);
4.3 Error Handling Operators
ErrorHandlingOperators.java
// onErrorReturn - return default value on error
Flux.just(1, 2, 0, 4)
.map(i -> 10 / i)
.onErrorReturn(-1)
.subscribe(System.out::println);
// onErrorResume - switch to fallback publisher
Flux.just(1, 2, 0, 4)
.map(i -> 10 / i)
.onErrorResume(error -> Flux.just(-1, -2, -3))
.subscribe(System.out::println);
// onErrorContinue - skip error and continue
Flux.just(1, 2, 0, 4)
.map(i -> 10 / i)
.onErrorContinue((error, value) -> {
System.out.println("Error with value: " + value);
})
.subscribe(System.out::println);
// retry - retry on error
Mono.fromCallable(() -> callExternalService())
.retry(3)
.subscribe(System.out::println);
// retryWhen - custom retry logic
Mono.fromCallable(() -> callExternalService())
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.subscribe(System.out::println);
4.4 Schedulers
Schedulers control where and when work is executed.
SchedulersExample.java
// Schedulers.immediate() - current thread
Flux.range(1, 5)
.subscribeOn(Schedulers.immediate())
.subscribe(System.out::println);
// Schedulers.single() - single reusable thread
Flux.range(1, 5)
.subscribeOn(Schedulers.single())
.subscribe(System.out::println);
// Schedulers.parallel() - fixed pool for CPU-intensive work
Flux.range(1, 5)
.subscribeOn(Schedulers.parallel())
.subscribe(System.out::println);
// Schedulers.boundedElastic() - dynamic pool for I/O work
Flux.range(1, 5)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(System.out::println);
// publishOn vs subscribeOn
Flux.range(1, 5)
.subscribeOn(Schedulers.boundedElastic()) // affects subscription
.map(i -> i * 2)
.publishOn(Schedulers.parallel()) // affects downstream
.subscribe(System.out::println);
Key Difference:
subscribeOn(): Controls where the source emits itemspublishOn(): Controls where downstream operators execute
4.5 Hot vs Cold Publishers
Cold Publisher
Each subscriber gets its own data stream from the beginning.
ColdPublisher.java
Flux<Long> coldFlux = Flux.interval(Duration.ofSeconds(1))
.take(5);
// Subscriber 1
coldFlux.subscribe(i -> System.out.println("Sub1: " + i));
// Wait 2 seconds
Thread.sleep(2000);
// Subscriber 2 - starts from 0, not 2
coldFlux.subscribe(i -> System.out.println("Sub2: " + i));
Hot Publisher
All subscribers share the same data stream.
HotPublisher.java
Flux<Long> coldFlux = Flux.interval(Duration.ofSeconds(1)).take(5);
// Convert to hot publisher
ConnectableFlux<Long> hotFlux = coldFlux.publish();
// Subscriber 1
hotFlux.subscribe(i -> System.out.println("Sub1: " + i));
// Start emitting
hotFlux.connect();
// Wait 2 seconds
Thread.sleep(2000);
// Subscriber 2 - starts from current value (2 or 3)
hotFlux.subscribe(i -> System.out.println("Sub2: " + i));
5. Spring WebFlux
Spring WebFlux is Spring's reactive web framework.
5.1 Architecture
Traditional Spring MVC:
┌─────────────┐
│ Servlet API │ (Blocking I/O)
└─────────────┘
↓
┌─────────────┐
│ Spring MVC │
└─────────────┘
Spring WebFlux:
┌──────────────┐ ┌─────────────┐
│ Netty/Undertow│ or │ Servlet 3.1+│ (Non-blocking I/O)
└──────────────┘ └─────────────┘
↓ ↓
┌────────────────────────────────┐
│ Spring WebFlux │
└────────────────────────────────┘
5.2 Dependencies
pom.xml
<dependencies>
<!-- Spring WebFlux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Reactor Test -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
5.3 Annotated Controllers
UserController.java
@RestController
@RequestMapping("/api/users")
public class UserController {
@Autowired
private UserService userService;
// Get single user
@GetMapping("/{id}")
public Mono<User> getUser(@PathVariable Long id) {
return userService.findById(id);
}
// Get all users
@GetMapping
public Flux<User> getAllUsers() {
return userService.findAll();
}
// Create user
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Mono<User> createUser(@RequestBody User user) {
return userService.save(user);
}
// Update user
@PutMapping("/{id}")
public Mono<User> updateUser(@PathVariable Long id, @RequestBody User user) {
return userService.update(id, user);
}
// Delete user
@DeleteMapping("/{id}")
@ResponseStatus(HttpStatus.NO_CONTENT)
public Mono<Void> deleteUser(@PathVariable Long id) {
return userService.deleteById(id);
}
// Stream users (Server-Sent Events)
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> streamUsers() {
return userService.findAll()
.delayElements(Duration.ofSeconds(1));
}
}
5.4 Functional Endpoints
UserRouter.java
@Configuration
public class UserRouter {
@Bean
public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
return RouterFunctions
.route(GET("/api/users/{id}"), handler::getUser)
.andRoute(GET("/api/users"), handler::getAllUsers)
.andRoute(POST("/api/users"), handler::createUser)
.andRoute(PUT("/api/users/{id}"), handler::updateUser)
.andRoute(DELETE("/api/users/{id}"), handler::deleteUser);
}
}
UserHandler.java
@Component
public class UserHandler {
@Autowired
private UserService userService;
public Mono<ServerResponse> getUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return userService.findById(id)
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
return ServerResponse.ok()
.body(userService.findAll(), User.class);
}
public Mono<ServerResponse> createUser(ServerRequest request) {
Mono<User> userMono = request.bodyToMono(User.class);
return userMono
.flatMap(userService::save)
.flatMap(user -> ServerResponse
.status(HttpStatus.CREATED)
.bodyValue(user));
}
public Mono<ServerResponse> updateUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
Mono<User> userMono = request.bodyToMono(User.class);
return userMono
.flatMap(user -> userService.update(id, user))
.flatMap(user -> ServerResponse.ok().bodyValue(user))
.switchIfEmpty(ServerResponse.notFound().build());
}
public Mono<ServerResponse> deleteUser(ServerRequest request) {
Long id = Long.valueOf(request.pathVariable("id"));
return userService.deleteById(id)
.then(ServerResponse.noContent().build());
}
}
5.5 WebClient (Reactive HTTP Client)
WebClientExample.java
@Service
public class ExternalApiService {
private final WebClient webClient;
public ExternalApiService(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder
.baseUrl("https://api.example.com")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
// GET request
public Mono<User> getUser(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.bodyToMono(User.class);
}
// GET with query parameters
public Flux<User> searchUsers(String name) {
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path("/users")
.queryParam("name", name)
.build())
.retrieve()
.bodyToFlux(User.class);
}
// POST request
public Mono<User> createUser(User user) {
return webClient.post()
.uri("/users")
.bodyValue(user)
.retrieve()
.bodyToMono(User.class);
}
// Error handling
public Mono<User> getUserWithErrorHandling(Long id) {
return webClient.get()
.uri("/users/{id}", id)
.retrieve()
.onStatus(HttpStatus::is4xxClientError,
response -> Mono.error(new NotFoundException("User not found")))
.onStatus(HttpStatus::is5xxServerError,
response -> Mono.error(new ServiceException("Service unavailable")))
.bodyToMono(User.class);
}
}
6. Reactive Data Access
6.1 Spring Data R2DBC
R2DBC (Reactive Relational Database Connectivity) provides reactive database access.
Dependencies
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
</dependency>
</dependencies>
Configuration
application.yml
spring:
r2dbc:
url: r2dbc:postgresql://localhost:5432/mydb
username: user
password: password
Entity
User.java
@Table("users")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
@Id
private Long id;
private String username;
private String email;
private Integer age;
@CreatedDate
private LocalDateTime createdAt;
@LastModifiedDate
private LocalDateTime updatedAt;
}
Repository
UserRepository.java
public interface UserRepository extends ReactiveCrudRepository<User, Long> {
// Derived query methods
Flux<User> findByUsername(String username);
Flux<User> findByAgeGreaterThan(Integer age);
Mono<User> findByEmail(String email);
// Custom query
@Query("SELECT * FROM users WHERE age BETWEEN :minAge AND :maxAge")
Flux<User> findByAgeBetween(Integer minAge, Integer maxAge);
@Query("SELECT * FROM users WHERE username LIKE :pattern")
Flux<User> searchByUsername(String pattern);
}
Service
UserService.java
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
public Mono<User> findById(Long id) {
return userRepository.findById(id);
}
public Flux<User> findAll() {
return userRepository.findAll();
}
public Mono<User> save(User user) {
return userRepository.save(user);
}
public Mono<User> update(Long id, User user) {
return userRepository.findById(id)
.flatMap(existingUser -> {
existingUser.setUsername(user.getUsername());
existingUser.setEmail(user.getEmail());
existingUser.setAge(user.getAge());
return userRepository.save(existingUser);
});
}
public Mono<Void> deleteById(Long id) {
return userRepository.deleteById(id);
}
public Flux<User> findAdults() {
return userRepository.findByAgeGreaterThan(18);
}
}
6.2 Spring Data MongoDB Reactive
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
ProductRepository.java
public interface ProductRepository extends ReactiveMongoRepository<Product, String> {
Flux<Product> findByCategory(String category);
Flux<Product> findByPriceLessThan(Double price);
@Query("{ 'name': { $regex: ?0, $options: 'i' } }")
Flux<Product> searchByName(String name);
}
7. Testing Reactive Applications
7.1 StepVerifier
StepVerifier tests reactive streams step by step.
StepVerifierTests.java
@Test
public void testMono() {
Mono<String> mono = Mono.just("Hello");
StepVerifier.create(mono)
.expectNext("Hello")
.verifyComplete();
}
@Test
public void testFlux() {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);
StepVerifier.create(flux)
.expectNext(1)
.expectNext(2)
.expectNext(3, 4, 5)
.verifyComplete();
}
@Test
public void testError() {
Mono<String> mono = Mono.error(new RuntimeException("Error"));
StepVerifier.create(mono)
.expectError(RuntimeException.class)
.verify();
}
@Test
public void testWithDelay() {
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(3);
StepVerifier.create(flux)
.expectNext(0L, 1L, 2L)
.verifyComplete();
}
@Test
public void testConditional() {
Flux<Integer> flux = Flux.range(1, 10);
StepVerifier.create(flux)
.expectNextMatches(i -> i == 1)
.expectNextCount(8)
.expectNext(10)
.verifyComplete();
}
7.2 WebTestClient
WebTestClient tests WebFlux endpoints.
UserControllerTest.java
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureWebTestClient
public class UserControllerTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void testGetUser() {
webTestClient.get()
.uri("/api/users/1")
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON)
.expectBody(User.class)
.value(user -> {
assertNotNull(user);
assertEquals(1L, user.getId());
});
}
@Test
public void testGetAllUsers() {
webTestClient.get()
.uri("/api/users")
.exchange()
.expectStatus().isOk()
.expectBodyList(User.class)
.hasSize(5);
}
@Test
public void testCreateUser() {
User newUser = new User(null, "john", "john@example.com", 25, null, null);
webTestClient.post()
.uri("/api/users")
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(newUser)
.exchange()
.expectStatus().isCreated()
.expectBody(User.class)
.value(user -> {
assertNotNull(user.getId());
assertEquals("john", user.getUsername());
});
}
@Test
public void testUserNotFound() {
webTestClient.get()
.uri("/api/users/999")
.exchange()
.expectStatus().isNotFound();
}
}
7.3 Mock Testing
UserServiceTest.java
@ExtendWith(MockitoExtension.class)
public class UserServiceTest {
@Mock
private UserRepository userRepository;
@InjectMocks
private UserService userService;
@Test
public void testFindById() {
User user = new User(1L, "alice", "alice@example.com", 30, null, null);
when(userRepository.findById(1L)).thenReturn(Mono.just(user));
Mono<User> result = userService.findById(1L);
StepVerifier.create(result)
.expectNext(user)
.verifyComplete();
}
@Test
public void testFindAll() {
List<User> users = Arrays.asList(
new User(1L, "alice", "alice@example.com", 30, null, null),
new User(2L, "bob", "bob@example.com", 25, null, null)
);
when(userRepository.findAll()).thenReturn(Flux.fromIterable(users));
Flux<User> result = userService.findAll();
StepVerifier.create(result)
.expectNextCount(2)
.verifyComplete();
}
}
8. Error Handling
8.1 Global Error Handler
GlobalErrorHandler.java
@Component
@Order(-2)
public class GlobalErrorHandler implements ErrorWebExceptionHandler {
private final ObjectMapper objectMapper;
public GlobalErrorHandler(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
ErrorResponse errorResponse;
if (ex instanceof NotFoundException) {
response.setStatusCode(HttpStatus.NOT_FOUND);
errorResponse = new ErrorResponse("NOT_FOUND", ex.getMessage());
} else if (ex instanceof ValidationException) {
response.setStatusCode(HttpStatus.BAD_REQUEST);
errorResponse = new ErrorResponse("VALIDATION_ERROR", ex.getMessage());
} else {
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
errorResponse = new ErrorResponse("INTERNAL_ERROR", "An error occurred");
}
try {
byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);
DataBuffer buffer = response.bufferFactory().wrap(bytes);
return response.writeWith(Mono.just(buffer));
} catch (Exception e) {
return Mono.error(e);
}
}
}
@Data
@AllArgsConstructor
class ErrorResponse {
private String code;
private String message;
}
8.2 Controller Advice
RestControllerAdvice.java
@RestControllerAdvice
public class ReactiveExceptionHandler {
@ExceptionHandler(NotFoundException.class)
@ResponseStatus(HttpStatus.NOT_FOUND)
public Mono<ErrorResponse> handleNotFound(NotFoundException ex) {
return Mono.just(new ErrorResponse("NOT_FOUND", ex.getMessage()));
}
@ExceptionHandler(ValidationException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
public Mono<ErrorResponse> handleValidation(ValidationException ex) {
return Mono.just(new ErrorResponse("VALIDATION_ERROR", ex.getMessage()));
}
@ExceptionHandler(Exception.class)
@ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
public Mono<ErrorResponse> handleGeneric(Exception ex) {
return Mono.just(new ErrorResponse("INTERNAL_ERROR", "An error occurred"));
}
}
8.3 Service-Level Error Handling
ServiceErrorHandling.java
@Service
public class UserService {
public Mono<User> getUserWithErrorHandling(Long id) {
return userRepository.findById(id)
.switchIfEmpty(Mono.error(new NotFoundException("User not found")))
.onErrorResume(DatabaseException.class, ex -> {
log.error("Database error: ", ex);
return Mono.error(new ServiceException("Database unavailable"));
})
.retry(3)
.timeout(Duration.ofSeconds(5))
.onErrorReturn(new User()); // Fallback
}
}
9. Best Practices
9.1 Don't Block!
❌ Bad:
BlockingCode.java
public Mono<User> getUserBad(Long id) {
User user = userRepository.findById(id).block(); // NEVER DO THIS!
return Mono.just(user);
}
✅ Good:
NonBlockingCode.java
public Mono<User> getUserGood(Long id) {
return userRepository.findById(id);
}
9.2 Handle Blocking Code Properly
If you must call blocking code:
HandleBlockingCode.java
public Mono<String> callBlockingApi() {
return Mono.fromCallable(() -> {
// Blocking call
return blockingHttpClient.get();
})
.subscribeOn(Schedulers.boundedElastic()); // Use elastic scheduler
}
9.3 Use Proper Operators
OperatorChoice.java
// Use flatMap for async operations
public Flux<OrderDetails> getOrderDetails(List<Long> orderIds) {
return Flux.fromIterable(orderIds)
.flatMap(id -> orderService.getOrder(id)); // Parallel execution
}
// Use concatMap for sequential processing
public Flux<OrderDetails> getOrderDetailsSequential(List<Long> orderIds) {
return Flux.fromIterable(orderIds)
.concatMap(id -> orderService.getOrder(id)); // Sequential execution
}
9.4 Resource Management
ResourceManagement.java
// Use try-with-resources equivalent
public Flux<String> readFile(String path) {
return Flux.using(
() -> Files.lines(Paths.get(path)), // Resource supplier
Flux::fromStream, // Resource usage
Stream::close // Resource cleanup
);
}
9.5 Avoid Nested subscribe()
❌ Bad:
NestedSubscribe.java
public void processBad() {
userRepository.findById(1L)
.subscribe(user -> {
orderRepository.findByUserId(user.getId())
.subscribe(order -> {
// Nested subscribe - BAD!
});
});
}
✅ Good:
FlatMapChain.java
public Mono<Order> processGood() {
return userRepository.findById(1L)
.flatMap(user -> orderRepository.findByUserId(user.getId()));
}
9.6 Lazy Evaluation
LazyEvaluation.java
// Use defer for lazy evaluation
public Mono<User> getCurrentUser() {
return Mono.defer(() -> {
// This is evaluated at subscription time
String username = SecurityContextHolder.getContext()
.getAuthentication()
.getName();
return userRepository.findByUsername(username);
});
}
9.7 Memory Management
MemoryManagement.java
// Use pagination for large datasets
public Flux<User> getAllUsersEfficiently() {
return userRepository.findAll()
.buffer(100) // Process in batches
.flatMap(Flux::fromIterable);
}
// Limit flux size
public Flux<Event> streamEvents() {
return eventRepository.findAll()
.take(1000); // Limit to 1000 items
}
10. Common Pitfalls
10.1 Forgetting to Subscribe
❌ Nothing happens without subscription:
NoSubscription.java
Mono<User> user = userRepository.findById(1L); // Nothing happens!
✅ Subscribe or return:
WithSubscription.java
// In a controller - framework subscribes
public Mono<User> getUser(Long id) {
return userRepository.findById(id);
}
// Manual subscription (rare)
userRepository.findById(1L).subscribe(
user -> System.out.println(user),
error -> System.err.println(error),
() -> System.out.println("Complete")
);
10.2 Blocking in Reactive Chain
❌ Bad:
BlockingInChain.java
return userRepository.findById(id)
.map(user -> {
String result = blockingHttpClient.call(); // BLOCKS!
return result;
});
✅ Good:
NonBlockingChain.java
return userRepository.findById(id)
.flatMap(user ->
Mono.fromCallable(() -> blockingHttpClient.call())
.subscribeOn(Schedulers.boundedElastic())
);
10.3 Improper Error Handling
❌ Bad:
BadErrorHandling.java
try {
return userRepository.findById(id);
} catch (Exception e) { // Won't catch reactive errors!
return Mono.error(e);
}
✅ Good:
GoodErrorHandling.java
return userRepository.findById(id)
.onErrorResume(e -> Mono.error(new CustomException(e)));
10.4 Not Releasing Resources
❌ Bad:
ResourceLeak.java
Connection conn = getConnection();
return Mono.just(conn.getData()); // Connection leaked!
✅ Good:
ResourceCleanup.java
return Mono.using(
this::getConnection,
conn -> Mono.just(conn.getData()),
Connection::close
);
10.5 Ignoring Backpressure
BackpressureHandling.java
// Handle fast producer, slow consumer
Flux.range(1, 1_000_000)
.onBackpressureBuffer(1000) // Buffer strategy
// or
.onBackpressureDrop() // Drop strategy
// or
.onBackpressureLatest() // Keep latest
.subscribe();
Summary
Key Takeaways:
Reactive = Non-blocking + Asynchronous + Backpressure
Mono for 0-1 items, Flux for 0-N items
Never block in reactive chains
Use proper operators:
flatMap,map,filter,zip, etc.Handle errors with reactive operators
Test thoroughly with StepVerifier and WebTestClient
Use appropriate schedulers for different workloads
Subscribe to make things happen
Resource management is critical
Think in streams and transformations
When Reactive Shines:
High concurrency (thousands of connections)
I/O-bound operations
Streaming data
Microservices communication
Real-time applications
When to Stick with Traditional:
Simple CRUD apps
Low traffic applications
CPU-intensive operations
Team unfamiliar with reactive
This comprehensive guide covers the essentials of Reactive Programming with Spring Boot. Practice building applications using these concepts to gain mastery!