RxJava in Action
Discover how to implement RxJava by following a step-by-step example that filters, transforms, and limits data streams on background and UI threads. Learn how each operator contributes to building reactive pipelines that handle asynchronous data efficiently and update the user interface.
We'll cover the following...
Sample RxJava code
Let’s take a look at some RxJava in action:
The above code will get the top 5 sub-$90,000 electric cars and then update the UI with that data.
There’s a lot going on here, so let’s see how it does this line-by-line:
-
Line 1: It all starts with our
Observable, the source of our data. TheObservablewaits for an observer to subscribe to it, at which point it does some work and pushes data to itsObserver. We intentionally abstract away the creation of thecarsObservablehere, and we’ll cover how to create anObservablein the next chapter. For now, let’s assume the work that theObservablewill be doing is querying a REST API to get an ordered list of the top-selling cars. -
Lines 3–10: form essentially a data pipeline. Items emitted by the
Observablewill travel through the pipeline from top to bottom. Each of the functions in the pipeline is what we call anOperator;Operatorsmodify theObservablestream, allowing us to massage the data until we get what we want. In our example, we start with an ordered list of all top-selling car models. By the end of our pipeline, we have just the top 5 selling electric cars that are under $90,000. The massaging of the data happens as follows:-
Line 3: The
.subscribeOn(...)function tells theObservableto do its work on a background thread. We do this so that we don’t block Android’s main UI thread with the network request performed by theObservable. -
Line 4: The
.filter(...)function filters the stream down to only items that represent electric cars. AnyCarobject that does not meet this criterion is discarded at this point and does not continue down the stream. -
Line 5: The
.filter(...)function further filters the electric cars to those below $90,000. -
Line 6: The
.map(...)function transforms the element from aCarobject to aStringconsisting of the year, model, and make. From here on, thisStringtakes the place of theCarin the stream. -
Line 7: The
.distinct()function removes any elements that we’ve already encountered before. Note that this uniqueness requirement applies to ourStringvalues and not ourCarinstances, which no longer exist at this point in our chain because of the previous.map(...)call. -
Line 8: The
.take(5)function ensures that at most five elements will be passed on. If five elements are emitted, the stream completes and emits no more items. -
Line 9: The
.observeOn(...)function switches our thread of execution back to the main UI thread. So far, we’ve been working on the background thread specified in Line 3. Now that we need to manipulate our views, however, we need to be back on the UI thread. -
Line 10: The
.subscribe(...)function is both the beginning and end of our data stream. It is the beginning because.subscribe()prompts theObservableto do its work and start emitting items. However, the parameter we pass to it is theObservable, which represents the end of the pipeline and defines some action to perform when it receives an item. In our case, the action will be to update the UI. The item received is the result of all of the transformations performed by the upstream operators.
-
Let’s execute the code!
We have set up a sample Gradle Java Application for you on our platform to implement RxJava concepts on the spot.
Click the “Run” button and see the above example in action!
Note: Since it’s not an Android application, the code below will simply print the car
year,make, andmodelin the terminal. Also, the commandAndroidSchedulers.mainThread()is specific to Android Studio. We will be usingSchedulers.io()as a substitute. Don’t worry! We’ll explain all these concepts in detail as you proceed in the course.
/*
* File path: src/main/java/RxJava/NetworkClient.java
*/
package RxJava;
import java.util.List;
import java.util.Random;
import java.util.ArrayList;
import io.reactivex.Observable;
public class NetworkClient{
Random random = new Random();
List<Car> fetchBestSellingCars(){
// perform blocking network call here but for the sake of the example,
// mimic network latency by adding a random thread sleep and return
// a new User object
randomSleep();
Car best1 = new Car(Car.Type.ALL_ELECTRIC, 80000, "2021", "Honda", "Civic");
Car best2 = new Car(Car.Type.ALL_ELECTRIC, 70000, "2020", "Honda", "Civic");
Car best3 = new Car(Car.Type.ALL_ELECTRIC, 80000, "2020", "Toyota", "Corolla");
Car best4 = new Car(Car.Type.ALL_ELECTRIC, 70000, "2019", "Ford", "Fiesta");
Car best5 = new Car(Car.Type.ALL_DIESEL, 50000, "2019", "Ford", "Fiesta");
Car best6 = new Car(Car.Type.ALL_ELECTRIC, 10000, "2021", "Toyota", "Corolla");
Car best7 = new Car(Car.Type.ALL_ELECTRIC, 60000, "2020", "Ford", "Ford Edge");
Car best8 = new Car(Car.Type.ALL_ELECTRIC, 60000, "2020", "Toyota", "Camry");
List<Car> bestSellingCars = new ArrayList<>();
bestSellingCars.add(best1);
bestSellingCars.add(best2);
bestSellingCars.add(best3);
bestSellingCars.add(best4);
bestSellingCars.add(best5);
bestSellingCars.add(best6);
bestSellingCars.add(best7);
bestSellingCars.add(best8);
return bestSellingCars;
}
void randomSleep() {
try {
Thread.sleep(random.nextInt(3) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}