Search⌘ K
AI Features

RxJava in Action

Discover how to implement RxJava by following a step-by-step example that filters, transforms, and limits data streams on background and UI threads. Learn how each operator contributes to building reactive pipelines that handle asynchronous data efficiently and update the user interface.

Sample RxJava code

Let’s take a look at some RxJava in action:

Java
Observable<Car> carsObservable = getBestSellingCarsObservable();
carsObservable.subscribeOn(Schedulers.io())
.filter(car -> car.type == Car.Type.ALL_ELECTRIC)
.filter(car -> car.price < 90000)
.map(car -> car.year + " " + car.make + " " + car.model)
.distinct()
.take(5)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::updateUi);

The above code will get the top 5 sub-$90,000 electric cars and then update the UI with that data.

There’s a lot going on here, so let’s see how it does this line-by-line:

  • Line 1: It all starts with our Observable, the source of our data. The Observable waits for an observer to subscribe to it, at which point it does some work and pushes data to its Observer. We intentionally abstract away the creation of the carsObservable here, and we’ll cover how to create an Observable in the next chapter. For now, let’s assume the work that the Observable will be doing is querying a REST API to get an ordered list of the top-selling cars.

  • Lines 3–10: form essentially a data pipeline. Items emitted by the Observable will travel through the pipeline from top to bottom. Each of the functions in the pipeline is what we call an Operator; Operators modify the Observable stream, allowing us to massage the data until we get what we want. In our example, we start with an ordered list of all top-selling car models. By the end of our pipeline, we have just the top 5 selling electric cars that are under $90,000. The massaging of the data happens as follows:

    • Line 3: The .subscribeOn(...) function tells the Observable to do its work on a background thread. We do this so that we don’t block Android’s main UI thread with the network request performed by the Observable.

    • Line 4: The .filter(...) function filters the stream down to only items that represent electric cars. Any Car object that does not meet this criterion is discarded at this point and does not continue down the stream.

    • Line 5: The .filter(...) function further filters the electric cars to those below $90,000.

    • Line 6: The .map(...) function transforms the element from a Car object to a String consisting of the year, model, and make. From here on, this String takes the place of the Car in the stream.

    • Line 7: The .distinct() function removes any elements that we’ve already encountered before. Note that this uniqueness requirement applies to our String values and not our Car instances, which no longer exist at this point in our chain because of the previous .map(...) call.

    • Line 8: The .take(5) function ensures that at most five elements will be passed on. If five elements are emitted, the stream completes and emits no more items.

    • Line 9: The .observeOn(...) function switches our thread of execution back to the main UI thread. So far, we’ve been working on the background thread specified in Line 3. Now that we need to manipulate our views, however, we need to be back on the UI thread.

    • Line 10: The .subscribe(...) function is both the beginning and end of our data stream. It is the beginning because .subscribe() prompts the Observable to do its work and start emitting items. However, the parameter we pass to it is the Observable, which represents the end of the pipeline and defines some action to perform when it receives an item. In our case, the action will be to update the UI. The item received is the result of all of the transformations performed by the upstream operators.

Let’s execute the code!

We have set up a sample Gradle Java Application for you on our platform to implement RxJava concepts on the spot.

Click the “Run” button and see the above example in action!

Note: Since it’s not an Android application, the code below will simply print the car year, make, and model in the terminal. Also, the command AndroidSchedulers.mainThread() is specific to Android Studio. We will be using Schedulers.io() as a substitute. Don’t worry! We’ll explain all these concepts in detail as you proceed in the course.

/*
 * File path: src/main/java/RxJava/NetworkClient.java
 */
package RxJava;

import java.util.List;
import java.util.Random;
import java.util.ArrayList;
import io.reactivex.Observable;


public class NetworkClient{
    Random random = new Random();


    List<Car> fetchBestSellingCars(){
        // perform blocking network call here but for the sake of the example,
        // mimic network latency by adding a random thread sleep and return
        // a new User object    
    	randomSleep();
		Car best1 = new Car(Car.Type.ALL_ELECTRIC, 80000, "2021", "Honda", "Civic");
        Car best2 = new Car(Car.Type.ALL_ELECTRIC, 70000, "2020", "Honda", "Civic");
		Car best3 = new Car(Car.Type.ALL_ELECTRIC, 80000, "2020", "Toyota", "Corolla");
        Car best4 = new Car(Car.Type.ALL_ELECTRIC, 70000, "2019", "Ford", "Fiesta");
		Car best5 = new Car(Car.Type.ALL_DIESEL, 50000, "2019", "Ford", "Fiesta");
        Car best6 = new Car(Car.Type.ALL_ELECTRIC, 10000, "2021", "Toyota", "Corolla");
        Car best7 = new Car(Car.Type.ALL_ELECTRIC, 60000, "2020", "Ford", "Ford Edge");
        Car best8 = new Car(Car.Type.ALL_ELECTRIC, 60000, "2020", "Toyota", "Camry");
    	List<Car> bestSellingCars = new ArrayList<>();
        bestSellingCars.add(best1);
        bestSellingCars.add(best2);
        bestSellingCars.add(best3);
        bestSellingCars.add(best4);
        bestSellingCars.add(best5);
        bestSellingCars.add(best6);
        bestSellingCars.add(best7);
        bestSellingCars.add(best8);
    	return bestSellingCars;
    }

     void randomSleep() {
        try {
            Thread.sleep(random.nextInt(3) * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
RxJava in action