Shahzad Bhatti

April 23, 2014

Implementing Reactive Extensions (RX) using Java 8

Filed under: Java — admin @ 11:52 pm

In my last blog, I described new lambda support in Java 8. In order to try new Java features in more depth, I implemented Reactive extensions in Java 8. In short, reactive extensions allows processing synchronous and asynchronous in data uniform manner. It provides unified interfaces that can be used as an iterator or callback method for asynchronous processing. Though, Microsoft RX library is huge but I only implemented core features and focused on Observable API. Here is brief overview of implementation:

Creating Observable from Collection

Here is how you can create Observable from a collection:

    List<String> names = Arrays.asList("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribe(System.out::println, 
       Throwable::printStackTrace, () -> System.out.println("done"));
 

In Microsoft’s version of RX, Observable takes an Observer for subscription, which defines three methods: onNext, onError and onCompleted. onNext is invoked to push next element of data, onError is used to notify errors and onCompleted is called when data is all processed. In my implementation, the Observable interface defines two overloaded subscribe method, first takes callback functions for onNext and onError and second method takes three callback functions including onCompleted. I chose to use separate function parameters instead of a single interface so that caller can pass inline lambda functions instead of passing implementation of Observer interface.

Creating Observable from Array of objects

Here is how you can create Observable from stream:

    Observable.from("Erica", "Matt", "John", "Mike").subscribe(System.out::println, 
         Throwable::printStackTrace, () -> System.out.println("done"));
 

Creating Observable from Stream

Here is how you can create Observable from stream:

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    // note third argument for onComplete is optional
    Observable.from(names).subscribe(name -> System.out.println(name), 
       error -> error.printStackTrace());
 

Creating Observable from Iterator

Here is how you can create Observable from iterator:

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names.iterator()).subscribe(name -> System.out.println(name), 
       error -> error.printStackTrace());
 

Creating Observable from Spliterator

Here is how you can create Observable from spliterator:

    List<String> names = Arrays.asList("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names.spliterator()).subscribe(System.out::println, 
       Throwable::printStackTrace);
 

Creating Observable from a single object

Here is how you can create Observable from a single object:

    Observable.just("value").subscribe(v -> System.out.println(v), 
       error -> error.printStackTrace());
    // if a single object is collection, it would be treated as a single entity, e.g.
    Observable.just(Arrays.asList(1, 2, 3)).subscribe( num -> System.out.println(num), 
       error -> error.printStackTrace());
 

Creating Observable for an error

Here is how you can create Observable that would return an error:

    Observable.throwing(new Error("test error")).subscribe(System.out::println, 
       error -> System.err.println(error));
    // this will print error 
 

Creating Observable from a consumer function

Here is how you can create Observable that takes user function for invoking onNext, onError and onCompleted function:

    Observable.create(observer -> {
       for (String name : names) {
          observer.onNext(name);
       }
       observer.onCompleted();
    }).subscribe(System.out::println, Throwable::printStackTrace);
 

Creating Observable from range

Here is how you can create Observable from stream that would create numbers from start to end range exclusively.

    // Creates range of numbers starting at from until it reaches to exclusively
    Observable.range(4, 8).subscribe(num -> System.out.println(num), 
       error -> error.printStackTrace());
    // will print 4, 5, 6, 7
 

Creating empty Observable

It would call onCompleted right away:

    Observable.empty().subscribe(System.out::println, 
       Throwable::printStackTrace, () -> System.out.println("Completed"));
 

Creating never Observable

It would not call any of call back methods:

    Observable.never().subscribe(System.out::println, Throwable::printStackTrace);
 
 

Changing Scheduler

By default Observable notifies observer asynchronously using thread-pool scheduler but you can change default scheduler as follows:

Using thread-pool scheduler

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribeOn(Scheduler.getThreadPoolScheduler()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

Using new-thread scheduler

It will create new thread

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribeOn(Scheduler.getNewThreadScheduler()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

Using timer thread with interval

It will notify at each interval

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribeOn(Scheduler.getTimerSchedulerWithMilliInterval(1000)).
       subscribe(System.out::println, Throwable::printStackTrace);
    // this will print each name every second
 

Using immediate scheduler

This scheduler calls callback functions right away on the same thread. You can use this if you synchronous data and don’t want to create another thread. On the downside, you cannot unsubscribe with this scheduler.

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).subscribeOn(Scheduler.getImmediateScheduler()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

Transforming

Observables keep sequence of items as streams and they support map/flatMap operation as supported by standard Stream class, e.g.

Map

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).map(name -> name.hashCode()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

FlatMap

    Stream integerListStream = Stream.of( Arrays.asList(1, 2), 
       Arrays.asList(3, 4), Arrays.asList(5));
    Observable.from(integerListStream).flatMap(integerList -> integerList.stream()).
       subscribe(System.out::println, Throwable::printStackTrace);
 

Filtering

Observables supports basic filtering support as provided by Java Streams, e.g.

Filter

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", 
       "Five"); 
    Observable.from(names).filter(name -> name.startsWith("T")).
       subscribe(System.out::println, Throwable::printStackTrace);
    // This will only print Two and Three
 

Skip

skips given number of elements

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).skip(2).subscribe(System.out::println, 
       Throwable::printStackTrace);
    // This will skip One and Two
 

Limit

    Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); 
    Observable.from(names).limit(2).subscribe(System.out::println, 
       Throwable::printStackTrace);
    // This will only print first two strings
 

Distinct

    Stream<String> names = Stream.of("One", "Two", "Three", "One");
    Observable.from(names).distinct.subscribe(System.out::println, 
       Throwable::printStackTrace);
    // This will print One only once
 

Merge

This concates two observable data:

    Observable<Integer> observable2 = Observable.from(Stream.of(4, 5, 6));
    observable1.merge(observable2).subscribe(System.out::println, 
       Throwable::printStackTrace);
    // This will print 1, 2, 3, 4, 5, 6
 

Summary

In summary, as Java 8 already supported a lot of functional primitives, adding support for reactive extensions was quite straight forward. For example, Nextflix’s implementation of reactive extensions in Java consists of over 80K lines of code but it took few hundred lines to implement core features with Java 8. You can download or fork the code from https://github.com/bhatti/RxJava8.


No Comments »

No comments yet.

RSS feed for comments on this post. TrackBack URL

Leave a comment

You must be logged in to post a comment.

Powered by WordPress