Subscribe to Flux in Java Reactor

To get the data from the Publisher (Flux), we need to subscribe to it. In this post, you will learn to subscribe to a Flux in Java Reactor.

How to Subscribe to Flux in Java?

When we subscribe to a Publisher (Flux), it starts emitting signals like:

  • onNext 
    When the Publisher receives a request from the Consumer, it starts emitting data in the form of a stream of events by invoking the onNext() method of the Subscriber interface.
  • onError
    In case of an error, the exception will be sent in the form of an event to the Subscriber.
    Using the onError() method.
  • onComplete
    When the Publisher finishes with sending data, it will notify the Consumer via the onComplete() method.

Take a look at Introduction to Reactive Streams for more on Stream events.

To make the data flow, you have to subscribe to Flux using one of the subscribe() methods.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) throws InterruptedException {

    // Create a Flux
    Flux<String> flux = Flux.just("data1", "data2", "data3");

    // Subscribe to a Flux and print the elements
    flux.subscribe(System.out::println);

  }
}
Output: data1 data2 data3
 
When we call the subscribe(), we are telling the Publisher to start sending data.

We can also create functions that will be executed for each of the signals that Publisher sends when we subscribe.

Example

class ReactiveJavaTutorial {

  public static void main(String[] args) throws InterruptedException {

    Flux<String> flux = Flux.just("data1", "data2", "data3");

    flux.subscribe(
            data -> onNext(data), // onNext
            err -> onError(err),  // onError
            () -> onComplete() // onComplete
    );

  }

  private static <T> void onNext(T data) {
    System.out.println("onNext: Data received: " + data);
  }

  private static <T> void onError(Throwable err) {
    System.out.println("onError: Exception occurred: " + err.getMessage());
  }

  private static <T> void onComplete() {
    System.out.println("onComplete: Completed!");
  }

}
Output: onNext: Data received: data1 onNext: Data received: data2 onNext: Data received: data3 onComplete: Completed!
 
If an error occurs that prevents the Publisher from publishing the data, the onError signal will be emitted containing the exception object.
 
That’s it!

Leave a Reply

Your email address will not be published.