巨大なテキストデータを標準入力(pipe等)から受け取り、1行ずつ何か処理をして、結果を標準出力(これもpipe等)に書くプログラムを書こうとしてハマるパターン。
(なお文字コードのことはここでは忘れたことにするので、別途対応が必要かもしれない。)
例えば、ssh remote-host cat huge-file.txt | node process-data.js | xz -c > compressed-data.xz
などと使うような想定。
ダメな例1:
var chunk; while(chunk = process.stdin.read()) process.stdout.write(`data: ${chunk}`); }
{ Error: EAGAIN: resource temporarily unavailable, read errno: -35, code: 'EAGAIN', syscall: 'read' }
process.stdin.fdは非同期openされたfdを返すため、fs.readするとpipeにデータが来なくなった瞬間、例外になる。
ダメな例2:
process.stdin.on('readable', () => { var chunk = process.stdin.read(); if (chunk !== null) { process.stdout.write(`data: ${chunk}`); } });
一見うまく行くように見える。というかAPIドキュメントに載っているソースである。が、書き出し先がストールすると、見る見るうちにnode.jsのメモリ使用量が増加していく(node echo.js < /dev/urandom | sleep 99999 などとして試してみれば良い)。最後には
FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory
お亡くなりになる。
ダメな例3:
fdは、実は単に 0 と指定すれば同期readとなる(というかshellがopen(2)したfdがそのまま使われる)。しかし…
const fs = require("fs"); fs.readFile(0, function (err, data) { // ... 処理 & 書き出し }
これも全データを一度に読み込もうとしてしまうため、メモリが枯渇する。少しずつ読み込もう。
ダメな例4:
const fs = require("fs"); var BUFFER_SIZE = 4096; var buffer = new Buffer(BUFFER_SIZE); while (true) { var n = fs.readSync(0, buffer, 0, BUFFER_SIZE); if (n <= 0) return null; // bufferを使って何か処理して結果を出力。ここではbufferをそのまま出力する。 fs.write(process.stdout.fd, buffer); }
またしてもメモリが枯渇する。fs.writeはどんどんメモリにwriteすべきものを積んでしまうのだ…。process.stdout.writeも同じである。かといってfs.writeSyncにすると、
Error: EAGAIN: resource temporarily unavailable, write
pipeがストールした瞬間、例外。process.stdout.fdも非同期openされたfdなのである。
やっと一応動く例
fs.writeSyncにして、かつfdとして 1 を渡そう。
const fs = require("fs"); var BUFFER_SIZE = 4096; var buffer = new Buffer(BUFFER_SIZE); while (true) { var n = fs.readSync(0, buffer, 0, BUFFER_SIZE); if (n <= 0) return null; // buffer.slice(0, n)を使って何か処理して結果を出力。ここでは入力をそのまま出力する。 fs.writeSync(1, buffer.slice(0, n)); }
行で分割したい
1行ずつstdinを同期的に読んで返すreadline関数を作ってみる。改行で入力チャンクが切れるとは限らないので、チャンク同士を繋いでやる処理が必要になる。
const fs = require("fs"); var readline = (function() { var BUFFER_SIZE = 65536; var buffer = new Buffer(BUFFER_SIZE); var lines = []; return function() { if (lines.length > 1) return lines.shift(); while (true) { var n = fs.readSync(0, buffer, 0, BUFFER_SIZE); if (n <= 0) return lines[0] || null; var newlines = buffer.slice(0, n).toString().split("\n"); if (lines.length == 1) lines[0] += newlines.shift(); lines = lines.concat(newlines); if (lines.length > 1) { return lines.shift(); } } }; })(); var line; while ((line = readline()) != null) { fs.writeSync(1, line + "\n"); }
BUFFER_SIZEはデフォルトのパイプバッファサイズである64KBにしてみた。
しかし、これはC言語か何か?というような代物になってしまった。node.jsのプログラミングパラダイムでなんとかしたい。
Stream APIを使う
Streamクラスの pipe を使えばバッファをいい感じに管理してくれて、入力にバックプレッシャーを伝えることができる。処理部分はstream.Transformを使ってTransformStreamとして実装する。
var stream = require('stream'); // 大文字に変換する var transform = new stream.Transform({ transform: function (chunk, encoding, callback) { callback(null, chunk.toString().toUpperCase()); } }); process.stdin .pipe(transform) .pipe(process.stdout);
1行ずつ改行区切りで処理するなら、byline npmが便利。
LineStreamをpipeで挟むだけで、改行区切りで送ってくれるようになる。
空行は消えるので注意。維持したければ LineStreamのコンンストラクタに {keepEmptyLines: true} を指定すれば良い。
var stream = require('stream'); var LineStream = require('byline').LineStream; var lineStream = new LineStream(); var transform = new stream.Transform({ transform: function (chunk, encoding, callback) { callback(null, chunk.toString().toUpperCase() + "\n"); } }); process.stdin .pipe(lineStream) .pipe(transform) .pipe(process.stdout);
問題点:Cっぽいやつより4倍遅い。。バッファの量の設定等で早くなるかは不明。