Pitfalls to Avoid using RxJS

I first encountered ReactiveX while working on NEM2 blockchain, aka Catapult. On why ReactiveX was chosen rather than Promises, etc, there is a good writeup here. (The original article is no longer available).

Writing Reactive apps requires a change in view in how the app works. The following is one of the most common mistake/frustration I encountered while working with RxJS.

Avoid nested subscribe()

Functions referenced here are from https://github.com/nemtech/nem2-sdk-typescript-javascript

<Observable>.subscribe() should only be used at the end. If you are using nested observables, most likely you are doing it wrong.

One of the most common reason for using nested subscribe is that we want to chain observables yet not care for what the first observable returns.

For example:

Having a list of transaction hashes, I want to be able to determine if the transaction that I announce is successful or not.

To do this, I can use transactionHttp.getTransactionsStatuses(transactionHashes) to retrieve the current status of the transactions on chain.

However, not all transactions are included in the same block, so I would run a while loop to keep checking the blockchain until all transactions have status success or failure.

Resulting in

while (transactionHashes.length > 0) {
	transactionHttp.getTransactionsStatuses(transactionHashes).subscribe(
	(transactionStatusResponses) => {
        for each transactionStatusResponse.hash 
        	if transactionStatusResponse is sucessful remove from transactionHashes
    })
}

Now, it is obvious that we can improve this.

Rather than fetching transaction status in a loop, we can do so only if a new block is harvested.

So our resulting function will look like this

Listener.newBlock().subscribe(
	() => {
            transactionHttp.getTransactionsStatuses(transactionHashes).subscribe(
        (transactionStatusResponses) => {
            for each transactionStatusResponse.hash 
                if transactionStatusResponse is sucessful remove from transactionHashes
      	}
    )
})

Unfortunately there is no way for us to retrieve the transactionStatusResponse. Also the newBlock observable will not know when to exit since it cannot read the status of the inner subscription.

We need a more reactive way to do this.


What we need is to be able to listen to new blocks, then fetch the transactionStatuses within a single subscription.

First we start listening to new blocks

const ob1 = Listener.newBlock()

Now, instead of subscribe, we will use pipe() to keep the reactiveness.

Listener.newBlock().pipe(
	// Do stuff each time newBlock gives us a new block
)

Within pipe() we can specify operators that help us do stuff. However, if we subscribe to Listener.newBlock() right now, it will only return us BlockInfo. To be able to return a different value, we use the switchMap operator

// Within pipe()
switchMap(
	() => {
return transactionHttp.getTransactionsStatuses(transactionHashes)
	}
),

Now, if we do ob1.subscribe() we will get transactionStatus objects.

Show Comments