Обработка ошибок в объектах, реализующих интерфейс Stream в Node.js - задача нетривиальная. Прежде чем мне случилось "по-взрослому" поработать с потоками я полагал, что событие error останавливает процесс передачи данных. Оказалось что это не так. Приведу несколько примеров, демонстрирующих это, в моем случае неожиданное, поведение. Заодно рассмотрим кастомную реализацию некоторых типов интерфейса Stream.
Process.
Process.
Используем process.stdin и process.stdout: попробуем передать данные "по конвейеру", для чего напишем простенькое приложение, которое будет принимать в качестве аргумента строку, содержащую число и на выходе добавлять к полученному аргументу декремент числа через запятую с пробелом.
stream.js:
Отправим в приложение строку "result: 3" и пропустим ее через наш "конвейер" несколько раз:
Теперь попробуем выкинуть ошибку в каждой итерации "конвейера", как в stdin, так и в stdout.
stream.js:
Для тех кто в танке :) обозначу еще раз: я то думал (уверен что не только я один), что в результате первой же ошибки процесс передачи данных будет остановлен.
В этом случае решает process.exit().
stream.js:
Writable.
Реализуем writable интерфейс.
writable.js:
Попробуем выкинуть ошибку.
writable.js:
Завершим writable-поток "вручную" - this.end(). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние writable._writableState.ended.
writable.js:
Readable.
Реализуем readable интерфейс.
readable.js:
Попробуем выкинуть ошибку.
readable.js:
Завершим readable-поток "вручную" - this.push(null). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние readable._readableState.ended.
readable.js:
Transform.
Реализуем transform интерфейс.
transform.js:
Попробуем выкинуть ошибку.
transform.js:
Завершим transform-поток "вручную" - this.end(). Кроме того у transform-потока есть как _readableState, так и _writableState. Я думаю что в нашем случае будет "правильнее" проверить его состояние transform._writableState.ended перед тем тем как отправлять в него очередную порцию данных.
transform.js:
Pipe.
Соберем readable, transform и process.stdout в один конвейер.
pipe.js:
Выкидываем ошибку в readable-потоке.
pipe.js:
Обращаю внимание на то, что transform-поток ничего не знает об ошибке.
Выкидываем ошибку в transform-потоке.
pipe.js:
Теперь readable-поток не в курсе что его "отцепили", даже его событие end не сыграло, но тем не менее приложение в целом отработало валидно.
Заключение.
Несмотря на некоторые "вольности", которые допустимы при работе с объектами, реализующими интерфейс Stream, не забываем обрабатывать ошибки в каждом из них, хотя бы тупо выводом в консоль, иначе будет... печалька :).
stream.js:
process.stdin.on('readable', function() { var buffer = process.stdin.read(); if (buffer === null) return; var s = buffer.toString().trim(), arr = s.split(''), d = arr[arr.length - 1]; process.stdout.write(s + ', ' + (d - 1)); });
Отправим в приложение строку "result: 3" и пропустим ее через наш "конвейер" несколько раз:
Теперь попробуем выкинуть ошибку в каждой итерации "конвейера", как в stdin, так и в stdout.
stream.js:
process.stdin.on('readable', function() { var buffer = process.stdin.read(); if (buffer === null) return; var s = buffer.toString().trim(), arr = s.split(''), d = arr[arr.length - 1]; setTimeout(function() { process.stdout.write(s + ', ' + (d - 1)); }, 500); setTimeout(function() { process.stdin.emit('error', new Error('stdin error')); }, 200); setTimeout(function() { process.stdout.emit('error', new Error('stdout error')); }, 300); }); process.stdin.on('error', function(err) { console.error(process.pid + ' stdin error: ' + err); }); process.stdout.on('error', function(err) { console.error(process.pid + ' stdout error: ' + err); });И ничего не случилось - данные все равно "доползли" до заключительной серии:
Для тех кто в танке :) обозначу еще раз: я то думал (уверен что не только я один), что в результате первой же ошибки процесс передачи данных будет остановлен.
В этом случае решает process.exit().
stream.js:
process.stdin.on('readable', function() { var buffer = process.stdin.read(); if (buffer === null) return; var s = buffer.toString().trim(), arr = s.split(''), d = arr[arr.length - 1]; setTimeout(function() { process.stdout.write(s + ', ' + (d - 1)); }, 500); setTimeout(function() { process.stdin.emit('error', new Error('stdin error')); }, 200); setTimeout(function() { process.stdout.emit('error', new Error('stdout error')); process.exit(1); }, 300); }); process.stdin.on('error', function(err) { console.error(process.pid + ' stdin error: ' + err); }); process.stdout.on('error', function(err) { console.error(process.pid + ' stdout error: ' + err); });
Writable.
Реализуем writable интерфейс.
writable.js:
var writable = require('stream').Writable(); var i = 4, arr = []; writable._write = function(chunk, enc, next) { console.log('chunk: %s', chunk); arr.push(chunk); next(); }; writable.on('finish', function() { console.log('result: %s', arr.join(', ')); }); (function go() { if (!i--) return writable.end(); setTimeout(function() { writable.write(i.toString()); go(); }, 500); })();
Попробуем выкинуть ошибку.
writable.js:
var writable = require('stream').Writable(); var i = 4, arr = []; writable._write = function(chunk, enc, next) { console.log('chunk: %s', chunk); arr.push(chunk); next(); }; writable.on('finish', function() { console.log('result: %s', arr.join(', ')); }); writable.on('error', function(err) { console.log('err:', err); }); (function go() { if (!i--) return writable.end(); setTimeout(function() { writable.write(i.toString()); go(); }, 500); })(); setTimeout(function() { writable.emit('error', new Error('Bla-bla-bla')); }, 1200);Результат не изменился.
Завершим writable-поток "вручную" - this.end(). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние writable._writableState.ended.
writable.js:
var writable = require('stream').Writable(); var i = 4, arr = []; writable._write = function(chunk, enc, next) { console.log('chunk: %s', chunk); arr.push(chunk); next(); }; writable.on('finish', function() { console.log('result: %s', arr.join(', ')); }); writable.on('error', function(err) { console.log('err:', err); this.end(); }); (function go() { if (!i--) return writable.end(); setTimeout(function() { if (writable._writableState.ended) return; writable.write(i.toString()); go(); }, 500); })(); setTimeout(function() { writable.emit('error', new Error('Bla-bla-bla')); }, 1200);
Readable.
Реализуем readable интерфейс.
readable.js:
var readable = require('stream').Readable(); var i = 4, arr = []; readable._read = function() { if (!i--) return this.push(null); setTimeout(function() { arr.push(i); readable.push('chunk: ' + i + '\n'); }, 500); }; readable.on('end', function() { console.log('result: %s', arr.join(', ')); }); readable.pipe(process.stdout);
Попробуем выкинуть ошибку.
readable.js:
var readable = require('stream').Readable(); var i = 4, arr = []; readable._read = function() { if (!i--) return this.push(null); setTimeout(function() { arr.push(i); readable.push('chunk: ' + i + '\n'); }, 500); }; readable.on('end', function() { console.log('result: %s', arr.join(', ')); }); readable.on('error', function(err) { console.log('error:', err); }); readable.pipe(process.stdout); setTimeout(function() { readable.emit('error', new Error('Bla-bla-bla')); }, 1200);Результат не изменился.
Завершим readable-поток "вручную" - this.push(null). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние readable._readableState.ended.
readable.js:
var readable = require('stream').Readable(); var i = 4, arr = []; readable._read = function() { if (!i--) return this.push(null); setTimeout(function() { if (readable._readableState.ended) return; arr.push(i); readable.push('chunk: ' + i + '\n'); }, 500); }; readable.on('end', function() { console.log('result: %s', arr.join(', ')); }); readable.on('error', function(err) { console.log('error:', err); this.push(null); }); readable.pipe(process.stdout); setTimeout(function() { readable.emit('error', new Error('Bla-bla-bla')); }, 1200);
Transform.
Реализуем transform интерфейс.
transform.js:
var transform = require('stream').Transform(); var i = 4, arr = []; transform._transform = function(chunk, enc, next) { this.push('chunk: ' + chunk + '\n'); arr.push(chunk + '0'); next(); }; transform.on('finish', function() { console.log('result: %s', arr.join(', ')); }); transform.pipe(process.stdout); (function go() { if (!i--) return transform.end(); setTimeout(function() { transform.write(i.toString()); go(); }, 500); })();
Попробуем выкинуть ошибку.
transform.js:
var transform = require('stream').Transform(); var i = 4, arr = []; transform._transform = function(chunk, enc, next) { this.push('chunk: ' + chunk + '\n'); arr.push(chunk + '0'); next(); }; transform.on('finish', function() { console.log('result: %s', arr.join(', ')); }); transform.on('error', function(err) { console.log('error:', err); }); transform.pipe(process.stdout); (function go() { if (!i--) return transform.end(); setTimeout(function() { transform.write(i.toString()); go(); }, 500); })(); setTimeout(function() { transform.emit('error', new Error('Bla-bla-bla')); }, 1200);
И снова результат не меняется.
Завершим transform-поток "вручную" - this.end(). Кроме того у transform-потока есть как _readableState, так и _writableState. Я думаю что в нашем случае будет "правильнее" проверить его состояние transform._writableState.ended перед тем тем как отправлять в него очередную порцию данных.
transform.js:
var transform = require('stream').Transform(); var i = 4, arr = []; transform._transform = function(chunk, enc, next) { this.push('chunk: ' + chunk + '\n'); arr.push(chunk.toString() + '0'); next(); }; transform.on('finish', function() { console.log('result: %s', arr.join(', ')); }); transform.on('error', function(err) { console.log('error:', err); this.end(); }); transform.pipe(process.stdout); (function go() { if (!i--) return transform.end(); setTimeout(function() { if (transform._writableState.ended) return; transform.write(i.toString()); go(); }, 500); })(); setTimeout(function() { transform.emit('error', new Error('Bla-bla-bla')); }, 1200);
Pipe.
Соберем readable, transform и process.stdout в один конвейер.
pipe.js:
var i = 4, arr = []; // readable var readable = require('stream').Readable(); readable._read = function() { if (!i--) return this.push(null); setTimeout(function() { if (readable._readableState.ended) return; readable.push(i.toString()); }, 500); }; readable.on('end', function() { console.log('readable end'); }); // transform var transform = require('stream').Transform(); transform._transform = function(chunk, enc, next) { this.push('chunk: ' + chunk + '\n'); arr.push(chunk + '0'); next(); }; transform.on('end', function() { console.log('transform end'); console.log('result: %s', arr.join(', ')); }); transform.on('finish', function() { console.log('transform finish'); }); // pipe readable.pipe(transform).pipe(process.stdout);
Выкидываем ошибку в readable-потоке.
pipe.js:
var i = 4, arr = []; // readable var readable = require('stream').Readable(); readable._read = function() { if (!i--) return this.push(null); setTimeout(function() { if (readable._readableState.ended) return; readable.push(i.toString()); }, 500); }; readable.on('end', function() { console.log('readable end'); }); // transform var transform = require('stream').Transform(); transform._transform = function(chunk, enc, next) { this.push('chunk: ' + chunk + '\n'); arr.push(chunk + '0'); next(); }; transform.on('end', function() { console.log('transform end'); console.log('result: %s', arr.join(', ')); }); transform.on('finish', function() { console.log('transform finish'); }); // pipe readable.pipe(transform).pipe(process.stdout); readable.on('error', function(err) { console.log('readable error:', err); this.push(null); }); transform.on('error', function(err) { console.log('transform error:', err); this.end(); }); setTimeout(function() { readable.emit('error', new Error('Bla-bla-bla')); }, 1200);
Обращаю внимание на то, что transform-поток ничего не знает об ошибке.
Выкидываем ошибку в transform-потоке.
pipe.js:
var i = 4, arr = []; // readable var readable = require('stream').Readable(); readable._read = function() { if (!i--) return this.push(null); setTimeout(function() { if (readable._readableState.ended) return; readable.push(i.toString()); }, 500); }; readable.on('end', function() { console.log('readable end'); }); // transform var transform = require('stream').Transform(); transform._transform = function(chunk, enc, next) { this.push('chunk: ' + chunk + '\n'); arr.push(chunk + '0'); next(); }; transform.on('end', function() { console.log('transform end'); console.log('result: %s', arr.join(', ')); }); transform.on('finish', function() { console.log('transform finish'); }); // pipe readable.pipe(transform).pipe(process.stdout); readable.on('error', function(err) { console.log('readable error:', err); this.push(null); }); transform.on('error', function(err) { console.log('transform error:', err); this.end(); }); setTimeout(function() { transform.emit('error', new Error('Bla-bla-bla')); }, 1200);
Теперь readable-поток не в курсе что его "отцепили", даже его событие end не сыграло, но тем не менее приложение в целом отработало валидно.
Заключение.
Несмотря на некоторые "вольности", которые допустимы при работе с объектами, реализующими интерфейс Stream, не забываем обрабатывать ошибки в каждом из них, хотя бы тупо выводом в консоль, иначе будет... печалька :).
Комментариев нет:
Отправить комментарий
Комментарий будет опубликован после модерации