Обработка ошибок в объектах, реализующих интерфейс Stream в Node.js - задача нетривиальная. Прежде чем мне случилось "по-взрослому" поработать с потоками я полагал, что событие error останавливает процесс передачи данных. Оказалось что это не так. Приведу несколько примеров, демонстрирующих это, в моем случае неожиданное, поведение. Заодно рассмотрим кастомную реализацию некоторых типов интерфейса Stream.
Process.
Process.
Используем process.stdin и process.stdout: попробуем передать данные "по конвейеру", для чего напишем простенькое приложение, которое будет принимать в качестве аргумента строку, содержащую число и на выходе добавлять к полученному аргументу декремент числа через запятую с пробелом.
stream.js:
Отправим в приложение строку "result: 3" и пропустим ее через наш "конвейер" несколько раз:
Теперь попробуем выкинуть ошибку в каждой итерации "конвейера", как в stdin, так и в stdout.
stream.js:
Для тех кто в танке :) обозначу еще раз: я то думал (уверен что не только я один), что в результате первой же ошибки процесс передачи данных будет остановлен.
В этом случае решает process.exit().
stream.js:
Writable.
Реализуем writable интерфейс.
writable.js:
Попробуем выкинуть ошибку.
writable.js:
Завершим writable-поток "вручную" - this.end(). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние writable._writableState.ended.
writable.js:
Readable.
Реализуем readable интерфейс.
readable.js:
Попробуем выкинуть ошибку.
readable.js:
Завершим readable-поток "вручную" - this.push(null). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние readable._readableState.ended.
readable.js:
Transform.
Реализуем transform интерфейс.
transform.js:
Попробуем выкинуть ошибку.
transform.js:
Завершим transform-поток "вручную" - this.end(). Кроме того у transform-потока есть как _readableState, так и _writableState. Я думаю что в нашем случае будет "правильнее" проверить его состояние transform._writableState.ended перед тем тем как отправлять в него очередную порцию данных.
transform.js:
Pipe.
Соберем readable, transform и process.stdout в один конвейер.
pipe.js:
Выкидываем ошибку в readable-потоке.
pipe.js:
Обращаю внимание на то, что transform-поток ничего не знает об ошибке.
Выкидываем ошибку в transform-потоке.
pipe.js:
Теперь readable-поток не в курсе что его "отцепили", даже его событие end не сыграло, но тем не менее приложение в целом отработало валидно.
Заключение.
Несмотря на некоторые "вольности", которые допустимы при работе с объектами, реализующими интерфейс Stream, не забываем обрабатывать ошибки в каждом из них, хотя бы тупо выводом в консоль, иначе будет... печалька :).
stream.js:
process.stdin.on('readable', function() {
var buffer = process.stdin.read();
if (buffer === null) return;
var s = buffer.toString().trim(), arr = s.split(''), d = arr[arr.length - 1];
process.stdout.write(s + ', ' + (d - 1));
});
Отправим в приложение строку "result: 3" и пропустим ее через наш "конвейер" несколько раз:
Теперь попробуем выкинуть ошибку в каждой итерации "конвейера", как в stdin, так и в stdout.
stream.js:
process.stdin.on('readable', function() {
var buffer = process.stdin.read();
if (buffer === null) return;
var s = buffer.toString().trim(), arr = s.split(''), d = arr[arr.length - 1];
setTimeout(function() {
process.stdout.write(s + ', ' + (d - 1));
}, 500);
setTimeout(function() {
process.stdin.emit('error', new Error('stdin error'));
}, 200);
setTimeout(function() {
process.stdout.emit('error', new Error('stdout error'));
}, 300);
});
process.stdin.on('error', function(err) {
console.error(process.pid + ' stdin error: ' + err);
});
process.stdout.on('error', function(err) {
console.error(process.pid + ' stdout error: ' + err);
});
И ничего не случилось - данные все равно "доползли" до заключительной серии:Для тех кто в танке :) обозначу еще раз: я то думал (уверен что не только я один), что в результате первой же ошибки процесс передачи данных будет остановлен.
В этом случае решает process.exit().
stream.js:
process.stdin.on('readable', function() {
var buffer = process.stdin.read();
if (buffer === null) return;
var s = buffer.toString().trim(), arr = s.split(''), d = arr[arr.length - 1];
setTimeout(function() {
process.stdout.write(s + ', ' + (d - 1));
}, 500);
setTimeout(function() {
process.stdin.emit('error', new Error('stdin error'));
}, 200);
setTimeout(function() {
process.stdout.emit('error', new Error('stdout error'));
process.exit(1);
}, 300);
});
process.stdin.on('error', function(err) {
console.error(process.pid + ' stdin error: ' + err);
});
process.stdout.on('error', function(err) {
console.error(process.pid + ' stdout error: ' + err);
});
Writable.
Реализуем writable интерфейс.
writable.js:
var writable = require('stream').Writable();
var i = 4, arr = [];
writable._write = function(chunk, enc, next) {
console.log('chunk: %s', chunk);
arr.push(chunk);
next();
};
writable.on('finish', function() {
console.log('result: %s', arr.join(', '));
});
(function go() {
if (!i--) return writable.end();
setTimeout(function() {
writable.write(i.toString());
go();
}, 500);
})();
Попробуем выкинуть ошибку.
writable.js:
var writable = require('stream').Writable();
var i = 4, arr = [];
writable._write = function(chunk, enc, next) {
console.log('chunk: %s', chunk);
arr.push(chunk);
next();
};
writable.on('finish', function() {
console.log('result: %s', arr.join(', '));
});
writable.on('error', function(err) {
console.log('err:', err);
});
(function go() {
if (!i--) return writable.end();
setTimeout(function() {
writable.write(i.toString());
go();
}, 500);
})();
setTimeout(function() {
writable.emit('error', new Error('Bla-bla-bla'));
}, 1200);
Результат не изменился.Завершим writable-поток "вручную" - this.end(). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние writable._writableState.ended.
writable.js:
var writable = require('stream').Writable();
var i = 4, arr = [];
writable._write = function(chunk, enc, next) {
console.log('chunk: %s', chunk);
arr.push(chunk);
next();
};
writable.on('finish', function() {
console.log('result: %s', arr.join(', '));
});
writable.on('error', function(err) {
console.log('err:', err);
this.end();
});
(function go() {
if (!i--) return writable.end();
setTimeout(function() {
if (writable._writableState.ended) return;
writable.write(i.toString());
go();
}, 500);
})();
setTimeout(function() {
writable.emit('error', new Error('Bla-bla-bla'));
}, 1200);
Readable.
Реализуем readable интерфейс.
readable.js:
var readable = require('stream').Readable();
var i = 4, arr = [];
readable._read = function() {
if (!i--) return this.push(null);
setTimeout(function() {
arr.push(i);
readable.push('chunk: ' + i + '\n');
}, 500);
};
readable.on('end', function() {
console.log('result: %s', arr.join(', '));
});
readable.pipe(process.stdout);
Попробуем выкинуть ошибку.
readable.js:
var readable = require('stream').Readable();
var i = 4, arr = [];
readable._read = function() {
if (!i--) return this.push(null);
setTimeout(function() {
arr.push(i);
readable.push('chunk: ' + i + '\n');
}, 500);
};
readable.on('end', function() {
console.log('result: %s', arr.join(', '));
});
readable.on('error', function(err) {
console.log('error:', err);
});
readable.pipe(process.stdout);
setTimeout(function() {
readable.emit('error', new Error('Bla-bla-bla'));
}, 1200);
Результат не изменился.Завершим readable-поток "вручную" - this.push(null). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние readable._readableState.ended.
readable.js:
var readable = require('stream').Readable();
var i = 4, arr = [];
readable._read = function() {
if (!i--) return this.push(null);
setTimeout(function() {
if (readable._readableState.ended) return;
arr.push(i);
readable.push('chunk: ' + i + '\n');
}, 500);
};
readable.on('end', function() {
console.log('result: %s', arr.join(', '));
});
readable.on('error', function(err) {
console.log('error:', err);
this.push(null);
});
readable.pipe(process.stdout);
setTimeout(function() {
readable.emit('error', new Error('Bla-bla-bla'));
}, 1200);
Transform.
Реализуем transform интерфейс.
transform.js:
var transform = require('stream').Transform();
var i = 4, arr = [];
transform._transform = function(chunk, enc, next) {
this.push('chunk: ' + chunk + '\n');
arr.push(chunk + '0');
next();
};
transform.on('finish', function() {
console.log('result: %s', arr.join(', '));
});
transform.pipe(process.stdout);
(function go() {
if (!i--) return transform.end();
setTimeout(function() {
transform.write(i.toString());
go();
}, 500);
})();
Попробуем выкинуть ошибку.
transform.js:
var transform = require('stream').Transform();
var i = 4, arr = [];
transform._transform = function(chunk, enc, next) {
this.push('chunk: ' + chunk + '\n');
arr.push(chunk + '0');
next();
};
transform.on('finish', function() {
console.log('result: %s', arr.join(', '));
});
transform.on('error', function(err) {
console.log('error:', err);
});
transform.pipe(process.stdout);
(function go() {
if (!i--) return transform.end();
setTimeout(function() {
transform.write(i.toString());
go();
}, 500);
})();
setTimeout(function() {
transform.emit('error', new Error('Bla-bla-bla'));
}, 1200);
И снова результат не меняется.
Завершим transform-поток "вручную" - this.end(). Кроме того у transform-потока есть как _readableState, так и _writableState. Я думаю что в нашем случае будет "правильнее" проверить его состояние transform._writableState.ended перед тем тем как отправлять в него очередную порцию данных.
transform.js:
var transform = require('stream').Transform();
var i = 4, arr = [];
transform._transform = function(chunk, enc, next) {
this.push('chunk: ' + chunk + '\n');
arr.push(chunk.toString() + '0');
next();
};
transform.on('finish', function() {
console.log('result: %s', arr.join(', '));
});
transform.on('error', function(err) {
console.log('error:', err);
this.end();
});
transform.pipe(process.stdout);
(function go() {
if (!i--) return transform.end();
setTimeout(function() {
if (transform._writableState.ended) return;
transform.write(i.toString());
go();
}, 500);
})();
setTimeout(function() {
transform.emit('error', new Error('Bla-bla-bla'));
}, 1200);
Pipe.
Соберем readable, transform и process.stdout в один конвейер.
pipe.js:
var i = 4, arr = [];
// readable
var readable = require('stream').Readable();
readable._read = function() {
if (!i--) return this.push(null);
setTimeout(function() {
if (readable._readableState.ended) return;
readable.push(i.toString());
}, 500);
};
readable.on('end', function() {
console.log('readable end');
});
// transform
var transform = require('stream').Transform();
transform._transform = function(chunk, enc, next) {
this.push('chunk: ' + chunk + '\n');
arr.push(chunk + '0');
next();
};
transform.on('end', function() {
console.log('transform end');
console.log('result: %s', arr.join(', '));
});
transform.on('finish', function() {
console.log('transform finish');
});
// pipe
readable.pipe(transform).pipe(process.stdout);
Выкидываем ошибку в readable-потоке.
pipe.js:
var i = 4, arr = [];
// readable
var readable = require('stream').Readable();
readable._read = function() {
if (!i--) return this.push(null);
setTimeout(function() {
if (readable._readableState.ended) return;
readable.push(i.toString());
}, 500);
};
readable.on('end', function() {
console.log('readable end');
});
// transform
var transform = require('stream').Transform();
transform._transform = function(chunk, enc, next) {
this.push('chunk: ' + chunk + '\n');
arr.push(chunk + '0');
next();
};
transform.on('end', function() {
console.log('transform end');
console.log('result: %s', arr.join(', '));
});
transform.on('finish', function() {
console.log('transform finish');
});
// pipe
readable.pipe(transform).pipe(process.stdout);
readable.on('error', function(err) {
console.log('readable error:', err);
this.push(null);
});
transform.on('error', function(err) {
console.log('transform error:', err);
this.end();
});
setTimeout(function() {
readable.emit('error', new Error('Bla-bla-bla'));
}, 1200);
Обращаю внимание на то, что transform-поток ничего не знает об ошибке.
Выкидываем ошибку в transform-потоке.
pipe.js:
var i = 4, arr = [];
// readable
var readable = require('stream').Readable();
readable._read = function() {
if (!i--) return this.push(null);
setTimeout(function() {
if (readable._readableState.ended) return;
readable.push(i.toString());
}, 500);
};
readable.on('end', function() {
console.log('readable end');
});
// transform
var transform = require('stream').Transform();
transform._transform = function(chunk, enc, next) {
this.push('chunk: ' + chunk + '\n');
arr.push(chunk + '0');
next();
};
transform.on('end', function() {
console.log('transform end');
console.log('result: %s', arr.join(', '));
});
transform.on('finish', function() {
console.log('transform finish');
});
// pipe
readable.pipe(transform).pipe(process.stdout);
readable.on('error', function(err) {
console.log('readable error:', err);
this.push(null);
});
transform.on('error', function(err) {
console.log('transform error:', err);
this.end();
});
setTimeout(function() {
transform.emit('error', new Error('Bla-bla-bla'));
}, 1200);
Теперь readable-поток не в курсе что его "отцепили", даже его событие end не сыграло, но тем не менее приложение в целом отработало валидно.
Заключение.
Несмотря на некоторые "вольности", которые допустимы при работе с объектами, реализующими интерфейс Stream, не забываем обрабатывать ошибки в каждом из них, хотя бы тупо выводом в консоль, иначе будет... печалька :).
















Комментариев нет:
Отправить комментарий
Комментарий будет опубликован после модерации