Node.jsでUNIX-likeにパイプ処理

巨大なテキストデータを標準入力(pipe等)から受け取り、1行ずつ何か処理をして、結果を標準出力(これもpipe等)に書くプログラムを書こうとしてハマるパターン。
(なお文字コードのことはここでは忘れたことにするので、別途対応が必要かもしれない。)


例えば、ssh remote-host cat huge-file.txt | node process-data.js | xz -c > compressed-data.xz などと使うような想定。


ダメな例1:
var chunk;
while(chunk = process.stdin.read())
  process.stdout.write(`data: ${chunk}`);
}
{ Error: EAGAIN: resource temporarily unavailable, read errno: -35, code: 'EAGAIN', syscall: 'read' }

process.stdin.fdは非同期openされたfdを返すため、fs.readするとpipeにデータが来なくなった瞬間、例外になる。

ダメな例2:
process.stdin.on('readable', () => {
  var chunk = process.stdin.read();
  if (chunk !== null) {
    process.stdout.write(`data: ${chunk}`);
  }
});

一見うまく行くように見える。というかAPIドキュメントに載っているソースである。が、書き出し先がストールすると、見る見るうちにnode.jsのメモリ使用量が増加していく(node echo.js < /dev/urandom | sleep 99999 などとして試してみれば良い)。最後には

FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - process out of memory

お亡くなりになる。

ダメな例3:

fdは、実は単に 0 と指定すれば同期readとなる(というかshellがopen(2)したfdがそのまま使われる)。しかし…

const fs = require("fs");
fs.readFile(0, function (err, data) {
  // ... 処理 & 書き出し
}

これも全データを一度に読み込もうとしてしまうため、メモリが枯渇する。少しずつ読み込もう。

ダメな例4:
const fs = require("fs");
var BUFFER_SIZE = 4096;
var buffer = new Buffer(BUFFER_SIZE);
while (true) {
  var n = fs.readSync(0, buffer, 0, BUFFER_SIZE);
  if (n <= 0)
    return null;
  // bufferを使って何か処理して結果を出力。ここではbufferをそのまま出力する。
  fs.write(process.stdout.fd, buffer);
}

またしてもメモリが枯渇する。fs.writeはどんどんメモリにwriteすべきものを積んでしまうのだ…。process.stdout.writeも同じである。かといってfs.writeSyncにすると、

Error: EAGAIN: resource temporarily unavailable, write

pipeがストールした瞬間、例外。process.stdout.fdも非同期openされたfdなのである。

やっと一応動く例

fs.writeSyncにして、かつfdとして 1 を渡そう。

const fs = require("fs");
var BUFFER_SIZE = 4096;
var buffer = new Buffer(BUFFER_SIZE);
while (true) {
  var n = fs.readSync(0, buffer, 0, BUFFER_SIZE);
  if (n <= 0)
    return null;
  // buffer.slice(0, n)を使って何か処理して結果を出力。ここでは入力をそのまま出力する。
  fs.writeSync(1, buffer.slice(0, n));
}

行で分割したい

1行ずつstdinを同期的に読んで返すreadline関数を作ってみる。改行で入力チャンクが切れるとは限らないので、チャンク同士を繋いでやる処理が必要になる。

const fs = require("fs");
var readline = (function() {
  var BUFFER_SIZE = 65536;
  var buffer = new Buffer(BUFFER_SIZE);
  var lines = [];
  return function() {
    if (lines.length > 1)
      return lines.shift();
    while (true) {
      var n = fs.readSync(0, buffer, 0, BUFFER_SIZE);
      if (n <= 0)
        return lines[0] || null;
      var newlines = buffer.slice(0, n).toString().split("\n");
      if (lines.length == 1)
        lines[0] += newlines.shift();
      lines = lines.concat(newlines);
      if (lines.length > 1) {
        return lines.shift();
      }
    }
  };
})();

var line;
while ((line = readline()) != null) {
  fs.writeSync(1, line + "\n");
}


BUFFER_SIZEはデフォルトのパイプバッファサイズである64KBにしてみた。


しかし、これはC言語か何か?というような代物になってしまった。node.jsのプログラミングパラダイムでなんとかしたい。

Stream APIを使う

Streamクラスの pipe を使えばバッファをいい感じに管理してくれて、入力にバックプレッシャーを伝えることができる。処理部分はstream.Transformを使ってTransformStreamとして実装する。

var stream = require('stream');

// 大文字に変換する
var transform = new stream.Transform({
    transform: function (chunk, encoding, callback) {
      callback(null, chunk.toString().toUpperCase());
    }
});

process.stdin
  .pipe(transform)
  .pipe(process.stdout);


1行ずつ改行区切りで処理するなら、byline npmが便利。
LineStreamをpipeで挟むだけで、改行区切りで送ってくれるようになる。
空行は消えるので注意。維持したければ LineStreamのコンンストラクタに {keepEmptyLines: true} を指定すれば良い。

var stream = require('stream');
var LineStream = require('byline').LineStream;

var lineStream = new LineStream();

var transform = new stream.Transform({
    transform: function (chunk, encoding, callback) {
      callback(null, chunk.toString().toUpperCase() + "\n");
    }
});

process.stdin
  .pipe(lineStream)
  .pipe(transform)
  .pipe(process.stdout);

問題点:Cっぽいやつより4倍遅い。。バッファの量の設定等で早くなるかは不明。