RxJava, RxJS, RxSwift, RxKotlin, RxScala, Rx.NET… - do you see technologies like these listed in the job requirements and feel a little bit scared? I don’t blame you. Reactive programming may seem intimidating at first, but I believe that my explanation will give you solid understanding of the basics and push you quicker through the initial learning curve, regardless of the programming language you decide to use.
Before We Start
In order to better grasp the idea behind the Rx, let’s examine its three building blocks: Observer pattern, Iterator pattern and functional programming. Later in the article I’ll show you how each of these is used in the actual implementations. I purposefully defer the definition of reactive programming, because I want you to build a good understanding of other important concepts first. So bear with me!
Can you imagine your favorite messaging mobile application without push notifications, like the one below?
How would we operate without them? Well, let’s imagine that for a second. We probably would go crazy looking at our phones every few minutes, constantly launching messaging app, which in turn would call a web service asking for information about current status of unread messages. It’s not optimal for users as well as for web service itself. Push notifications flip this behavior around. When I was implementing them in mobile projects I’ve worked on, I didn’t have to run any periodic checks in the background - I simply had to invoke a function, which observes. The connection with web service is constantly open, but nothing happens until some data is pushed to the listening app, where it is turned into fancy notification at the top of your screen. My app doesn’t have to call anything, it patiently waits for certain event (like new message) to happen, preserving many resources e. g. battery. This is fine example of Observer pattern used by many in everyday life. As old Hollywood Principle states: “Don’t Call Us. We’ll Call You.”.
If you decided to dive into reactive programming, you’ve probably already worked with collections like lists, sets or maps. The most common thing you can do with them is traversing, and Iterator pattern simplifies that. Think for a minute, what two methods would class responsible for going through each element of a list have? How would you call them? Hint: remember that you can’t go beyond the size of the collection.
Iterator usually consists of method “next()”, which returns the consecutive element and “hasNext()”, which returns boolean indicating if we reached the end.
All in all, if you’ve written “for loop” few times in your life you should intuitively get the idea of iteration and that’s enough.
This is a style of programming that to many (including me) is more elegant, easier to debug and generally cleaner. I want you to take a look at examples below written in Kotlin and try to name two main differences.
“sumIfEven” function computes the sum of the numbers in a given list, only if that number is even. I created a variable “sumOfEven” to which I add consecutive even numbers, while iterating over the list with “for loop”.
“sumIf” function computes the sum of the numbers in a given list, only if that number satisfies a given predicate (a function that returns true or false based on some argument, which in our case is an integer). I used “filter” function, which outputs a list without odd numbers (each element of list is passed to the predicate function and if predicate returns true then the element will be present in the output). The output is then summed using “sum”.
The first major difference is getting rid of state, which in the first example was accumulated in the variable “sumOfEven”. The second one is a bit more complicated and involves something called higher-order function - a function, which returns or takes as an argument another function. In our case “sumIf” takes another function (“predicate”) as an argument. This function is a simple one checking if the number is even. Please study closely the part related to passing a function as we’ll refer to it multiple times in this article.
These differences are two main pillars of functional programming - lack of state (no side effects) and higher-order functions (passing function as parameter or returning it).Which example do you like more? Let me still defer the definition of reactive programming to the point when you read about two additional concepts - Observable and Observer.
Observable is a stream of data. You can think of it as array, but instead of items being available immediately, they are appearing one by one as time passes. A good example of data, that can be emitted is mouse coordinates. Let’s say we want to have a stream, which returns each user’s click for the period of 5 minutes.
The observation of cursor starts.
1 minute passed and user still didn’t click anywhere.
2 minutes 35 seconds - user clicked! The coordinates are emitted.
3 minutes 59 seconds - user clicked again! The coordinates are emitted.
4 minutes 15 seconds - click! The coordinates are emitted.
5 minutes. End of observation.
Let’s plot observed coordinates using a version of something called marble diagram. I want you to really feel how time is integral component of the Observable.
But how can these mouse clicks be consumed? What can we do with them? Was someone observing?
There were three kinds of events, which happened, or could happen during emission of mouse click coordinates. The first one is the most common - Observable simply pushes another item e. g. at 4 minutes 15 seconds. I’ll call this event “onNext”. The second is happening at the end (at exactly 5 minutes). Observable stops emitting no matter what, as we wanted to listen for new data only during the specific period of time. Nothing happens after that. Let’s call this event “onComplete”. The last type of events is also terminal. Imagine that user did something we didn’t anticipate, like clicking outside of permitted area. We did terrible job at handling this case in our code, so it ends in runtime error. We can’t emit anymore at this point, but we want to know what happened and why it happened. Let’s call the “exception” event “onError”.
This is the perfect time to finally define reactive programming. Reactive programming is a programming paradigm (a fancy name for style or way of programming), which combines Observer pattern (listening for “onNext”, “onCompleted” and “onError” events), Iterator pattern (helps in dealing with items emitted through Observable, which in some respects, is similar to collection) and functional programming (check “Operators” section in this article for more familiar examples) to make certain problems easier to solve (in my opinion - most of the problems) using data streams and the propagation of change over time (basically Observables).
A function “mouseClicksObservable” returns an Observable which over time emits a sequence of coordinates (class Coordinate). Note the syntax of listening for new events. We need to call “subscribe” and pass an Observer class, with implementation of three methods corresponding to three types of events that can happen. Can you guess what will be printed in the console? This is Java. See code below for simplified functional version written in Kotlin.
In my opinion, the sexiest ingredient of reactive programming is the functional one. You will stumble upon its two pillars (lack of state and higher-order functions) frequently when using operators. Operator is a function that manipulates items coming out of Observable, so that Observer, typically in “onNext” callback, could receive properly processed data. For example, imagine a stream emitting dates in the number format. We want our Observer to act on them (e. g. print them) but in the String format and only if they are not older than two days. Operators do the job of transforming date numbers before they reach the Observer.
Inside official docs for ReactiveX (a library written in different languages to make reactive programming easier) we read: “Each language-specific implementation of ReactiveX implements a set of operators. Although there is much overlap between implementations, there are also some operators that are only implemented in certain implementations. Also, each implementation tends to name its operators to resemble those of similar methods that are already familiar from other contexts in that language.”. This should reassure you about statement I made in the section “Before We Start”. Now let’s see some examples.
In this example there are two Observables. The one above the “filter” operator is the input (you can see, that as time passes numbers are emitted). The one below is the output after applying “filter”. Can you see what happened? Each element of the input is passed to the function inside “filter” (here the function is “x > 10”, where “x” is the next element) and if it returns true, then the item appears in the output Observable.
Very similar situation - above is the input, below - output. This operator makes sure, to keep all elements in the output Observable unique, by not letting them go from the input if they already appeared at some point.
When item is emitted in the input, the operator waits a certain amount of time (say, one second) and if nothing more within that timespan appears, then the item is emitted in the output. Hence this little shift between corresponding items in the diagram. Now, look what happened in the middle:
- “2” was emitted in the input.
- “debounce” is waiting for a given amount of time - say, one second.
- Within that one second “3” is emitted, so “2” can’t be emitted and now “one-second-waiting” starts again.
- Within that one second “4” is emitted, so “3” can’t be emitted, too. Waiting starts again.
- Within this last waiting “5” appears, so “4” can’t be in the output.
- One second passed and within that time no more items appeared, so “debounce” pushes only “5” to the output.
Simple. Makes the emission of the input Observable start later by a given amount of time.
This is a little bit different. There are two Observables as the input and only one as the output. What “merge” does is take each item emitted from the first and second Observable in the input and put them one by one into one Observable in the output. Notice that on the diagram the output has the same items at the same timeline points.
“Map” operator transforms each item from the input to new item in the output. The transformation is given to “map” via a function. In this example this function takes each element (“x”) and multiplies it by 10.
To this operator you must pass a function, which returns an Observable for each item in the input. The output however won’t consist of Observables, because they are all merged together into one stream. Check next paragraph for more clarification.
Common Interview Question - What's the Difference Between Flatmap and Map
You’ve probably noticed, that “flatMap” is very similar to “map”. Maybe you think that “flatMap” is not needed, just like I did at the beginning of my Rx journey? Let’s imagine, what world would be like without “flatMap”. Say, we have our stream of mouse coordinates from method “mouseClicksObservable”. For each click however, we want Observer to obtain two values - x and y coordinate - one by one (not together). Well, if we were to use “map”, we would need to think what should we map a coordinate to. The output class should be able to emit two values one by one, because that’s what Observer wants in our example. The only one I know, which is capable of doing this, is, yes you guessed it, Observable. So we would map each coordinate in the input to Observable. What would be the output? One Observable, where each item is an Observable itself. Each emitting x and y. Can you see the problem? We would need to observe each one individually. What can we do with that? We can, for example, “flatten” all Observables in the output, i. e. merge them all together into one. After doing that, all items are in the same stream and we are subscribing only once. This is why we need “flatMap” and this is exactly why it is called this way.
Fancy a Challenge?
Congratulations! You now understand the basics of the reactive programming paradigm. It’s not that tough, right? I want to present you with a practical exercise, which requires a little extra digging into the docs of ReactiveX (http://reactivex.io/). Post answer in the comment. No code required, just proper order of operators with description (what’s on the input and output of each of them e. g. “Mouse coordinates on the input of “flatMap” - x and y values one by one on the output.”). Ready?
A function “searchQuery” returns an Observable with items, which are Strings. Each String is emitted after user types another letter, for example when user inputs a word “cola”, the Observable emits following items one by one: “c”, “co”, “col”, “cola”. Requirements:
- I want the Observer to only receive Strings, which contain more than three chars.
- I do not want Observer to obtain every item when user types rapidly.
- I do not want Observer to obtain items in the situation when user makes changes to the query, but within 1000 milliseconds the resulting query is the same. For example: say, current query is “cola”. Time counting starts. User changes “cola” to “colar”, then to “colars”, then to “colar”, then to “cola”. 1000 milliseconds passes. So within that timespan user started with “cola” and ended with “cola”, too! I don’t want any of these items (“cola”, “colar”, colars”) to be emitted to the Observer.