Пайпинг потоков в Node.js - дело нехитрое. Главное не забывать слушать ошибки в каждом потоке. И на сегодня это единственный кейс, где я нахожу оправданным использование промисов вместо функций обратного вызова. Объясню почему на простом примере. Для начала напишем три кастомных конструктора, которые наследуют от Readable, Transform, Writable и работают в Object Mode:
Создадим несколько потоков:
Пайпинг потоков может выглядеть следующим образом:
Разумеется мы забыли приклеить каждому потоку функцию обработки ошибок, поэтому если в TransformStream выкинуть ошибку, будет печалька:
Если просто приклеить каждому потоку коллбэк, то есть вероятность того, что он будет вызван несколько раз (в нашем примере наверняка), а это как правило не то, что нам нужно:
Поэтому обычно приходится оборачивать коллбэк как-то так:
Но если применить промисы, то можно с этим на париться, т.к. как известно они резолвятся или реджектятся один раз:
Дисклеймер: не думаю, что на текущий момент времени даже "нативные" промисы в JavaScript могут соревноваться в производительности с "vanilla" кодом, в общем, как говорится, думайте сами, решайте сами.
const stream = require('stream'); const Readable = stream.Readable; const Transform = stream.Transform; const Writable = stream.Writable; const InitStream = function () { this._obj = {}; Readable.call(this, { objectMode: true }); }; InitStream.prototype = Object.create(Readable.prototype, { _read: { value: function () { this.push(this._obj); this.push(null); } } }); const TransformStream = function (obj) { this._obj = obj; Transform.call(this, { objectMode: true }); }; TransformStream.prototype = Object.create(Transform.prototype, { _transform: { value: function (chunk, encoding, cb) { this.push(Object.assign(chunk, this._obj)); cb(); } } }); const LogStream = function () { Writable.call(this, { objectMode: true }); }; LogStream.prototype = Object.create(Writable.prototype, { _write: { value: function (chunk, encoding, cb) { console.log(chunk); cb(); } } });
Создадим несколько потоков:
const init = new InitStream(); // "вбрасываем" пустой объект const a = new TransformStream({ a: 1 }); // добавляем свойство "a" const b = new TransformStream({ b: 2 }); // добавляем свойство "b" const c = new TransformStream({ c: 3 }); // добавляем свойство "c" const log = new LogStream(); // выводим в консоль
Пайпинг потоков может выглядеть следующим образом:
const pipe = (init, a, b, c, log, cb) => { log.on('finish', cb); init.pipe(a); a.pipe(b); b.pipe(c); c.pipe(log); }; pipe(init, a, b, c, log, (err) => { if (err) return console.error(err); console.log('finish'); });
Разумеется мы забыли приклеить каждому потоку функцию обработки ошибок, поэтому если в TransformStream выкинуть ошибку, будет печалька:
TransformStream.prototype = Object.create(Transform.prototype, { _transform: { value: function (chunk, encoding, cb) { this.push(Object.assign(chunk, this._obj)); cb(new Error('My Error')); // cb(); } } });
Если просто приклеить каждому потоку коллбэк, то есть вероятность того, что он будет вызван несколько раз (в нашем примере наверняка), а это как правило не то, что нам нужно:
const pipe = (init, a, b, c, log, cb) => { init.on('error', cb); a.on('error', cb); b.on('error', cb); c.on('error', cb); log.on('error', cb); log.on('finish', cb); init.pipe(a); a.pipe(b); b.pipe(c); c.pipe(log); };
Поэтому обычно приходится оборачивать коллбэк как-то так:
const pipe = (init, a, b, c, log, cb) => { let called = false; const callback = (err) => { if (called) return; called = true; cb(err); }; init.on('error', callback); a.on('error', callback); b.on('error', callback); c.on('error', callback); log.on('error', callback); log.on('finish', callback); init.pipe(a); a.pipe(b); b.pipe(c); c.pipe(log); };
Но если применить промисы, то можно с этим на париться, т.к. как известно они резолвятся или реджектятся один раз:
const pipe = (init, a, b, c, log) => new Promise((resolve, reject) => { init.on('error', reject); a.on('error', reject); b.on('error', reject); c.on('error', reject); log.on('error', reject); log.on('finish', resolve); init.pipe(a); a.pipe(b); b.pipe(c); c.pipe(log); }); pipe(init, a, b, c, log) .then(() => console.log('finish')) .catch((err) => console.error(err));
Дисклеймер: не думаю, что на текущий момент времени даже "нативные" промисы в JavaScript могут соревноваться в производительности с "vanilla" кодом, в общем, как говорится, думайте сами, решайте сами.
Комментариев нет:
Отправить комментарий
Комментарий будет опубликован после модерации