Home Tutorials Training Consulting Products Books Company Donate Contact us









NOW Hiring

Quick links

Share

This tutorial contains notes about RxJava 2.0.

1. Using reactive programming with RxJava 2.0

1.1. What is RxJava and reactive programming

In reactive programming the consumer reacts to the data as it comes in. This is the reason why asynchronous programming is also called reactive programming. Reactive programming allows to propagates event changes to registered observers.

RxJava is a port from Netflix of the Reactive Extensions (Rx) to Java. RxJava was open sourced 2014 and is hosted at http://reactivex.io/.

The Observer pattern done right. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.

— reactivex.io

The Java version of this concept is called RxJava and is hosted under https://github.com/ReactiveX/RxJava. RxJava is published under the Apache 2.0 license.

RxJava describes itself as an API for asynchronous programming with observable streams.

The build blocks for RxJava code are the following:

  • observables representing sources of data

  • subscribers (or observers) listening to the observables

  • a set of methods for modifying and composing the data

An observable emits items; a subscriber consumes those items. An observable may emit any number of items (including zero items). It can terminate either successfully or with an error.

A observable can have any number of subscribers. If a new item is emitted from the observable, the onNext() method is called on each subscriber. If the observable finishes its data flow either successful or with an error, the onComplete() or the onError() method is called on each subscriber.

A very simple example written as JUnit4 test is the following:

package com.vogella.android.rxjava.simple;

import org.junit.Test;

import io.reactivex.Observable;

import static junit.framework.Assert.assertTrue;


public class RxJavaUnitTest {
    String result="";

    // Simple subscription to a fix value
    @Test
    public void returnAValue(){
        result = "";
        Observable<String> observer = Observable.just("Hello"); // provides datea
        observer.subscribe(s -> result=s); // Callable as subscriber
        assertTrue(result.equals("Hello"));
    }
}

1.2. Define a dependency to RxJava 2.0

As of this writing the version 2.0.4 is currently the released one. Replace the version with your desired version.s

For a Gradle build you can add RxJava via the following dependency statement.

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.0.4'

For Maven you can add a dependency with the following snippet

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.0.4</version>
</dependency>

For OSGi environments, e.g., Eclipse RCP development, https://dl.bintray.com/simon-scholz/RxJava-OSGi/ can be used as p2 update site.

rxjava update site

1.3. Why doing asynchronous programming

Programming in an imperative single threaded way can lead to blocking user interfaces. For example, waiting for a webservice call can cause an application freeze, if the network is not responsive. Such behavior results in a bad experience for the user.

This can be avoided by handling potential long running operations asynchronously.

An example would be:

public List<Todo> getTodos() {
        List<Todo> todosFromWeb = // query a webservice (with bad network latency)
        return todosFromWeb;
}

Calling the getTodos() method from the main thread cause the application logic to wait until the todosFromWeb arrive.

To improve this potential long running code should run in a different thread. It should post updates to the main thread when a result comes in.

public void getTodos(Consumer<List<Todo>> todosCallback) {

        Thread thread = new Thread(()-> {
                List<Todo> todosFromWeb = // query a webservice

                todosCallback.accept(todosFromWeb);
        });
        thread.start();
}

Now after calling the getTodos(Consumer<List<Todo>> todosConsumer) the main thread can continue working, is not blocked and can react once the accept method of the given Consumer is called. This code runs now asynchronous.

But what if errors during the web service query do occur?

public void getTodos(FailableCallback<List<Todo>> todosCallback) {

        Thread thread = new Thread(()-> {
                try {
                        List<Todo> todosFromWeb = // query a web service

                        todosCallback.accept(todosFromWeb);
                } catch(Exception ex) {
                        todosCallback.error(ex);
                }
        });
        thread.start();
}

With the custom FailableCallback interface it works, but it also adds complexity.

And there are even more issue that can occur:

  • Synchronization with the main thread may be required (For example, widgets in SWT and Android have to be updated from the UI thread)

  • What if the consumer of the FailableCallback is not present any more?

  • What if such a FailableCallback depends on another FailableCallback?

public void getUserPermission(FailableCallback<UserPermission> permissionCallback) {
        Thread thread = new Thread(()-> {
                try {
                        UserPermission permission = // query a web service

                        permissionCallback.accept(permission);
                } catch(Exception ex) {
                        permission.error(ex);
                }
        });
        thread.start();
}

public void getTodos(FailableCallback<List<Todo>> todosCallback) {

        Thread thread = new Thread(()-> {
                getUserPermission(new FailableCallback() {

                        public void accept(UserPermission permission) {
                                if(permission.isValid()) {
                                        try {
                                                List<Todo> todosFromWeb = // query a web service

                                                if(!todosCallbackInstance.isDisposed()) {
                                                        if(syncWithUIThread()) {
                                                                todosCallback.accept(todosFromWeb);
                                                        }
                                                }
                                        } catch(Exception ex) {
                                                if(!todosCallbackInstance.isDisposed()) {
                                                        if(syncWithUIThread()) {
                                                                todosCallback.error(ex);
                                                        }
                                                }
                                        }
                                }
                        }

                        public void error(Exception ex) {
                                // Oh no!
                        }
                });
        });
        thread.start();
}

This is really bad coding and hard to read code. Also having lots of callbacks can lead to what is known as the callbackhell. This can be better addressed with RxJava.

1.4. Advantages of RxJava

Some advantages of RxJava are the following:

  • You can chain async operations, e.g., if a API call depends on the call of another API

  • A defined way to handle errors

  • It reduces the need for state variables, which can be the source of errors == Creating sources, subscribing to them and disposing them

2. Creating sources to observe

Observables are the sources for the data. Usually they start providing data once a subscriber starts listening. They can produce an empty, a single or multiple items. They can terminate with an error or success. Sources may never terminate, e.g., a observable for a button click can potentially produce an infinite stream of events.

Table 1. Obervable types
Type Description

Flowable<T>

Emits 0 or n items and terminates with complete or an error. Supports backpressure, which allows to control how fast a source emits items.

Observable<T>

Emits 0 or n items and terminates with complete or an error.

Single<T>

Emits either a single item or an error. The reactive version of a method call. You subscribe to a Single and you get either a return value or an error.

Maybe<T>

Succeeds with an item, or no item, or errors. The reactive version of an Optional.

Completable

Either completes or returns an error. It never return items. The reactive version of a Runnable.

An example for the usage of backpressure is when you process touch events. You cannot control the user who is doing these touch events, but you can tell the source to emit the events on a slower rate in case you cannot processes them at the rate the user produces them. This has been designed for the support for reactive streams. Only Flowable supports backpressure.

The following shows an example for the creation of an observable.

 Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
            @Override
            public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
                try {
                    List<Todo> todos = RxJavaUnitTest.this.getTodos();
                    for (Todo todo : todos) {
                        emitter.onNext(todo);
                    }
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        });

Using lambdas the same statement can be expressed as:

Observable<Todo> todoObservable = Observable.create(emitter -> {
        try {
                List<Todo> todos = getTodos();
                for (Todo todo : todos) {
                        emitter.onNext(todo);
                }
                emitter.onComplete();
        } catch (Exception e) {
                emitter.onError(e);
        }
});

Typical Observable that could emit an undefined amount of data are UI listener, like a click listener. For there events it depends how often a user interacts them them.

Types that usually terminate either successfully or on failure are Maybe<T>, Single<T> and Completable.

Maybe<T> objects are kind of an async java.util.Optional from Java 8.

Maybe<List<Todo>> todoMaybe = Maybe.create(emitter -> {
        try {
                List<Todo> todos = getTodos();
                if(todos != null && !todos.isEmpty()) {
                        emitter.onSuccess(todos); (1)
                }else {
                        emitter.onComplete(); (2)
                }
        } catch (Exception e) {
                emitter.onError(e); (3)
        }
});
1 java.util.Optional has a value
2 java.util.Optional contains no value → null
3 An error occurred

Single<T> objects can also be considered as promises, which are also pretty popular in async frameworks and are similar to Maybe<T> objects, but only without the onComplete() method.

Completable objects are pretty similar to Single<T> objects, but without return value and therefore also do not have a generic type <T> like the other types. Completable objects can also be seen as reactive java.lang.Runnable objects.

2.1. Convenience methods to create observables

RxJava provides several convenience methods to create observables

  • Observable.just("Hello") - Allows to create an observable as wrapper around other data types

  • Observable.fromIterable() - takes an java.lang.Iterable<T> and emits their values in their order in the data structure

  • Observable.fromArray() - takes an array and emits their values in their order in the data structure

  • Observable.fromCallable() - Allows to create an observable for a java.util.concurrent.Callable<V>

  • Observable.fromFuture() - Allows to create an observable for a java.util.concurrent.Future

  • Observable.interval() - An observable that emits Long objects in a given interval

Similar methods exists for the other data types, e.g., *Flowable.just(), Maybe.just() and Single.just.

2.2. Subscribing in RxJava

Once an observable instance is available, subscribers can be attached.

All observable types offer a large variety of subscribe methods.

Observable<Todo> todoObservable = Observable.create(emitter -> { ... });

// Simply subscribe with a io.reactivex.functions.Consumer<T>, which will be informed onNext()
Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));

// Dispose the subscription when not interest=== Subscribing in RxJava

Once an observable instance is available, subscribers can be attached.

All observable types offer a large variety of subscribe methods.
Observable<Todo> todoObservable = Observable.create(emitter -> { ... });

// Simply subscribe with a io.reactivex.functions.Consumer<T>, which will be informed onNext()
Disposable disposable = todoObservable.subscribe(t -> System.out.print(t));

// Dispose the subscription when not interested in the emitted data any more
disposable.dispose();

// Also handle the error case with a second io.reactivex.functions.Consumer<T>
Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());

// ...

There can be even more io.reactivex.functions.Consumer<T> for onNext, onSuccess, onFailure, onComplete and so on according to the observable type.

A io.reactivex.functions.Consumer<T> is almost equal to the java.util.function.Consumer from java 8, except of that its accept method can throw an Exception. Besides that RxJava also does not depend on Java 8, but is compatible with Java 6.

There is also a subscribeWith method on observable instances, which can be used like this:

DisposableObserver<Todo> disposableObserver = todoObservable.subscribeWith(new         DisposableObserver<Todo>() {

        @Override
        public void onNext(Todo t) {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
});


// Also handle the error case with a second io.reactivex.functions.Consumer<T>
Disposable subscribe = todoObservable.subscribe(t -> System.out.print(t), e -> e.printStackTrace());

// ...

There can be even more io.reactivex.functions.Consumer<T> for onNext, onSuccess, onFailure, onComplete and so on according to the observable type.

A io.reactivex.functions.Consumer<T> is almost equal to the java.util.function.Consumer from java 8, except of that its accept method can throw an Exception. Besides that RxJava also does not depend on Java 8, but is compatible with Java 6.

There is also a subscribeWith method on observable instances, which can be used like this:

DisposableObserver<Todo> disposableObserver = todoObservable.subscribeWith(new         DisposableObserver<Todo>() {

        @Override
        public void onNext(Todo t) {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
});

2.3. Disposing subscriptions and using CompositeDisposable

When listers or subscribers are attached they usually are not supposed to listen eternally.

So it could happen that due to some state change the event being emitted by an observable might be not interesting any more.

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;

Single<List<Todo>> todosSingle = getTodos();

Disposable disposable = todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {

        @Override
        public void onSuccess(List<Todo> todos) {
                // work with the resulting todos
        }

        @Override
        public void onError(Throwable e) {
                // handle the error case
        }
});

// continue working and dispose when value of the Single is not interesting any more
disposable.dispose();

The Single class and other observable classes offer different subscribe methods, which return a Disposable object.

When working with multiple subscriptions, which may become obsolete due to the same state change using a CompositeDisposable is pretty handy to dispose a collection of subscriptions.

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.disposables.CompositeDisposable;

CompositeDisposable compositeDisposable = new CompositeDisposable();

Single<List<Todo>> todosSingle = getTodos();

Single<Happiness> happiness = getHappiness();

compositeDisposable.add(todosSingle.subscribeWith(new DisposableSingleObserver<List<Todo>>() {

        @Override
        public void onSuccess(List<Todo> todos) {
                // work with the resulting todos
        }

        @Override
        public void onError(Throwable e) {
                // handle the error case
        }
}));

compositeDisposable.add(happiness.subscribeWith(new DisposableSingleObserver<Happiness>() {

        @Override
        public void onSuccess(Happiness happiness) {
                // celebrate the happiness :-D
        }

        @Override
        public void onError(Throwable e) {
                System.err.println("Don't worry, be happy! :-P");
        }
}));

// continue working and dispose all subscriptions when the values from the Single objects are not interesting any more
compositeDisposable.dispose();

3. Caching values of completed observables

When working with observables doing async calls on every subscription on an observable is often not necessary.

It likely happens that observables are passed around in the application, without the need to do an such an expensive call all the time a subscription is added.

The following code does the expensive web query 4 times, even though doing this once would be fine, since the same Todo objects should be shown, but only in different ways.

Single<List<Todo>> todosSingle = Single.create(emitter -> {
        Thread thread = new Thread(() -> {
                try {
                        List<Todo> todosFromWeb = // query a webservice

                        System.out.println("Called 4 times!");

                        emitter.onSuccess(todosFromWeb);
                } catch (Exception e) {
                        emitter.onError(e);
                }
        });
        thread.start();
});

todosSingle.subscribe(... " Show todos times in a bar chart " ...);

showTodosInATable(todosSingle);

todosSingle.subscribe(... " Show todos in gant diagram " ...);

anotherMethodThatsSupposedToSubscribeTheSameSingle(todosSingle);

The next code snippet makes use of the cache method, so that the Single instance keeps its result, once it was successful for the first time.

Single<List<Todo>> todosSingle = Single.create(emitter -> {
        Thread thread = new Thread(() -> {
                try {
                        List<Todo> todosFromWeb = // query a webservice

                        System.out.println("I am only called once!");

                        emitter.onSuccess(todosFromWeb);
                } catch (Exception e) {
                        emitter.onError(e);
                }
        });
        thread.start();
});

// cache the result of the single, so that the web query is only done once
Single<List<Todo>> cachedSingle = todosSingle.cache();

cachedSingle.subscribe(... " Show todos times in a bar chart " ...);

showTodosInATable(cachedSingle);

cachedSingle.subscribe(... " Show todos in gant diagram " ...);

anotherMethodThatsSupposedToSubscribeTheSameSingle(cachedSingle);

4. Conversion between types

It is easy to convert between different RxJava types.

Table 2. Conversion between types
From / To Flowable Observable Maybe Single Completable

Flowable

toObservable()

reduce()
elementAt()
firstElement()
lastElement()
singleElement()

scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…​)

ignoreElements()

Observable

toFlowable()

reduce()
elementAt()
firstElement()
lastElement()
singleElement()

scan()
elementAt()
first()/firstOrError()
last()/lastOrError()
single()/singleOrError()
all()/any()/count()
(and more…​)

ignoreElements()

Maybe

toFlowable()

toObservable()

toSingle()
sequenceEqual()

toCompletable()

Single

toFlowable()

toObservable()

toMaybe()

toCompletable()

Completable

toFlowable()

toObservable()

toMaybe()

toSingle()
toSingleDefault()

5. RxAndroid

5.1. Using RxAndroid

RxAndroid is an extension to RxJava. It providers a scheduler to run code in the main thread of Android. It also provides the ability to create a scheduler that runs on a Android handler class. With this schedulers, you can define an observable which does its work in a background thread, and post our results to the main thread. This allows for example to replace a AsyncTask implementations which RxJava.

To use RxJava in Android add the following dependency to your build.gradle file.

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.8'

For example you can define a long running operation via the following observable.

final Observable<Integer> serverDownloadObservable = Observable.create(emitter -> {
        SystemClock.sleep(1000); // simulate delay
        emitter.onNext(5);
        emitter.onComplete();
    });

You can now subscribe to this observable. This triggers its execution and provide the subscribe with the required information.

For example, lets assume you assign this to a button.

serverDownloadObservable.
                        observeOn(AndroidSchedulers.mainThread()).(1)
                        subscribeOn(Schedulers.io()).  (2)
                        subscribe(integer -> {
                            updateTheUserInterface(integer); // this methods updates the ui
                            view.setEnabled(true); // enables it again
                        });
            }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
1 The subscriber observes in the main thread
2 Observable is called outside the main thread

As we are only interested in the final result, we could also use a Single.

Subscription subscription = Single.create(new Single.OnSubscribe() {
           @Override
           public void call(SingleSubscriber singleSubscriber) {
               String result = doSomeLongRunningStuff();
               singleSubscriber.onSuccess(value);
           }
       })
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe(new Action1() {
           @Override
           public void call(String value) {
               // onSuccess
               updateTheUserInterface(); // this methods updates the ui
           }
       }, new Action1() {
           @Override
           public void call(Throwable throwable) {
               // handle onError
           }
       });

5.2. Unsubscribe to avoid memory leaks

Observable.subscribe() returns a Subscription (if you are using a Flowable) or a Disposable object. To prevent a possible (temporary) memory leak, unsubscribe from your observables in the`onStop()` method of the activity or fragment. For example, for a Disposable object you could do the following:

@Override
    protected void onDestroy() {
        super.onDestroy();
        if (bookSubscription != null && !bookSubscription.isDisposed()) {
            bookSubscription.dispose();
        }
    }

6. Exercise: First steps with RxJava and RxAndroid

Create a new project with the com.vogella.android.rxjava.simple top level package name.

6.1. Gradle dependencies

Add the following dependencies to your app/build.gradle file.

compile 'com.android.support:recyclerview-v7:23.1.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.8'
compile 'com.squareup.okhttp:okhttp:2.5.0'
testCompile 'junit:junit:4.12'

Also enable the usage of Java 8 in your app/build.gradle file.

android {
   // more stuff
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
}

6.2. Create activities

Change your main layout file to the following.

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
              android:layout_width="match_parent"
              android:layout_height="match_parent"
              android:orientation="vertical"
    >

    <Button
        android:id="@+id/first"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:onClick="onClick"
        android:text="First"
        />

    <Button
        android:id="@+id/second"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:onClick="onClick"
        android:text="Second"

        />

    <Button
        android:id="@+id/third"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:onClick="onClick"
        android:text="Third"

        />
</LinearLayout>

Create three activities:

  • RxJavaSimpleActivity

  • BooksActivity

  • ColorsActivity

Create the activity_rxjavasimple.xml layout file.

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    >

    <Button
        android:id="@+id/button"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:text="Server"
        />
    <Button
        android:id="@+id/toastbutton"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:text="Toast"
        android:onClick="onClick"
        />

    <TextView
        android:id="@+id/resultView"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:text="Result"
        />
</LinearLayout>

activity_colors.xml

In RxJavaSimpleActivity create a observable which simulates a long running operation (10 secs) and afterwards returns the number 5. Subscribe to it via a button click, disable the button

package com.vogella.android.rxjava.simple;

import android.os.Bundle;
import android.os.SystemClock;
import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.RecyclerView;
import android.view.View;
import android.widget.TextView;
import android.widget.Toast;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;


public class RxJavaSimpleActivity extends AppCompatActivity {

    RecyclerView colorListView;
    SimpleStringAdapter simpleStringAdapter;
    CompositeDisposable disposable = new CompositeDisposable();
    public int value =0;

    final Observable<Integer> serverDownloadObservable = Observable.create(emitter -> {
        SystemClock.sleep(10000); // simulate delay
        emitter.onNext(5);
        emitter.onComplete();
    });

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxjavasimple);
        View view = findViewById(R.id.button);
        view.setOnClickListener(v -> {
            v.setEnabled(false); // disables the button until execution has finished
            Disposable subscribe = serverDownloadObservable.
                    observeOn(AndroidSchedulers.mainThread()).
                    subscribeOn(Schedulers.io()).
                    subscribe(integer -> {
                        updateTheUserInterface(integer); // this methods updates the ui
                        v.setEnabled(true); // enables it again
                    });
            disposable.add(subscribe);
        });
    }

    private void updateTheUserInterface(int integer) {
        TextView view = (TextView) findViewById(R.id.resultView);
        view.setText(String.valueOf(integer));
    }

    @Override
    protected void onStop() {
        super.onStop();
        if (disposable!=null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }

    public void onClick(View view) {
        Toast.makeText(this, "Still active " + value++, Toast.LENGTH_SHORT).show();
    }
}

Create an adapter for a recycler view.

package com.vogella.android.rxjava.simple;

import android.content.Context;
import android.support.v7.widget.RecyclerView;
import android.view.LayoutInflater;
import android.view.View;
import android.view.ViewGroup;
import android.widget.TextView;
import android.widget.Toast;

import java.util.ArrayList;
import java.util.List;

/**
 * Adapter used to map a String to a text view.
 */
public class SimpleStringAdapter extends RecyclerView.Adapter<SimpleStringAdapter.ViewHolder> {

    private final Context mContext;
    private final List<String> mStrings = new ArrayList<>();

    public SimpleStringAdapter(Context context) {
        mContext = context;
    }

    public void setStrings(List<String> newStrings) {
        mStrings.clear();
        mStrings.addAll(newStrings);
        notifyDataSetChanged();
    }

    @Override
    public ViewHolder onCreateViewHolder(ViewGroup parent, int viewType) {
        View view = LayoutInflater.from(parent.getContext()).inflate(R.layout.string_list_item, parent, false);
        return new ViewHolder(view);
    }

    @Override
    public void onBindViewHolder(ViewHolder holder, final int position) {
        holder.colorTextView.setText(mStrings.get(position));
        holder.itemView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Toast.makeText(mContext, mStrings.get(position), Toast.LENGTH_SHORT).show();
            }
        });
    }

    @Override
    public int getItemCount() {
        return mStrings.size();
    }

    public static class ViewHolder extends RecyclerView.ViewHolder {

        public final TextView colorTextView;

        public ViewHolder(View view) {
            super(view);
            colorTextView = (TextView) view.findViewById(R.id.color_display);
        }
    }
}

Implement ColorsActivity which uses a observable to receive a list of colors.

Create the activity_colors.xml layout file.

<?xml version="1.0" encoding="utf-8"?>
<FrameLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    >
    <android.support.v7.widget.RecyclerView
        android:id="@+id/color_list"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        />
</FrameLayout>
package com.vogella.android.rxjava.simple;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView;

import java.util.ArrayList;
import java.util.List;

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;


public class ColorsActivity extends AppCompatActivity {

    RecyclerView colorListView;
    SimpleStringAdapter simpleStringAdapter;
    private Disposable disposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        configureLayout();
        createObservable();
    }

    private void createObservable() {
        Observable<List<String>> listObservable = Observable.just(getColorList());
        disposable = listObservable.subscribe(colors -> simpleStringAdapter.setStrings(colors));

    }

    private void configureLayout() {
        setContentView(R.layout.activity_colors);
        colorListView = (RecyclerView) findViewById(R.id.color_list);
        colorListView.setLayoutManager(new LinearLayoutManager(this));
        simpleStringAdapter = new SimpleStringAdapter(this);
        colorListView.setAdapter(simpleStringAdapter);
    }

    private static List<String> getColorList() {
        ArrayList<String> colors = new ArrayList<>();
        colors.add("red");
        colors.add("green");
        colors.add("blue");
        colors.add("pink");
        colors.add("brown");
        return colors;
    }

    @Override
    protected void onStop() {
        super.onStop();
        if (disposable!=null && !disposable.isDisposed()) {
            disposable.dispose();
        }
    }
}

Create the following (fake) server implementation.

package com.vogella.android.rxjava.simple;

import android.content.Context;
import android.os.SystemClock;

import java.util.ArrayList;
import java.util.List;

/**
 * This is a fake REST client.
 *
 * It simulates making blocking calls to an REST endpoint.
 */
public class RestClient {
    private Context mContext;

    public RestClient(Context context) {
        mContext = context;
    }

    public List<String> getFavoriteBooks() {
        SystemClock.sleep(8000);// "Simulate" the delay of network.
        return createBooks();
    }

    public List<String> getFavoriteBooksWithException() {
        SystemClock.sleep(8000);// "Simulate" the delay of network.
        throw new RuntimeException("Failed to load");
    }

    private List<String> createBooks() {
        List<String> books = new ArrayList<>();
        books.add("Lord of the Rings");
        books.add("The dark elf");
        books.add("Eclipse Introduction");
        books.add("History book");
        books.add("Der kleine Prinz");
        books.add("7 habits of highly effective people");
        books.add("Other book 1");
        books.add("Other book 2");
        books.add("Other book 3");
        books.add("Other book 4");
        books.add("Other book 5");
        books.add("Other book 6");
        return books;
    }
}

Create the activity_books.xml layout file.

<?xml version="1.0" encoding="utf-8"?>
<FrameLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    >

    <ProgressBar
        android:id="@+id/loader"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:layout_gravity="center"
        />

    <android.support.v7.widget.RecyclerView
        android:id="@+id/books_list"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:visibility="gone"
        />

</FrameLayout>

Also implement the BooksActivity activity.

package com.vogella.android.rxjava.simple;

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView;
import android.view.View;
import android.widget.ProgressBar;

import java.util.List;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;


public class BooksActivity extends AppCompatActivity {

    private Disposable bookSubscription;
    private RecyclerView booksRecyclerView;
    private ProgressBar progressBar;
    private SimpleStringAdapter stringAdapter;
    private RestClient restClient;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        restClient = new RestClient(this);
        configureLayout();
        createObservable();
    }

    private void createObservable() {
        Observable<List<String>> booksObservable =
                Observable.fromCallable(() -> restClient.getFavoriteBooks());
        bookSubscription = booksObservable.
                subscribeOn(Schedulers.io()).
                observeOn(AndroidSchedulers.mainThread()).
                subscribe(strings -> displayBooks(strings));
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (bookSubscription != null && !bookSubscription.isDisposed()) {
            bookSubscription.dispose();
        }
    }

    private void displayBooks(List<String> books) {
        stringAdapter.setStrings(books);
        progressBar.setVisibility(View.GONE);
        booksRecyclerView.setVisibility(View.VISIBLE);
    }

    private void configureLayout() {
        setContentView(R.layout.activity_books);
        progressBar = (ProgressBar) findViewById(R.id.loader);
        booksRecyclerView = (RecyclerView) findViewById(R.id.books_list);
        booksRecyclerView.setLayoutManager(new LinearLayoutManager(this));
        stringAdapter = new SimpleStringAdapter(this);
        booksRecyclerView.setAdapter(stringAdapter);
    }

    @Override
    protected void onStop() {
        super.onStop();
        if (bookSubscription!=null && !bookSubscription.isDisposed()) {
            bookSubscription.dispose();
        }
    }
}

6.3. Implement a long running implementation via a Callable

A java.util.Callable is like a runnable but it can throw an exception and return a value.

The following activity implement a observable created based on a Callable. During the subscription a progressbar will be make visible and once the process finishes the progressbar is hidden again and a text view is updated.

The long running operation will run in the background, the update of the UI will happen in the main thread.

Here is the activity_scheduler.xml layout file:

[[source, xml]

<?xml version="1.0" encoding="utf-8"?>
<RelativeLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    >

    <Button
        android:id="@+id/scheduleLongRunningOperation"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:lines="3"
        android:text="Start something long"
        android:layout_marginStart="12dp"
        android:layout_alignParentStart="true"
        />

    <TextView
        android:id="@+id/messagearea"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:layout_alignParentStart="true"
        android:text=""
        android:layout_below="@+id/scheduleLongRunningOperation"
        />

    <ProgressBar
        android:id="@+id/progressBar"
        style="?android:attr/progressBarStyle"
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:visibility="gone"
        android:layout_alignBottom="@+id/scheduleLongRunningOperation"
        android:layout_toEndOf="@+id/scheduleLongRunningOperation"
        />

</RelativeLayout>

[[source, java]

package com.vogella.android.rxjava.simple;

import android.os.Bundle;
import android.os.SystemClock;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.ProgressBar;
import android.widget.TextView;

import java.util.concurrent.Callable;

import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;


/** Demonstrates a long running operation of the main thread
 * during which a  progressbar is shown
 *
 */
public class SchedulerActivity extends AppCompatActivity {

    private Disposable subscription;
    private ProgressBar progressBar;
    private TextView messagearea;
    private View button;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        configureLayout();
        createObservable();
    }

    private void createObservable() {
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (subscription != null && !subscription.isDisposed()) {
            subscription.dispose();
        }
    }

    private void configureLayout() {
        setContentView(R.layout.activity_scheduler);
        progressBar = (ProgressBar) findViewById(R.id.progressBar);
        messagearea = (TextView) findViewById(R.id.messagearea);
        button  = findViewById(R.id.scheduleLongRunningOperation);
        button.setOnClickListener(new View.OnClickListener(){
            @Override
            public void onClick(View v) {
//                progressBar.setVisibility(View.VISIBLE);
                Observable.fromCallable(callable).
                        subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).
                        doOnSubscribe(disposable ->
                                {
                                    progressBar.setVisibility(View.VISIBLE);
                                    button.setEnabled(false);
                                    messagearea.setText(messagearea.getText().toString() +"\n" +"Progressbar set visible" );
                                }
                        ).
                        subscribe(getDisposableObserver());
            }
        });
    }

    Callable<String> callable = new Callable<String>() {
        @Override
        public String call() throws Exception {
            return doSomethingLong();
        }
    };

    public String doSomethingLong(){
        SystemClock.sleep(1000);
        return "Hello";
    }

    /**
     * Observer
     * Handles the stream of data:
     */
    private DisposableObserver<String> getDisposableObserver() {
        return new DisposableObserver<String>() {

            @Override
            public void onComplete() {
                messagearea.setText(messagearea.getText().toString() +"\n" +"OnComplete" );
                progressBar.setVisibility(View.INVISIBLE);
                button.setEnabled(true);
                messagearea.setText(messagearea.getText().toString() +"\n" +"Hidding Progressbar" );
            }

            @Override
            public void onError(Throwable e) {
                messagearea.setText(messagearea.getText().toString() +"\n" +"OnError" );
                progressBar.setVisibility(View.INVISIBLE);
                button.setEnabled(true);
                messagearea.setText(messagearea.getText().toString() +"\n" +"Hidding Progressbar" );
            }

            @Override
            public void onNext(String message) {
                messagearea.setText(messagearea.getText().toString() +"\n" +"onNext " + message );
            }
        };
    }
}

7. Testing RxJava Observables and Subscriptions

7.1. Testing the observables

Flowable can be tested with io.reactivex.subscribers.TestSubscriber. Non-backpressured Observable, Single, Maybe and Completable can be tested with io.reactivex.observers.TestObserver.

@Test
    public void anObservableStreamOfEventsAndDataShouldEmitsEachItemInOrder() {

        Observable<String> pipelineOfData = Observable.just("Foo", "Bar");

        pipelineOfData.subscribe(testObserver);

        List<Object> dataEmitted = testObserver.values();
        assertThat(dataEmitted).hasSize(2);
        assertThat(dataEmitted).containsOnlyOnce("Foo");
        assertThat(dataEmitted).containsOnlyOnce("Bar");
    }

All base reactive types now have a test() method. This is a huge convenience for returning TestSubscriber or TestObserver.

TestSubscriber<Integer> ts = Flowable.range(1, 5).test();

TestObserver<Integer> to = Observable.range(1, 5).test();

TestObserver<Integer> tso = Single.just(1).test();

TestObserver<Integer> tmo = Maybe.just(1).test();

TestObserver<Integer> tco = Completable.complete().test();

8. Exercise: Writing unit tests for RxJava

8.1. Write small unit test

Create a small test to use RxJava in a test.

package com.vogella.android.rxjava.simple;

import org.junit.Test;

import java.util.List;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.observers.TestObserver;

import static junit.framework.Assert.assertTrue;


public class RxJavaUnitTest {
    String result="";

    // Simple subscription to a fix value
    @Test
    public void returnAValue(){
        result = "";
        Observable<String> observer = Observable.just("Hello"); // provides datea
        observer.subscribe(s -> result=s); // Callable as subscriber
        assertTrue(result.equals("Hello"));
    }

    @Test public void test(){
        Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
            @Override
            public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
                try {
                    List<Todo> todos = RxJavaUnitTest.this.getTodos();
                    if (todos!=null){
                        throw new NullPointerException("todos was null");
                    }
                    for (Todo todo : todos) {
                        emitter.onNext(todo);
                    }
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        });
        TestObserver<Object> testObserver = new TestObserver<>();
        todoObservable.subscribeWith(testObserver);
        testObserver.assertError(NullPointerException.class);

    }

    private List<Todo> getTodos() {
        return null;
    }

    public class Todo {
    }
}

The following code demonstrates the usage of Callable together with OkHttp and RxJava.

package com.vogella.android.rxjava.simple;

import org.junit.Test;

import java.util.List;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.observers.TestObserver;

import static junit.framework.Assert.assertTrue;


public class RxJavaUnitTest {
    String result="";

    // Simple subscription to a fix value
    @Test
    public void returnAValue(){
        result = "";
        Observable<String> observer = Observable.just("Hello"); // provides datea
        observer.subscribe(s -> result=s); // Callable as subscriber
        assertTrue(result.equals("Hello"));
    }

    @Test public void test(){
        Observable<Todo> todoObservable = Observable.create(new ObservableOnSubscribe<Todo>() {
            @Override
            public void subscribe(ObservableEmitter<Todo> emitter) throws Exception {
                try {
                    List<Todo> todos = RxJavaUnitTest.this.getTodos();
                    if (todos!=null){
                        throw new NullPointerException("todos was null");
                    }
                    for (Todo todo : todos) {
                        emitter.onNext(todo);
                    }
                    emitter.onComplete();
                } catch (Exception e) {
                    emitter.onError(e);
                }
            }
        });
        TestObserver<Object> testObserver = new TestObserver<>();
        todoObservable.subscribeWith(testObserver);
        testObserver.assertError(NullPointerException.class);

    }

    private List<Todo> getTodos() {
        return null;
    }

    public class Todo {
    }
}

9. About this website

10. RxJava resources

10.1. vogella GmbH training and consulting support

TRAINING SERVICE & SUPPORT

The vogella company provides comprehensive training and education services from experts in the areas of Eclipse RCP, Android, Git, Java, Gradle and Spring. We offer both public and inhouse training. Whichever course you decide to take, you are guaranteed to experience what many before you refer to as “The best IT class I have ever attended”.

The vogella company offers expert consulting services, development support and coaching. Our customers range from Fortune 100 corporations to individual developers.

Copyright © 2012-2016 vogella GmbH. Free use of the software examples is granted under the terms of the EPL License. This tutorial is published under the Creative Commons Attribution-NonCommercial-ShareAlike 3.0 Germany license.

See Licence.