RxJSの実装から読み解くストリーム
仰々しいタイトルですが、大した内容ではありません。Angular
ユーザーであれば頻繁に利用しているであろうRxJS
ですが(最近はsignalのおかげでそうでもない?)、ストリーム
とは具体的に何か説明することはできるでしょうか。
多くの人は値を処理する一連のプロセス
のようにふわっと理解されているのではないでしょうか。
間違いではないのですが、このポストではRxJSのソースを参照し、具体的にどのように実装されているかを確認していきます。
参照するリポジトリは以下👇
※ 参照しているのはv8.0.0-alpha.14です。
※ 本ポストにソースコードを掲載するにあたり不要な部分を除去したり、説明に必要なコメントを追記しています。
結論
まず結論から。
ストリームとは単一または連続したObservable
のことを指します。
以下その根拠の説明をしていきます。
Observableとは
まずRxJSの基本となるObservable
の利用方法から確認していきます。
以下は1秒後にログにHello World!
を出力するObservableです。
const o = new Observable<string>((subscriber: Subscriber<string>) => {
setTimeout(() => {
subscriber.next('Hello World!');
}, 1000);
});
// 1秒後に Hello World! を出力する
const subscription = o.subscribe(console.log);
処理の内容は以下の様になる。
- Observableのコンストラクタでsubscribe時に呼び出す関数を指定する。
- subscribeを実行する。その際に引数の関数を元にSubscriberインスタンスが生成される。
- コンストラクタで指定した関数が実行される。また関数の引数として生成したSubscriberを渡す。
- 関数内でSubscriber.nextを呼び出す。next内部でsubscribeの引数の関数を呼び出す。
以下が、Observableの実装です。
export class Observable<T> implements Subscribable<T> {
// _subscribeに関数をセットする。
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
// Subscriberインスタンスが作成される。
subscribe(observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null): Subscription {
const subscriber = observerOrNext instanceof Subscriber ? observerOrNext : new Subscriber(observerOrNext);
subscriber.add(this._trySubscribe(subscriber));
return subscriber;
}
// _subscribeを呼び出す。この内部でSubscriber.nextを呼び出す。
// nextが呼ばれたとき、指定したコールバックが実行される。
protected _trySubscribe(sink: Subscriber<T>): TeardownLogic {
try {
return this._subscribe(sink);
} catch (err) {
sink.error(err);
}
}
}
はい、だいたいこんな感じです。
※正確にはSubscriber内で、ConsumerObserverが生成されていたりするのですが、subscribeの引数の関数を呼んでいる感じです。
このことからObservableはsubscribeでき、内部にSubscriberを保持しSubscriber.nextを呼び出す役割があり、処理はsubscribeしたとき初めて実行されることが分かります。
これが単一のObservableのストリームです。
オペレーターを利用したストリーム
Observableの基本的な動作が分かったところで、オペレーターを利用してストリームを作ってみます。
of(1,2,3)
.pipe(map(x => x * 2))
.subscribe(console.log)
一応説明を。of(1,2,3)
は1~3を順番にnextするObservableを生成する関数です。
※内部的にはfromArrayLike
という関数を呼び出しています。pipe
に指定しているのはmap
オペレーターで、値を2倍にする関数を指定しています。
で、subscribeするとログに値が出力される。
この処理は以下の流れで実行される。
- of関数によってObservableが生成される。
- Observable.pipeが呼ばれ、上から順番にオペレーター関数が実行されます。これらの関数は
(Observable) => Observable
な関数を返します。これによって連続したObservable(ひとつ前のObservableの通知を受け取るObservable)が生成される。 - pipeから最後のオペレーター関数が生成したObservableが返却される。
- それをsubscirbeすることで連続したObservableの処理が発火し、コールバック関数が実行される。
以下がpipeとmapの実装です。
export class Observable<T> implements Subscribable<T> {
// 各オペレーター関数には自分のひとつ前のObservableが渡される
pipe(...operations: UnaryFunction<any, any>[]): unknown {
return operations.reduce(pipeReducer, this as any);
}
function pipeReducer(prev: any, fn: UnaryFunction<any, any>) {
return fn(prev);
}
}
export function map<T, R>(project: (value: T, index: number) => R): OperatorFunction<T, R> {
// source: Observable<T>
return (source) =>
// destination: Subscriber<R>
// destinationはconsole.logを内包したSubscriber
new Observable((destination) => {
let index = 0;
// source.subscribeすることでofの値を受け取る
source.subscribe(
operate({
destination,
next: (value: T) => {
// mapに渡した関数を実行して値を2倍にし、subscriberに渡す
destination.next(project(value, index++));
},
})
);
});
}
実装を見ることでストリームの実態が見えてきました。
私が連続したObservableと表現した意味が分かったのではないのでしょうか。
結局やっていることは👇みたいなことです。
// of(1,2,3)に相当するObservable
const o1 = new Observable(s => {
[1,2,3].forEach(value => s.next(value))
})
// map(x => x * 2)に相当するObservable
const o2 = new Observable(s => {
o1.subscribe({
next: value => s.next(value * 1),
complete: () => s.complete()
})
})
o2.subscribe(console.log)
operate関数
operate
関数はSubscriberを生成するための便利なユーティリティです。
実装は以下になります。
export function operate<In, Out>({ destination, ...subscriberOverrides }: OperateConfig<In, Out>) {
return new Subscriber(destination, subscriberOverrides);
}
map関数ではoperate関数にdestinationとSubscriberOverridesを渡すことで、destinationの動作を拡張しています。
destination.nextはconsole.logを呼び出しますが、destination.next(project(value, index++));
とすることでログに2倍の値が出力されるわけですね。
オペレーターを自作するときは便利なので積極的に利用するといいでしょう。
まとめ
実装を確認することで、ストリームの実態が見えたのではないでしょうか。
RxJSは実装がシンプルで分かりやすく、コードリーディングもしやすいです。
興味がある方は見てみてることをお勧めします。