In my last blog, I described new lambda support in Java 8. In order to try new Java features in more depth, I implemented Reactive extensions in Java 8. In short, reactive extensions allows processing synchronous and asynchronous in data uniform manner. It provides unified interfaces that can be used as an iterator or callback method for asynchronous processing. Though, Microsoft RX library is huge but I only implemented core features and focused on Observable API. Here is brief overview of implementation:
Creating Observable from Collection
Here is how you can create Observable from a collection:
List<String> names = Arrays.asList("One", "Two", "Three", "Four", "Five"); Observable.from(names).subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("done"));
In Microsoft’s version of RX, Observable takes an Observer for subscription, which defines three methods: onNext, onError and onCompleted. onNext is invoked to push next element of data, onError is used to notify errors and onCompleted is called when data is all processed. In my implementation, the Observable interface defines two overloaded subscribe method, first takes callback functions for onNext and onError and second method takes three callback functions including onCompleted. I chose to use separate function parameters instead of a single interface so that caller can pass inline lambda functions instead of passing implementation of Observer interface.
Creating Observable from Array of objects
Here is how you can create Observable from stream:
Observable.from("Erica", "Matt", "John", "Mike").subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("done"));
Creating Observable from Stream
Here is how you can create Observable from stream:
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); // note third argument for onComplete is optional Observable.from(names).subscribe(name -> System.out.println(name), error -> error.printStackTrace());
Creating Observable from Iterator
Here is how you can create Observable from iterator:
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); Observable.from(names.iterator()).subscribe(name -> System.out.println(name), error -> error.printStackTrace());
Creating Observable from Spliterator
Here is how you can create Observable from spliterator:
List<String> names = Arrays.asList("One", "Two", "Three", "Four", "Five"); Observable.from(names.spliterator()).subscribe(System.out::println, Throwable::printStackTrace);
Creating Observable from a single object
Here is how you can create Observable from a single object:
Observable.just("value").subscribe(v -> System.out.println(v), error -> error.printStackTrace()); // if a single object is collection, it would be treated as a single entity, e.g. Observable.just(Arrays.asList(1, 2, 3)).subscribe( num -> System.out.println(num), error -> error.printStackTrace());
Creating Observable for an error
Here is how you can create Observable that would return an error:
Observable.throwing(new Error("test error")).subscribe(System.out::println, error -> System.err.println(error)); // this will print error
Creating Observable from a consumer function
Here is how you can create Observable that takes user function for invoking onNext, onError and onCompleted function:
Observable.create(observer -> { for (String name : names) { observer.onNext(name); } observer.onCompleted(); }).subscribe(System.out::println, Throwable::printStackTrace);
Creating Observable from range
Here is how you can create Observable from stream that would create numbers from start to end range exclusively.
// Creates range of numbers starting at from until it reaches to exclusively Observable.range(4, 8).subscribe(num -> System.out.println(num), error -> error.printStackTrace()); // will print 4, 5, 6, 7
Creating empty Observable
It would call onCompleted right away:
Observable.empty().subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("Completed"));
Creating never Observable
It would not call any of call back methods:
Observable.never().subscribe(System.out::println, Throwable::printStackTrace);
Changing Scheduler
By default Observable notifies observer asynchronously using thread-pool scheduler but you can change default scheduler as follows:
Using thread-pool scheduler
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); Observable.from(names).subscribeOn(Scheduler.getThreadPoolScheduler()). subscribe(System.out::println, Throwable::printStackTrace);
Using new-thread scheduler
It will create new thread
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); Observable.from(names).subscribeOn(Scheduler.getNewThreadScheduler()). subscribe(System.out::println, Throwable::printStackTrace);
Using timer thread with interval
It will notify at each interval
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); Observable.from(names).subscribeOn(Scheduler.getTimerSchedulerWithMilliInterval(1000)). subscribe(System.out::println, Throwable::printStackTrace); // this will print each name every second
Using immediate scheduler
This scheduler calls callback functions right away on the same thread. You can use this if you synchronous data and don’t want to create another thread. On the downside, you cannot unsubscribe with this scheduler.
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); Observable.from(names).subscribeOn(Scheduler.getImmediateScheduler()). subscribe(System.out::println, Throwable::printStackTrace);
Transforming
Observables keep sequence of items as streams and they support map/flatMap operation as supported by standard Stream class, e.g.
Map
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); Observable.from(names).map(name -> name.hashCode()). subscribe(System.out::println, Throwable::printStackTrace);
FlatMap
StreamintegerListStream = Stream.of( Arrays.asList(1, 2), Arrays.asList(3, 4), Arrays.asList(5)); Observable.from(integerListStream).flatMap(integerList -> integerList.stream()). subscribe(System.out::println, Throwable::printStackTrace);
Filtering
Observables supports basic filtering support as provided by Java Streams, e.g.
Filter
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); Observable.from(names).filter(name -> name.startsWith("T")). subscribe(System.out::println, Throwable::printStackTrace); // This will only print Two and Three
Skip
skips given number of elements
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); Observable.from(names).skip(2).subscribe(System.out::println, Throwable::printStackTrace); // This will skip One and Two
Limit
Stream<String> names = Stream.of("One", "Two", "Three", "Four", "Five"); Observable.from(names).limit(2).subscribe(System.out::println, Throwable::printStackTrace); // This will only print first two strings
Distinct
Stream<String> names = Stream.of("One", "Two", "Three", "One"); Observable.from(names).distinct.subscribe(System.out::println, Throwable::printStackTrace); // This will print One only once
Merge
This concates two observable data:
Observable<Integer> observable2 = Observable.from(Stream.of(4, 5, 6)); observable1.merge(observable2).subscribe(System.out::println, Throwable::printStackTrace); // This will print 1, 2, 3, 4, 5, 6
Summary
In summary, as Java 8 already supported a lot of functional primitives, adding support for reactive extensions was quite straight forward. For example, Nextflix’s implementation of reactive extensions in Java consists of over 80K lines of code but it took few hundred lines to implement core features with Java 8. You can download or fork the code from https://github.com/bhatti/RxJava8.