はじめに
こんにちは。iimonでエンジニアをしている保田です。 本日は、Web標準技術であるStreams APIについて、その基本と実装サンプルまでを解説します。TypeScriptを用いた具体的なコード例を交えながら、Streams APIがなぜ必要で、どのように活用すべきかを理解していきたいと思います。
Streams API とは
まず、Streams APIとは何かを説明します。 Streams APIは、データの流れ(ストリーム)を効率的に処理するためのWeb標準技術です。 従来のアプローチでは、大容量のデータの場合、 全データが揃うまで待ってから処理を開始していました。 一方、Streams APIでは、データをチャンクという小さな単位に分けて、 到着したものから順次処理を開始することができます。 https://developer.mozilla.org/ja/docs/Web/API/Streams_API
Streams APIの3つの構成要素
Streams APIは3つのストリームから構成されています。
1. ReadableStream(読み取り) - Fetch APIのレスポンスボディはReadableStream - データのソースとなるストリーム
const response = await fetch('https://example.com/data.json'); if (!response.body) { throw new Error('レスポンスボディが存在しません'); } const stream: ReadableStream<Uint8Array> = response.body;
2. TransformStream(変換) - データを変換・加工するストリーム - ReadableStreamから出てくるのはUint8Array(バイナリデータ)なので、変換するためにTextDecoderStream()を通す必要がある
// テキストデコーダー(バイト列 → 文字列) const decoder = new TextDecoderStream(); const textStream: ReadableStream<string> = stream.pipeThrough(decoder);
3. WritableStream(書き込み) - データの出力先となるストリーム - ファイル保存、DOM要素への出力などを行う
// ファイルへの書き込み const fileHandle = await window.showSaveFilePicker(); const writable: WritableStream = await fileHandle.createWritable();
主要なメソッド
実装で頻繁に使うメソッドを簡単に紹介します。
ストリームの接続メソッド
pipeThrough(transformStream)
- データを変換しながら次のストリームに渡すメソッド
- TransformStreamを引数に取り、変換後のReadableStreamを返す
- 複数回連結して、データ変換のパイプラインを構築することができる
stream .pipeThrough(transform1) // 1つ目の変換 .pipeThrough(transform2) // 2つ目の変換 // さらに連結可能
pipeTo(writableStream)
- データを最終的な出力先に送るメソッド
- WritableStreamを引数に取り、処理完了を示すPromiseを返す
- パイプラインの最後に必ず1回だけ使用する
await stream .pipeThrough(transform) .pipeTo(destination); // 最後はpipeTo()
ReadableStreamのメソッド
getReader()
ストリームから直接データを読み取るための リーダー(Reader) を取得するメソッドです。
使い分け
- pipeThrough / pipeTo → 自動パイプ処理向け(推奨)
- getReader() → 手動でチャンク単位の処理 をしたい場合
使用時の注意点
- 読み取り中は ストリームがロックされる ため、他の操作ができない
- 処理終了後は releaseLock() でロックを解放する必要がある
- 中断する場合は cancel(reason) でリソースを解放
基本的な使い方
const reader = stream.getReader(); try { while (true) { // チャンクを1つ読み取る const { done, value } = await reader.read(); // done: true → 全データ読み取り完了 // done: false → まだデータが残っている if (done) { console.log('読み取り完了'); break; } // value: 読み取ったチャンクのデータ console.log('受信:', value); } } finally { // 必ずロックを解放(エラー時も実行される) reader.releaseLock(); }
cancel(reason)
- ストリームの読み取りを 途中で中止 するメソッドです。
- 不要になったストリームのリソースを解放できます。
- reason は省略可能ですが、エラー原因などを渡せます。
- pipeThrough や pipeTo で接続している場合、キャンセルすると 下流の WritableStream も影響を受けます。
await stream.cancel("不要になったため中止");
TransformStreamDefaultControllerのプロパティとメソッド
enqueue(chunk)
- 変換したデータを次のストリームに送出するメソッド
- transform()メソッド内で使用
transform(chunk, controller) { const processed = processData(chunk); controller.enqueue(processed); }
desiredSize(プロパティ)
- キューの残り容量を示す数値
- 正の値 → まだデータを受け入れ可能
- 0以下 → キューが満杯(後で説明する背圧が発生している状態)
console.log(controller.desiredSize); // 結果が3 → あと3つ入る controller.enqueue(data); console.log(controller.desiredSize); // 結果が2 → ひとつ減った
Transformerインターフェースのメソッド
transform(chunk, controller)
- 各チャンクに対して実行される変換処理
- 上流から送られてきたデータ(chunk)を受け取る
- 必要に応じて加工・変換する
- 加工したデータを controller.enqueue() で下流に送る
transform(chunk: string, controller: TransformStreamDefaultController<number>) { const number = parseInt(chunk, 10); if (!isNaN(number)) { controller.enqueue(number); // 有効な数値だけ下流に送る } }
flush(controller)
- flush(controller) は、TransformStream が終了するときに呼ばれる関数
- 変換中に残ったバッファのデータを最後に下流に送る
- ストリームの終端処理を行う
flush(controller: TransformStreamDefaultController<string>) { if (this.buffer.length > 0) { controller.enqueue(this.buffer); // 残りのデータを送出 } }
start(controller)(オプション)
- Transformerの初期化処理(バッファの初期化やログ出力など)
- 必要に応じて実装
start(controller: TransformStreamDefaultController<string>) { console.log('変換処理を開始します'); }
パイプラインの構築
これらのストリームをpipeThrough()とpipeTo()で連結することで、データ処理のパイプラインを構築できます。
await stream .pipeThrough(decoder) // データを文字列やバイナリに変換 .pipeThrough(jsonParser) // JSON オブジェクトに変換 .pipeTo(writer); // 最終的な出力先に書き込む
なぜ Streams API が必要なのか
従来の処理方法が抱える課題
ここで、従来の方法が抱える2つの問題についてみてみます。
メモリ負荷の増大 - 従来の方法では データ全体を一度にメモリに読み込む - データサイズが大きいと、メモリを大量に消費してしまう - 最悪の場合、アプリがクラッシュすることもある
const response = await fetch('large-file.json'); const data = await response.json(); // ここで全データを一気に読み込む
初回応答の遅延 - データのダウンロードや読み込みが完了するまで、画面に何も表示されない - ユーザーは数秒~数十秒間、待たされることになる - 特に大量データの表示やリアルタイム処理には不向き
const users = await fetchAllUsers(); // 全ユーザーを取得するまで待機 displayUsers(users); // データ取得完了後に表示開始
Streams APIによる解決
Streams APIは、チャンク単位でデータを処理することで、これらの課題を解決します。
メモリ効率の最適化
- データを チャンクごとに処理
- メモリに残るのは、チャンクサイズ × キューサイズだけ
- 大容量ファイルでも安定して動作
if (!response.body) { throw new Error('レスポンスボディが存在しません'); } await response.body // ReadableStream: サーバーからのデータ .pipeThrough(new TextDecoderStream()) // バイナリを文字列に変換(チャンク単位) .pipeThrough(new TransformStream({ transform(chunk, controller) { const processed = processChunk(chunk); // チャンクを処理 controller.enqueue(processed); // 下流に送る // 処理済みチャンクはメモリからすぐ解放される } })) .pipeTo(writer); // WritableStream: 最終出力先に書き込む
低遅延の実現
if (!response.body) { throw new Error('レスポンスボディが存在しません'); } await response.body // ReadableStream: サーバーからのデータ .pipeThrough(decoder) // バイナリを文字列に変換(チャンク単位) .pipeTo(new WritableStream({ write(chunk) { displayChunk(chunk); // 受信したチャンクを即座に表示 } }));
内部キューと背圧制御
ここから、Streams APIの重要な機能である背圧制御について説明します。
背圧制御とは
背圧制御とは、データの生産速度と消費速度の差を自動的に調整する仕組みです。
例えば、データの生成が速すぎてメモリが溢れないように、また、処理が遅すぎて待機時間が発生しないように、自動的にバランスを取ります。
内部キューの仕組み
各ストリームは、内部にキュー(待ち行列)を持っています。
このキューには、highWaterMarkという最大容量が設定されています。
例えば、highWaterMark: 5の場合、最大5個のチャンクを保持できます。
データを生成するReadableStreamと、データを消費するWritableStreamの速度が異なる場合、 このキューが満杯になったり、空になったりします。
背圧制御は、この状態を自動的に検知して、データフローを調整しています。
new ReadableStream({ // ... }, { highWaterMark: 5 // キューの最大サイズ(ReadableStream/WritableStreamのデフォルト: 1、TransformStreamのデフォルト: 0) });
背圧制御の動作パターン
具体的な動作を3つのケースで見てみます。
通常の状態
- 処理がスムーズ
- キューに余裕がある
- 背圧なし
ReadableStream → [キュー: □□□__] → WritableStream
(3/5個使用) ↓
高速処理
処理が遅い
- キューが満杯になる
- 背圧発生 → ReadableStreamに停止信号
- ReadableStreamは一時停止
ReadableStream → [キュー: ■■■■■] → WritableStream
↑ (5/5個満杯) ↓
停止信号 低速処理
処理が追いつく
- キューに空きができる
- 背圧解除 → ReadableStreamに再開信号
- ReadableStreamが再開
ReadableStream → [キュー: ■■___] → WritableStream
↑ (2/5個使用) ↓
再開信号 処理完了
このように、自動的にペースを調整することで、メモリに溜まるデータ量が制限されます。
desiredSizeプロパティの役割
desiredSizeプロパティは、キューの残り容量を示す重要な指標です。
desiredSizeの計算式
// 基本的な考え方 desiredSize = highWaterMark - queueTotalSize // queueTotalSizeは、キュー内の全チャンクのサイズの合計 // チャンクのサイズが指定されていない場合は、各チャンクのサイズは1としてカウント // 例:highWaterMark = 5 の場合 desiredSize = 5 // キューは空(5個入る) desiredSize = 2 // 3個使用中(あと2個入る) desiredSize = 0 // キューが満杯に近く、背圧がかかり始めている desiredSize = -1 // 容量オーバー(背圧発生中)
この値によって、次のセクションで説明するpull()メソッドの呼び出しが制御されます。
pull()とenqueue()による自動調整
背圧制御の核心は、pull()メソッドとdesiredSizeの連携にあります。
仕組みの詳細
ReadableStreamのpull()は 下流のキューに余裕があるとき (desiredSize > 0) に自動的に呼ばれるcontroller.enqueue(chunk)でチャンクを追加すると、desiredSize が減少desiredSize <= 0になると、pull() は自動的に停止- 上流はこれ以上チャンクを作らないので、メモリ使用量が抑えられる
- 下流の処理が進み、キューに余裕が出るとpull() が再び呼ばれる
つまり、手動でペース調整する必要がなく、 システムが自動的にメモリ使用量を制限してくれます。
実装例
const stream = new ReadableStream({ // start(): ストリーム開始時に1度だけ呼ばれる start(controller) { console.log('ストリーム開始'); }, // pull(): desiredSize > 0 のときに自動的に呼ばれる pull(controller) { const chunk = generateData(); // チャンクを送出 controller.enqueue(chunk); // この時点でdesiredSizeが減少 // desiredSize <= 0 になると、pull()は呼ばれなくなる(背圧発生) // 下流の処理が進んでdesiredSize > 0 になると、再びpull()が呼ばれる } }, { highWaterMark: 3 // キューの最大サイズを3に設定 });
試してみる
Metropolitan Museum of Art APIから約50万件のオブジェクトID(約5MB)を取得する例で、 実際に試してみました。
従来の方法の場合
type ApiResponse = { total: number; objectIDs: number[]; }; const fetchAllIds = async (): Promise<number[]> => { const response = await fetch('https://collectionapi.metmuseum.org/public/collection/v1/objects'); // 全データ(約5MB)をメモリに読み込む const data: ApiResponse = await response.json(); return data.objectIDs; // 約50万件を返す }; // 使用例 const ids = await fetchAllIds(); // 待機が発生 console.log(`取得完了: ${ids.length}件`);
問題点: - ダウンロード完了まで待機時間が約20秒間程度 - 全データ(約5MB)がメモリに保持される - 初回表示までの時間が長い
Streams APIを使用した場合
Streams APIでは、データをチャンクごとに処理します。
CustomReadableStream
Fetch APIのレスポンスをReadableStreamとして扱うカスタムクラス。
class CustomReadableStream extends ReadableStream<Uint8Array> { constructor(url: string) { let reader: ReadableStreamDefaultReader<Uint8Array> | null = null; super({ // ストリーム開始時に1度だけ実行 async start(controller) { try { // URLからデータを取得 const response = await fetch(url); if (!response.ok) { throw new Error(`HTTP Error: ${response.status}`); } if (!response.body) { throw new Error("No response body"); } // レスポンスボディからリーダーを取得 reader = response.body.getReader(); } catch (error) { controller.error(error); } }, // 下流にデータを送る準備ができたときに自動的に呼ばれる async pull(controller) { if (!reader) { controller.error(new Error("Reader not initialized")); return; } try { // チャンクを読み取る const { done, value } = await reader.read(); if (done) { // 全データ読み取り完了 controller.close(); } else { // チャンクを下流に送出 controller.enqueue(value); } } catch (error) { controller.error(error); } }, // ストリームがキャンセルされたときの処理 async cancel() { if (reader) { await reader.cancel(); reader = null; } }, }); } }
CustomTransformStream
受け取った文字列データを加工して、必要なIDだけを取り出す変換用ストリームのクラス。
class CustomTransformStream extends TransformStream<string, number> { constructor() { let buffer = ""; let isInsideArray = false; super({ /** * 初期処理(オプション) */ start() { console.log('Transform開始'); }, /** * チャンクを変換 * 各チャンクに対して実行される */ transform(chunk, controller) { // 受信したチャンクをバッファに追加 buffer += chunk; // JSON配列の開始位置を探す if (!isInsideArray) { const arrayStartIndex = findArrayStart(buffer); if (arrayStartIndex !== null) { isInsideArray = true; // 配列開始位置以降のデータのみを保持 buffer = buffer.substring(arrayStartIndex); } } // 配列内のデータを処理 if (isInsideArray) { // バッファから数値IDを抽出 const ids = extractIds(buffer); // 抽出したIDを1つずつ下流に送出 for (const id of ids) { controller.enqueue(id); } // 処理済みデータをバッファから削除 buffer = cleanBuffer(buffer); } }, /** * 終了時の処理 * バッファに残ったデータを処理 */ flush(controller) { const ids = extractIds(buffer); for (const id of ids) { controller.enqueue(id); } console.log('Transform完了'); }, }); } }
CustomWritableStream
変換されたIDを受け取り、UIにテキストとして表示する書き込み用ストリームのクラス。
class CustomWritableStream extends WritableStream<number> { constructor( onWrite?: (id: number) => void, onComplete?: () => void, onError?: (error: any) => void, ) { super({ /** * 各チャンクの書き込み処理 */ async write(id) { // カスタム処理を実行(オプション) if (onWrite) { onWrite(id); } }, /** * ストリーム正常終了時の処理 */ close() { console.log("処理完了"); if (onComplete) { onComplete(); } }, /** * エラー発生時の処理 */ abort(reason) { console.error("エラー:", reason); if (onError) { onError(reason); } }, }); } }
パイプラインでの使用例
// カスタムクラスを組み合わせてパイプラインを構築 const readable = new CustomReadableStream(MET_API_URL); // データソース const decoder = new TextDecoderStream(); // バイナリ→文字列変換 const transformer = new CustomTransformStream(); // JSON→ID抽出 const writable = new CustomWritableStream( (id) => console.log(`ID受信: ${id}`), // IDを1つ受信するたびに実行 () => console.log('処理完了'), // 全ID処理完了時に実行 (error) => console.error('エラー:', error) // エラー時に実行 ); // パイプラインの実行(エラーハンドリング付き) try { await readable .pipeThrough(decoder) // バイト列を文字列に変換 .pipeThrough(transformer) // JSONからIDを抽出 .pipeTo(writable); // IDを処理(UI更新など) } catch (error) { console.error('ストリーム処理エラー:', error); // 必要に応じてリソースのクリーンアップやユーザーへの通知を行う }
測定結果
※ 環境によって結果は異なります
| 項目 | 従来の方法 | Streams API |
|---|---|---|
| 初回表示 | 約20秒待機 | 即座(数十ms) |
| 表示方法 | 一括表示 | 逐次表示 |
| メモリ使用量 | 約5MB増加 | 一定(チャンクサイズ程度) |
Streams APIが適している場面
大容量データの処理 と リアルタイム性が求められる処理 の2つで利用されることが多いようです。 例:リアルタイムデータ処理、大容量ファイルのダウンロード・アップロードなど
まとめ
本日は、Streams API の基本について解説しました。 今後は、実装の選択肢のひとつとして適した場面があれば、積極的に活用していきたいと思います。
特に、今回紹介した背圧制御の仕組みを理解することで、なぜメモリ効率が良いのか、なぜ安定して動作するのかが理解できました。
ご清聴ありがとうございました!
この技術に興味を持っていただけた方、一緒に開発してみたい方は、ぜひカジュアルにお話しさせてください。
iimon採用サイト / Wantedly / Green