Escaping pipeline hell

2019 January 9

Previously, I stepped through the evolution of asynchronous programming in JavaScript, from callback hell, through promises, and leaving off where most coverage stops, at async functions (built on top of the underlying technology of generators). Given that history, I want to talk about the state of asynchronous programming with observables in JavaScript.

Observables

For my novice readers, you can think of observables as promises that can resolve more than once, effectively yielding multiple values. In the same way you attach success and failure callbacks to promises, you subscribe to an observable with callbacks for three types of events: zero, one, or more value events; zero or one error events; and zero or one completion events. The error and completion events are terminal events: there will be at most one of them, never both, and once that event arrives, no more events of any kind will be seen. These callbacks are generally called the next, error, and complete callbacks, respectively. Taken together, they form an observer.

Observables can be constructed from many diverse kinds of sources, including arrays, generators, and promises (which are just observables that yield at most one value). In a famous example from Erik Meijer’s excellent introduction to observables, entitled Your Mouse Is a Database, he arrives at a succinct implementation of an autocomplete (or “type-ahead”) text box using observables. It has been replicated many times over, and in the latest JavaScript, it might look like this:

const typeahead = fromEvent(searchBox, 'input').pipe(
map((e: KeyboardEvent) => e.target.value),
filter(text => text.length > 2),
debounceTime(10),
distinctUntilChanged(),
switchMap(() => ajax('/api/endpoint'))
)

typeahead.subscribe(data => {
// Handle the data from the API
})

One more advantage that observables have over promises is that a subscription can be canceled, which lets an observable know that it can stop doing work. This is very useful for canceling AJAX requests whose responses are no longer needed, as in the autocomplete example when the user has edited the text box before the last autocomplete results have returned. There was a proposal for cancelable promises, but it was withdrawn. Without investigating too deeply, I got the impression that it was shouted down in favor of the existing and more general abstraction of observables.

As you can see in the example above, however, observables suffer from the same “chain of callbacks” style that promises had before generators arrived. I’m calling this pipeline hell in the spirit of callback hell. We’re missing the observable equivalent of async functions.

RxJS

RxJS is the most popular implementation of observables for JavaScript, to my knowledge. Coincidentally, version 4 of RxJS had a trampoline for observable-yielding generators called spawn. It appears to have unceremoniously fallen out of the library. I suspect not enough people were using it. Some people actually prefer the chaining of operators on observables, but if my observables look like promises (i.e. they yield exactly one value or throw an error) but cancelable, then I would prefer syntax like async functions for the same reason it made promises easier to use: it feels more like familiar synchronous programming, making it easier to read and write.

The good news is that generators give us an opportunity. Much like async functions are derived from a combination of promise-yielding generators and a special trampoline, we can define our own trampoline for observable-yielding generators.

Take a step back and think of the generator function. It takes some arguments and returns a generator object. It has a name that I can call when I’m ready to start the generator, and I can call it from many different places, even far from where it is defined. I want to modify my observable-yielding generator functions so that they return an observable instead of a generator object. For that, I need a function that takes the generator function and returns a new function, one that takes the same arguments as the generator function but returns an observable. Further, the generator should not be started when this new function is called, but instead when the observable is subscribed.

We’ll have to define our own kind of observable to implement the trampoline. The easiest way to do that is to shove all the interesting work into a subscriber (not pictured), and then construct the observable with the subscriber:

function async(makeGenerator) {
// Return a function that constructs an Observable.
return function(...args) {
return Observable.create(observer => {
// This function won't be called until the observable is subscribed.
const generator = makeGenerator.apply(this, args)
return new GeneratorSubscriber(observer, generator)
})
}
}

The subscriber needs to start the trampoline in its constructor, and it needs an unsubscribe method that can stop the trampoline. I’m not going to step through the whole subscriber here, but I’ll link it for the advanced audience interested in seeing how it works. With async, we can write functions that handle cancelable requests in the framework of observables:

const logIn = async(function*({ username, password }) {
try {
const token = yield ajax.getJSON(
'https://example.com/login', { username, password })
return { value: token }
} catch (cause) {
return { error: 'Wrong username or password.' }
}
})

const observable = logIn() // Request not yet sent.
const subscription = observable.subscribe( // Request sent now.
({ value }) => console.log('token', value),
({ error }) => console.error('error', error),
)
subscription.unsubscribe() // Request canceled.

In fact, implementing spawn from RxJS 4 becomes trivial. It just wraps a zero-argument generator function with async and immediately invokes it:

function spawn(f) {
return async(f)()
}

MobX

Now I want to talk about a different kind of observable, those of MobX. (If you’re unfamiliar, I’m going to have to defer to their documentation.) Mutations of MobX observables should be batched within actions. Async functions do not play well with this practice, but MobX offers an alternative built on top of generators: flow. The problem is that flow only works with promise-yielding generators, and promises are not cancelable. If we’re using cancelable observables, then we need a flow alternative that works with them.

Fortunately, it can be easily built on top of the same subscriber that we used for async. All we have to do is wrap each generator method (next, throw, and return) in an action:

function flow(makeGenerator) {
// Return a function that constructs an Observable.
return function(...args) {
return Observable.create(observer => {
// This function won't be called until the observable is subscribed.
const generator = makeGenerator.apply(this, args)
generator.next = action(generator.next)
generator.throw = action(generator.throw)
generator.return = action(generator.return)
return new GeneratorSubscriber(observer, generator)
})
}
}

Caveats

Async functions are not a panacea for observables, however. Some of the more advanced operators can be difficult or ugly to implement as async functions, especially the ones that carry state, e.g. debounceTime or distinctUntilChanged. If you have a chain of functional staples, though, like map, filter, and catchError, then it is much better written as an async function, in my experience.