Reactive Jersey Client, Part 3 – Customization

Reactive Jersey Client API is a generic API allowing end users to utilize the popular reactive programming model when using Jersey Client. This part of the series describes SPI and implementation of support for custom reactive libraries.

In case you want to bring support for some other library providing Reactive Programming Model into your application you can extend functionality of Reactive Jersey Client by implementing SPI available in jersey-rx-client module.

Extend RxInvoker interface

Even though not entirely intuitive this step is required when a support for a custom reactive library is needed. As mentioned above few JAX-RS Client interfaces had to be modified in order to make possible to invoke HTTP calls in a reactive way. All of them except the RxInvoker extend the original interfaces from JAX-RS (e.g. Client). RxInvoker is a brand new interface (very similar to SyncInvoker and AsyncInvoker) that actually lets you to invoke HTTP methods in the reactive way.

@Beta
public interface RxInvoker<T> {
 
    public <T> get();
 
    public <R> <T> get(Class<R> responseType);
 
    // ...
 
}

As you can notice it’s too generic as it’s designed to support various reactive libraries without bringing any additional abstractions and restrictions. The first type parameter, T, is the asynchronous/event-based completion aware type (e.g. Observable). The given type should be parametrized with the actual response type. And since it’s not possible to parametrize type parameter it’s an obligation of the extension of RxInvoker to do that. That applies to simpler methods, such as get(), as well as to more advanced methods, for example get(Class).

In the first case it’s enough to parametrize the needed type with Response, e.g. Observable get(). The second case uses the type parameter from the parameter of the method. To accordingly extend the get(Class) method you need to parametrize the needed type with R type parameter, e.g.  Observable get(Class responseType).

To following summarizes the requirements above and illustrate them in one code snippet.

 {
 
    @Override
    public Observable<Response> get();
 
    @Override
    public <T> Observable<T> get(Class<T> responseType);
 
    // ...
 
}

This is an excerpt from RxObservableInvoker that works with RxJava’s Observable.

Implement the extended interface

Either you can implement the extension of RxInvoker from scratch or it’s possible to extend from AbstractRxInvoker abstract class which serves as a default implementation of the interface. In the later case only #method(…) methods are needed to be implemented as the default implementation of other methods (HTTP calls) delegates to these methods.

final class JerseyRxObservableInvoker
                extends AbstractRxInvoker<Observable>
                implements RxObservableInvoker {

    JerseyRxObservableInvoker(final Invocation.Builder builder,
                              final ExecutorService executor) {
        super(builder, executor);
    }

    @Override
    public <T> Observable<T> method(final String name,
                                    final Entity<?> entity,
                                    final Class<T> responseType) {
        // Invoke as sync JAX-RS client request and subscribe/observe
        // on a scheduler initialized with executor service.

        final Scheduler scheduler = Schedulers.from(getExecutorService());

        return Observable.create(new Observable.OnSubscribe<T>() {
            @Override
            public void call(final Subscriber<? super T> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    try {
                        final T response = getBuilder()
                            .method(name, entity, responseType);

                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onNext(response);
                        }
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onCompleted();
                        }
                    } catch (final Throwable throwable) {
                        if (!subscriber.isUnsubscribed()) {
                            subscriber.onError(throwable);
                        }
                    }
                }
            }
        }).subscribeOn(scheduler).observeOn(scheduler);
    }

    // ...

}

For a complete implementation take a look at JerseyRxObservableInvoker.

Implement and register RxInvokerProvider

To create an instance of particular RxInvoker an implementation of RxInvokerProvider SPI interface is needed. When a concrete RxInvoker is requested the runtime goes through all available providers and finds one which supports the given invoker type. It is expected that each provider supports mapping for distinct set of types and subtypes so that different providers do not conflict with each other.

public final class RxObservableInvokerProvider
                       implements RxInvokerProvider {
 
    @Override
    public <T> T getInvoker(final Class<T> invokerType,
                            final Invocation.Builder builder,
                            final ExecutorService executor) {

        if (RxObservableInvoker.class.isAssignableFrom(invokerType)) {
            return invokerType.cast(
                new JerseyRxObservableInvoker(builder, executor));
        }
        return null;
    }
}

Reactive Jersey Client looks for all available RxInvokerProviders via the standard META-INF/services mechanism. It’s enough to bundle org.glassfish.jersey.client.rx.spi.RxInvokerProvider file with your library and reference your implementation (by fully qualified class name) from it.

org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider

Further reading

Resources