巨大なテキストデータを標準入力(pipe等)から受け取り、1行ずつ何か処理をして、結果を標準出力(これもpipe等)に書くプログラムを書こうとしてハマるパターン。
(なお文字コードのことはここでは忘れたことにするので、別途対応が必要かもしれない。)
例えば、ssh remote-host cat huge-file.txt | node process-data.js | xz -c > compressed-data.xz
などと使うような想定。
ダメな例1:
var chunk;
while(chunk = process.stdin.read())
process.stdout.write(`data: ${chunk}`);
}
{ Error: EAGAIN: resource temporarily unavailable, read errno: -35, code: 'EAGAIN', syscall: 'read' }
process.stdin.fdは非同期openされたfdを返すため、fs.readするとpipeにデータが来なくなった瞬間、例外になる。
ダメな例2:
process.stdin.on('readable', () => {
var chunk = process.stdin.read();
if (chunk !== null) {
process.stdout.write(`data: ${chunk}`);
}
});
一見うまく行くように見える。というかAPIドキュメントに載っているソースである。が、書き出し先がストールすると、見る見るうちにnode.jsのメモリ使用量が増加していく(node echo.js < /dev/urandom | sleep 99999 などとして試してみれば良い)。最後には
FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory
お亡くなりになる。
ダメな例3:
fdは、実は単に 0 と指定すれば同期readとなる(というかshellがopen(2)したfdがそのまま使われる)。しかし…
const fs = require("fs");
fs.readFile(0, function (err, data) {
}
これも全データを一度に読み込もうとしてしまうため、メモリが枯渇する。少しずつ読み込もう。
ダメな例4:
const fs = require("fs");
var BUFFER_SIZE = 4096;
var buffer = new Buffer(BUFFER_SIZE);
while (true) {
var n = fs.readSync(0, buffer, 0, BUFFER_SIZE);
if (n <= 0)
return null;
fs.write(process.stdout.fd, buffer);
}
またしてもメモリが枯渇する。fs.writeはどんどんメモリにwriteすべきものを積んでしまうのだ…。process.stdout.writeも同じである。かといってfs.writeSyncにすると、
Error: EAGAIN: resource temporarily unavailable, write
pipeがストールした瞬間、例外。process.stdout.fdも非同期openされたfdなのである。
やっと一応動く例
fs.writeSyncにして、かつfdとして 1 を渡そう。
const fs = require("fs");
var BUFFER_SIZE = 4096;
var buffer = new Buffer(BUFFER_SIZE);
while (true) {
var n = fs.readSync(0, buffer, 0, BUFFER_SIZE);
if (n <= 0)
return null;
fs.writeSync(1, buffer.slice(0, n));
}
行で分割したい
1行ずつstdinを同期的に読んで返すreadline関数を作ってみる。改行で入力チャンクが切れるとは限らないので、チャンク同士を繋いでやる処理が必要になる。
const fs = require("fs");
var readline = (function() {
var BUFFER_SIZE = 65536;
var buffer = new Buffer(BUFFER_SIZE);
var lines = [];
return function() {
if (lines.length > 1)
return lines.shift();
while (true) {
var n = fs.readSync(0, buffer, 0, BUFFER_SIZE);
if (n <= 0)
return lines[0] || null;
var newlines = buffer.slice(0, n).toString().split("\n");
if (lines.length == 1)
lines[0] += newlines.shift();
lines = lines.concat(newlines);
if (lines.length > 1) {
return lines.shift();
}
}
};
})();
var line;
while ((line = readline()) != null) {
fs.writeSync(1, line + "\n");
}
BUFFER_SIZEはデフォルトのパイプバッファサイズである64KBにしてみた。
しかし、これはC言語か何か?というような代物になってしまった。node.jsのプログラミングパラダイムでなんとかしたい。
Stream APIを使う
Streamクラスの pipe を使えばバッファをいい感じに管理してくれて、入力にバックプレッシャーを伝えることができる。処理部分はstream.Transformを使ってTransformStreamとして実装する。
var stream = require('stream');
var transform = new stream.Transform({
transform: function (chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
});
process.stdin
.pipe(transform)
.pipe(process.stdout);
1行ずつ改行区切りで処理するなら、byline npmが便利。
LineStreamをpipeで挟むだけで、改行区切りで送ってくれるようになる。
空行は消えるので注意。維持したければ LineStreamのコンンストラクタに {keepEmptyLines: true} を指定すれば良い。
var stream = require('stream');
var LineStream = require('byline').LineStream;
var lineStream = new LineStream();
var transform = new stream.Transform({
transform: function (chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase() + "\n");
}
});
process.stdin
.pipe(lineStream)
.pipe(transform)
.pipe(process.stdout);
問題点:Cっぽいやつより4倍遅い。。バッファの量の設定等で早くなるかは不明。