Protocol Buffers - протокол сериализации структурированных данных. Нельзя просто взять и скомпилировать: rpc Send (stream bytes) returns (void);, нужно обязательно использовать структуры: rpc Send (stream File) returns (Empty);. В Node.js самый простой способ отправить файл, или любой другой Readable Stream - спайпить его с Writable Stream. Для того чтобы отправить файл через gRPC можно пребразовать поток бинарных данных в поток структурированных данных и наоборот с помощью Transform Stream, как-то так: на клиенте Readable Stream -> Transform Stream -> ClientReadableStream, на сервере ServerReadableStream -> Transform Stream -> Writable Stream.
- stream.js:
- server.js:
- client.js:
Для идентификации файла я решил использовать HashStream, который так же как UpStream и DownStream наследует от Transform Stream.
На сервере содержимое файла хранится в DataStream, который наследует от Writable Stream.
- stream.js:
const { Transform, Writable } = require('stream') const { createHash } = require('crypto') class UpStream extends Transform { constructor () { super({ readableObjectMode: true }) } _transform (chunk, _, cb) { this.push({ chunk: chunk }) cb() } } class DownStream extends Transform { constructor () { super({ writableObjectMode: true }) } _transform ({ chunk }, _, cb) { this.push(Buffer.from(chunk)) cb() } } class HashStream extends Transform { constructor () { super() this.hash = createHash('sha1') this.digest = '' } _transform (chunk, _, cb) { this.hash.update(chunk) this.push(chunk) cb() } _flush (cb) { this.digest = this.hash.digest('hex') cb() } } class DataStream extends Writable { constructor () { super() this._data = [] } _write (chunk, _, cb) { this._data.push(chunk) cb() } get data () { return Buffer.concat(this._data).toString() } } module.exports = { UpStream, DownStream, HashStream, DataStream }
- server.js:
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const path = require('path') const protoFileName = path.join(__dirname, 'protos/test.proto') const packageDefinition = protoLoader.loadSync(protoFileName) const grpcObject = grpc.loadPackageDefinition(packageDefinition) const server = new grpc.Server() const { DownStream, HashStream, DataStream } = require('./stream') const send = (upStream, cb) => { const downStream = new DownStream() const hashStream = new HashStream() const dataStream = new DataStream() new Promise((resolve, reject) => { upStream.on('error', reject) downStream.on('error', reject) hashStream.on('error', reject) dataStream.on('error', reject) dataStream.on('finish', () => { console.log('digest:', hashStream.digest) resolve() }) upStream.pipe(downStream).pipe(hashStream).pipe(dataStream) }) .then(cb) .catch((err) => { console.error('err:', err) cb(err) }) } server.addService(grpcObject.test.Sender.service, { send: send }) server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure()) server.start()
- client.js:
const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const path = require('path') const protoFileName = path.join(__dirname, 'protos/test.proto') const packageDefinition = protoLoader.loadSync(protoFileName) const grpcObject = grpc.loadPackageDefinition(packageDefinition) const client = new grpcObject.test.Sender('localhost:50051', grpc.credentials.createInsecure()) const { UpStream, HashStream } = require('./stream') const { createReadStream } = require('fs') const send = (filename) => { return new Promise((resolve, reject) => { const fileStream = createReadStream(filename) const hashStream = new HashStream() const upStream = new UpStream() const callStream = client.send({}, (err) => { if (err) return reject(err) console.log('digest:', hashStream.digest) resolve() }) fileStream.on('error', reject) hashStream.on('error', reject) upStream.on('error', reject) callStream.on('error', reject) fileStream.pipe(hashStream).pipe(upStream).pipe(callStream) }) } send(process.argv[2] || __filename) .catch((err) => { console.error('err:', err) })
Для идентификации файла я решил использовать HashStream, который так же как UpStream и DownStream наследует от Transform Stream.
На сервере содержимое файла хранится в DataStream, который наследует от Writable Stream.
Комментариев нет:
Отправить комментарий
Комментарий будет опубликован после модерации