Create a Flux in Java Reactor

Flux is a Publisher that can emit 0…n items. Let’s explore different ways how to create a Flux in Java Reactor.

How to create a Flux?

We can create a Flux using the following methods:

  • Flux.just()
  • Flux.fromIterable()
  • Flux.fromStream()
  • Flux.range()


Create a Flux using the Flux.just() method

The simplest way to create a Flux is with the just() method. We can pass 0…n arguments, like in the following example:

import reactor.core.publisher.Flux;

class ReactiveJavaTutorial {

  public static void main(String[] args) throws InterruptedException {

    // create an empty Flux
    Flux flux1 = Flux.just();

    // create a Flux that will hold only one value
    Flux<String> flux2 = Flux.just("data");

    // create a Flux that will hold multiple values
    Flux<String> flux3 = Flux.just("data1", "data2", "data3");

  }
}


If we want to create an empty Flux, we can use the Flux.empty() method.


Using the Flux.fromIterable() method

The fromIterable() method creates a Flux that emits the items contained in the provided Iterable.
That means that we can pass an ArrayList or any other collection that is iterable as an argument to the method.

Example

import reactor.core.publisher.Flux;

class ReactiveJavaTutorial {

  public static void main(String[] args) throws InterruptedException {

    List<String> cities = new ArrayList<>(Arrays.asList("London", "Paris", "Rome", "Amsterdam"));

    Flux<String> flux = Flux.fromIterable(cities);

  }
}


Create a Flux from Java 8 Stream

To create a Flux from Java 8 Stream, we can use the fromStream() method. 

Example

import reactor.core.publisher.Flux;

class ReactiveJavaTutorial {

  public static void main(String[] args) throws InterruptedException {

    List<Integer> ints = new ArrayList<>(Arrays.asList(17, 10, 19, 22, 4));

    // Create a Stream
    Stream<Integer> intStream = ints.stream();

    // Create a Flux from Stream
    Flux<Iinteger> fluxFromStream = Flux.fromStream(intStream);

  }
}


Using the Flux.range() method

The range(int start, int count) method builds a Flux that will only emit a sequence of count incrementing integers, starting from start.

Example

import reactor.core.publisher.Flux;

class ReactiveJavaTutorial {

  public static void main(String[] args) throws InterruptedException {

    // create a Flux that will contains values 1...15
    Flux<Integer> flux1 = Flux.range(1, 15);

    // create a Flux that will contains values 5...29
    Flux<Integer> flux2 = Flux.range(5, 25);
  }
}


That’s it!

Leave a Reply

Your email address will not be published. Required fields are marked *