Reactive Jersey Client, Part 1 – Motivation

Reactive Jersey Client API is a generic API allowing end users to utilize the popular reactive programming model when using Jersey Client. This part of the series describes motivation behind creating Reactive Jersey Client.

The whole series consists of three articles:

The Problem

Imagine a travel agency whose information system consists of multiple basic services. These services might be built using different technologies (JMS, EJB, WS, …). For simplicity we presume that the services can be consumed using REST interface via HTTP method calls (e.g. using a JAX-RS Client). We can also presume that the basic services we need to work with are:

  • Customers service – provides information about customers of the travel agency.
  • Destinations service – provides a list of visited and recommended destinations for an authenticated customer.
  • Weather service – provides weather forecast for a given destination.
  • Quoting service – provides price calculation for a customer to travel to a recommended destination.

The task is to create a publicly available feature that would, for an authenticated user, display a list of 10 last visited places and also display a list of 10 new recommended destinations including weather forecast and price calculations for the user. Notice that some of the requests (to retrieve data) depend on results of previous requests. E.g. getting recommended destinations depends on obtaining information about the authenticated user first. Obtaining weather forecast depends on destination information, etc. This relationship between some of the requests is an important part of the problem and an area where you can take a real advantage of the reactive programming model.

One way how to obtain data is to make multiple HTTP method calls from the client (e.g. mobile device) to all services involved and combine the retrieved data on the client. However, since the basic services are available in the internal network only we’d rather create a public orchestration layer instead of exposing all internal services to the outside world. The orchestration layer would expose only the desired operations of the basic services to the public. To limit traffic and achieve lower latency we’d like to return all the necessary information to the client in a single response.

The orchestration layer is illustrated here:

Travel Agency Orchestration Service

The layer accepts requests from the outside and is responsible of invoking multiple requests to the internal services. When responses from the internal services are available in the orchestration layer they’re combined into a single response that is sent back to the client.

Now, let’s take a look at various ways how the orchestration layer can be implemented.

A Naïve Approach

The simplest way to implement the orchestration layer is to use synchronous approach. For this purpose we can use JAX-RS Client Sync API. The implementation is simple to do, easy to read and straightforward to debug.

final WebTarget destination = ...;
final WebTarget forecast = ...;
 
// Obtain recommended destinations.
List<Destination> recommended = Collections.emptyList();
try {
    recommended = destination.path("recommended").request()
            // Identify the user.
            .header("Rx-User", "Sync")
            // Return a list of destinations.
            .get(new GenericType<List<Destination>>() {});
} catch (final Throwable throwable) {
    errors.offer("Recommended: " + throwable.getMessage());
}
 
// Forecasts. (depend on recommended destinations)
final Map<String, Forecast> forecasts = new HashMap<>();
for (final Destination dest : recommended) {
    try {
        forecasts.put(dest.getDestination(),
                forecast.resolveTemplate("destination", dest.getDestination())
                        .request()
                        .get(Forecast.class));
    } catch (final Throwable throwable) {
        errors.offer("Forecast: " + throwable.getMessage());
    }
}

The whole implementation can be found in Jersey workspace, e.g. on GitHub – SyncAgentResource class in rx-client-java8-webapp.

The downside of this approach is it’s slowness. You need to sequentially process all the independent requests which means that you’re wasting resources. You are needlessly blocking threads, that could be otherwise used for some real work.

If you take a closer look at the example you can notice that at the moment when all the recommended destinations are available for further processing we try to obtain forecasts for these destinations. Obtaining a weather forecast can be done only for a single destination with a single request, so we need to make 10 requests to the Forecast service to get all the destinations covered. In a synchronous way this means getting the forecasts one-by-one. When one response with a forecast arrives we can send another request to obtain another one. This takes time. The whole process of constructing a response for the client can be seen in the following picture.

Time consumed to create a response for the client – synchronous way

Let’s try to quantify this with assigning an approximate time to every request we make to the internal services. This way we can easily compute the time needed to complete a response for the client. For example, obtaining

  • Customer details takes 150 ms
  • Recommended destinations takes 250 ms
  • Price calculation for a customer and destination takes 170 ms (each)
  • Weather forecast for a destination takes 330 ms (each)

When summed up, 5400 ms is approximately needed to construct a response for the client.

Synchronous approach is better to use for lower number of requests (where the accumulated time doesn’t matter that much) or for a single request that depends on the result of previous operations.

Optimized Approach

The amount of time needed by the synchronous approach can be lowered by invoking independent requests in parallel. We’re going to use JAX-RS Client Async API to illustrate this approach. The implementation in this case is slightly more difficult to get right because of the nested callbacks and the need to wait at some points for the moment when all partial responses are ready to be processed. The implementation is also a little bit harder to debug and maintain. The nested calls are causing a lot of complexity here.

final WebTarget destination = ...;
final WebTarget forecast = ...;
 
// Obtain recommended destinations. (does not depend on visited ones)
destination.path("recommended").request()
    // Identify the user.
    .header("Rx-User", "Async")
    // Async invoker.
    .async()
    // Return a list of destinations.
    .get(new InvocationCallback<List<Destination>>() {
        @Override
        public void completed(final List<Destination> recommended) {
            final CountDownLatch innerLatch = new CountDownLatch(recommended.size());

            // Forecasts. (depend on recommended destinations)
            final Map<String, Forecast> forecasts =
                Collections.synchronizedMap(new HashMap<>());

            for (final Destination dest : recommended) {
                forecast.resolveTemplate("destination", dest.getDestination())
                    .request()
                    .async()
                    .get(new InvocationCallback<Forecast>() {
                        @Override
                        public void completed(final Forecast forecast) {
                            forecasts.put(dest.getDestination(), forecast);
                            innerLatch.countDown();
                        }

                        @Override
                        public void failed(final Throwable throwable) {
                            errors.offer("Forecast: " + throwable.getMessage());
                            innerLatch.countDown();
                        }
                    });
            }

            // Have to wait here for dependent requests ...
            try {
                if (!innerLatch.await(10, TimeUnit.SECONDS)) {
                    errors.offer("Inner: Waiting for requests to complete has timed out.");
                }
            } catch (final InterruptedException e) {
                errors.offer("Inner: Waiting for requests to complete has been interrupted.");
            }

            // Continue with processing.
        }

        @Override
        public void failed(final Throwable throwable) {
            errors.offer("Recommended: " + throwable.getMessage());
        }
    });

The whole implementation can be found in Jersey workspace, e.g. on GitHub – AsyncAgentResource class in rx-client-java8-webapp.

The example is a bit more complicated from the first glance. We provided an InvocationCallback to async get method. One of the callback methods (completed or failed) is called when the request finishes. This is a pretty convenient way to handle async invocations when no nested calls are present. Since we have some nested calls (obtaining weather forecasts) we needed to introduce a CountDownLatch synchronization primitive as we use asynchronous approach in obtaining the weather forecasts as well. The latch is decreased every time a request, to the Forecasts service, completes successfully or fails. This indicates that the request actually finished and it is a signal for us that we can continue with processing (otherwise we wouldn’t have all required data to construct the response for the client). This additional synchronization is something that was not present when taking the synchronous approach, but it is needed here.

Also the error processing can not be written as it could be in an ideal case. The error handling is scattered in too many places within the code, that it is quite difficult to create a comprehensive response for the client.

On the other hand taking asynchronous approach leads to code that is as fast as it gets. The resources are used optimally (no waiting threads) to achieve quick response time. The whole process of constructing the response for the client can be seen in an image below. It only took 730 ms instead of 5400 ms which we encountered in the previous approach.

Time consumed to create a response for the client – asynchronous way

As you can guess, this approach, even with all it’s benefits, is the one that is really hard to implement, debug and maintain. It’s a safe bet when you have many independent calls to make but it gets uglier with an increasing number of nested calls.

Reactive Approach

Reactive approach is a way out of the so-called Callback Hell which you can encounter when dealing with Java’s Futures or invocation callbacks. Reactive approach is based on a data-flow concept and the execution model propagate changes through the flow. An example of a single item in the data-flow chain can be a JAX-RS Client HTTP method call. When the JAX-RS request finishes then the next item (or the user code) in the data-flow chain is notified about the continuation, completion or error in the chain. You’re more describing what should be done next than how the next action in the chain should be triggered. The other important part here is that the data-flows are composable. You can compose/transform multiple flows into the resulting one and apply more operations on the result.

final WebTarget destination = ...;
final WebTarget forecast = ...;
 
// Recommended places.
final Observable<Destination> recommended = RxObservable.from(destination)
        .path("recommended")
        .request()
        // Identify the user.
        .header("Rx-User", "RxJava")
        // Reactive invoker.
        .rx()
        // Return a list of destinations.
        .get(new GenericType<List<Destination>>() {})
        // Handle Errors.
        .onErrorReturn(throwable -> {
            errors.offer("Recommended: " + throwable.getMessage());
            return Collections.emptyList();
        })
        // Emit destinations one-by-one.
        .flatMap(Observable::from)
        // Remember emitted items for dependant requests.
        .cache();
 
// Forecasts. (depend on recommended destinations)
final RxWebTarget<RxObservableInvoker> rxForecast = RxObservable.from(forecast);
final Observable<Forecast> forecasts = recommended.flatMap(destination ->
        rxForecast
                .resolveTemplate("destination", destination.getDestination())
                .request()
                .rx()
                .get(Forecast.class)
                .onErrorReturn(throwable -> {
                    errors.offer("Forecast: " + throwable.getMessage());
                    return new Forecast(destination.getDestination(), "N/A");
                }));
 
final Observable<Recommendation> recommendations = Observable
        .zip(recommended, forecasts, Recommendation::new);

The whole implementation can be found in Jersey workspace, e.g. on GitHub – ObservableAgentResource class in rx-client-java8-webapp.

The APIs are described in more detail in the next part Reactive Jersey Client, Part 2 – Usage and Supported Libraries{.row-title}.

As you can see the code achieves the same work as the previous two examples. It’s more readable than the pure asynchronous approach even though it’s equally fast. It’s as easy to read and implement as the synchronous approach. The error processing is also better handled in this way than in the asynchronous approach.

When dealing with a large amount of requests (that depend on each other) and when you need to compose/combine the results of these requests, the reactive programming model is the right technique to use.

Further reading

Resources