Home Tutorials Training Consulting Products Books Company Donate Contact us









NOW Hiring

Quick links

Share

This tutorial contains notes about RxJava 2.0.

1. RxJava 2.0

1.1. What is RxJava and reactive programming

In reactive programming the consumer reacts to the data as it comes in. This is the reason why asynchronous programming is also called reactive programming. Reactive programming allows to propagates event changes to registered observers.

RxJava is a port from Netflix of the Reactive Extensions (Rx) to Java. RxJava was open sourced 2014 and is hosted at http://reactivex.io/.

The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

— reactivex.io

The Java version of this concept is called RxJava and is hosted under https://github.com/ReactiveX/RxJava. RxJava is published under the Apache 2.0 license.

RxJava describes itself as an API for asynchronous programming with observable streams.

1.2. Define a dependency to RxJava 2.0

As of this writing the version 2.0.4 is currently the released one. Replace g.a.v with 2.0.6 or a later version.

For a Gradle build you can add RxJava via the following dependency statement.

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: 'g.a.v'

For Maven you can add a dependency with the following snippet

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>g.a.v</version>
</dependency>

For OSGi environments, e.g., Eclipse RCP development, https://dl.bintray.com/simon-scholz/RxJava-OSGi/ can be used as p2 update site.

rxjava update site

1.3. Async programming

Nowadays programming in an imperative single threaded way usually leads to strange behaviors, a blocking non responsive UIs and therefore a bad user experience.

For example actively waiting for a database query or a webservice call can cause an application freeze, if the network is not responsive.

This can be avoided by handling unpredicted things asynchronously.

An example would be:

public List<Todo> getTodos() {

        List<Todo> todosFromWeb = // query a webservice (with bad network latency)

        return todosFromDb;
}

Calling the getTodos() method from the main thread or an UI thread will cause a non responsive application until the todosFromWeb arrive.

To improve this the query, which takes an unpredictable amount of time, this code should run in a different thread and inform the main thread when a result comes in.

public void getTodos(Consumer<List<Todo>> todosCallback) {

        Thread thread = new Thread(()-> {
                List<Todo> todosFromWeb = // query a webservice

                todosCallback.accept(todosFromWeb);
        });
        thread.start();
}

Now after calling the getTodos(Consumer<List<Todo>> todosConsumer) the main thread can continue working, is not blocked and can react once the accept method of the given Consumer is called.

Now the code is really asynchronous.

But what if errors during the web service query do occur?

public void getTodos(FailableCallback<List<Todo>> todosCallback) {

        Thread thread = new Thread(()-> {
                try {
                        List<Todo> todosFromWeb = // query a web service

                        todosCallback.accept(todosFromWeb);
                } catch(Exception ex) {
                        todosCallback.error(ex);
                }
        });
        thread.start();
}

With the custom FailableCallback interface it works, but it also adds complexity.

And there are even more issue that can occur:

  • Syncronization with the UI (Widgets in SWT and Android have to be updated from the UI thread)

  • What if the consumer of the FailableCallback is not present any more?

  • What if such a FailableCallback depends on another FailableCallback?

public void getUserPermission(FailableCallback<UserPermission> permissionCallback) {
        Thread thread = new Thread(()-> {
                try {
                        UserPermission permission = // query a web service

                        permissionCallback.accept(permission);
                } catch(Exception ex) {
                        permission.error(ex);
                }
        });
        thread.start();
}

public void getTodos(FailableCallback<List<Todo>> todosCallback) {

        Thread thread = new Thread(()-> {
                getUserPermission(new FailableCallback() {

                        public void accept(UserPermission permission) {
                                if(permission.isValid()) {
                                        try {
                                                List<Todo> todosFromWeb = // query a web service

                                                if(!todosCallbackInstance.isDisposed()) {
                                                        if(syncWithUIThread()) {
                                                                todosCallback.accept(todosFromWeb);
                                                        }
                                                }
                                        } catch(Exception ex) {
                                                if(!todosCallbackInstance.isDisposed()) {
                                                        if(syncWithUIThread()) {
                                                                todosCallback.error(ex);
                                                        }
                                                }
                                        }
                                }
                        }

                        public void error(Exception ex) {
                                // Oh no!
                        }
                });
        });
        thread.start();
}

This is really bad coding and it could get worse and should only show one example what can be addressed with ReactiveX. These problems are often considered as callback hell.

2. RxJava Observable Types

The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

— reactivex.io

In order to archive this RxJava comes with observable types acting as the sources of data, classes to subscribe to these observable types and a lot of methods for modifying, composing and transforming the data being exchanged between the observable and the subscriber.

Some of the methods are pretty similar to the ones Java 8 provides with the Stream API, e.g., filter(), map() and so on.

Table 1. Table Obervable types
Type

Flowable<T>

Observable<T>

Single<T>

Maybe<T>

Completable

Types that can emit data repeatedly or even infinite are Flowable<T> and Obervable<T>.

Observable<Todo> todoObservable = Observable.create(emitter -> {
        try {
                List<Todo> todos = getTodos();
                for (Todo todo : todos) {
                        emitter.onNext(todo);
                }
                emitter.onComplete();
        } catch (Exception e) {
                emitter.onError(e);
        }
});

Typical Observable that could emit likely infinite data are UI listener like a click listener, where it is unpredictable that often a user might click on a button or other UI widget.

Types that usually terminate either successfully or on failure are Maybe<T>, Single<T> and Completable.

Maybe<T> objects are kind of an async java.util.Optional from Java 8.

Maybe<List<Todo>> todoMaybe = Maybe.create(emitter -> {
        try {
                List<Todo> todos = getTodos();
                if(todos != null && !todos.isEmpty()) {
                        emitter.onSuccess(todos); (1)
                }else {
                        emitter.onComplete(); (2)
                }
        } catch (Exception e) {
                emitter.onError(e); (3)
        }
});
1 java.util.Optional has a value
2 java.util.Optional contains no value → null
3 An error occurred

Single<T> objects can also be considered as promises, which are also pretty popular in async frameworks and are similar to Maybe<T> objects, but only without the onComplete() method.

Completable objects are pretty similar to Single<T> objects, but without return value and therefore also do not have a generic type <T> like the other types. Completable objects can also be seen as reactive java.lang.Runnable objects.

Besides the most popular create() method of these observable type there are more convenience methods to create one of these types.

  • Observable.just() - Allows to create an observable as wrapper around other data types

  • Observable.fromIterable() - takes an java.lang.Iterable<T> and emits their values in their order in the data structure

  • Observable.fromArray() - takes an array and emits their values in their order in the data structure

  • Observable.fromCallable() - Allows to create an observable for a java.util.concurrent.Callable<V>

  • Observable.fromFuture() - Allows to create an observable for a java.util.concurrent.Future

  • Observable.interval() - An observable that emits Long objects in a given interval

  • …​

3. Subscribing in RxJava

One an observable instance is available listener/subscriber can be attached.

All observable types offer a large variety of subscribe methods.

Observable<Todo> todoObservable = Observable.create(emitter -> { ... });

// Simply subscribe with a io.reactivex.functions.Consumer<T>, which will be informed onNext()
Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));

// Dispose the subscription when not interested in the emitted data any more
disposable.dispose();

// Also handle the error case with a second io.reactivex.functions.Consumer<T>
Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());

// ...

There can be even more io.reactivex.functions.Consumer<T> for onNext, onSuccess, onFailure, onComplete and so on according to the observable type.

A io.reactivex.functions.Consumer<T> is almost equal to the java.util.function.Consumer from java 8, except of that its accept method can throw an Exception. Besides that RxJava also does not depend on Java 8, but is compatible with Java 6.

There is also a subscribeWith method on observable instances, which can be used like this:

DisposableObserver<Todo> disposableObserver = todoObservable.subscribeWith(new         DisposableObserver<Todo>() {

        @Override
        public void onNext(Todo t) {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
});

4. Disposing subscriptions and using CompositeDisposable

When listers or subscribers are attached they usually are not supposed to listen eternally.

So it could happen that due to some state change the event being emitted by an observable might be not interesting any more.

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;

Single<List<Todo>> todosSingle = getTodos();

Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {

        @Override
        public void onSuccess(List<Todo> todos) {
                // work with the resulting todos
        }

        @Override
        public void onError(Throwable e) {
                // handle the error case
        }
});

// continue working and dispose when value of the Single is not interesting any more
disposable.dispose();

The Single class and other observable classes offer different subscribe methods, which return a Disposable object.

When working with multiple subscriptions, which may become obsolete due to the same state change using a CompositeDisposable is pretty handy to dispose a collection of subscriptions.

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.disposables.CompositeDisposable;

CompositeDisposable compositeDisposable = new CompositeDisposable();

Single<List<Todo>> todosSingle = getTodos();

Single<Happiness> happiness = getHappiness();

compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {

        @Override
        public void onSuccess(List<Todo> todos) {
                // work with the resulting todos
        }

        @Override
        public void onError(Throwable e) {
                // handle the error case
        }
}));

compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver<Happiness>() {

        @Override
        public void onSuccess(Happiness happiness) {
                // celebrate the happiness :-D
        }

        @Override
        public void onError(Throwable e) {
                System.err.println("Don't worry, be happy! :-P");
        }
}));

// continue working and dispose all subscriptions when the values from the Single objects are not interesting any more
compositeDisposable.dispose();

5. Caching values of completed observables

When working with observables doing async calls on every subscription on an observable is often not necessary.

It likely happens that observables are passed around in the application, without the need to do an such an expensive call all the time a subscription is added.

The following code does the expensive web query 4 times, even though doing this once would be fine, since the same Todo objects should be shown, but only in different ways.

Single<List<Todo>> todosSingle = Single.create(emitter -> {
        Thread thread = new Thread(() -> {
                try {
                        List<Todo> todosFromWeb = // query a webservice

                        System.out.println("Called 4 times!");

                        emitter.onSuccess(todosFromWeb);
                } catch (Exception e) {
                        emitter.onError(e);
                }
        });
        thread.start();
});

todosSingle.subscribe(... " Show todos times in a bar chart " ...);

showTodosInATable(todosSingle);

todosSingle.subscribe(... " Show todos in gant diagram " ...);

anotherMethodThatsSupposedToSubscribeTheSameSingle(todosSingle);

The next code snippet makes use of the cache method, so that the Single instance keeps its result, once it was successful for the first time.

Single<List<Todo>> todosSingle = Single.create(emitter -> {
        Thread thread = new Thread(() -> {
                try {
                        List<Todo> todosFromWeb = // query a webservice

                        System.out.println("I am only called once!");

                        emitter.onSuccess(todosFromWeb);
                } catch (Exception e) {
                        emitter.onError(e);
                }
        });
        thread.start();
});

// cache the result of the single, so that the web query is only done once
Single<List<Todo>> cachedSingle = todosSingle.cache();

cachedSingle.subscribe(... " Show todos times in a bar chart " ...);

showTodosInATable(cachedSingle);

cachedSingle.subscribe(... " Show todos in gant diagram " ...);

anotherMethodThatsSupposedToSubscribeTheSameSingle(cachedSingle);

6. Flowable<T> and Backpressure

RxJava 2.0 introduced a new type Flowable<T>, which is pretty much the same as an Observable<T> in regards of the API, but Flowable<T> supports backpressure and Observable<T> does not.

Back in RxJava 1.0 the concept of backpressure came too late and was added to Observable<T> types, but some did throw a MissingBackpressureException, so distinguishing between Flowable<T> and Observable<T> is a good thing.

Besides Observable<T> also Maybe<T>, Single<T> and Completable have no backpressure.

7. Conversion between types

It is easy to convert between different RxJava types.

Table 2. Conversion between types
From / To Flowable Observable Maybe Single Completable

Flowable

toObservable()

reduce()
elementAt()
firstElement()
lastElement()
singleElement()

scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…​)

ignoreElements()

Observable

toFlowable()

reduce()
elementAt()
firstElement()
lastElement()
singleElement()

scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…​)

ignoreElements()

Maybe

toFlowable()

toObservable()

toSingle()
sequenceEqual()

toCompletable()

Single

toFlowable()

toObservable()

toMaybe()

toCompletable()

Completable

toFlowable()

toObservable()

toMaybe()

toSingle()
toSingleDefault()

8. Testing RxJava Observables and Subscriptions

8.1. Testing the observables

You can test an observable via the TestSubscriber class provided by the RxJava library.

Observable<String> obs = ...// assume creation code here
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
obs.subscribe(testSubscriber);

testSubscriber.assertNoErrors();
List<String> chickens = testSubscriber.getOnNextEvents();

// TODO assert your string integrity...

8.2. Testing the observables

RxJava provides a way to override the schedulers exposed, so that observables are called synchronously. See http://fedepaol.github.io/blog/2015/09/13/testing-rxjava-observables-subscriptions/ for an example.

9. About this website

10. RxJava resources

10.1. vogella GmbH training and consulting support

TRAINING SERVICE & SUPPORT

The vogella company provides comprehensive training and education services from experts in the areas of Eclipse RCP, Android, Git, Java, Gradle and Spring. We offer both public and inhouse training. Whichever course you decide to take, you are guaranteed to experience what many before you refer to as “The best IT class I have ever attended”.

The vogella company offers expert consulting services, development support and coaching. Our customers range from Fortune 100 corporations to individual developers.

Copyright © 2012-2016 vogella GmbH. Free use of the software examples is granted under the terms of the EPL License. This tutorial is published under the Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Germany license.

See Licence.