Секрет создания больших приложений - не создавать большие приложения. В этом посте я хочу раскрыть рецепт приготовления масштабируемых приложений на архитектуре микросервисов на платформе Node.js с использованием MongoDB. Для примера приготовим какое-нибудь бесполезное веб-приложение. Особенность этого рецепта в том, что ничего кроме Node.js и MongoDB не потребуется.
Не знаю что получится в итоге, думаю здесь важен сам процесс, а не результат, поехали...
1. Cluster.
Для начала напишем простой http-сервер.
- app.js:
Запустим по одному процессу на каждое ядро процессора с помощью модуля cluster.
- cl.js:
Через 10 секунд рестартуем каждый из процессов по очереди, таким образом обеспечив доступность 100% по ходу рестарта.
В Linux для этого выполняем kill -SIGUSR2 <PID>, где PID - ID мастер-процесса, узнать который можно выполнив следующую команду: ps ax | grep node.
В Windows этот фокус не пройдет, для теста раскомментируем строку setTimeout(onRestart, 10000);
Через 20 секунд каждый из процессов упадет с ошибкой, после чего успешно поднимется обратно, что и требовалось.
Еще одно замечание на предмет работы модуля cluster: если затестить в Windows, то мы всегда будем попадать на один и тот же процесс, и с этим ничего не поделаешь. В любом случае будем считать, что "боевой" сервер будет не под окнами.
2. Service Registry.
Развернем реплику MongoDB со следующим конфигом:
Напишем модуль регистрации сервисов.
- reg.js:
При запуске каждый сервис создает документ в коллекции services, перед завершением работы удаляет созданный документ.
Научим http-сервер регистрироваться в реестре сервисов.
- app.js:
Обновим модуль запуска кластера. Добавим мастер-процессу слушателя событий SIGINT и SIGTERM - функцию по имени onKill() - для того, чтобы сервисы успевали удалить запись о своем существовании из коллекции services перед тем как уйти в небытие. Кроме того в процессе рестарта рабочих процессов - onRestart() - модуль будет отправлять каждому рабочему процессу сообщение - worker.send('shutdown') - с той же целью.
- cl.js:
Запускаем http-сервер, наблюдаем такую же картину, как и в прошлый раз - на скриншотах выше.
Сразу после запуска коллекция services выглядит примерно так:
После падений, рестартов и т.п. мы имеем все те же сервисы, только с иными _id и pid:
После завершения работы коллекция снова пустая.
3. Reverse Proxy.
Растащим приложение еще больше "в ширину" с помощью балансировщика.
- proxy.js:
В качестве веб-сервера балансировщик использует фреймворк express и отправляет все запросы по маршруту '/' существующим сервисам по алгоритму round-robin. Прочие маршруты и алгоритмы в случае необходимости реализуем самостоятельно.
Перед запуском балансировщик получает все документы коллекции services и создает для каждого сервиса прокси с помощью модуля http-proxy, а также начинает пасти появление новых сервисов и удаление существующих, используя модуль tail.
- tail.js:
Для запуска балансировщика с помощью модуля cluster подмолодим код файла cl.js - добавим возможность запуска не только './app.js', но и любого другого приложения, отправленного вторым аргументом команды запуска кластера (первый аргумент уже занят - это порт).
- cl.js:
Запускаем кластер балансировщиков на порту 5000:
Открываем браузер, идем по адресу http://localhost:5000, на что балансировщик недвусмысленно дает нам понять что в настоящий момент у нас нет серверов способных обработать этот запрос:
Запускаем пару сервисов по имени test (они же http-серверы) на разных портах:
И наш балансировщик начинает раскидывать запросы по только что запущенным сервисам в порядке живой очереди:
Коллекция services в этот момент выглядит следующим образом:
That's all folks! В результате мы приготовили вполне себе масштабируемое приложение. Толку от него никакого, т.к. сервис по имени test не делает ничего кроме как отдает в ответ на запрос свой PID, что впрочем не мешает приложению в случае необходимости стать сколько угодно большим.
Не знаю что получится в итоге, думаю здесь важен сам процесс, а не результат, поехали...
1. Cluster.
Для начала напишем простой http-сервер.
- app.js:
'use strict'; var host = process.env.HOST || '127.0.0.1'; var port = process.env.PORT || process.argv[2] || 3000; var server = require('http').createServer(function(req, res) { console.log('Process <%s> is handling request', process.pid); res.end(process.pid.toString()); }); server.listen(port, host, function() { console.log('Server started at', server.address()); }); setTimeout(function() { throw new Error('Ooops'); }, 20000);
Запустим по одному процессу на каждое ядро процессора с помощью модуля cluster.
- cl.js:
'use strict'; var cluster = require('cluster'); if (cluster.isMaster) { start(); } else { require('./app'); } cluster.on('online', onOnline); cluster.on('exit', onExit); function start() { var i = require('os').cpus().length; while (i--) { spawn(); } process.on('SIGUSR2', onRestart); // setTimeout(onRestart, 10000); } function spawn(env) { return cluster.fork(env); } function onOnline(worker) { console.log('Worker %s online, pid: %s', worker.id, worker.process && worker.process.pid); } function onExit(worker, code, signal) { if (!worker.suicide && code !== 0) { console.log('Worker %s crashed, code: %s, signal: %s', worker.id, code, signal); spawn(); } } function onRestart() { console.log('Restarting workers'); var workers = Object.keys(cluster.workers); var i = workers.length; var timeout = process.env.WORKER_TIMEOUT || 1000; (function restart() { var worker = cluster.workers[workers[--i]]; if (!worker) return; console.log('Restarting worker %s, pid: %s', worker.id, worker.process && worker.process.pid); setTimeout(function() { worker.kill(); worker.once('exit', function() { spawn().once('online', restart); }); }, timeout); })(); }
Через 10 секунд рестартуем каждый из процессов по очереди, таким образом обеспечив доступность 100% по ходу рестарта.
В Linux для этого выполняем kill -SIGUSR2 <PID>, где PID - ID мастер-процесса, узнать который можно выполнив следующую команду: ps ax | grep node.
В Windows этот фокус не пройдет, для теста раскомментируем строку setTimeout(onRestart, 10000);
Через 20 секунд каждый из процессов упадет с ошибкой, после чего успешно поднимется обратно, что и требовалось.
Еще одно замечание на предмет работы модуля cluster: если затестить в Windows, то мы всегда будем попадать на один и тот же процесс, и с этим ничего не поделаешь. В любом случае будем считать, что "боевой" сервер будет не под окнами.
2. Service Registry.
Развернем реплику MongoDB со следующим конфигом:
{ _id: "x", members: [ {_id: 0, host: "localhost:27001"}, {_id: 1, host: "localhost:27002"}, {_id: 2, host: "localhost:27003", arbiterOnly: true} ] }
Напишем модуль регистрации сервисов.
- reg.js:
'use strict'; var mongodb = require('mongodb'); var ObjectID = mongodb.ObjectID; var str = 'mongodb://localhost:27001,localhost:27002/test?replicaSet=x'; module.exports = function(opts, cb) { mongodb.connect(str, function(err, db) { if (err) return cb(err); process._id = new ObjectID(); db.collection('services').insertOne({ _id: process._id, service: opts.service || process.env.SERVICE || 'test', host: opts.host, port: opts.port, pid: process.pid }, function(err) { if (err) return cb(err); db.close(); process.on('SIGINT', onExit.bind(null, 'SIGINT')); process.on('SIGTERM', onExit.bind(null, 'SIGTERM')); process.on('message', onMessage); process.on('uncaughtException', onError); cb(); }); }); }; function onExit(signal) { unregister(function(err) { if (err) console.error(err); console.log('Process %s exit, signal: %s', process.pid, signal); process.exit(0); }); } function onMessage(message) { if (message === 'shutdown') { unregister(function(err) { if (err) console.error(err); }); } } function onError(err) { console.error(err.stack); unregister(function(err) { if (err) console.error(err); process.exit(1); }); } function unregister(cb) { mongodb.connect(str, function(err, db) { if (err) return cb(err); db.collection('services').deleteOne({_id: process._id}, function(err) { db.close(); cb(err); }); }); }
При запуске каждый сервис создает документ в коллекции services, перед завершением работы удаляет созданный документ.
Научим http-сервер регистрироваться в реестре сервисов.
- app.js:
'use strict'; var host = process.env.HOST || '127.0.0.1'; var port = process.env.PORT || process.argv[2] || 3000; var server = require('http').createServer(function(req, res) { console.log('Process <%s> is handling request', process.pid); res.end(process.pid.toString()); }); require('./reg')({host: host, port: port}, function(err) { if (err) throw err; server.listen(port, host, function() { console.log('Server started at', server.address()); }); }); setTimeout(function() { throw new Error('Ooops'); }, 20000);
Обновим модуль запуска кластера. Добавим мастер-процессу слушателя событий SIGINT и SIGTERM - функцию по имени onKill() - для того, чтобы сервисы успевали удалить запись о своем существовании из коллекции services перед тем как уйти в небытие. Кроме того в процессе рестарта рабочих процессов - onRestart() - модуль будет отправлять каждому рабочему процессу сообщение - worker.send('shutdown') - с той же целью.
- cl.js:
'use strict'; var cluster = require('cluster'); if (cluster.isMaster) { start(); } else { require('./app'); } cluster.on('online', onOnline); cluster.on('exit', onExit); function start() { var i = require('os').cpus().length; while (i--) { spawn(); } process.on('SIGINT', onKill); process.on('SIGTERM', onKill); process.on('SIGUSR2', onRestart); // setTimeout(onRestart, 10000); } function spawn(env) { return cluster.fork(env); } function onOnline(worker) { console.log('Worker %s online, pid: %s', worker.id, worker.process && worker.process.pid); } function onExit(worker, code, signal) { if (!worker.suicide && code !== 0) { console.log('Worker %s crashed, code: %s, signal: %s', worker.id, code, signal); spawn(); } } function onRestart() { console.log('Restarting workers'); var workers = Object.keys(cluster.workers); var i = workers.length; var timeout = process.env.WORKER_TIMEOUT || 1000; (function restart() { var worker = cluster.workers[workers[--i]]; if (!worker) return; console.log('Restarting worker %s, pid: %s', worker.id, worker.process && worker.process.pid); worker.send('shutdown'); setTimeout(function() { worker.kill(); worker.once('exit', function() { spawn().once('online', restart); }); }, timeout); })(); } function onKill() { setTimeout(function() { console.log('Bye'); process.exit(0); }, process.env.WORKER_TIMEOUT || 1000); }
Запускаем http-сервер, наблюдаем такую же картину, как и в прошлый раз - на скриншотах выше.
Сразу после запуска коллекция services выглядит примерно так:
После падений, рестартов и т.п. мы имеем все те же сервисы, только с иными _id и pid:
После завершения работы коллекция снова пустая.
3. Reverse Proxy.
Растащим приложение еще больше "в ширину" с помощью балансировщика.
- proxy.js:
'use strict'; var host = process.env.HOST || '127.0.0.1'; var port = process.env.PORT || process.argv[2] || 5000; var express = require('express'); var app = express(); var logger = require('morgan'); var httpProxy = require('http-proxy'); var cache = { test: [] }; var services = { test: roundr(cache.test) }; var onNextService = {i: insertService, d: deleteService}; require('./tail')(onNextService, function(err) { if (err) throw err; app.use(logger('dev')); app.get('/', testHandler); app.use(err404Handler); app.use(err500Handler); var server = require('http').createServer(app); server.listen(port, host, function() { console.log('Proxy server running at', server.address()); }); }); function roundr(arr) { var i = Number.MAX_SAFE_INTEGER; return { next: function() { i = i - 1 || Number.MAX_SAFE_INTEGER; return arr[i%arr.length]; }, hasNext: function() { return !!arr.length; } }; } function insertService(item) { if (item && item.service && cache[item.service] && item.host && item.port && !cache[item.service].some(serviceExists) && item._id) { console.log('Process <%s> is creating <%s> service proxy on <%s:%s>', process.pid, item.service, item.host, item.port); cache[item.service].push(createProxy({ host: item.host, port: item.port }, item._id.toString())); } function serviceExists(obj) { return obj.options && obj.options.target && obj.options.target.host === item.host && obj.options.target.port === item.port; } } function deleteService(item) { var _id = item._id && item._id.toString(); if (_id) { Object.keys(cache).every(function(service) { return cache[service].every(removeService); }); } function removeService(obj, i, arr) { if (obj._id === _id) { arr.splice(i, 1); return false; } return true; } } function createProxy(target, _id) { var proxy = httpProxy.createProxyServer({ target: target }); proxy._id = _id; proxy.on('error', onError); return proxy; } function onError(err, req, res) { if (err.code === 'ECONNRESET') return req.connection.destroy(); console.error(err); if (!res.headersSent) { res.writeHead(500); res.end(); } } function testHandler(req, res, next) { if (services.test.hasNext()) return services.test.next().web(req, res); next(new Error('Service unavailable')); } function err404Handler(req, res, next) { var err = new Error('Not Found'); err.status = 404; next(err); } function err500Handler(err, req, res, next) { res.status(err.status || 500).end(err.message); }
В качестве веб-сервера балансировщик использует фреймворк express и отправляет все запросы по маршруту '/' существующим сервисам по алгоритму round-robin. Прочие маршруты и алгоритмы в случае необходимости реализуем самостоятельно.
Перед запуском балансировщик получает все документы коллекции services и создает для каждого сервиса прокси с помощью модуля http-proxy, а также начинает пасти появление новых сервисов и удаление существующих, используя модуль tail.
- tail.js:
'use strict'; var mongodb = require('mongodb'); var tail = require('mongodb-tail'); var local = 'mongodb://localhost:27001,localhost:27002/local?replicaSet=x'; var test = 'mongodb://localhost:27001,localhost:27002/test?replicaSet=x'; module.exports = function(onNextService, cb) { mongodb.connect(local, function(err, local) { if (err) return cb(err); tail(local.collection('oplog.rs'), { getCursor: function getCursor(latest) { return { ts: {$gt: latest.ts}, fromMigrate: {$exists: false}, ns: 'test.services', op: {$in: ['i', 'd']} }; }, onNextObject: function onNextObject(item, cb) { if (item.o) onNextService[item.op].call(null, item.o); cb(); } }, function() { mongodb.connect(test, function(err, db) { if (err) return cb(err); db.collection('services').find({}, function(err, cursor) { if (err) return cb(err); cursor.each(function(err, item) { if (err) return cb(err); if (item) return onNextService.i(item); db.close(); return cb(); }); }); }); }); process.on('SIGINT', onExit.bind(null, 'SIGINT')); process.on('SIGTERM', onExit.bind(null, 'SIGTERM')); process.on('uncaughtException', onError); function onExit(signal) { setTimeout(function() { local.close(); console.log('Process %s exit, signal: %s', process.pid, signal); process.exit(0); }, 0); } function onError(err) { console.error(err.stack); setTimeout(function() { local.close(); process.exit(1); }, 0); } }); };
Для запуска балансировщика с помощью модуля cluster подмолодим код файла cl.js - добавим возможность запуска не только './app.js', но и любого другого приложения, отправленного вторым аргументом команды запуска кластера (первый аргумент уже занят - это порт).
- cl.js:
... if (cluster.isMaster) { start(); } else { require(process.argv[3] || './app'); } ...
Запускаем кластер балансировщиков на порту 5000:
Открываем браузер, идем по адресу http://localhost:5000, на что балансировщик недвусмысленно дает нам понять что в настоящий момент у нас нет серверов способных обработать этот запрос:
Запускаем пару сервисов по имени test (они же http-серверы) на разных портах:
И наш балансировщик начинает раскидывать запросы по только что запущенным сервисам в порядке живой очереди:
Коллекция services в этот момент выглядит следующим образом:
That's all folks! В результате мы приготовили вполне себе масштабируемое приложение. Толку от него никакого, т.к. сервис по имени test не делает ничего кроме как отдает в ответ на запрос свой PID, что впрочем не мешает приложению в случае необходимости стать сколько угодно большим.
Комментариев нет:
Отправить комментарий
Комментарий будет опубликован после модерации