RxJava create Operators – Operators for Creating Observables – RxJava Tutorial #4

0
206

In this article we will discuss about various RxJava create operators in depth with examples. Here we will discuss how to create Observables using various operators like Create, From, Just, Differ, etc.

This article is part of RxJava Intro series. You can checkout the entire series here:

Let us explore these RxJava create operators one by one.

RxJava Create Operators

Create

Create operator helps creating an Observable from scratch by calling observer methods programmatically. It creates only one Observable for all Observers.

rxjava create operator
rxjava create operator block diagram

We have already seen an example on how to create an Observable using Create Operator. Let us re-collect the same by creating one more Observable.

package com.coderefer.rxandroidexamples.intro.operators.create
import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import com.coderefer.rxandroidexamples.R
import io.reactivex.Observable
import io.reactivex.Observer
import io.reactivex.disposables.Disposable
private const val TAG = "CreateOperatorActivity"
/**
* Class to demonstrate create Operator
* */
class CreateOperatorActivity : AppCompatActivity() {
private val numList = listOf(1, 2, 3, 4, 5)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_create_operator)
val observable = initializeObservable()
val observer = initializeObserver()
observable.subscribe(observer)
}
private fun initializeObserver(): Observer<Int> {
return object : Observer<Int> {
override fun onComplete() {
Log.d(TAG, "onComplete")
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: Int) {
Log.d(TAG, t.toString())
}
override fun onError(e: Throwable) {
}
}
}
private fun initializeObservable(): Observable<Int> {
// using create operator to create a new Observable
return Observable.create {
for (i in numList)
it.onNext(i)
it.onComplete()
}
}
}

In the above example, you can identify that in Line 48, we created an Observable using Create Operator. If we dive into RxJava documentation link for create() we observe that it does not take any values. Hence we declared the values as list and iterated through them using onNext() function in line 49 – 50.

Here is the output:

2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 1
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 2
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 3
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 4
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: 5
2019-01-16 07:35:17.108 7865-7865/com.coderefer.rxandroidexamples D/CreateOperatorActivity: onComplete

Defer

Defer differs from create in a way that Defer does not create an Observable unless and until an Observer is subscribed to it. It waits until an Observer subscribes and generate a Fresh Observable with fresh data. So even though the Observer thinks it is subscribing to Same Observable, it gets a Fresh Observable.

Advantages

As this Operator waits till last minute to create new Observable, this ensures that the Observable always contains fresh Data. So this Operator is mostly used when intended to fetch freshest Data

Example

Here, I’ve created an Observable using Defer Operator as shown in the below example.

package com.coderefer.rxandroidexamples.intro.operators.create
import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import com.coderefer.rxandroidexamples.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
private const val TAG = "DeferOperatorActivity"
class DeferOperatorActivity : AppCompatActivity() {
private lateinit var disposable1 :Disposable
private lateinit var disposable2 :Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_defer_operator)
val testClass = TestClass()
val observable = testClass.strObservable()
val deferObservable = testClass.deferObservable()
testClass.str = "World"
disposable1 = observable.subscribe {
Log.d(TAG, it) // prints "Hello" only since it is the value when Observable is created / initialized
}
observable.subscribe()
disposable2 =deferObservable.subscribe {
Log.d(TAG, it) // prints "World" since it creates a fresh Observable when subscribed with fresh data.. hence prints "World"
}
}
override fun onDestroy() {
disposable1.dispose() // to avoid memory leaks
disposable2.dispose()
super.onDestroy()
}
}
class TestClass {
var str: String? = "Hello"
fun strObservable() : Observable<String> {
return Observable.just(str)
}
// defer observable
fun deferObservable() : Observable<String> {
return Observable.defer {
Observable.just(str)
}
}
}

In the above example, we created a TestClass by initializing a variable named str to “Hello”. I’ve taken two types of observable – one is with Just operator and other is with Defer operator.

In line 24, I’ve modified the str value to “World”. In lines 25, 30 we subscribed to both observables. Now If we see the logcat, the output is printed as follows:

defer output

We can clearly see that Defer operator is fetching the latest data by creating a fresh Observable.

From

From operator is used to transform objects like lists, arrays, futures etc into Observables.

rxjava create operators
from operator illustration

There are different methods for transforming different objects like fromArray(), fromCallable(), fromFuture(), fromIterable(), fromPublisher(). More info is provided at the RxJava Wiki Link.

Example

Here I will consider an example using fromIterable where I will be converting arraylist into Observables.

package com.coderefer.rxandroidexamples.intro.operators.create
import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import com.coderefer.rxandroidexamples.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
private const val TAG = "FromOperatorActivity"
class FromOperatorActivity : AppCompatActivity() {
private lateinit var mDisposable : Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_from_operator)
val numList = arrayListOf(11,12,13,14,15)
val numObservable = Observable.fromIterable(numList)
mDisposable = numObservable.subscribe {
Log.d(TAG, it.toString())
}
}
override fun onDestroy() {
mDisposable.dispose()
super.onDestroy()
}
}

In Line 22, I’ve transformed the numList to Observable.

Output:

2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 11
2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 12
2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 13
2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 14
2019-01-18 08:35:48.919 25434-25434/com.coderefer.rxandroidexamples D/FromOperatorActivity: 15

Interval

This operator creates an Observable that emits a sequence of Integers with a given particular interval of time.

rxjava create operators
Interal operator illustration
Example

Below is the example where I used Interval Operator to create Observable and print Integer starting from 0.

package com.coderefer.rxandroidexamples.intro.operators.create
import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import com.coderefer.rxandroidexamples.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import java.util.concurrent.TimeUnit
private const val TAG = "IntervalOperator"
class IntervalOperatorActivity : AppCompatActivity() {
lateinit var disposable :Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_interval_operator)
val clock = Observable.interval(1, TimeUnit.SECONDS)
disposable = clock.subscribe {
Log.d(TAG, it.toString())
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}
}

In line 20, I created a clock Observable using Interval Operator and given it an interval of 1 second. Now the output for this above will be as follows:

2019-01-19 10:20:28.731 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 0
2019-01-19 10:20:29.727 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 1
2019-01-19 10:20:30.728 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 2
2019-01-19 10:20:31.729 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 3
2019-01-19 10:20:32.728 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 4
2019-01-19 10:20:33.728 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 5
2019-01-19 10:20:34.727 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 6
2019-01-19 10:20:35.728 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 7
2019-01-19 10:20:36.727 1569-1596/com.coderefer.rxandroidexamples D/IntervalOperator: 8

This will print until the activity is destroyed as we are disposing disposable. Be careful that if we don’t dispose the disposable, it will continue to printing the values even if we exit the activity.

Just

Just Operator converts an item into an Observable that emits that item. Here whatever we pass in as arguments, it is converted to an Observable and will be emitted. The max arguments that can be passed in Just Operator are 1 to 10 (so min of 1 argument and max of 10).

Example

Here is the first example of creation of Observable using Just Operator where I passed the numbers 1, 2, 3, 4 directly as arguments.

package com.coderefer.rxandroidexamples.intro.operators.create
import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import com.coderefer.rxandroidexamples.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
private const val TAG = "JustOperator"
class JustOperatorActivity : AppCompatActivity() {
private lateinit var disposable: Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_just_operator)
val observable = Observable.just(1,2,3,4)
disposable = observable.subscribe {
Log.d(TAG, it.toString())
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}
}

The Output for the above is :

2019-01-20 19:58:36.661 2747-2747/? D/JustOperator: 1
2019-01-20 19:58:36.661 2747-2747/? D/JustOperator: 2
2019-01-20 19:58:36.661 2747-2747/? D/JustOperator: 3
2019-01-20 19:58:36.661 2747-2747/? D/JustOperator: 4

We can also pass List as a parameter.

package com.coderefer.rxandroidexamples.intro.operators.create
import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import com.coderefer.rxandroidexamples.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
private const val TAG = "JustOperPassList"
class JustOperPassingListActivity : AppCompatActivity() {
private lateinit var disposable: Disposable
val list = listOf("a", "b", "c", "d")
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_just_oper_passing_list)
val observable = Observable.just(list)
disposable = observable.subscribe {
Log.d(TAG, it.toString())
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}}

The output will be as follows:

2019-01-20 20:22:47.340 4160-4160/com.coderefer.rxandroidexamples D/JustOperPassList: [a, b, c, d]

Range

Range Operator creates an Observable that emits a particular range of Integers. It takes 2 arguments – starting integer n and length m, so that it starts emitting integers from the starting integer n to n+m-1.

rxjava operators range

Example

Here is the example where we created an Observable using Range Operator.

package com.coderefer.rxandroidexamples.intro.operators.create
import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import com.coderefer.rxandroidexamples.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
private const val TAG = "RangeOperator"
class RangeOperatorActivity : AppCompatActivity() {
private lateinit var disposable: Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_just_oper_passing_list)
val observable = Observable.range(7,5)
disposable = observable.subscribe {
Log.d(TAG, it.toString())
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}
}

The output of the above example is:

2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 7
2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 8
2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 9
2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 10
2019-01-20 22:27:36.136 5491-5491/com.coderefer.rxandroidexamples D/RangeOperator: 11

Repeat

Repeat Operator is a simple operator which emits the same item multiple times

Example

Let us see its implementation with example.

package com.coderefer.rxandroidexamples.intro.operators.create
import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import com.coderefer.rxandroidexamples.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
private const val TAG = "RepeatOperator"
class RepeatOperatorActivity : AppCompatActivity() {
private lateinit var disposable: Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_just_oper_passing_list)
val observable = Observable.just(1,2).repeat(2)
disposable = observable.subscribe {
Log.d(TAG, it.toString())
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}
}

The output for the above is:

2019-01-21 00:01:46.935 6305-6305/com.coderefer.rxandroidexamples D/RepeatOperator: 1
2019-01-21 00:01:46.935 6305-6305/com.coderefer.rxandroidexamples D/RepeatOperator: 2
2019-01-21 00:01:46.935 6305-6305/com.coderefer.rxandroidexamples D/RepeatOperator: 1
2019-01-21 00:01:46.935 6305-6305/com.coderefer.rxandroidexamples D/RepeatOperator: 2

Timer

Timer operator creates an Observable that emits a particular item after a given delay.

timer operator

Example

Here is the example of timer operator:

package com.coderefer.rxandroidexamples.intro.operators.create
import android.support.v7.app.AppCompatActivity
import android.os.Bundle
import android.util.Log
import com.coderefer.rxandroidexamples.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import java.util.concurrent.TimeUnit
private const val TAG = "TimerOperator"
class TimerOperatorActivity : AppCompatActivity() {
private lateinit var disposable: Disposable
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_just_oper_passing_list)
val observable = Observable.timer(1, TimeUnit.SECONDS)
disposable = observable.subscribe {
Log.d(TAG, it.toString())
}
}
override fun onDestroy() {
disposable.dispose()
super.onDestroy()
}
}

The output for the above is :

2019-01-21 00:06:55.349 6505-6541/com.coderefer.rxandroidexamples D/TimerOperator: 0

This completes our tutorial on RxJava create Operators which we use to create Observables. In the next tutorial, we will focus on RxJava’s Operators for Transforming Observables along with examples.

All the code for the above examples can be found in the following  Github link. Hope this article is useful to you. Make sure you subscribe for the mail to notify about the latest articles first. Let me know if any queries through comment section.

0 0 vote
Article Rating
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments