Handling Exceptions in Project Reactor

When working with the Reactive Streams, if an error occurs while the Publisher is sending data, the entire flow will be interrupted, and the onError signal will be sent to the Consumer. No other signals will be sent after the onError

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just(2, 7, 10)
            .concatWith(Flux.error(new RuntimeException("Exception occurred")))
            .concatWith(Mono.just(12))
            .log()
            .subscribe();

  }
}
Output: reactor.Flux.ConcatArray.1 : onSubscribe(FluxConcatArray.ConcatArraySubscriber) reactor.Flux.ConcatArray.1 : request(unbounded) reactor.Flux.ConcatArray.1 : onNext(2) reactor.Flux.ConcatArray.1 : onNext(7) reactor.Flux.ConcatArray.1 : onNext(3) reactor.Flux.ConcatArray.1 : onError(java.lang.RuntimeException: Exception occurred) reactor.Flux.ConcatArray.1 : java.lang.RuntimeException: Exception occurred at com.example.demo.DemoApplication.main(DemoApplication.java:14) ~[classes/:na]
 
You can see that element 12 is not emitted. An error occurred, and the whole flow stopped. 

How to handle exceptions in Project Reactor?

In the Project Reactor, we can handle exceptions using some of the following operators:

  • onErrorReturn()
  • onErrorResume()
  • onErrorContinue()
  • onErrorMap()
  • doOnError()

The exception handling operators in Project Reactor are defined in both Mono and Flux classes.

Handling exceptions in Project Reactor with the onErrorReturn() operator

The onErrorReturn() will emit the specified callback value when the error occurs. In this way, the code will recover from the exception.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just(2, 7, 10)
            .concatWith(Flux.error(new RuntimeException("Exception occurred")))
            .concatWith(Mono.just(12))
            .onErrorReturn(72)
            .log()
            .subscribe();

  }
}
Output: reactor.Flux.OnErrorResume.1 : onSubscribe(FluxOnErrorResume.ResumeSubscriber) reactor.Flux.OnErrorResume.1 : request(unbounded) reactor.Flux.OnErrorResume.1 : onNext(2) reactor.Flux.OnErrorResume.1 : onNext(7) reactor.Flux.OnErrorResume.1 : onNext(10) reactor.Flux.OnErrorResume.1 : onNext(72) reactor.Flux.OnErrorResume.1 : onComplete()
 
The above example shows that the code completely recovered and continued like the error never happened. There is noError signal, as you can see.

Handling exceptions in Project Reactor with the onErrorResume() operator

The onErrorResume() method accepts the Function interface and produces a result, which is in this case a Mono.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just(2, 7, 10)
            .concatWith(Flux.error(new RuntimeException("Exception occurred")))
            .concatWith(Mono.just(12))
            .onErrorResume(err -> {
              System.out.println("Error caught: " + err);
              return Mono.just(12);
            })
            .log()
            .subscribe();

  }
}
Output: reactor.Flux.OnErrorResume.1 : onSubscribe(FluxOnErrorResume.ResumeSubscriber) reactor.Flux.OnErrorResume.1 : request(unbounded) reactor.Flux.OnErrorResume.1 : onNext(2) reactor.Flux.OnErrorResume.1 : onNext(7) reactor.Flux.OnErrorResume.1 : onNext(10) Error caught: java.lang.RuntimeException: Exception occurred reactor.Flux.OnErrorResume.1 : onNext(12) reactor.Flux.OnErrorResume.1 : onComplete()

Handle exceptions with the onErrorContinue() operator

The onErrorContinue() catches the exception, the element that caused the exception will be dropped, and the Publisher will continue emitting the remaining elements.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just(2, 7, 10, 8, 12, 22, 24)
            .map(element -> {
                if (element == 8) {
                  throw new RuntimeException("Exception occurred!");
                }
                return element;
            }).onErrorContinue((ex, element) -> {
                System.out.println("Exception caught: " + ex);
                System.out.println("The element that caused the exception is: " + element);
            }).log()
            .subscribe();

  }
}
Output: reactor.Flux.ContextWrite.1 : | onSubscribe([Fuseable] FluxContextWrite.ContextWriteSubscriber) reactor.Flux.ContextWrite.1 : | request(unbounded) reactor.Flux.ContextWrite.1 : | onNext(2) reactor.Flux.ContextWrite.1 : | onNext(7) reactor.Flux.ContextWrite.1 : | onNext(10) Exception caught: java.lang.RuntimeException: Exception occurred! The element that caused the exception is: 8 reactor.Flux.ContextWrite.1 : | onNext(12) reactor.Flux.ContextWrite.1 : | onNext(22) reactor.Flux.ContextWrite.1 : | onNext(24) reactor.Flux.ContextWrite.1 : | onComplete()

Handle exceptions with the onErrorMap() operator

With the onErrorMap(), the code can’t recover from the exception. This method only catches the exception and transforms it from one type to another.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just(2, 7, 10, 8, 12, 22, 24)
            .map(element -> {
                if (element == 8) {
                  throw new RuntimeException("Exception occurred!");
                }
                return element;
            }).onErrorMap(ex -> {
                System.out.println("Exception caught: " + ex);
                return new CustomException(ex.getMessage(), ex);
            }).log()
            .subscribe();

  }
}

class CustomException extends Exception {

  public CustomException(String message, Throwable exception) {
    super(message, exception);
  }
}
Output: reactor.Flux.OnErrorResume.1 : onSubscribe(FluxOnErrorResume.ResumeSubscriber) reactor.Flux.OnErrorResume.1 : request(unbounded) reactor.Flux.OnErrorResume.1 : onNext(2) reactor.Flux.OnErrorResume.1 : onNext(7) reactor.Flux.OnErrorResume.1 : onNext(10) Exception caught: java.lang.RuntimeException: Exception occurred! reactor.Flux.OnErrorResume.1 : onError(com.example.demo.CustomException: Exception occurred!) reactor.Flux.OnErrorResume.1 :

Handle exceptions with the doOnError() operator

This operator is one of the doOn Callbacks in Project Reactor. It doesn’t change the original sequence. With this operator, we can catch the exception and perform some action when the onError signal gets sent from the Publisher, but the code is not able to recover, and the error still gets propagated to the caller.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) {

    Flux.just(1, 2, 3)
            .concatWith(Flux.error(new RuntimeException("Exception occurred.")))
            .doOnError(ex -> System.out.println("Exception caught: " + ex)) // catch and print the exception
            .log()
            .subscribe();

  }
}
Output: reactor.Flux.Peek.1 : onSubscribe(FluxPeek.PeekSubscriber) reactor.Flux.Peek.1 : request(unbounded) reactor.Flux.Peek.1 : onNext(1) reactor.Flux.Peek.1 : onNext(2) reactor.Flux.Peek.1 : onNext(3) Exception caught: java.lang.RuntimeException: Exception occurred. reactor.Flux.Peek.1 : onError(java.lang.RuntimeException: Exception occurred.) reactor.Flux.Peek.1 : java.lang.RuntimeException: Exception occurred. at com.example.demo.ReactiveJavaTutorial.main(ReactiveJavaTutorial.java:13)
 
That was all about Exception Handling in Project Reactor
 
Happy coding!

Leave a Reply

Your email address will not be published. Required fields are marked *