Skip to main content

Command Palette

Search for a command to run...

Comprehensive Notes on Reactive Programming with Spring Boot

Published
14 min read
S

👋 Hey there! I’m Shohanur Rahman!

I’m a backend developer with over 5.5 years of experience in building scalable and efficient web applications. My work focuses on Java, Spring Boot, and microservices architecture, where I love designing robust API solutions and creating secure middleware for complex integrations.

💼 What I Do Backend Development: Expert in Spring Boot, Spring Cloud, and Spring WebFlux, I create high-performance microservices that drive seamless user experiences. Cloud & DevOps: AWS enthusiast, skilled in using EC2, S3, RDS, and Docker to design scalable and reliable cloud infrastructures. Digital Security: Passionate about securing applications with OAuth2, Keycloak, and digital signatures for data integrity and privacy. 🚀 Current Projects I’m currently working on API integrations with Spring Cloud Gateway and designing an e-invoicing middleware. My projects often involve asynchronous processing, digital signature implementations, and ensuring high standards of security.

📝 Why I Write I enjoy sharing what I’ve learned through blog posts, covering everything from backend design to API security and cloud best practices. Check out my posts if you’re into backend dev, cloud tech, or digital security!

Table of Contents

  1. Introduction to Reactive Programming

  2. The Reactive Manifesto

  3. Reactive Streams Specification

  4. Project Reactor

  5. Spring WebFlux

  6. Reactive Data Access

  7. Testing Reactive Applications

  8. Error Handling

  9. Best Practices

  10. Common Pitfalls


1. Introduction to Reactive Programming

What is Reactive Programming?

Reactive Programming is a programming paradigm focused on asynchronous data streams and the propagation of change. It's about building systems that react to events and data as they become available.

Key Characteristics:

  • Non-blocking: Operations don't wait for results

  • Asynchronous: Tasks execute independently

  • Event-driven: Responds to data streams

  • Backpressure-aware: Handles data flow control

Traditional vs Reactive Approach

Traditional (Blocking):

TraditionalApproach.java

public User getUser(Long id) {
    User user = userRepository.findById(id); // Thread blocks here
    return user;
}

Reactive (Non-blocking):

ReactiveApproach.java

public Mono<User> getUser(Long id) {
    return userRepository.findById(id); // Returns immediately
}

When to Use Reactive Programming?

Use When:

  • High concurrency requirements

  • I/O-bound operations (database, network calls)

  • Real-time data streaming

  • Limited resources (threads, memory)

  • Microservices architecture with many service calls

Avoid When:

  • CPU-intensive operations

  • Simple CRUD applications with low traffic

  • Team lacks reactive experience

  • Heavy use of blocking libraries

2. The Reactive Manifesto

The Reactive Manifesto defines four core principles:

2.1 Responsive

  • System responds in a timely manner

  • Provides consistent quality of service

  • Establishes reliable upper bounds on response times

2.2 Resilient

  • System stays responsive in the face of failure

  • Failures are contained and isolated

  • Recovery is handled gracefully

2.3 Elastic

  • System stays responsive under varying workload

  • Scales up or down based on demand

  • No contention points or central bottlenecks

2.4 Message-Driven

  • Relies on asynchronous message passing

  • Provides loose coupling and isolation

  • Enables non-blocking communication


3. Reactive Streams Specification

Reactive Streams is a standard for asynchronous stream processing with non-blocking backpressure.

Core Interfaces:

ReactiveStreamsInterfaces.java

// 1. Publisher - produces data
public interface Publisher<T> {
    void subscribe(Subscriber<? super T> s);
}

// 2. Subscriber - consumes data
public interface Subscriber<T> {
    void onSubscribe(Subscription s);
    void onNext(T t);
    void onError(Throwable t);
    void onComplete();
}

// 3. Subscription - links Publisher and Subscriber
public interface Subscription {
    void request(long n);
    void cancel();
}

// 4. Processor - both Publisher and Subscriber
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

Backpressure

Backpressure is a mechanism to prevent overwhelming a subscriber with data.

How it works:

  1. Subscriber requests N items from Publisher

  2. Publisher sends only N items

  3. Subscriber processes items

  4. Subscriber requests more items

BackpressureExample.java

Flux.range(1, 1000)
    .onBackpressureBuffer(100)  // Buffer up to 100 items
    .subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(10); // Request 10 items initially
        }

        @Override
        protected void hookOnNext(Integer value) {
            System.out.println(value);
            request(1); // Request next item after processing
        }
    });

4. Project Reactor

Project Reactor is Spring's reactive library implementing Reactive Streams.

4.1 Core Types: Mono and Flux

Mono<T>

Represents 0 or 1 element.

MonoExamples.java

// Creating Mono
Mono<String> monoJust = Mono.just("Hello");
Mono<String> monoEmpty = Mono.empty();
Mono<String> monoError = Mono.error(new RuntimeException("Error"));

// From callable
Mono<String> monoFromCallable = Mono.fromCallable(() -> {
    return "Computed Value";
});

// Deferred execution
Mono<String> monoDefer = Mono.defer(() -> Mono.just("Lazy"));

Flux<T>

Represents 0 to N elements.

FluxExamples.java

// Creating Flux
Flux<String> fluxJust = Flux.just("A", "B", "C");
Flux<Integer> fluxRange = Flux.range(1, 10);
Flux<Long> fluxInterval = Flux.interval(Duration.ofSeconds(1));

// From iterable
List<String> list = Arrays.asList("X", "Y", "Z");
Flux<String> fluxFromIterable = Flux.fromIterable(list);

// From stream
Flux<String> fluxFromStream = Flux.fromStream(Stream.of("1", "2", "3"));

4.2 Common Operators

Transformation Operators

TransformationOperators.java

// map - transform each element
Flux.range(1, 5)
    .map(i -> i * 2)
    .subscribe(System.out::println); // 2, 4, 6, 8, 10

// flatMap - async transformation, flattens results
Flux.just("user1", "user2")
    .flatMap(username -> getUserDetails(username))
    .subscribe(System.out::println);

// flatMapSequential - maintains order
Flux.just("A", "B", "C")
    .flatMapSequential(this::processAsync)
    .subscribe(System.out::println);

// concatMap - processes one at a time, maintains order
Flux.just(1, 2, 3)
    .concatMap(this::processSequentially)
    .subscribe(System.out::println);

Filtering Operators

FilteringOperators.java

// filter - keep elements matching predicate
Flux.range(1, 10)
    .filter(i -> i % 2 == 0)
    .subscribe(System.out::println); // 2, 4, 6, 8, 10

// take - take first N elements
Flux.range(1, 100)
    .take(5)
    .subscribe(System.out::println); // 1, 2, 3, 4, 5

// skip - skip first N elements
Flux.range(1, 10)
    .skip(5)
    .subscribe(System.out::println); // 6, 7, 8, 9, 10

// distinct - remove duplicates
Flux.just(1, 2, 2, 3, 3, 3)
    .distinct()
    .subscribe(System.out::println); // 1, 2, 3

// distinctUntilChanged - remove consecutive duplicates
Flux.just(1, 1, 2, 2, 3, 1)
    .distinctUntilChanged()
    .subscribe(System.out::println); // 1, 2, 3, 1

Combining Operators

CombiningOperators.java

// concat - concatenate publishers sequentially
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux.concat(flux1, flux2)
    .subscribe(System.out::println); // 1, 2, 3, 4, 5, 6

// merge - merge publishers concurrently
Flux.merge(flux1, flux2)
    .subscribe(System.out::println); // Order not guaranteed

// zip - combine elements from multiple sources
Flux<String> names = Flux.just("Alice", "Bob");
Flux<Integer> ages = Flux.just(25, 30);
Flux.zip(names, ages, (name, age) -> name + " is " + age)
    .subscribe(System.out::println); // Alice is 25, Bob is 30

// combineLatest - combine latest values
Flux.combineLatest(flux1, flux2, (a, b) -> a + b)
    .subscribe(System.out::println);

Timing Operators

TimingOperators.java

// delayElements - delay each element
Flux.range(1, 5)
    .delayElements(Duration.ofSeconds(1))
    .subscribe(System.out::println);

// timeout - fail if no signal within duration
Mono.delay(Duration.ofSeconds(10))
    .timeout(Duration.ofSeconds(5))
    .subscribe(
        data -> System.out.println(data),
        error -> System.out.println("Timeout!")
    );

// delaySubscription - delay subscription
Flux.just(1, 2, 3)
    .delaySubscription(Duration.ofSeconds(2))
    .subscribe(System.out::println);

4.3 Error Handling Operators

ErrorHandlingOperators.java

// onErrorReturn - return default value on error
Flux.just(1, 2, 0, 4)
    .map(i -> 10 / i)
    .onErrorReturn(-1)
    .subscribe(System.out::println);

// onErrorResume - switch to fallback publisher
Flux.just(1, 2, 0, 4)
    .map(i -> 10 / i)
    .onErrorResume(error -> Flux.just(-1, -2, -3))
    .subscribe(System.out::println);

// onErrorContinue - skip error and continue
Flux.just(1, 2, 0, 4)
    .map(i -> 10 / i)
    .onErrorContinue((error, value) -> {
        System.out.println("Error with value: " + value);
    })
    .subscribe(System.out::println);

// retry - retry on error
Mono.fromCallable(() -> callExternalService())
    .retry(3)
    .subscribe(System.out::println);

// retryWhen - custom retry logic
Mono.fromCallable(() -> callExternalService())
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
    .subscribe(System.out::println);

4.4 Schedulers

Schedulers control where and when work is executed.

SchedulersExample.java

// Schedulers.immediate() - current thread
Flux.range(1, 5)
    .subscribeOn(Schedulers.immediate())
    .subscribe(System.out::println);

// Schedulers.single() - single reusable thread
Flux.range(1, 5)
    .subscribeOn(Schedulers.single())
    .subscribe(System.out::println);

// Schedulers.parallel() - fixed pool for CPU-intensive work
Flux.range(1, 5)
    .subscribeOn(Schedulers.parallel())
    .subscribe(System.out::println);

// Schedulers.boundedElastic() - dynamic pool for I/O work
Flux.range(1, 5)
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe(System.out::println);

// publishOn vs subscribeOn
Flux.range(1, 5)
    .subscribeOn(Schedulers.boundedElastic()) // affects subscription
    .map(i -> i * 2)
    .publishOn(Schedulers.parallel()) // affects downstream
    .subscribe(System.out::println);

Key Difference:

  • subscribeOn(): Controls where the source emits items

  • publishOn(): Controls where downstream operators execute

4.5 Hot vs Cold Publishers

Cold Publisher

Each subscriber gets its own data stream from the beginning.

ColdPublisher.java

Flux<Long> coldFlux = Flux.interval(Duration.ofSeconds(1))
    .take(5);

// Subscriber 1
coldFlux.subscribe(i -> System.out.println("Sub1: " + i));

// Wait 2 seconds
Thread.sleep(2000);

// Subscriber 2 - starts from 0, not 2
coldFlux.subscribe(i -> System.out.println("Sub2: " + i));

Hot Publisher

All subscribers share the same data stream.

HotPublisher.java

Flux<Long> coldFlux = Flux.interval(Duration.ofSeconds(1)).take(5);

// Convert to hot publisher
ConnectableFlux<Long> hotFlux = coldFlux.publish();

// Subscriber 1
hotFlux.subscribe(i -> System.out.println("Sub1: " + i));

// Start emitting
hotFlux.connect();

// Wait 2 seconds
Thread.sleep(2000);

// Subscriber 2 - starts from current value (2 or 3)
hotFlux.subscribe(i -> System.out.println("Sub2: " + i));

5. Spring WebFlux

Spring WebFlux is Spring's reactive web framework.

5.1 Architecture

Traditional Spring MVC:
┌─────────────┐
│ Servlet API │ (Blocking I/O)
└─────────────┘
      ↓
┌─────────────┐
│ Spring MVC  │
└─────────────┘

Spring WebFlux:
┌──────────────┐      ┌─────────────┐
│ Netty/Undertow│  or  │ Servlet 3.1+│ (Non-blocking I/O)
└──────────────┘      └─────────────┘
        ↓                    ↓
┌────────────────────────────────┐
│       Spring WebFlux           │
└────────────────────────────────┘

5.2 Dependencies

pom.xml

<dependencies>
    <!-- Spring WebFlux -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <!-- Reactor Test -->
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

5.3 Annotated Controllers

UserController.java

@RestController
@RequestMapping("/api/users")
public class UserController {

    @Autowired
    private UserService userService;

    // Get single user
    @GetMapping("/{id}")
    public Mono<User> getUser(@PathVariable Long id) {
        return userService.findById(id);
    }

    // Get all users
    @GetMapping
    public Flux<User> getAllUsers() {
        return userService.findAll();
    }

    // Create user
    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<User> createUser(@RequestBody User user) {
        return userService.save(user);
    }

    // Update user
    @PutMapping("/{id}")
    public Mono<User> updateUser(@PathVariable Long id, @RequestBody User user) {
        return userService.update(id, user);
    }

    // Delete user
    @DeleteMapping("/{id}")
    @ResponseStatus(HttpStatus.NO_CONTENT)
    public Mono<Void> deleteUser(@PathVariable Long id) {
        return userService.deleteById(id);
    }

    // Stream users (Server-Sent Events)
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> streamUsers() {
        return userService.findAll()
            .delayElements(Duration.ofSeconds(1));
    }
}

5.4 Functional Endpoints

UserRouter.java

@Configuration
public class UserRouter {

    @Bean
    public RouterFunction<ServerResponse> userRoutes(UserHandler handler) {
        return RouterFunctions
            .route(GET("/api/users/{id}"), handler::getUser)
            .andRoute(GET("/api/users"), handler::getAllUsers)
            .andRoute(POST("/api/users"), handler::createUser)
            .andRoute(PUT("/api/users/{id}"), handler::updateUser)
            .andRoute(DELETE("/api/users/{id}"), handler::deleteUser);
    }
}

UserHandler.java

@Component
public class UserHandler {

    @Autowired
    private UserService userService;

    public Mono<ServerResponse> getUser(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        return userService.findById(id)
            .flatMap(user -> ServerResponse.ok().bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> getAllUsers(ServerRequest request) {
        return ServerResponse.ok()
            .body(userService.findAll(), User.class);
    }

    public Mono<ServerResponse> createUser(ServerRequest request) {
        Mono<User> userMono = request.bodyToMono(User.class);
        return userMono
            .flatMap(userService::save)
            .flatMap(user -> ServerResponse
                .status(HttpStatus.CREATED)
                .bodyValue(user));
    }

    public Mono<ServerResponse> updateUser(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        Mono<User> userMono = request.bodyToMono(User.class);
        return userMono
            .flatMap(user -> userService.update(id, user))
            .flatMap(user -> ServerResponse.ok().bodyValue(user))
            .switchIfEmpty(ServerResponse.notFound().build());
    }

    public Mono<ServerResponse> deleteUser(ServerRequest request) {
        Long id = Long.valueOf(request.pathVariable("id"));
        return userService.deleteById(id)
            .then(ServerResponse.noContent().build());
    }
}

5.5 WebClient (Reactive HTTP Client)

WebClientExample.java

@Service
public class ExternalApiService {

    private final WebClient webClient;

    public ExternalApiService(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder
            .baseUrl("https://api.example.com")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
    }

    // GET request
    public Mono<User> getUser(Long id) {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .bodyToMono(User.class);
    }

    // GET with query parameters
    public Flux<User> searchUsers(String name) {
        return webClient.get()
            .uri(uriBuilder -> uriBuilder
                .path("/users")
                .queryParam("name", name)
                .build())
            .retrieve()
            .bodyToFlux(User.class);
    }

    // POST request
    public Mono<User> createUser(User user) {
        return webClient.post()
            .uri("/users")
            .bodyValue(user)
            .retrieve()
            .bodyToMono(User.class);
    }

    // Error handling
    public Mono<User> getUserWithErrorHandling(Long id) {
        return webClient.get()
            .uri("/users/{id}", id)
            .retrieve()
            .onStatus(HttpStatus::is4xxClientError, 
                response -> Mono.error(new NotFoundException("User not found")))
            .onStatus(HttpStatus::is5xxServerError,
                response -> Mono.error(new ServiceException("Service unavailable")))
            .bodyToMono(User.class);
    }
}

6. Reactive Data Access

6.1 Spring Data R2DBC

R2DBC (Reactive Relational Database Connectivity) provides reactive database access.

Dependencies

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-r2dbc</artifactId>
    </dependency>
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-postgresql</artifactId>
    </dependency>
</dependencies>

Configuration

application.yml


spring:
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/mydb
    username: user
    password: password

Entity

User.java

@Table("users")
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {

    @Id
    private Long id;

    private String username;

    private String email;

    private Integer age;

    @CreatedDate
    private LocalDateTime createdAt;

    @LastModifiedDate
    private LocalDateTime updatedAt;
}

Repository

UserRepository.java

public interface UserRepository extends ReactiveCrudRepository<User, Long> {

    // Derived query methods
    Flux<User> findByUsername(String username);

    Flux<User> findByAgeGreaterThan(Integer age);

    Mono<User> findByEmail(String email);

    // Custom query
    @Query("SELECT * FROM users WHERE age BETWEEN :minAge AND :maxAge")
    Flux<User> findByAgeBetween(Integer minAge, Integer maxAge);

    @Query("SELECT * FROM users WHERE username LIKE :pattern")
    Flux<User> searchByUsername(String pattern);
}

Service

UserService.java

@Service
public class UserService {

    @Autowired
    private UserRepository userRepository;

    public Mono<User> findById(Long id) {
        return userRepository.findById(id);
    }

    public Flux<User> findAll() {
        return userRepository.findAll();
    }

    public Mono<User> save(User user) {
        return userRepository.save(user);
    }

    public Mono<User> update(Long id, User user) {
        return userRepository.findById(id)
            .flatMap(existingUser -> {
                existingUser.setUsername(user.getUsername());
                existingUser.setEmail(user.getEmail());
                existingUser.setAge(user.getAge());
                return userRepository.save(existingUser);
            });
    }

    public Mono<Void> deleteById(Long id) {
        return userRepository.deleteById(id);
    }

    public Flux<User> findAdults() {
        return userRepository.findByAgeGreaterThan(18);
    }
}

6.2 Spring Data MongoDB Reactive

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

ProductRepository.java

public interface ProductRepository extends ReactiveMongoRepository<Product, String> {

    Flux<Product> findByCategory(String category);

    Flux<Product> findByPriceLessThan(Double price);

    @Query("{ 'name': { $regex: ?0, $options: 'i' } }")
    Flux<Product> searchByName(String name);
}

7. Testing Reactive Applications

7.1 StepVerifier

StepVerifier tests reactive streams step by step.

StepVerifierTests.java

@Test
public void testMono() {
    Mono<String> mono = Mono.just("Hello");

    StepVerifier.create(mono)
        .expectNext("Hello")
        .verifyComplete();
}

@Test
public void testFlux() {
    Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5);

    StepVerifier.create(flux)
        .expectNext(1)
        .expectNext(2)
        .expectNext(3, 4, 5)
        .verifyComplete();
}

@Test
public void testError() {
    Mono<String> mono = Mono.error(new RuntimeException("Error"));

    StepVerifier.create(mono)
        .expectError(RuntimeException.class)
        .verify();
}

@Test
public void testWithDelay() {
    Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(3);

    StepVerifier.create(flux)
        .expectNext(0L, 1L, 2L)
        .verifyComplete();
}

@Test
public void testConditional() {
    Flux<Integer> flux = Flux.range(1, 10);

    StepVerifier.create(flux)
        .expectNextMatches(i -> i == 1)
        .expectNextCount(8)
        .expectNext(10)
        .verifyComplete();
}

7.2 WebTestClient

WebTestClient tests WebFlux endpoints.

UserControllerTest.java

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureWebTestClient
public class UserControllerTest {

    @Autowired
    private WebTestClient webTestClient;

    @Test
    public void testGetUser() {
        webTestClient.get()
            .uri("/api/users/1")
            .exchange()
            .expectStatus().isOk()
            .expectHeader().contentType(MediaType.APPLICATION_JSON)
            .expectBody(User.class)
            .value(user -> {
                assertNotNull(user);
                assertEquals(1L, user.getId());
            });
    }

    @Test
    public void testGetAllUsers() {
        webTestClient.get()
            .uri("/api/users")
            .exchange()
            .expectStatus().isOk()
            .expectBodyList(User.class)
            .hasSize(5);
    }

    @Test
    public void testCreateUser() {
        User newUser = new User(null, "john", "john@example.com", 25, null, null);

        webTestClient.post()
            .uri("/api/users")
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(newUser)
            .exchange()
            .expectStatus().isCreated()
            .expectBody(User.class)
            .value(user -> {
                assertNotNull(user.getId());
                assertEquals("john", user.getUsername());
            });
    }

    @Test
    public void testUserNotFound() {
        webTestClient.get()
            .uri("/api/users/999")
            .exchange()
            .expectStatus().isNotFound();
    }
}

7.3 Mock Testing

UserServiceTest.java

@ExtendWith(MockitoExtension.class)
public class UserServiceTest {

    @Mock
    private UserRepository userRepository;

    @InjectMocks
    private UserService userService;

    @Test
    public void testFindById() {
        User user = new User(1L, "alice", "alice@example.com", 30, null, null);
        when(userRepository.findById(1L)).thenReturn(Mono.just(user));

        Mono<User> result = userService.findById(1L);

        StepVerifier.create(result)
            .expectNext(user)
            .verifyComplete();
    }

    @Test
    public void testFindAll() {
        List<User> users = Arrays.asList(
            new User(1L, "alice", "alice@example.com", 30, null, null),
            new User(2L, "bob", "bob@example.com", 25, null, null)
        );

        when(userRepository.findAll()).thenReturn(Flux.fromIterable(users));

        Flux<User> result = userService.findAll();

        StepVerifier.create(result)
            .expectNextCount(2)
            .verifyComplete();
    }
}

8. Error Handling

8.1 Global Error Handler

GlobalErrorHandler.java

@Component
@Order(-2)
public class GlobalErrorHandler implements ErrorWebExceptionHandler {

    private final ObjectMapper objectMapper;

    public GlobalErrorHandler(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Override
    public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
        ServerHttpResponse response = exchange.getResponse();
        response.getHeaders().setContentType(MediaType.APPLICATION_JSON);

        ErrorResponse errorResponse;

        if (ex instanceof NotFoundException) {
            response.setStatusCode(HttpStatus.NOT_FOUND);
            errorResponse = new ErrorResponse("NOT_FOUND", ex.getMessage());
        } else if (ex instanceof ValidationException) {
            response.setStatusCode(HttpStatus.BAD_REQUEST);
            errorResponse = new ErrorResponse("VALIDATION_ERROR", ex.getMessage());
        } else {
            response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
            errorResponse = new ErrorResponse("INTERNAL_ERROR", "An error occurred");
        }

        try {
            byte[] bytes = objectMapper.writeValueAsBytes(errorResponse);
            DataBuffer buffer = response.bufferFactory().wrap(bytes);
            return response.writeWith(Mono.just(buffer));
        } catch (Exception e) {
            return Mono.error(e);
        }
    }
}

@Data
@AllArgsConstructor
class ErrorResponse {
    private String code;
    private String message;
}

8.2 Controller Advice

RestControllerAdvice.java

@RestControllerAdvice
public class ReactiveExceptionHandler {

    @ExceptionHandler(NotFoundException.class)
    @ResponseStatus(HttpStatus.NOT_FOUND)
    public Mono<ErrorResponse> handleNotFound(NotFoundException ex) {
        return Mono.just(new ErrorResponse("NOT_FOUND", ex.getMessage()));
    }

    @ExceptionHandler(ValidationException.class)
    @ResponseStatus(HttpStatus.BAD_REQUEST)
    public Mono<ErrorResponse> handleValidation(ValidationException ex) {
        return Mono.just(new ErrorResponse("VALIDATION_ERROR", ex.getMessage()));
    }

    @ExceptionHandler(Exception.class)
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    public Mono<ErrorResponse> handleGeneric(Exception ex) {
        return Mono.just(new ErrorResponse("INTERNAL_ERROR", "An error occurred"));
    }
}

8.3 Service-Level Error Handling

ServiceErrorHandling.java

@Service
public class UserService {

    public Mono<User> getUserWithErrorHandling(Long id) {
        return userRepository.findById(id)
            .switchIfEmpty(Mono.error(new NotFoundException("User not found")))
            .onErrorResume(DatabaseException.class, ex -> {
                log.error("Database error: ", ex);
                return Mono.error(new ServiceException("Database unavailable"));
            })
            .retry(3)
            .timeout(Duration.ofSeconds(5))
            .onErrorReturn(new User()); // Fallback
    }
}

9. Best Practices

9.1 Don't Block!

Bad:

BlockingCode.java

public Mono<User> getUserBad(Long id) {
    User user = userRepository.findById(id).block(); // NEVER DO THIS!
    return Mono.just(user);
}

Good:

NonBlockingCode.java

public Mono<User> getUserGood(Long id) {
    return userRepository.findById(id);
}

9.2 Handle Blocking Code Properly

If you must call blocking code:

HandleBlockingCode.java

public Mono<String> callBlockingApi() {
    return Mono.fromCallable(() -> {
        // Blocking call
        return blockingHttpClient.get();
    })
    .subscribeOn(Schedulers.boundedElastic()); // Use elastic scheduler
}

9.3 Use Proper Operators

OperatorChoice.java

// Use flatMap for async operations
public Flux<OrderDetails> getOrderDetails(List<Long> orderIds) {
    return Flux.fromIterable(orderIds)
        .flatMap(id -> orderService.getOrder(id)); // Parallel execution
}

// Use concatMap for sequential processing
public Flux<OrderDetails> getOrderDetailsSequential(List<Long> orderIds) {
    return Flux.fromIterable(orderIds)
        .concatMap(id -> orderService.getOrder(id)); // Sequential execution
}

9.4 Resource Management

ResourceManagement.java

// Use try-with-resources equivalent
public Flux<String> readFile(String path) {
    return Flux.using(
        () -> Files.lines(Paths.get(path)), // Resource supplier
        Flux::fromStream,                    // Resource usage
        Stream::close                        // Resource cleanup
    );
}

9.5 Avoid Nested subscribe()

Bad:

NestedSubscribe.java

public void processBad() {
    userRepository.findById(1L)
        .subscribe(user -> {
            orderRepository.findByUserId(user.getId())
                .subscribe(order -> {
                    // Nested subscribe - BAD!
                });
        });
}

Good:

FlatMapChain.java

public Mono<Order> processGood() {
    return userRepository.findById(1L)
        .flatMap(user -> orderRepository.findByUserId(user.getId()));
}

9.6 Lazy Evaluation

LazyEvaluation.java

// Use defer for lazy evaluation
public Mono<User> getCurrentUser() {
    return Mono.defer(() -> {
        // This is evaluated at subscription time
        String username = SecurityContextHolder.getContext()
            .getAuthentication()
            .getName();
        return userRepository.findByUsername(username);
    });
}

9.7 Memory Management

MemoryManagement.java

// Use pagination for large datasets
public Flux<User> getAllUsersEfficiently() {
    return userRepository.findAll()
        .buffer(100) // Process in batches
        .flatMap(Flux::fromIterable);
}

// Limit flux size
public Flux<Event> streamEvents() {
    return eventRepository.findAll()
        .take(1000); // Limit to 1000 items
}

10. Common Pitfalls

10.1 Forgetting to Subscribe

Nothing happens without subscription:

NoSubscription.java

Mono<User> user = userRepository.findById(1L); // Nothing happens!

Subscribe or return:

WithSubscription.java

// In a controller - framework subscribes
public Mono<User> getUser(Long id) {
    return userRepository.findById(id);
}

// Manual subscription (rare)
userRepository.findById(1L).subscribe(
    user -> System.out.println(user),
    error -> System.err.println(error),
    () -> System.out.println("Complete")
);

10.2 Blocking in Reactive Chain

Bad:

BlockingInChain.java

return userRepository.findById(id)
    .map(user -> {
        String result = blockingHttpClient.call(); // BLOCKS!
        return result;
    });

Good:

NonBlockingChain.java

return userRepository.findById(id)
    .flatMap(user -> 
        Mono.fromCallable(() -> blockingHttpClient.call())
            .subscribeOn(Schedulers.boundedElastic())
    );

10.3 Improper Error Handling

Bad:

BadErrorHandling.java

try {
    return userRepository.findById(id);
} catch (Exception e) { // Won't catch reactive errors!
    return Mono.error(e);
}

Good:

GoodErrorHandling.java

return userRepository.findById(id)
    .onErrorResume(e -> Mono.error(new CustomException(e)));

10.4 Not Releasing Resources

Bad:

ResourceLeak.java

Connection conn = getConnection();
return Mono.just(conn.getData()); // Connection leaked!

Good:

ResourceCleanup.java

return Mono.using(
    this::getConnection,
    conn -> Mono.just(conn.getData()),
    Connection::close
);

10.5 Ignoring Backpressure

BackpressureHandling.java

// Handle fast producer, slow consumer
Flux.range(1, 1_000_000)
    .onBackpressureBuffer(1000)  // Buffer strategy
    // or
    .onBackpressureDrop()         // Drop strategy
    // or
    .onBackpressureLatest()       // Keep latest
    .subscribe();

Summary

Key Takeaways:

  1. Reactive = Non-blocking + Asynchronous + Backpressure

  2. Mono for 0-1 items, Flux for 0-N items

  3. Never block in reactive chains

  4. Use proper operators: flatMap, map, filter, zip, etc.

  5. Handle errors with reactive operators

  6. Test thoroughly with StepVerifier and WebTestClient

  7. Use appropriate schedulers for different workloads

  8. Subscribe to make things happen

  9. Resource management is critical

  10. Think in streams and transformations

When Reactive Shines:

  • High concurrency (thousands of connections)

  • I/O-bound operations

  • Streaming data

  • Microservices communication

  • Real-time applications

When to Stick with Traditional:

  • Simple CRUD apps

  • Low traffic applications

  • CPU-intensive operations

  • Team unfamiliar with reactive


This comprehensive guide covers the essentials of Reactive Programming with Spring Boot. Practice building applications using these concepts to gain mastery!