Here is how the documentation describes it succinctly:
A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately.
To illustrate this concept, the following code shows two threads simulating work (by sleeping) and only proceeding once both have reached the barrier:
The code spawns two threads and passes the
CountDownLatch to each of them. Both threads start at (approximately) the same time, but one of them sleeps much longer than the other. Once the sleep is over they count down the latch, and once it reaches zero both threads can make progress again.
If you execute this snippet, you’ll get an output like this (it shows the time as well to make clear that indeed 5 seconds have passed):
2019-03-28T15:19:02.080658: Thread 2 starting work... 2019-03-28T15:19:02.080665: Thread 1 starting work... 2019-03-28T15:19:07.092761: Both finished and moving on. 2019-03-28T15:19:07.092827: Both finished and moving on.
This is pretty neat, but it has one big problem when we look at it from a reactive angle: it blocks the threads until they can make progress - and blocking is the one thing you want to avoid at all cost when going reactive.
So, how can we coordinate two or more execution pipelines to be waiting on a barrier without blocking the underlying threading infrastructure?
One possibility is to create our own barrier around the MonoProcessor that ships with Project Reactor. The
Processor in reactive streams terminology acts both as a
Publisher and a
Subscriber - so we can store state in it while automatically following the reactive streams contract to notify our subscribers if needed.
So, here is the
It works similar to the
CountDownLatch in one aspect: it gets created with the amount of parties that need to be ready and waiting before all of them can proceed. Internally, we track all the ready parties with a thread-safe
await() is called, we increment our ready counter and if we are at our needed threshold we complete the
If you are confused about the nested
Mono.defer(): in the next code snippet you’ll see how the
await() method is used. If the code is not turned into a “cold” mono by deferring it, the code will be executed at runtime time and basically immediately proceed without waiting for the actual delay event.
The following code snippet is modeled after our very first example, but using two reactive sequences instead of spawning two separate threads.
Instead of sleeping, our
Mono instances use the
delay operator to simulate work. Once the delay is over and the event is emitted, they will subscribe on the barrier and increment the internal counter. Once both reach this point the
Mono will complete and they can both make progress.
In the console you’ll again see that both are only proceeding once the barrier is lifted:
2019-03-28T15:29:02.64789: Starting both sequences... 2019-03-28T15:29:08.158664: All finished and moving on. 2019-03-28T15:29:08.15904: All finished and moving on.
One limitation of the approach outlined above is that we are effectively turning our chain into a
Mono<Void>, loosing any value we might have carried around. If you only need a signal this might be okay, but in practice we often need to do something with those values after the barrier. To fix this, we can utilize the
delayUntil operator in reactor and modify our barrier slightly. The
delayUntil operator takes a function that waits for a
Processor to complete as a signal.
We only need to make minor adjustments and implement the function interface the operator expects:
We only had to change the return type and the name of the method to implement the interface.
The following snippet shows the operator in action - note how similar it is to our previous approaches.
So there you have it! With a couple lines of code you can implement a reactive
Barrier which works similar to its blocking
If you are using similar or other approaches let me know in the comments!