In this article we will discuss about various RxJava create operators in depth with examples. Here we will discuss how to create Observables using various operators like Create, From, Just, Differ, etc.
This article is part of RxJava Intro series. You can checkout the entire series here:
- RxAndroid Introduction
- RxAndroid Example – RxJava Tutorial #2
- RxJava Operators In General – RxJava Tutorial #3
- RxJava Operators for Creating Observables – RxJava Tutorial #4 – You are Here
- RxJava Operators for Transforming Observables – RxJava Tutorial #5
- RxJava Operators for Filtering Observables – RxJava Tutorial #6 – Coming Up
- RxJava Operators for Combining Observables – RxJava Tutorial #7 – Coming Up
Let us explore these RxJava create operators one by one.
RxJava Create Operators
Create
Create operator helps creating an Observable from scratch by calling observer methods programmatically. It creates only one Observable for all Observers.
We have already seen an example on how to create an Observable using Create Operator. Let us re-collect the same by creating one more Observable.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.coderefer.rxandroidexamples.intro.operators.create | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.Observer | |
import io.reactivex.disposables.Disposable | |
private const val TAG = "CreateOperatorActivity" | |
/** | |
* Class to demonstrate create Operator | |
* */ | |
class CreateOperatorActivity : AppCompatActivity() { | |
private val numList = listOf(1, 2, 3, 4, 5) | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_create_operator) | |
val observable = initializeObservable() | |
val observer = initializeObserver() | |
observable.subscribe(observer) | |
} | |
private fun initializeObserver(): Observer<Int> { | |
return object : Observer<Int> { | |
override fun onComplete() { | |
Log.d(TAG, "onComplete") | |
} | |
override fun onSubscribe(d: Disposable) { | |
} | |
override fun onNext(t: Int) { | |
Log.d(TAG, t.toString()) | |
} | |
override fun onError(e: Throwable) { | |
} | |
} | |
} | |
private fun initializeObservable(): Observable<Int> { | |
// using create operator to create a new Observable | |
return Observable.create { | |
for (i in numList) | |
it.onNext(i) | |
it.onComplete() | |
} | |
} | |
} |
In the above example, you can identify that in Line 48, we created an Observable using Create Operator. If we dive into RxJava documentation link for create() we observe that it does not take any values. Hence we declared the values as list and iterated through them using onNext() function in line 49 – 50.
Here is the output:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 1 | |
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 2 | |
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 3 | |
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 4 | |
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 5 | |
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: onComplete |
Defer
Defer differs from create in a way that Defer does not create an Observable unless and until an Observer is subscribed to it. It waits until an Observer subscribes and generate a Fresh Observable with fresh data. So even though the Observer thinks it is subscribing to Same Observable, it gets a Fresh Observable.
Advantages
As this Operator waits till last minute to create new Observable, this ensures that the Observable always contains fresh Data. So this Operator is mostly used when intended to fetch freshest Data
Example
Here, I’ve created an Observable using Defer Operator as shown in the below example.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.coderefer.rxandroidexamples.intro.operators.create | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
private const val TAG = "DeferOperatorActivity" | |
class DeferOperatorActivity : AppCompatActivity() { | |
private lateinit var disposable1 :Disposable | |
private lateinit var disposable2 :Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_defer_operator) | |
val testClass = TestClass() | |
val observable = testClass.strObservable() | |
val deferObservable = testClass.deferObservable() | |
testClass.str = "World" | |
disposable1 = observable.subscribe { | |
Log.d(TAG, it) // prints "Hello" only since it is the value when Observable is created / initialized | |
} | |
observable.subscribe() | |
disposable2 =deferObservable.subscribe { | |
Log.d(TAG, it) // prints "World" since it creates a fresh Observable when subscribed with fresh data.. hence prints "World" | |
} | |
} | |
override fun onDestroy() { | |
disposable1.dispose() // to avoid memory leaks | |
disposable2.dispose() | |
super.onDestroy() | |
} | |
} | |
class TestClass { | |
var str: String? = "Hello" | |
fun strObservable() : Observable<String> { | |
return Observable.just(str) | |
} | |
// defer observable | |
fun deferObservable() : Observable<String> { | |
return Observable.defer { | |
Observable.just(str) | |
} | |
} | |
} | |
In the above example, we created a TestClass by initializing a variable named str to “Hello”. I’ve taken two types of observable – one is with Just operator and other is with Defer operator.
In line 24, I’ve modified the str value to “World”. In lines 25, 30 we subscribed to both observables. Now If we see the logcat, the output is printed as follows:
We can clearly see that Defer operator is fetching the latest data by creating a fresh Observable.
From
From operator is used to transform objects like lists, arrays, futures etc into Observables.
There are different methods for transforming different objects like fromArray(), fromCallable(), fromFuture(), fromIterable(), fromPublisher(). More info is provided at the RxJava Wiki Link.
Example
Here I will consider an example using fromIterable where I will be converting arraylist into Observables.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.coderefer.rxandroidexamples.intro.operators.create | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
private const val TAG = "FromOperatorActivity" | |
class FromOperatorActivity : AppCompatActivity() { | |
private lateinit var mDisposable : Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_from_operator) | |
val numList = arrayListOf(11,12,13,14,15) | |
val numObservable = Observable.fromIterable(numList) | |
mDisposable = numObservable.subscribe { | |
Log.d(TAG, it.toString()) | |
} | |
} | |
override fun onDestroy() { | |
mDisposable.dispose() | |
super.onDestroy() | |
} | |
} |
In Line 22, I’ve transformed the numList to Observable.
Output:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 11 | |
2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 12 | |
2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 13 | |
2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 14 | |
2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 15 |
Interval
This operator creates an Observable that emits a sequence of Integers with a given particular interval of time.
Example
Below is the example where I used Interval Operator to create Observable and print Integer starting from 0.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.coderefer.rxandroidexamples.intro.operators.create | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
import java.util.concurrent.TimeUnit | |
private const val TAG = "IntervalOperator" | |
class IntervalOperatorActivity : AppCompatActivity() { | |
lateinit var disposable :Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_interval_operator) | |
val clock = Observable.interval(1, TimeUnit.SECONDS) | |
disposable = clock.subscribe { | |
Log.d(TAG, it.toString()) | |
} | |
} | |
override fun onDestroy() { | |
disposable.dispose() | |
super.onDestroy() | |
} | |
} |
In line 20, I created a clock Observable using Interval Operator and given it an interval of 1 second. Now the output for this above will be as follows:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2019-01-19 10:20:28.731 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 0 | |
2019-01-19 10:20:29.727 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 1 | |
2019-01-19 10:20:30.728 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 2 | |
2019-01-19 10:20:31.729 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 3 | |
2019-01-19 10:20:32.728 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 4 | |
2019-01-19 10:20:33.728 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 5 | |
2019-01-19 10:20:34.727 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 6 | |
2019-01-19 10:20:35.728 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 7 | |
2019-01-19 10:20:36.727 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 8 |
This will print until the activity is destroyed as we are disposing disposable. Be careful that if we don’t dispose the disposable, it will continue to printing the values even if we exit the activity.
Just
Just Operator converts an item into an Observable that emits that item. Here whatever we pass in as arguments, it is converted to an Observable and will be emitted. The max arguments that can be passed in Just Operator are 1 to 10 (so min of 1 argument and max of 10).
Example
Here is the first example of creation of Observable using Just Operator where I passed the numbers 1, 2, 3, 4 directly as arguments.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.coderefer.rxandroidexamples.intro.operators.create | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
private const val TAG = "JustOperator" | |
class JustOperatorActivity : AppCompatActivity() { | |
private lateinit var disposable: Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_just_operator) | |
val observable = Observable.just(1,2,3,4) | |
disposable = observable.subscribe { | |
Log.d(TAG, it.toString()) | |
} | |
} | |
override fun onDestroy() { | |
disposable.dispose() | |
super.onDestroy() | |
} | |
} |
The Output for the above is :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2019-01-20 19:58:36.661 2747-2747/? D/JustOperator: 1 | |
2019-01-20 19:58:36.661 2747-2747/? D/JustOperator: 2 | |
2019-01-20 19:58:36.661 2747-2747/? D/JustOperator: 3 | |
2019-01-20 19:58:36.661 2747-2747/? D/JustOperator: 4 |
We can also pass List as a parameter.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.coderefer.rxandroidexamples.intro.operators.create | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
private const val TAG = "JustOperPassList" | |
class JustOperPassingListActivity : AppCompatActivity() { | |
private lateinit var disposable: Disposable | |
val list = listOf("a", "b", "c", "d") | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_just_oper_passing_list) | |
val observable = Observable.just(list) | |
disposable = observable.subscribe { | |
Log.d(TAG, it.toString()) | |
} | |
} | |
override fun onDestroy() { | |
disposable.dispose() | |
super.onDestroy() | |
}} |
The output will be as follows:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2019-01-20 20:22:47.340 4160-4160/com.coderefer.rxandroidexamples D/JustOperPassList: [a, b, c, d] |
Range
Range Operator creates an Observable that emits a particular range of Integers. It takes 2 arguments – starting integer n and length m, so that it starts emitting integers from the starting integer n to n+m-1.
Example
Here is the example where we created an Observable using Range Operator.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.coderefer.rxandroidexamples.intro.operators.create | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
private const val TAG = "RangeOperator" | |
class RangeOperatorActivity : AppCompatActivity() { | |
private lateinit var disposable: Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_just_oper_passing_list) | |
val observable = Observable.range(7,5) | |
disposable = observable.subscribe { | |
Log.d(TAG, it.toString()) | |
} | |
} | |
override fun onDestroy() { | |
disposable.dispose() | |
super.onDestroy() | |
} | |
} |
The output of the above example is:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 7 | |
2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 8 | |
2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 9 | |
2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 10 | |
2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 11 |
Repeat
Repeat Operator is a simple operator which emits the same item multiple times
Example
Let us see its implementation with example.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.coderefer.rxandroidexamples.intro.operators.create | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
private const val TAG = "RepeatOperator" | |
class RepeatOperatorActivity : AppCompatActivity() { | |
private lateinit var disposable: Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_just_oper_passing_list) | |
val observable = Observable.just(1,2).repeat(2) | |
disposable = observable.subscribe { | |
Log.d(TAG, it.toString()) | |
} | |
} | |
override fun onDestroy() { | |
disposable.dispose() | |
super.onDestroy() | |
} | |
} |
The output for the above is:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2019-01-21 00:01:46.935 6305-6305/com.coderefer.rxandroidexamples D/RepeatOperator: 1 | |
2019-01-21 00:01:46.935 6305-6305/com.coderefer.rxandroidexamples D/RepeatOperator: 2 | |
2019-01-21 00:01:46.935 6305-6305/com.coderefer.rxandroidexamples D/RepeatOperator: 1 | |
2019-01-21 00:01:46.935 6305-6305/com.coderefer.rxandroidexamples D/RepeatOperator: 2 |
Timer
Timer operator creates an Observable that emits a particular item after a given delay.
Example
Here is the example of timer operator:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.coderefer.rxandroidexamples.intro.operators.create | |
import android.support.v7.app.AppCompatActivity | |
import android.os.Bundle | |
import android.util.Log | |
import com.coderefer.rxandroidexamples.R | |
import io.reactivex.Observable | |
import io.reactivex.disposables.Disposable | |
import java.util.concurrent.TimeUnit | |
private const val TAG = "TimerOperator" | |
class TimerOperatorActivity : AppCompatActivity() { | |
private lateinit var disposable: Disposable | |
override fun onCreate(savedInstanceState: Bundle?) { | |
super.onCreate(savedInstanceState) | |
setContentView(R.layout.activity_just_oper_passing_list) | |
val observable = Observable.timer(1, TimeUnit.SECONDS) | |
disposable = observable.subscribe { | |
Log.d(TAG, it.toString()) | |
} | |
} | |
override fun onDestroy() { | |
disposable.dispose() | |
super.onDestroy() | |
} | |
} |
The output for the above is :
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2019-01-21 00:06:55.349 6505-6541/com.coderefer.rxandroidexamples D/TimerOperator: 0 |
This completes our tutorial on RxJava create Operators which we use to create Observables. In the next tutorial, we will focus on RxJava’s Operators for Transforming Observables along with examples.
All the code for the above examples can be found in the following  Github link. Hope this article is useful to you. Make sure you subscribe for the mail to notify about the latest articles first. Let me know if any queries through comment section.