The 5 basic topics you should know about project reactor

  • Erwin Manders
  • 01-02-2022
  • 6 min

Several years ago, I started my journey in reactive programming by building my first spring WebFlux application in Java. My team wanted to build our first cloud application without the hassle of recursive tasks or futures, but with the benefits of asynchronous computing. For example, wrapping every external service call into an asynchronous task to reduce waiting time for all data calls to be completed. We decided to give reactive programming a try by building a WebFlux application.

We made rapid progress in the beginning, since this application didn’t have very specific business logic. But quite soon we started running into very simple issues with how we had coded the application, mostly related to not accounting for the use of blocking code, not mapping errors or even constructing publishers off of null values. It was the moment when we started fully exploring the Project Reactor API where we learned how to deal with everyday coding issues.

But let’s pause here for a second to reflect on what reactive programming actually is. If you’ve never used client-side languages such as JavaScript, or used frameworks like RxJava, it might not be easy to get started. In imperative programming, the result of an equation is only derived once. In reactive programming, equations are defined as data streams that react upon changes within their dependencies.

Project Reactor uses the 4 basic blocks of the publish/subscribe pattern:

  • Publisher: The component that has a piece of data to publish to any subscribers
  • Subscriber: The component that subscribes for some kind of data from one or more publishers
  • Subscription: A contract between a publisher and a subscriber to exchange data
  • Processor: The component to process the data, represented as both a publisher and a subscriber to exchange data
Figure: In this diagram there are 3 publishers. P2 is also a subscriber to P1 and P3 is a subscriber to P2. There is one subscription active on P1 and P2. In this case, P1 calls an operator and publishes a value to P2. P2 will process the value and call its own operator. If that operator does not return a value or throws an error, P2 could publish an error or remain empty, and so will P3.

Before we continue, we must understand that a publisher can always publish data regardless of any active subscribers, but without any active subscriptions, nothing will happen. This does not mean that the publisher is aware of the subscribers. In fact, the subscriber might have requested data but it is not waiting for a reply. We call this non-blocking. The publisher will publish the data as soon as it’s ready, and the subscriber can act upon it.

  1. Creating publishers

In Project Reactor, two type of publishers exist. The Mono, which will publish either a single value or none, and the Flux, which can publish more than one value, until the subscription ends. To work with these classes, we have to understand how we initialize publishers. Since it is not possible for a publisher to emit null values, we should make sure to use justOrEmpty for any objects that are nullable, or directly from Optional. The following test code demonstrates the creating of a publisher and verifies it using Step Verifier from the reactor test library, which subscribe to the data stream for us and validate what has been published.

	final var nonNullValue = Mono.justOrEmpty(VALUE);
    	StepVerifier.create(optionalWithValue)
    	        .expectNext(VALUE)
    	        .verifyComplete();

	final var optional = Optional.of(VALUE);
	final var optionalWithValue = Mono.justOrEmpty(optional);
	StepVerifier.create(optionalWithValue)
.expectNext(VALUE)
	 	.verifyComplete();

This will safeguard us from null pointer exceptions in our publisher. If the optional is empty, the publisher will be empty:

		final var emptyOptional = Mono.justOrEmpty(Optional.empty());
    	StepVerifier.create(emptyOptional)
    	       .verifyComplete();

The quickest way to create Flux publishers is to convert a collection into a Flux:

		final var list = List.of(VALUE, VALUE, VALUE);
    	final var fluxFromList = Flux.fromIterable(list);
    	StepVerifier.create(fluxFromList)
    	        .expectNext(VALUE)
    	        .expectNext(VALUE)
    	        .expectNext(VALUE)
    	        .verifyComplete();

    	final var stream = Stream.of(VALUE, VALUE, VALUE);
    	final var fluxFromStream = Flux.fromStream(stream);
    	StepVerifier.create(fluxFromStream)
    	        .expectNext(VALUE)
    	        .expectNext(VALUE)
    	        .expectNext(VALUE)
    	        .verifyComplete();        

Since real objects are never this simple, we can combine these techniques with maps, flatmaps and flatmapmany to create a single Mono or Flux:

		Mono.justOrEmpty(flatMapIterableObject.getIterableObject())
      	      .flatMapMany(iterableObject -> Flux.fromIterable(iterableObject.getValueObjects()))
      	      .map(FlatMapIterableObject.ValueObject::getValue); 	

The justOrEmpty will convert the nullable getIterableObject into a Mono, which is mapped by flatMapMany into a flat Flux from the List getValueObjects. The map operation changes the existing Flux by calling the methodreference function getValue() on all objects in that Flux

  1. Converting publishers

Once we have created a publisher we need to know how to convert between publishers of different types or how to combine multiple publishers to compute to a single data stream. Starting off, a Fluxcan simply be converted to a Mono as long as we know which data value we are interested in by using first, next or last. To convert a Mono into a Flux that repeats that value we can use repeat. Be aware that without specifying the number of times to repeat, the Flux will repeat indefinitely:

	final var flux = Flux.range(1, 3);
    	final var next = flux.next();
    	final var last = flux.last(2);
    	StepVerifier.create(next)
        	    .expectNext(1)
            	.verifyComplete();
    	StepVerifier.create(last)
            	.expectNext(3)
            	.verifyComplete();

        final var flux = Mono.just(1).repeat(3);

        StepVerifier.create(flux)
                .expectNextCount(4)
                .verifyComplete();            	

The next step is to be able to combine different instances of Mono and Flux. We can zip two instances of the same type by using zipWith, or a Mono with a Flux by using repeat, first, next or last. Be aware that if the Mono is not repeated, a Flux is created that will publish only once:

        final var flux = Flux.just(1, 2, 3, 4);
        final var mono = Mono.just(1);

        final var zippedFlux = flux.zipWith(mono);

        StepVerifier.create(zippedFlux)
                .expectNextCount(1)
                .verifyComplete();

        final var zippedFluxRepeatMono = flux.zipWith(mono.repeat());

        StepVerifier.create(zippedFluxRepeatMono)
                .expectNextCount(4)
                .verifyComplete();

Zipping two publishers into one without combinator function will generate a new publisher of tuple objects that hold the left and right value. Specifying a combinator will call this function and create a new publisher with only the result of that combinator:

        final var flux = Flux.just(1, 2);
        final var flux2 = Flux.just(3, 4, 5);

        final var zippedFlux = flux.zipWith(flux2, (a,b) -> a + b);

        StepVerifier.create(zippedFlux)
                .expectNext(4)
                .expectNext(6)
                .verifyComplete();

Zipping more than two publishers into one can be achieved by the static zip method. The same rules apply, if we zip a Mono with a Flux, only one value will be published unless we repeat. If we zip multiple Flux, the resulting Flux will end when one of the two original Flux ends:

        final var flux1 = Flux.just(1, 2, 3);
        final var flux2 = Flux.just(4, 5);
        final var mono = Mono.just("1");

        StepVerifier.create(Flux.zip(flux1, flux2, mono))
                .assertNext(tuple -> {
                    assertThat(tuple.getT1()).isEqualTo(1);
                    assertThat(tuple.getT2()).isEqualTo(4);
                    assertThat(tuple.getT3()).isEqualTo("1");
                })
                .verifyComplete();

        StepVerifier.create(Flux.zip(flux1, flux2, mono.repeat()))
                .expectNextCount(2)
                .verifyComplete();

The second zipped publisher ends after two values instead of one because the Mono is repeated until the subscription ends.

  1. Switching from or to different publishers

Let’s face it, your application does not always work as expected. Since we are working with publish/subscribe pattern, one of the behaviors might be that the subscription is ended before any value is published at all. This happens frequently, so it is important to think about what should happen if your publisher returns empty and handle it appropriately.

Most basically, we can give any publisher a switchIfEmpty to describe what should happen when that publisher is empty:

final var fluxWithSwitch = Flux.empty().switchIfEmpty(Mono.just(1));

        StepVerifier.create(fluxWithSwitch)
                .expectNext(1)
                .verifyComplete();

As you can see, we can switch to a Mono if a Flux is empty, but from a Mono we can only switch to another Mono. We can also return a default value instead of another publisher with defaultIfEmpty. Besides switching on empty, we can also switch on first signal. We can return a completely different Flux depending on its state:

        final var switchOnFirstFlux = Flux.just(1, 2, 3, 4, 5)
                .switchOnFirst((signal, flux) -> signal.get() == 2 ? flux : switchFlux(flux));

        StepVerifier.create(switchOnFirstFlux)
                .expectNext(2)
                .expectNext(4)
                .expectNext(6)
                .expectNext(8)
                .verifyComplete();

The Flux has changed because the first Signal has a value of 2. We call the switchFlux method which applies a map operation to the data stream:

    private Flux<Integer> switchFlux(final Flux<Integer> flux)
    {
        return flux.map(i -> i * 2).filter(i -> i < 10);
    }

All publishers are constructed as early as possible, which means that the switch statements above will construct their alternative publisher ahead of time. Sometimes that behavior is unwanted, and we have to specify that the switch publisher should not be constructed ahead of time:

        final var mockService = mock(MockService.class);
        final var flux = Flux.just(1, 2, 3, 4);
        final var switchWithDefer = flux.switchIfEmpty(defer(() -> mockService.getValues()));
        StepVerifier.create(switchWithDefer)
                .expectNextCount(4)
                .verifyComplete();
        verifyNoInteractions(mockService);

Running this test, since the Flux is not empty and will cause the supplier not to be called and thus no interaction will have occurred on the mockService object.

If the behavior of returning another publisher or default value is unwanted because the empty publisher was supposed to return critical data, we can consider returning a Mono.error or Flux.error with an exception and then act accordingly on the error publisher. This works exactly the same as switching to a Mono or Flux:

		final var errorOnEmptyMono = Mono.empty().switchIfEmpty(Mono.error(new IllegalStateException("Publisher returned empty")));
        StepVerifier.create(errorOnEmptyMono)
                .verifyError(IllegalStateException.class);
  1. Handling errors in publishers

Exceptions will occur in your reactor just like any other application. You can throw your own exceptions or create error publishers with Mono.error or Flux.error. Publishers can return exceptions thrown by external code that you do not control. Checked exceptions in error publishers do not require that exception to be handled, so make sure to be aware what exceptions can be thrown by your code when handling them. Just like normal, make sure to handle exceptions as specific as possible, and make sure that as many of your publishers handle their own exceptions, to prevent losing trace of where the exception occurred.

There are a few basic practices we can apply to handle or map exceptions depending on what your publisher needs.

To return a default value when an error is published, use onErrorReturn:

        final var onErrorReturnFlux = operationWithError().onErrorReturn(1);
        StepVerifier.create(onErrorReturnFlux)
                .expectNext(1)
                .verifyComplete();

The operationWithError() method is a publisher that instead returns Flux.error:

    private Flux<Integer> operationWithError()
    {
        return Flux.error(new PublisherException("Error"));
    }

If the return value should be another publisher, use onErrorResume instead. If catching the exception is unwanted, but instead you just want to map the exception, and perhaps perform some logging, use onErrorMap, this is especially useful for mapping exceptions in external code to exceptions that your application understands:

        final var onErrorMapFlux = operationWithError().onErrorMap(throwable -> new IllegalStateException());
        StepVerifier.create(onErrorMapFlux)
                .verifyError(IllegalStateException.class);

At last, if we just want to perform an action on an error, perform no mapping and just rethrow that same exception, we can use onErrorContinue. For example, we can do this to add exception logging to any publisher:

        final var onErrorContinueFlux = operationWithError().onErrorContinue((throwable, o) -> log.error(format("Object: %s, Exception: %s", o, throwable)));
        StepVerifier.create(onErrorContinueFlux)
                .verifyError(PublisherException.class);

All of these methods support supplying an exception class or a predicate function to handle the exception only for certain classes or for specific cases. For example, if I just want to handle NullPointerExceptions or some specific message and map them to something else:

        final var onErrorMapForClass = operationWithError().onErrorMap(NullPointerException.class, throwable -> new IllegalStateException());
        StepVerifier.create(onErrorMapForClass)
                .verifyError(PublisherException.class);

        final var onErrorMapPredicate = operationWithError().onErrorMap(throwable -> throwable.getMessage().contains("Error"), throwable -> new IllegalStateException());
        StepVerifier.create(onErrorMapPredicate)
                .verifyError(IllegalStateException.class);

We can of course also do this with onErrorReturn and onErrorResume if a certain exception can be recovered from, or with onErrorContinue if the action is only to be performed for a specific exception.

  1. Caching in publishers

This topic might seem irrelevant to some, but caching has become an increasingly relevant topic in cloud computing. Caching is used to reduce the number of threads in use and the amount of external service calls happening to fetch data from another service or database. When two subscribers are active on the same data stream, they will each request their data at a different point in time, which means the data stream is constructed twice. To prevent the data stream to be created twice, we can cache the publisher.

To cache a publisher for re-use amongst other subscribers, just use cache at the end of the data stream:

        final var count = new AtomicInteger();

        final var cachedMono = Mono.just(1).doOnNext(i -> count.incrementAndGet()).cache();

        StepVerifier.create(cachedMono)
                .expectNextCount(1)
                .verifyComplete();

        StepVerifier.create(cachedMono)
                .expectNextCount(1)
                .verifyComplete();

        StepVerifier.create(cachedMono)
                .expectNextCount(1)
                .verifyComplete();

        assertThat(count.get()).isEqualTo(1);

This will re-use the published value for all subscribers. The incrementAndGet is only called once because the entire data stream is cached. If instead we want to cache the data in an external cache, we can use the reactor addons to work with Spring Cache within our reactor:

        final var lookUpAndCacheMono = CacheMono
                .lookup(key -> Mono.justOrEmpty(cache.get(key, CacheValue.class)).map(Signal::next), cacheKey)
                .onCacheMissResume(() -> Mono.just(new CacheValue("value")).doOnNext(value -> count.incrementAndGet()))
                .andWriteWith((key, signal) -> fromRunnable(
                        () -> ofNullable(signal.get())
                                .ifPresent(value -> cache.put(key, value))));

        StepVerifier.create(lookUpAndCacheMono)
                .expectNextCount(1)
                .verifyComplete();

        StepVerifier.create(lookUpAndCacheMono)
                .expectNextCount(1)
                .verifyComplete();

        StepVerifier.create(lookUpAndCacheMono)
                .expectNextCount(1)
                .verifyComplete();

        assertThat(count.get()).isEqualTo(1);

This will lookup the data from a cache, fetch the data using the given function if the cache is empty, and write the data in cache after the function is completed. The incrementAndGet is also called only once in this case.

Both practices have advantages. If a publisher has various subscribers within one class, we can easily prevent it from being constructed more than once by adding Mono.cache or Flux.cache. If a publisher has one subscriber but this API is often called by other components or other instances of the service, we can use CacheMono to simply cache it externally.

Additional resources

To view the code base or to try out the reactor code for yourself please visit my github: https://github.com/ErwinManders/ProjectReactorBlog

Interested in trying out reactor and spring WebFlux or to learn more about it? Check the project reactor and spring websites for guides and extensive API reference:

About the author

Erwin Manders
Senior Java Software Engineer

Erwin has been working for Rabobank for 3 years. For years Erwin has worked on various cloud technologies and likes to dedicate his time to improving himself and the people around him. Erwin is a real 'tech guy' with knowledge of various backend and frontend technologies and is enthusiastic about software architecture as well. In his spare time, he likes to go for a run and participates in various semi marathons and train runs.

Related articles

Why every Java Developer should attend J-Fall

  • Ko Turk
  • 25 January 2022
  • 4 min

VIDEO: Become a better developer with a hacker's mindset

  • Sebastien Hoekstra
  • 10 September 2021
  • 35 min.

These 6 solutions are making our lives as easy as we can

  • Ragna Gerretsen
  • 16 September 2021
  • 2 min.