Wednesday, December 9, 2020

Concurrency In RxJava


RxJava achieves concurrency through the Schedulers. 

Most commonly used Schedulers are IO & Computation:

Schedulers.io : Used for IO bound tasks (e.g.network call or Database call.)

Schedulers.computation : Used for CPU bound tasks.  (e.g. sorting large array in Java code)

The difference of CPU bound Vs IO bound task:

In case of IO bound task, we can have more theads , than,  the no of CPU cores of running machine. Because CPU is idle when the IO operaion is called.

Whereas in case of CPU bound task, as it is purely computational (e.g. Performing sorting in Java ArrayList); hence we should avoid creating threads more than the no of CPU core in the running machine.


subscribeOn : runs the tasks in new thread (start to end)

observeOn  : threading is applied only on the downstram task. (Operations defined after observeOn call)


Example:

In below example, we have taken a String , then transform the String to uppercase  , then printed the value.


File Name: ObsSubsEx.java

import java.util.concurrent.TimeUnit;


import io.reactivex.Observable;

import io.reactivex.schedulers.Schedulers;

public class ObsSubsEx {


public static void main(String[] args) throws InterruptedException {


Observable.just("subscribeOn One").subscribeOn(Schedulers.computation()).map(ObsSubsEx::toUpper).subscribe(ObsSubsEx::printVal);


TimeUnit.SECONDS.sleep(1);


Observable.just("subscribeOn Two").map(ObsSubsEx::toUpper).subscribeOn(Schedulers.computation()).subscribe(ObsSubsEx::printVal);


TimeUnit.SECONDS.sleep(1);


Observable.just("observeOn").map(ObsSubsEx::toUpper).observeOn(Schedulers.computation()).subscribe(ObsSubsEx::printVal);


TimeUnit.SECONDS.sleep(1);

}

private static String toUpper(String val) {

System.out.println("Uppercase done on thread:"+Thread.currentThread().getName());

return val.toUpperCase();

}


private static void printVal(String val) {

System.out.println("Final value is:"+val+":Thread:"+Thread.currentThread().getName());

}

}


Let's see from output log how the flow works:

Uppercase done on thread:RxComputationThreadPool-1
Final value is:SUBSCRIBEON ONE:Thread:RxComputationThreadPool-1

Uppercase done on thread:RxComputationThreadPool-2
Final value is:SUBSCRIBEON TWO:Thread:RxComputationThreadPool-2

Uppercase done on thread:main
Final value is:OBSERVEON:Thread:RxComputationThreadPool-3

Conclusion:

As we can see, in case of subscribeOn , irrespective of where it is called, both methods toUpper & printVal runs in a seperate thread. i.e. Threading applies to all operation (upstream as well as downtream) 

Whereas in case of observeOn, toUpper  is running in "main" thread & printVal  runs in a seperate thread, as we have called observeOn after the transform. i.e. Threading applied to downstream operations.

Now if we call the observeOn before the map:

Observable.just("observeOn Two").observeOn(Schedulers.computation()).map(ObsSubsEx::toUpper).subscribe(ObsSubsEx::printVal);

Then we can see the methods run in a seperate thread

Uppercase done on thread:RxComputationThreadPool-4
Final value is:OBSERVEON TWO:Thread:RxComputationThreadPool-4

Further Reading:


http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

https://www.aanandshekharroy.com/articles/2018-01/rxjava-flowables

https://proandroiddev.com/understanding-rxjava-subscribeon-and-observeon-744b0c6a41ea

https://dzone.com/articles/server-sent-events-with-rxjava-and-sseemitter

Code Link in Github:





No comments:

Convert Java Project from Log4j 1 to Log4j2

Many times while working on old Java projects we find Log4j 1.x is used. But as the Log4j2 is the new one; hence to upgrade to Log4j2 we nee...