Reactive Jersey Client API is a generic API allowing end users to utilize the popular reactive programming model when using Jersey Client. This part of the series describes SPI and implementation of support for custom reactive libraries.
- Reactive Jersey Client – Motivation
- Reactive Jersey Client – Usage and Supported Reactive Libraries
- Reactive Jersey Client – Customization (SPI) (this article)
In case you want to bring support for some other library providing Reactive Programming Model into your application you can extend functionality of Reactive Jersey Client by implementing SPI available in jersey-rx-client
module.
Extend RxInvoker interface
Even though not entirely intuitive this step is required when a support for a custom reactive library is needed. As mentioned above few JAX-RS Client interfaces had to be modified in order to make possible to invoke HTTP calls in a reactive way. All of them except the RxInvoker extend the original interfaces from JAX-RS (e.g. Client
). RxInvoker
is a brand new interface (very similar to SyncInvoker
and AsyncInvoker
) that actually lets you to invoke HTTP methods in the reactive way.
@Beta
public interface RxInvoker<T> {
public <T> get();
public <R> <T> get(Class<R> responseType);
// ...
}
As you can notice it’s too generic as it’s designed to support various reactive libraries without bringing any additional abstractions and restrictions. The first type parameter, T
, is the asynchronous/event-based completion aware type (e.g. Observable
). The given type should be parametrized with the actual response type. And since it’s not possible to parametrize type parameter it’s an obligation of the extension of RxInvoker
to do that. That applies to simpler methods, such as get()
, as well as to more advanced methods, for example get(Class)
.
In the first case it’s enough to parametrize the needed type with Response
, e.g. Observable
. The second case uses the type parameter from the parameter of the method. To accordingly extend the get(Class
method you need to parametrize the needed type with R
type parameter, e.g.
.
To following summarizes the requirements above and illustrate them in one code snippet.
{
@Override
public Observable<Response> get();
@Override
public <T> Observable<T> get(Class<T> responseType);
// ...
}
This is an excerpt from RxObservableInvoker
that works with RxJava’s Observable
.
Implement the extended interface
Either you can implement the extension of RxInvoker
from scratch or it’s possible to extend from AbstractRxInvoker abstract class which serves as a default implementation of the interface. In the later case only #method(…)
methods are needed to be implemented as the default implementation of other methods (HTTP calls) delegates to these methods.
final class JerseyRxObservableInvoker
extends AbstractRxInvoker<Observable>
implements RxObservableInvoker {
JerseyRxObservableInvoker(final Invocation.Builder builder,
final ExecutorService executor) {
super(builder, executor);
}
@Override
public <T> Observable<T> method(final String name,
final Entity<?> entity,
final Class<T> responseType) {
// Invoke as sync JAX-RS client request and subscribe/observe
// on a scheduler initialized with executor service.
final Scheduler scheduler = Schedulers.from(getExecutorService());
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
if (!subscriber.isUnsubscribed()) {
try {
final T response = getBuilder()
.method(name, entity, responseType);
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(response);
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} catch (final Throwable throwable) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(throwable);
}
}
}
}
}).subscribeOn(scheduler).observeOn(scheduler);
}
// ...
}
For a complete implementation take a look at JerseyRxObservableInvoker.
Implement and register RxInvokerProvider
To create an instance of particular RxInvoker
an implementation of RxInvokerProvider SPI interface is needed. When a concrete RxInvoker
is requested the runtime goes through all available providers and finds one which supports the given invoker type. It is expected that each provider supports mapping for distinct set of types and subtypes so that different providers do not conflict with each other.
public final class RxObservableInvokerProvider
implements RxInvokerProvider {
@Override
public <T> T getInvoker(final Class<T> invokerType,
final Invocation.Builder builder,
final ExecutorService executor) {
if (RxObservableInvoker.class.isAssignableFrom(invokerType)) {
return invokerType.cast(
new JerseyRxObservableInvoker(builder, executor));
}
return null;
}
}
Reactive Jersey Client looks for all available RxInvokerProvider
s via the standard META-INF/services
mechanism. It’s enough to bundle org.glassfish.jersey.client.rx.spi.RxInvokerProvider
file with your library and reference your implementation (by fully qualified class name) from it.
org.glassfish.jersey.client.rx.rxjava.RxObservableInvokerProvider