RxJava Transform Operators, as the name indicates, are used for transforming the items which are emitted by reactive sources such as Observables.
In this article we will discuss about various RxJava Transform operators in depth with examples. Here we will discuss various operators like Buffer, Map, FlatMap, SwitchMap, ConcatMap, etc.
This article is part of RxJava Intro series. You can checkout the entire series here:
- RxAndroid Introduction
- RxAndroid Example – RxJava Tutorial #2
- RxJava Operators In General – RxJava Tutorial #3
- RxJava Operators for Creating Observables – RxJava Tutorial #4
- RxJava Operators for Transforming Observables – RxJava Tutorial #5 – You are here
- RxJava Operators for Filtering Observables – RxJava Tutorial #6 – Coming Up
- RxJava Operators for Combining Observables – RxJava Tutorial #7 – Coming Up
Now let us explore RxJava Transform Operators one by one.
[adinserter block=”8″]
RxJava Transform Operators
Buffer
This operator periodically gathers items emitted by an Observable into bundle and emits these bundles rather than emitting the items one at a time.
If the source Observable issues onError in between, then the Buffer immediately pass on this error notification on priority even if it contains some data that was emitted by the Observable before it issued error.
Example
package com.coderefer.rxandroidexamples.intro.operators.transform | |
import android.os.Bundle | |
import android.support.v7.app.AppCompatActivity | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
private const val TAG = "BufferOperator" | |
class BufferOperatorActivity : AppCompatActivity() { | |
private lateinit var disposable: Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_buffer) | |
val observable = | |
Observable.just(10, 20, 30, 40, 50, 60) | |
.buffer(3) | |
disposable = observable.subscribe { | |
Log.d(TAG, "onNext: ") | |
for(s in it) { | |
Log.d(TAG, s.toString()) | |
} | |
} | |
} | |
override fun onDestroy() { | |
disposable.dispose() | |
super.onDestroy() | |
} | |
} |
In the above Example, in line 20, we can see that we used Observable.buffer() method to transform the way items are emitted from it.
Output
2019-03-04 09:00:33.580 6316-6316/com.coderefer.rxandroidexamples D/BufferOperator: onNext: | |
2019-03-04 09:00:33.580 6316-6316/com.coderefer.rxandroidexamples D/BufferOperator: 10 | |
2019-03-04 09:00:33.580 6316-6316/com.coderefer.rxandroidexamples D/BufferOperator: 20 | |
2019-03-04 09:00:33.580 6316-6316/com.coderefer.rxandroidexamples D/BufferOperator: 30 | |
2019-03-04 09:00:33.580 6316-6316/com.coderefer.rxandroidexamples D/BufferOperator: onNext: | |
2019-03-04 09:00:33.580 6316-6316/com.coderefer.rxandroidexamples D/BufferOperator: 40 | |
2019-03-04 09:00:33.580 6316-6316/com.coderefer.rxandroidexamples D/BufferOperator: 50 | |
2019-03-04 09:00:33.580 6316-6316/com.coderefer.rxandroidexamples D/BufferOperator: 60 |
[adinserter block=”8″]
Map
This operator is used to transform items emitted by Observable by applying function to each item.
Example
package com.coderefer.rxandroidexamples.intro.operators.transform | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
private const val TAG = "MapOperator" | |
class MapOperatorActivity : AppCompatActivity() { | |
private lateinit var disposable: Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_map_operator) | |
val observable = | |
Observable.just(1,2,3,4,5) | |
.map { | |
it*10 | |
} | |
disposable = observable.subscribe { | |
Log.d(TAG, it.toString()) | |
} | |
} | |
override fun onDestroy() { | |
disposable.dispose() | |
super.onDestroy() | |
} | |
} | |
In the above example, in line 21, we multiplied each item emitted by Observable with 10 using Map Operator thus transforming the data emitted by Observables.
Output:
2019-03-04 08:44:41.583 29042-29042/com.coderefer.rxandroidexamples D/MapOperator: 10 | |
2019-03-04 08:44:41.583 29042-29042/com.coderefer.rxandroidexamples D/MapOperator: 20 | |
2019-03-04 08:44:41.583 29042-29042/com.coderefer.rxandroidexamples D/MapOperator: 30 | |
2019-03-04 08:44:41.583 29042-29042/com.coderefer.rxandroidexamples D/MapOperator: 40 | |
2019-03-04 08:44:41.583 29042-29042/com.coderefer.rxandroidexamples D/MapOperator: 50 |
Important Note: Notice that the order of insertion is same as that of the order of emission in case of Map Operator
FlatMap
FlatMap operator transforms the items emitted by Observable into Observables, by applying function to the items and then later, it flattens these items emitted by these Observables into a Single Observable. In other words, FlatMap merges Due to this flattening of the emissions, the order of emission is not maintained since the items emitted by these Observables are mixed (interleave).
Example:
package com.coderefer.rxandroidexamples.intro.operators.transform | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.android.schedulers.AndroidSchedulers | |
import io.reactivex.schedulers.Schedulers | |
import java.util.concurrent.TimeUnit | |
private const val TAG = "FlatMapOperator" | |
class FlatMapOperatorActivity : AppCompatActivity() { | |
private val items: List<Int> = listOf(1,2,3,4,5) | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_flat_map_operator) | |
Observable.fromIterable(items) | |
.flatMap { | |
Observable.just(it * 2) | |
.delay(it/10L, TimeUnit.SECONDS) | |
} | |
.doOnNext{ | |
Log.d(TAG, it.toString()) | |
} | |
.observeOn(Schedulers.io()) | |
.subscribeOn(AndroidSchedulers.mainThread()) | |
.subscribe() | |
} | |
} |
Output:
2019-04-21 15:33:29.305 17098-17146/com.coderefer.rxandroidexamples D/FlatMapOperator: 4 | |
2019-04-21 15:33:29.306 17098-17146/com.coderefer.rxandroidexamples D/FlatMapOperator: 2 | |
2019-04-21 15:33:29.307 17098-17146/com.coderefer.rxandroidexamples D/FlatMapOperator: 6 | |
2019-04-21 15:33:29.307 17098-17146/com.coderefer.rxandroidexamples D/FlatMapOperator: 10 | |
2019-04-21 15:33:29.308 17098-17146/com.coderefer.rxandroidexamples D/FlatMapOperator: 8 |
In the above output, we can see that the order is not the same. Flatmap Operator does not preserve the order of items.
[adinserter block=”8″]
SwitchMap
The main difference between other operators and SwitchMap is the cancelling effect. SwitchMap operates in such a way that whenever a new item is emitted by Observable, it will unsubscribe to that old observable and begins mirroring the new one.
Example
package com.coderefer.rxandroidexamples.intro.operators.transform | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.android.schedulers.AndroidSchedulers | |
import io.reactivex.disposables.Disposable | |
import io.reactivex.schedulers.Schedulers | |
import java.util.concurrent.TimeUnit | |
private const val TAG = "SwitchMapOperator" | |
class SwitchMapOperatorActivity : AppCompatActivity() { | |
lateinit var disposable: Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_switch_map_operator) | |
val numList = arrayListOf(1, 2, 3, 4, 5) | |
disposable = Observable.interval(0, 2, TimeUnit.SECONDS) | |
.switchMap { | |
Observable.interval(0,750, TimeUnit.MILLISECONDS) | |
} | |
.doOnNext { | |
Log.d(TAG, it.toString()) | |
} | |
.observeOn(Schedulers.io()) | |
.subscribeOn(AndroidSchedulers.mainThread()) | |
.subscribe() | |
} | |
override fun onDestroy() { | |
disposable.dispose() | |
super.onDestroy() | |
} | |
} |
In the above example, in line 28, we can see the observable starts emitting the values in the time interval 750 milliseconds and again starts observing from 0 whenever the Observable from line 26 starts emitting new item.
Output
2019-04-20 22:30:50.501 12230-12258/com.coderefer.rxandroidexamples D/SwitchMapOperator: 0 | |
2019-04-20 22:30:51.244 12230-12258/com.coderefer.rxandroidexamples D/SwitchMapOperator: 1 | |
2019-04-20 22:30:51.993 12230-12258/com.coderefer.rxandroidexamples D/SwitchMapOperator: 2 | |
2019-04-20 22:30:52.483 12230-12257/com.coderefer.rxandroidexamples D/SwitchMapOperator: 0 | |
2019-04-20 22:30:53.235 12230-12257/com.coderefer.rxandroidexamples D/SwitchMapOperator: 1 | |
2019-04-20 22:30:53.984 12230-12257/com.coderefer.rxandroidexamples D/SwitchMapOperator: 2 | |
2019-04-20 22:30:54.486 12230-12258/com.coderefer.rxandroidexamples D/SwitchMapOperator: 0 | |
2019-04-20 22:30:55.239 12230-12258/com.coderefer.rxandroidexamples D/SwitchMapOperator: 1 | |
2019-04-20 22:30:55.989 12230-12258/com.coderefer.rxandroidexamples D/SwitchMapOperator: 2 | |
2019-04-20 22:30:56.486 12230-12257/com.coderefer.rxandroidexamples D/SwitchMapOperator: 0 | |
2019-04-20 22:30:57.237 12230-12257/com.coderefer.rxandroidexamples D/SwitchMapOperator: 1 | |
2019-04-20 22:30:57.989 12230-12257/com.coderefer.rxandroidexamples D/SwitchMapOperator: 2 | |
2019-04-20 22:30:58.483 12230-12258/com.coderefer.rxandroidexamples D/SwitchMapOperator: 0 | |
2019-04-20 22:30:59.234 12230-12258/com.coderefer.rxandroidexamples D/SwitchMapOperator: 1 | |
2019-04-20 22:30:59.985 12230-12258/com.coderefer.rxandroidexamples D/SwitchMapOperator: 2 | |
… |
ConcatMap
ConcatMap operator works almost same as FlatMap, the only difference is – ConcatMap preserves the order of emission of items. Let us take the same example which we used for FlatMap and use ConcatMap instead.
Example
package com.coderefer.rxandroidexamples.intro.operators.transform | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.android.schedulers.AndroidSchedulers | |
import io.reactivex.schedulers.Schedulers | |
import java.util.concurrent.TimeUnit | |
private const val TAG = "ConcatMapOperator" | |
class ConcatMapOperatorActivity : AppCompatActivity() { | |
private val items: List<Int> = listOf(1,2,3,4,5) | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_concat_map_operator) | |
Observable.fromIterable(items) | |
.concatMap { | |
Observable.just(it * 2) | |
.delay(it/10L, TimeUnit.SECONDS) | |
} | |
.doOnNext{ | |
Log.d(TAG, it.toString()) | |
} | |
.observeOn(Schedulers.io()) | |
.subscribeOn(AndroidSchedulers.mainThread()) | |
.subscribe() | |
} | |
} |
Output
From the above output, we can see that the order of emission of items is preserved.
In the upcoming articles, we will discuss about RxJava filtering operators.
If you like what you’ve read today you can check our my other articles on Android and Kotlin development, or if want to get in touch, please send me a tweet or follow me on Twitter or on Facebook, it really makes my day.