-
Notifications
You must be signed in to change notification settings - Fork 8
Home
Welcome to the RxGWT wiki!
You first need a project, yep! hehe you need a codebase. Start learning a new tool like RxJava, Rx from now on, that might be even considered a new parading (functional reactive programming) and trying to solve the implementing-an-app problem is probably a really bad starting point, too much complexity. Otherwise, what you MUST do is review you already written app and apply Rx to simplify, clarify and maybe to add some new feature that was too complex to implement before.
Then… where can I apply Rx?
Nice, Rx is like a supercharged-callback (worst explanation ever, but perfect to identify where to apply it). Then, wherever a callback is used you can apply Rx instead. Also, if you are just using the callback to do some direct action then applying Rx might not simplify or clarify anything. You need a second criterion, find callbacks composition. Ok, this second point is a bit more confusing, also callback composition is so complex that maybe you have avoided it without realizing it, so maybe you don't have many cases in your current codebase, at least not explicit written cases. For example, btn.click + input.valueChange
is a common case of event composition, but you usually avoid it accessing the input.value
directly when the btn.click
events are fired, so the unique apparent event involved is btn.click
. Some common examples of event compositions are:
- get the last value change when the user clicks
- request data when the user click, canceling the previous request
- refresh the table periodically using the last available data
- reveal a page when various asynchronous tasks complete
- combine various request into a unique response
The bottom line is identify the composition of callbacks in your code base and migrate them to Rx.
The rest of this document I'll describe how to migrate specific cases to Rx. This might also help to identify those callbacks in your codebase.
I start explaining how to map raw callbacks to Rx to also take advantage and explain the Rx equivalent types.
Callbacks can be categorized in
- called only once and has no value, we name it Completable type
- called one and only one time with a value of type
T
, we name it Single type - called zero or many times with values of type
T
, we name it Observable type
Just one important note, you cannot use nulls
through Rx
, yep! this was an important change between RxJava1
and RxJava2
that should make our code and the lib code better. There is another type called Maybe that you can use for example if you have a Single
that might returns null
. I'll skip it to try to be less verbose.
Also, there is another type called Flowable
, but I think we can avoid it for now as it adds some complex concept (backpressure) that doesn't help right now.
This kind of callbacks that are called always one and only once time without value looks like this…
void fooCompletable(Runnable success) {…}
fooCompletable(() -> console.log("success"));
And can be replaced by the Rx
type io.reactivex.Completable
.
// Wrap it using…
Completable fooCompletable = Completable.create(em -> fooCompletable(em::onComplete));
// if the `fooCompletable` method is in your code, you can add or replace it with…
Completable fooCompletable() { return Completable.create(em -> fooCompletable(em::onComplete)); }
// and consume it with…
fooCompletable.subscribe(() -> console.log("success"));
This kind of callbacks that are called always one and only one time with a value of type T
looks like this…
void fooResponse(Consumer<String> once) {…}
fooResponse(n -> console.log("once " + n));
And can be replaced by the Rx
type io.reactivex.Single<T>
. This is a pretty common wrapper, you'll probably already realized, but this is equivalent to the JDK CompletableFuture
or the JS Promise
. Not exactly the same because Rx
is not opinionated, but we leave that for later. Wrap it using:
// Wrap it using…
Single<String> fooResponse = Single.create(em -> fooResponse(em::onSuccess));
// if the `fooResponse` method is in your code, you can add or replace it with…
Single<String> fooResponse() { return Single.create(em -> fooResponse(em::onSuccess)); }
// and consume it with…
fooResponse.subscribe(n -> console.log("once " + n));
This kind of callbacks that are called zero or many time with values of type T
looks like this…
RegistrationHandler fooListener(Consumer<String> each) {…}
RegistrationHandler subscription = fooStream(n -> console.log("each " + n));
And can be replaced by the Rx
type io.reactivex.Observable<T>
. This is the most important type, this will wrap a stream of events which is where Rx
shine! composing stream of events. Note that now we have added the registration handler, actually, the previous types also support it but are less important because it unsubscribed anyway as soon as it completes. But in this case, it can live forever so the subscription lifecycle is very important.
// Wrap it using…
Observable<String> fooListener = Observable.create(em -> em.setCancellable(fooListener(em::onNext)::remove));
// if the `fooListener` method is in your code, you can add or replace it with…
static Observable<String> fooListener() {
return Observable.create(em -> em.setCancellable(fooListener(em::onNext)::remove));
}
// and consume it with…
Disposable subscription = fooListener.subscribe(n -> console.log("each " + n));
This project adds a lot of helpers to create streams of UI events divided by modules.
com.intendia.rxgwt2.user.RxHandlers
Contains utils to subscribe to all the GWT SDK HasXxxHandlers
types, e.g.:
Button btn = new Button();
Observable<ClickEvent> click$ = RxHandlers.click(btn);
com.intendia.rxgwt2.user.RxEvents
Similar to RxHandlers
but only requires the existence of DomEvent.Type
and can only be applied to Widget
. This is useful because sometimes the widget does not implement the HasXxxHandlers
, e.g.:
Button btn = new Button();
Observable<ClickEvent> click$ = RxEvents.click(btn);
com.intendia.rxgwt2.elemento.RxElemento
This uses the native addEventListener
and uses elemento
typed event to get a user-friendly type-safe API. There are 2 more variants, one for elemental1
and another for elemental2
, the first one should be considered obsolete, but if you are still using elemental1
you can use it. And the second one is useless because the elemento
API is the same but type-safe. A simple example use:
HTMLDivElement div = Elements.div().get();
Observable<MouseEvent> click$ = RxElemento.fromEvent(div, EventType.click);
com.intendia.rxgwt2.user.RxUser
Here there are some more utilities for the gwt-user
lib. Various of this utils are about value changes and follow a naming convention, method starting with bind
means that it always emits a first item on subscription. So for example:
TextBox textBox = new TextBox();
Observable<ValueChangeEvent<String>> change$ = RxHandlers.valueChange(textBox);
Observable<Optional<String>> bindChangeOpt$ = RxUser.bindValueChange(textBox);
Observable<String> bindChange$ = RxUser.bindValueChangeOr(textBox, "");
The first observable change$
will emit the first item when the TextBox
changes, OTOH bindChangeOpt$
will emit the first value inmediately after subscription. It return an Optional
because you cannot emmit null
values. The third variant sets a default value so you can avoid the Optional
wrapper.
Some other UI utils in RxUser
:
- bindSetSelectionChange, bindSingleSelectionChange
- bindValueChange, bindValueChangeOr
- keyDown, keyPress
- windowClose, windowClosing, windowResize, windowScroll
UI is the primary source of events, but almost always those events end up producing requests. So having a Rx
service layer is critical to be able to write nice descriptive Rx
code. This lib includes various helpers and there are various external libs that already creates Rx
friendly APIs. But if you have any doubt on how to adapt your service layer to Rx
do not hesitate to ask for help in the gitter channel!
com.intendia.rxgwt2.user.RxUser
Utility included in this library to wrap gwt-user
request into Rx
.
Single<Response> fromRequest(RequestBuilder requestBuilder)
: Wraps a RequestBuilder
into a Single<Response>
.
Single<Response> get(String url)
: Configures a GET url
RequestBuilder
and wraps it into a Single<Response>
.
There are various libs that generates Rx
friendly services:
- AutoREST creates REST clients using JAX-RS interfaces.
- AutoRPC GWT creates the GWT-RPC Rx interfaces.
This lib does not include a specific utility to wrap the modern native fetch API
, but as this API uses native Promises
you can use com.intendia.rxgwt2.elemental2.RxElemental2.fromPromise
to wrap it as a Single
. Neither contains a native XmlHttpRequest
adapter, it is not included because a nice API might be a bit out of scope for this project, but you can gets inspired by the AutoREST
implementation based on elemental2
XmlHttpRequest
type to create your own adapter.
If non of this is your case, you can always wrap it using a callback based wrapper as explained in the callbacks
section.
GWT exposes:
-
com.google.gwt.core.client.Scheduler: The
scheduleFinally
command has some limitations and it might be considered deprecated, won't be available in J2CL, so just ignore it. For deferred commands useio.reactivex.schedulers.Schedulers#newThread
. This might be weird because there is just one thread, but IMO this is a nice smooth solution to make it the core shareable between client and server side. You should actually use the scheduler that better match your intention (between single, computation, io, trampoline or newThread), currently all implementations are pretty similar, but using the right one makes your code more expressive, it will work in client and server side and it might benefits for performance improvements in future releases of rxjava-gwt. -
com.google.gwt.user.client.Timer: this can be imitated using the
timer
(emit a tick on timeout) orinterval
(emit a tick on each timeout) operators. Those operators are available in each rx type. - animation or idle schedulers: I do not recommend to reference it directly, this will make your code client only compatible, but if you know what you are doing you can also use
GwtSchedulers
a utility class similar toSchedulers
but with some client only specific schedulers including the nativeanimationFrame
andrequestIdle
rx adapted schedulers.
Although you can use the rx schedulers directly, this is also not recommended. Rx schedulers are applied to rx types using the subscribeOn
, observeOn
and disposeOn
operators. Each one will apply the specified scheduler on that phase. This operator composition strategy makes it super flexible, so for example, if you want to process a big list of elements applying some expensive computation, reducing it and finally paint the result in a canvas using the annimationFrame you can do:
Observable.just(1,2,3,4) // this should be is a "big" list of elemnts
.buffer(2) // group by reasonable size so the ui do not get lock
.observeOn(Schedulers.computation()) // each buck in an new eventLoop
.map(buck -> /*some expensive operation that returns a number*/1)
.scan(0, (n, acc) -> n + acc) // progressively accumulate
.throttleLatest(2, SECONDS) // but only emit each 2 second (deferred)
.observeOn(GwtSchedulers.animationFrame()) // then apply the animationFrame
.subscribe(sum -> {/*e.g. update a canvas with the sum*/});
Comparison of common strategies of the Observer
pattern in java with the objective of writing async reactive code. We are going to compare callbacks
, CompletableFuture
, Promise
and Single
based APIs. The API will look like:
- callbacks
public Cancelable onClicks(Callback<ClickEvent> callback)
- CompletableFuture
public CompletableFuture<ClickEvent> onClicks()
- Promise
public Promise<ClickEvent> onClicks()
- Single
public Single<ClickEvent> onClicks()
Exposing the API using each strategy has some advantages and some limitations. callbacks
is the comparison baseline, all other are utils to improve callbacks-based APIs.
Callbacks | CompletableFuture | Promise | Rx types | |
---|---|---|---|---|
sync/async | both¹ | both | async | both¹ |
hot/cold | both¹ | eager(hot) | eager(hot) | both¹ |
cancelable | yes² | yes | no | yes |
cardinality | many | one | one | many |
(1) Depends on the implementation, usually documented in the API. This is a good thing as you can use the same strategy for both cases.
(2) Callbacks-based APIs might implement cancelations returning a subscription handler that can be called to cancel the subscription or exposing a removeCallback
method that you need to pass the callback again to remove it.
sync/async: can be evaluated synchronous or asynchronous. This might have usability or performance implications, it really depends on the situations but being non-opinionated (support both cases) is better.
hot/cold: cold if the producer is created on subscription (aka. lazy), hot is when the observable closes over the producer (aka. eager). This might be better or worst depending on the use case, it might be useful or it might improve performance, but it depends on each use case. So an alternative is better if it supports both options in a non-opinionated API. You want a HOT observable when you don’t want to create your producer over and over again.
cancelable: if the callback can be canceled, and also that combination automatically chain this subscription. I.e. if you combine A with B, if you unsubscribe B then A should be handled accordingly.
The purpose of this callbacks
alternatives is to improve composability solving the callback-hell. This problem is solved using techniques from functional programming
and the result are these composable fluent builders. The number of operators and the ability to extend it's also important to be able to write concise but descriptive code. Operators are directly related to productivity. Rx is by far the strategy that comes with more operators. Promise
is the more limited with only map
, merge
and catch
. CompletableFuture
has various operators, but IMO the API is much worst, both more limited and difficult to understand (analyzed in its own section). And Rx
is full of operators, quite well documented and it is the only one that can be extended and applied using the fluent API.
I personally am unable to understand how people compare CompletableFuture API
with current RxJava2 API
. I think the problem is that JDK life cycle is too slow to be able to polish a modern API like this one. Lasts years, higher-order fluent APIs has been introduced in OOP languages like Java, and during those lasts years those APIs has evolved quickly. Rx
started in .Net
many years ago, probably in that language various iterations happens, later during migration to RxJava
, the API gets reviewed again, and one more time during RxJava1
to RxJava2
and with the introduction of cardinality types (Completable, Single and Maybe) the current RxJava API
is years ahead of this stalled JDK API. IMO one critical missing feature is the lift
and compose
methods that allow to easily extends with new operators maintaining the fluent legible API. But also the uniformity with polished naming conventions, the non-opinionated implementation and the enormous number of operator makes RxJava
far more powerful than CompletableFuture
.
Comparison of basic instantiation and evaluation
// the simplest instantiation, using already available values
CompletableFuture<String> cDone = CompletableFuture.completedFuture("done");
Single<String> sDone = Single.just("done");
Single<String> sDone2 = Single.fromFuture(cDone); // yep, you can wrap it
// using this but note that Future doesn't work in GWT by default, bc requires
// blocking! actually, this is bad even for JVM code bc CompletableFuture always
// forces to block to detect the Future completion bc the Future has no
// callback API to notify the completion! 👎
// weird API exposes an eagerly created operation executed in a thread
CompletableFuture<String> weirdAsyncOp = CompletableFuture.supplyAsync(() -> "done");
CompletableFuture<Void> voidOp = CompletableFuture.runAsync(() -> out.println("done"));
// in Rx you need various steps, but IMO is much much clear and explicit
// first you define the action, and the result is lazy and reusable
Completable lazyAction = Completable.fromAction(() -> out.println("done"));
// then you can configure it to be executed in a thread
Completable lazyAsyncAction = lazyAction.subscribeOn(Schedulers.newThread());
// and finally you need to subscribe to fire it up! this is the difference between
// eager(hot) and lazy(cold), CompletableFuture is just a signal of work-completed
// vs Completable that is a actual description of the work, and the work
// doesn't start until you subscribe
lazyAsyncAction.subscribe();
// AND!! second subscription means, second evaluation of the action!
lazyAsyncAction.subscribe();
// the behaviour in both cases are a bit different, so you must understand it
// to see what is the best for your use case, anyways, you can actually imitate
// the CompletableFuture eager behaviour, IMO with a much clean code
Single<String> eager = Single.fromFuture(commonPool().submit(() -> "done"));
// so it is clear that you are submitting eagerly a task to the commonPool
// but I promise 😉 laziness is your friend, lazy people always choose Rx
Comparison of operators and combination
CompletableFuture<String> str = CompletableFuture.completedFuture("a"), str1 = CompletableFuture.completedFuture("a");
CompletableFuture<Integer> num = CompletableFuture.completedFuture(1);
Single<String> str$ = Single.just("a"), str1$ = Single.just("a"); // the str1 variant is used in some example, nothing special about it
Single<Integer> num$ = Single.just(1);
CompletableFuture<Void> allOf = CompletableFuture.allOf(str, num);
Completable allOf$ = Single.merge(str$, num$).ignoreElements();
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(str, num);
Single<? extends Serializable> anyOf$ = Single.merge(str$, num$).firstOrError();
// str.obtrudeValue(…); WTF WTF resetting the internal state? you really don't want to do that!
// here there are all the method on CompletionStage removing all the `Async`
// variants(24!) bc it can be easily applied in Rx with the subscribeOn operator
//<U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
CompletableFuture<Integer> thenApply = str.thenApply(n -> 1);
Single<Integer> thenApply$ = str$.map(n -> 1);
//CompletionStage<Void> thenAccept(Consumer<? super T> action);
CompletableFuture<Void> thenAccept = str.thenAccept(n -> {/**/});
Completable thenAccept$ = str$.doOnSuccess(n -> {/**/}).ignoreElement();
//CompletionStage<Void> thenRun(Runnable action);
CompletableFuture<Void> theRun = str.thenRun(() -> {/**/});
Completable thenRun$ = str$.doOnSuccess(n -> {/**/}).ignoreElement();
//<U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
CompletableFuture<String> thenCombine = str.thenCombine(num, (a, n) -> a + ":" + n);
Single<String> thenCombine$ = str$.zipWith(num$, (a, n) -> a + ":" + n);
//<U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);
CompletableFuture<Void> thenAcceptBoth = str.thenAcceptBoth(num, (a, n) -> {/**/});
Completable thenAcceptBoth$ = str$.zipWith(num$, (a, n) -> a + ":" + n).ignoreElement();
//CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);
CompletableFuture<Void> runAfterBoth = str.runAfterBoth(num, () -> {/**/});
Completable runAfterBoth$ = Single.merge(str$, num$).ignoreElements().doOnComplete(() -> {/**/});
//<U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
CompletableFuture<String> applyToEither = str.applyToEither(str1, a -> ":" + a);
Single<String> applyToEither$ = str$.ambWith(str1$).map(a -> ":" + a);
//CompletionStage<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action);
CompletableFuture<Void> acceptEither = str.acceptEither(str1, a -> {/**/});
Completable acceptEither$ = str$.ambWith(str1$).doOnSuccess(a -> {/**/}).ignoreElement();
//CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);
CompletableFuture<Void> runAfterEither = str.runAfterEither(str1, () -> {/**/});
Completable runAfterEither$ = str$.ambWith(str1$).ignoreElement().doOnComplete(() -> {/**/});
//<U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
CompletableFuture<Integer> thenCompose = str.thenCompose(n -> num);
Single<Integer> thenCompose$ = str$.flatMap(n -> num$);
//CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
CompletableFuture<String> exceptionally = str.exceptionally(e -> "e");
Single<String> exceptionally$ = str$.onErrorReturn(e -> "e");
//CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
CompletableFuture<String> whenComplete = str.whenComplete((a, e) -> {/**/});
Single<String> whenComplete$ = str$.doOnEvent((a, e) -> {/**/});
//<U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture<Integer> handle = str.handle((a, e) -> e == null ? 1 : 0);
// this is the unique operator that cannot be applied out of the box, which maybe is
// a symptom of an unnecessary or uncommon operator that can be replaced with
Single<Integer> handle$ = str$.map(a -> 1).onErrorReturnItem(0);
All of this is just to show how to migrate from CompletableFuture
to RxJava
but
you should realize that, if you try the other way around is almost impossible. I mean,
RxJava
start from here, and it has much more operators. For example, I have skipped
all the CompletableFuture
async
variants because RxJava
is much more flexible,
and allow to configure the scheduling more precisely, e.g.:
Scheduler UI = Schedulers.single(); // this imitates the UI event loop
Single.fromCallable(() -> range(0, 100000).count())//executed in computation scheduler
.subscribeOn(Schedulers.computation()).observeOn(UI)
.subscribe(result -> {/**/});//executed in the UI event loop
Also, timing operators are pretty useful and difficult to apply in CompletableFuture
, e.g.:
// this will throws a timeout if it takes longer thant 5sec
Single.fromCallable(() -> range(0, 100000).count()).timeout(5, SECONDS);