Today, I will use RxJS pipeable operators and create a custom and reusable operator. This is based on the demo I made in Episode 27.
Let’s print this in the console:
with those three cards:
New to Reactive Programming? Start with Episode 1. Here is a summary.
A stream is a sequence of events over time (eg. a stream of click events). In this episode, I’ll use ❚ interval
to create a stream that emits incremental numbers, periodically.
A listener reacts to events emitted by a stream (values, error and completion notification). This is the reactivity principle. I’ll use console.log
as a listener to react to the emitted values.
Operators transform, filter and combine streams. An operator never modifies the input stream. Instead, it returns a new stream. This is the immutability principle. To create a “gaussian” stream from interval I need:
âťš map
. It projects each event of the input stream with a project function.âťš take
to set a maximum amount of events to emit. Indeed, interval emits an infinite number of events, but I want to print only 25 lines on the console.This example is based on RxJS v6.0 and pipeable operators. Download other versions with my explorer:
Import the creation function âťš interval
and the pipeable operators âťš map
and âťš take
:
import { interval } from "rxjs";
import { map, take } from "rxjs/operators";
The creation function âťš interval
returns an Observable
. It has a built-in pipe
method to chain pipeable operators.
Emit incremental numbers and complete immediately after the 25th value:
const stream1 = interval(350).pipe(
take(25)
);
Emit values projected with a gaussian function, every 350ms:
const stream2 = interval(350).pipe(
take(25),
map(gaussian)
);
Emit lines of “•” based on decimals:
const stream3 = interval(350).pipe(
take(25),
map(gaussian),
map(num => "•".repeat(Math.floor(num * 65)))
);
Use the subscribe
method to pass a stream listener:
stream.subscribe(console.log)
What if I want both a gaussian stream of •
and a bezier stream of ~
?
const gaussian_stream = interval(350).pipe(
take(25),
map(gaussian),
map(num => "•".repeat(Math.floor(num * 65)))
);
const bezier_stream = interval(350).pipe(
take(25),
map(bezier),
map(num => "~".repeat(Math.floor(num * 65)))
);
Clearly, this code could be refactored. For that, I need to create my own operator, ideally based both on take(25)
and map(num => ...)
.
Previously, those two RxJS operators were chained with the pipe method built into Observable
. But here, I want to chain two operators in a separate function.
To do so, RxJS provides a utility pipe function, that needs to be imported:
import { interval, pipe } from "rxjs";
import { map, take } from "rxjs/operators";
I use this pipe function to chain map and take. Et voilĂ ! My custom draw
operator:
const draw = brush =>
pipe(
map(num => brush.repeat(Math.floor(num * 65))),
take(25)
);
Finally, my home-made draw
operator can be used like any other RxJS pipeable operators:
const gaussian_stream = interval(350).pipe(
map(gaussian),
draw("•")
);
const bezier_stream = interval(350).pipe(
map(bezier),
draw("~")
);
rxjs/operators
Observable
You can download the full source code in several versions on reactive.how/rxjs/explorer.
The Illustrated Book of RxJS ($40 off on Gumroad)
Launchpad for RxJS
Freelance Developer Advocate. Motion graphics with code. JavaScript and Elm. cedricsoulas.com
Receive my latest news, product updates and programming visualizations. You can unsubscribe at any time.