Skip to content
Ignacio Baca Moreno-Torres edited this page Oct 15, 2018 · 20 revisions

Welcome to the RxGWT wiki!

How to start using RxJava in GWT?

You first need a project, yep! hehe you need a codebase. Start learning a new tool like RxJava, Rx from now on, that might be even considered a new parading (functional reactive programming) and trying to solve the implementing-an-app problem is probably a really bad starting point, too much complexity. Otherwise, what you MUST do is review you already written app and apply Rx to simplify, clarify and maybe to add some new feature that was too complex to implement before.

Then… where can I apply Rx?

Nice, Rx is like a supercharged-callback (worst explanation ever, but perfect to identify where to apply it). Then, wherever a callback is used you can apply Rx instead. Also, if you are just using the callback to do some direct action then applying Rx might not simplify or clarify anything. You need a second criterion, find callbacks composition. Ok, this second point is a bit more confusing, also callback composition is so complex that maybe you have avoided it without realizing it, so maybe you don't have many cases in your current codebase, at least not explicit written cases. For example, btn.click + input.valueChange is a common case of event composition, but you usually avoid it accessing the input.value directly when the btn.click events are fired, so the unique apparent event involved is btn.click. Some common examples of event compositions are:

  • get the last value change when the user clicks
  • request data when the user click, canceling the previous request
  • refresh the table periodically using the last available data
  • reveal a page when various asynchronous tasks complete
  • combine various request into a unique response

The bottom line is identify the composition of callbacks in your code base and migrate them to Rx.

The rest of this document I'll describe how to migrate specific cases to Rx. This might also help to identify those callbacks in your codebase.

Callbacks

I start explaining how to map raw callbacks to Rx to also take advantage and explain the Rx equivalent types.

Callbacks can be categorized in

  • called only once and has no value, we name it Completable type
  • called one and only one time with a value of type T, we name it Single type
  • called zero or many times with values of type T, we name it Observable type

Just one important note, you cannot use nulls through Rx, yep! this was an important change between RxJava1 and RxJava2 that should make our code and the lib code better. There is another type called Maybe that you can use for example if you have a Single that might returns null. I'll skip it to try to be less verbose.

Also, there is another type called Flowable, but I think we can avoid it for now as it adds some complex concept (backpressure) that doesn't help right now.

Completable

This kind of callbacks that are called always one and only once time without value looks like this…

void fooCompletable(Runnable success) {…}
fooCompletable(() -> console.log("success"));

And can be replaced by the Rxtype io.reactivex.Completable.

// Wrap it using…
Completable fooCompletable = Completable.create(em -> fooCompletable(em::onComplete));

// if the `fooCompletable` method is in your code, you can add or replace it with…
Completable fooCompletable() { return Completable.create(em -> fooCompletable(em::onComplete)); }

// and consume it with…
fooCompletable.subscribe(() -> console.log("success"));

Single

This kind of callbacks that are called always one and only one time with a value of type T looks like this…

void fooResponse(Consumer<String> once) {…}
fooResponse(n -> console.log("once " + n));

And can be replaced by the Rx type io.reactivex.Single<T>. This is a pretty common wrapper, you'll probably already realized, but this is equivalent to the JDK CompletableFuture or the JS Promise. Not exactly the same because Rx is not opinionated, but we leave that for later. Wrap it using:

// Wrap it using…
Single<String> fooResponse = Single.create(em -> fooResponse(em::onSuccess));

// if the `fooResponse` method is in your code, you can add or replace it with…
Single<String> fooResponse() { return Single.create(em -> fooResponse(em::onSuccess)); }

// and consume it with…
fooResponse.subscribe(n -> console.log("once " + n));

Observable

This kind of callbacks that are called zero or many time with values of type T looks like this…

RegistrationHandler fooListener(Consumer<String> each) {…}
RegistrationHandler subscription = fooStream(n -> console.log("each " + n));

And can be replaced by the Rx type io.reactivex.Observable<T>. This is the most important type, this will wrap a stream of events which is where Rx shine! composing stream of events. Note that now we have added the registration handler, actually, the previous types also support it but are less important because it unsubscribed anyway as soon as it completes. But in this case, it can live forever so the subscription lifecycle is very important.

// Wrap it using…
Observable<String> fooListener = Observable.create(em -> em.setCancellable(fooListener(em::onNext)::remove));

// if the `fooListener` method is in your code, you can add or replace it with…
static Observable<String> fooListener() {
    return Observable.create(em -> em.setCancellable(fooListener(em::onNext)::remove));
}

// and consume it with…
Disposable subscription = fooListener.subscribe(n -> console.log("each " + n));

UI Events

This project adds a lot of helpers to create streams of UI events divided by modules.

com.intendia.rxgwt2.user.RxHandlers Contains utils to subscribe to all the GWT SDK HasXxxHandlers types, e.g.:

Button btn = new Button();
Observable<ClickEvent> click$ = RxHandlers.click(btn);

com.intendia.rxgwt2.user.RxEvents Similar to RxHandlers but only requires the existence of DomEvent.Type and can only be applied to Widget. This is useful because sometimes the widget does not implement the HasXxxHandlers, e.g.:

Button btn = new Button();
Observable<ClickEvent> click$ = RxEvents.click(btn);

com.intendia.rxgwt2.elemento.RxElemento This uses the native addEventListener and uses elemento typed event to get a user-friendly type-safe API. There are 2 more variants, one for elemental1 and another for elemental2, the first one should be considered obsolete, but if you are still using elemental1 you can use it. And the second one is useless because the elemento API is the same but type-safe. A simple example use:

HTMLDivElement div = Elements.div().get();
Observable<MouseEvent> click$ = RxElemento.fromEvent(div, EventType.click);

com.intendia.rxgwt2.user.RxUser Here there are some more utilities for the gwt-user lib. Various of this utils are about value changes and follow a naming convention, method starting with bind means that it always emits a first item on subscription. So for example:

TextBox textBox = new TextBox();
Observable<ValueChangeEvent<String>> change$ = RxHandlers.valueChange(textBox);
Observable<Optional<String>> bindChangeOpt$ = RxUser.bindValueChange(textBox);
Observable<String> bindChange$ = RxUser.bindValueChangeOr(textBox, "");

The first observable change$ will emit the first item when the TextBox changes, OTOH bindChangeOpt$ will emit the first value inmediately after subscription. It return an Optional because you cannot emmit null values. The third variant sets a default value so you can avoid the Optional wrapper.

Some other UI utils in RxUser:

  • bindSetSelectionChange, bindSingleSelectionChange
  • bindValueChange, bindValueChangeOr
  • keyDown, keyPress
  • windowClose, windowClosing, windowResize, windowScroll

Requests

UI is the primary source of events, but almost always those events end up producing requests. So having a Rx service layer is critical to be able to write nice descriptive Rx code. This lib includes various helpers and there are various external libs that already creates Rx friendly APIs. But if you have any doubt on how to adapt your service layer to Rx do not hesitate to ask for help in the gitter channel!

com.intendia.rxgwt2.user.RxUser Utility included in this library to wrap gwt-user request into Rx.

Single<Response> fromRequest(RequestBuilder requestBuilder): Wraps a RequestBuilder into a Single<Response>.

Single<Response> get(String url): Configures a GET url RequestBuilder and wraps it into a Single<Response>.

There are various libs that generates Rx friendly services:

  • AutoREST creates REST clients using JAX-RS interfaces.
  • AutoRPC GWT creates the GWT-RPC Rx interfaces.

This lib does not include a specific utility to wrap the modern native fetch API, but as this API uses native Promises you can use com.intendia.rxgwt2.elemental2.RxElemental2.fromPromise to wrap it as a Single. Neither contains a native XmlHttpRequest adapter, it is not included because a nice API might be a bit out of scope for this project, but you can gets inspired by the AutoREST implementation based on elemental2 XmlHttpRequest type to create your own adapter.

If non of this is your case, you can always wrap it using a callback based wrapper as explained in the callbacks section.

Schedulers

GWT exposes:

  • com.google.gwt.core.client.Scheduler: The scheduleFinally command has some limitations and it might be considered deprecated, won't be available in J2CL, so just ignore it. For deferred commands use io.reactivex.schedulers.Schedulers#newThread. This might be weird because there is just one thread, but IMO this is a nice smooth solution to make it the core shareable between client and server side. You should actually use the scheduler that better match your intention (between single, computation, io, trampoline or newThread), currently all implementations are pretty similar, but using the right one makes your code more expressive, it will work in client and server side and it might benefits for performance improvements in future releases of rxjava-gwt.
  • com.google.gwt.user.client.Timer: this can be imitated using the timer (emit a tick on timeout) or interval (emit a tick on each timeout) operators. Those operators are available in each rx type.
  • animation or idle schedulers: I do not recommend to reference it directly, this will make your code client only compatible, but if you know what you are doing you can also use GwtSchedulers a utility class similar to Schedulers but with some client only specific schedulers including the native animationFrame and requestIdle rx adapted schedulers.

Although you can use the rx schedulers directly, this is also not recommended. Rx schedulers are applied to rx types using the subscribeOn, observeOn and disposeOn operators. Each one will apply the specified scheduler on that phase. This operator composition strategy makes it super flexible, so for example, if you want to process a big list of elements applying some expensive computation, reducing it and finally paint the result in a canvas using the annimationFrame you can do:

Observable.just(1,2,3,4) // this should be is a "big" list of elemnts
        .buffer(2) // group by reasonable size so the ui do not get lock
        .observeOn(Schedulers.computation()) // each buck in an new eventLoop
        .map(buck -> /*some expensive operation that returns a number*/1)
        .scan(0, (n, acc) -> n + acc) // progressively accumulate
        .throttleLatest(2, SECONDS) // but only emit each 2 second (deferred)
        .observeOn(GwtSchedulers.animationFrame()) // then apply the animationFrame
        .subscribe(sum -> {/*e.g. update a canvas with the sum*/});

Other

Comparison

Comparison of common strategies of the Observer pattern in java with the objective of writing async reactive code. We are going to compare callbacks, CompletableFuture, Promise and Single based APIs. The API will look like:

  • callbacks public Cancelable onClicks(Callback<ClickEvent> callback)
  • CompletableFuture public CompletableFuture<ClickEvent> onClicks()
  • Promise public Promise<ClickEvent> onClicks()
  • Single public Single<ClickEvent> onClicks()

Exposing the API using each strategy has some advantages and some limitations. callbacks is the comparison baseline, all other are utils to improve callbacks-based APIs.

Callbacks CompletableFuture Promise Rx types
sync/async both¹ both async both¹
hot/cold both¹ eager(hot) eager(hot) both¹
cancelable yes² yes no yes
cardinality many one one many

(1) Depends on the implementation, usually documented in the API. This is a good thing as you can use the same strategy for both cases.

(2) Callbacks-based APIs might implement cancelations returning a subscription handler that can be called to cancel the subscription or exposing a removeCallback method that you need to pass the callback again to remove it.

sync/async: can be evaluated synchronous or asynchronous. This might have usability or performance implications, it really depends on the situations but being non-opinionated (support both cases) is better.

hot/cold: cold if the producer is created on subscription (aka. lazy), hot is when the observable closes over the producer (aka. eager). This might be better or worst depending on the use case, it might be useful or it might improve performance, but it depends on each use case. So an alternative is better if it supports both options in a non-opinionated API. You want a HOT observable when you don’t want to create your producer over and over again.

cancelable: if the callback can be canceled, and also that combination automatically chain this subscription. I.e. if you combine A with B, if you unsubscribe B then A should be handled accordingly.

The purpose of this callbacks alternatives is to improve composability solving the callback-hell. This problem is solved using techniques from functional programming and the result are these composable fluent builders. The number of operators and the ability to extend it's also important to be able to write concise but descriptive code. Operators are directly related to productivity. Rx is by far the strategy that comes with more operators. Promise is the more limited with only map, merge and catch. CompletableFuture has various operators, but IMO the API is much worst, both more limited and difficult to understand (analyzed in its own section). And Rx is full of operators, quite well documented and it is the only one that can be extended and applied using the fluent API.

CompletableFuture

I personally am unable to understand how people compare CompletableFuture API with current RxJava2 API. I think the problem is that JDK life cycle is too slow to be able to polish a modern API like this one. Lasts years, higher-order fluent APIs has been introduced in OOP languages like Java, and during those lasts years those APIs has evolved quickly. Rx started in .Net many years ago, probably in that language various iterations happens, later during migration to RxJava, the API gets reviewed again, and one more time during RxJava1 to RxJava2 and with the introduction of cardinality types (Completable, Single and Maybe) the current RxJava API is years ahead of this stalled JDK API. IMO one critical missing feature is the lift and compose methods that allow to easily extends with new operators maintaining the fluent legible API. But also the uniformity with polished naming conventions, the non-opinionated implementation and the enormous number of operator makes RxJava far more powerful than CompletableFuture.

Comparison of basic instantiation and evaluation

// the simplest instantiation, using already available values
CompletableFuture<String> cDone = CompletableFuture.completedFuture("done");
Single<String> sDone = Single.just("done");
Single<String> sDone2 = Single.fromFuture(cDone); // yep, you can wrap it
// using this but note that Future doesn't work in GWT by default, bc requires
// blocking! actually, this is bad even for JVM code bc CompletableFuture always
// forces to block to detect the Future completion bc the Future has no
// callback API to notify the completion! 👎

// weird API exposes an eagerly created operation executed in a thread
CompletableFuture<String> weirdAsyncOp = CompletableFuture.supplyAsync(() -> "done");
CompletableFuture<Void> voidOp = CompletableFuture.runAsync(() -> out.println("done"));
// in Rx you need various steps, but IMO is much much clear and explicit
// first you define the action, and the result is lazy and reusable
Completable lazyAction = Completable.fromAction(() -> out.println("done"));
// then you can configure it to be executed in a thread
Completable lazyAsyncAction = lazyAction.subscribeOn(Schedulers.newThread());
// and finally you need to subscribe to fire it up! this is the difference between
// eager(hot) and lazy(cold), CompletableFuture is just a signal of work-completed
// vs Completable that is a actual description of the work, and the work
// doesn't start until you subscribe
lazyAsyncAction.subscribe();
// AND!! second subscription means, second evaluation of the action!
lazyAsyncAction.subscribe();

// the behaviour in both cases are a bit different, so you must understand it
// to see what is the best for your use case, anyways, you can actually imitate
// the CompletableFuture eager behaviour, IMO with a much clean code
Single<String> eager = Single.fromFuture(commonPool().submit(() -> "done"));
// so it is clear that you are submitting eagerly a task to the commonPool
// but I promise 😉 laziness is your friend, lazy people always choose Rx 

Comparison of operators and combination

CompletableFuture<String> str = CompletableFuture.completedFuture("a"), str1 = CompletableFuture.completedFuture("a");
CompletableFuture<Integer> num = CompletableFuture.completedFuture(1);
Single<String> str$ = Single.just("a"), str1$ = Single.just("a"); // the str1 variant is used in some example, nothing special about it
Single<Integer> num$ = Single.just(1);

CompletableFuture<Void> allOf = CompletableFuture.allOf(str, num);
Completable allOf$ = Single.merge(str$, num$).ignoreElements();

CompletableFuture<Object> anyOf = CompletableFuture.anyOf(str, num);
Single<? extends Serializable> anyOf$ = Single.merge(str$, num$).firstOrError();

// str.obtrudeValue(…); WTF WTF resetting the internal state? you really don't want to do that!

// here there are all the method on CompletionStage removing all the `Async`
// variants(24!) bc it can be easily applied in Rx with the subscribeOn operator

//<U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
CompletableFuture<Integer> thenApply = str.thenApply(n -> 1);
Single<Integer> thenApply$ = str$.map(n -> 1);

//CompletionStage<Void> thenAccept(Consumer<? super T> action);
CompletableFuture<Void> thenAccept = str.thenAccept(n -> {/**/});
Completable thenAccept$ = str$.doOnSuccess(n -> {/**/}).ignoreElement();

//CompletionStage<Void> thenRun(Runnable action);
CompletableFuture<Void> theRun = str.thenRun(() -> {/**/});
Completable thenRun$ = str$.doOnSuccess(n -> {/**/}).ignoreElement();

//<U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
CompletableFuture<String> thenCombine = str.thenCombine(num, (a, n) -> a + ":" + n);
Single<String> thenCombine$ = str$.zipWith(num$, (a, n) -> a + ":" + n);

//<U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
CompletableFuture<Void> thenAcceptBoth = str.thenAcceptBoth(num, (a, n) -> {/**/});
Completable thenAcceptBoth$ = str$.zipWith(num$, (a, n) -> a + ":" + n).ignoreElement();

//CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
CompletableFuture<Void> runAfterBoth = str.runAfterBoth(num, () -> {/**/});
Completable runAfterBoth$ = Single.merge(str$, num$).ignoreElements().doOnComplete(() -> {/**/});

//<U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
CompletableFuture<String> applyToEither = str.applyToEither(str1, a -> ":" + a);
Single<String> applyToEither$ = str$.ambWith(str1$).map(a -> ":" + a);

//CompletionStage<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action);
CompletableFuture<Void> acceptEither = str.acceptEither(str1, a -> {/**/});
Completable acceptEither$ = str$.ambWith(str1$).doOnSuccess(a -> {/**/}).ignoreElement();

//CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
CompletableFuture<Void> runAfterEither = str.runAfterEither(str1, () -> {/**/});
Completable runAfterEither$ = str$.ambWith(str1$).ignoreElement().doOnComplete(() -> {/**/});

//<U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
CompletableFuture<Integer> thenCompose = str.thenCompose(n -> num);
Single<Integer> thenCompose$ = str$.flatMap(n -> num$);

//CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
CompletableFuture<String> exceptionally = str.exceptionally(e -> "e");
Single<String> exceptionally$ = str$.onErrorReturn(e -> "e");

//CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
CompletableFuture<String> whenComplete = str.whenComplete((a, e) -> {/**/});
Single<String> whenComplete$ = str$.doOnEvent((a, e) -> {/**/});

//<U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture<Integer> handle = str.handle((a, e) -> e == null ? 1 : 0);
// this is the unique operator that cannot be applied out of the box, which maybe is
// a symptom of an unnecessary or uncommon operator that can be replaced with
Single<Integer> handle$ = str$.map(a -> 1).onErrorReturnItem(0);

All of this is just to show how to migrate from CompletableFuture to RxJava but you should realize that, if you try the other way around is almost impossible. I mean, RxJava start from here, and it has much more operators. For example, I have skipped all the CompletableFuture async variants because RxJava is much more flexible, and allow to configure the scheduling more precisely, e.g.:

Scheduler UI = Schedulers.single(); // this imitates the UI event loop
Single.fromCallable(() -> range(0, 100000).count())//executed in computation scheduler
        .subscribeOn(Schedulers.computation()).observeOn(UI)
        .subscribe(result -> {/**/});//executed in the UI event loop

Also, timing operators are pretty useful and difficult to apply in CompletableFuture, e.g.:

// this will throws a timeout if it takes longer thant 5sec
Single.fromCallable(() -> range(0, 100000).count()).timeout(5, SECONDS);