Не знаю как в настоящий момент обстоят дела с поддержкой river plugins в Elasticsearch, а Node.js-модуль по имени mongodb-es-river уже готов оказать помощь в индексировании документов MongoDB в Elasticsearch, теперь с возможностью горизонтального масштабирования процесса "перетекания" данных. Далее расскажу как приготовить river-сервис на основе перечисленных выше ингредиентов.
Разворачиваем MongoDB Replica Set:
Запускаем Elasticsearch Cluster:
Задача - идексировать документы коллекции users:
Решение:
- package.json:
Запускаем приложение:
Ищем юзеров:
Бинго.
Масштабируем "в ширину":
- cluster.js:
- app.js:
Каждый процесс зарегистрировался в реестре сервисов:
Обновляем fullName юзера:
В консоли приложения наблюдаем следующую картину:
Проверяем как поживает индекс:
Все в елочку.
Останавливаем кластер:
Реестр сервисов снова пуст:
Количество реестров и сервисов миксуем самостоятельно.
Как это было:
За микшерским пультом DJ Baur:
P.S.: Вариант хранения состояния сервиса на каждом из нодов сервиса, использованный в этом модуле, на мой взгляд не самый лучший. Для failover реализации river-сервиса с помощью этого модуля желательно запустить хотя бы пару кластеров на разных машинах. Разворачиваем MongoDB Replica Set:
Запускаем Elasticsearch Cluster:
Задача - идексировать документы коллекции users:
Решение:
- package.json:
{- app.js:
"name": "river-0.10.0",
"version": "1.0.0",
"description": "",
"main": "app.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"private": true,
"dependencies": {
"elasticsearch": "^8.0.1",
"mongodb": "^2.0.42",
"mongodb-es-river": "^0.10.0"
}
}
var mongodb = require('mongodb');
var local = 'mongodb://localhost:27001,localhost:27002/local?replicaSet=x';
var test = 'mongodb://localhost:27001,localhost:27002/test?replicaSet=x';
mongodb.connect(local, function(err, localDb) {
if (err) throw err;
mongodb.connect(test, function(err, testDb) {
if (err) throw err;
var settings = {
oplog: localDb.collection('oplog.rs'),
registry: testDb.collection('river'), // коллекция для хранения реестра сервисов
client: require('elasticsearch').Client()
};
var options = [
{
index: 'test',
type: 'users',
collection: testDb.collection('users'), // целевая коллекция
project: {name: 1, fullName: 1} // к примеру нас интересуют только поля name и fullName
}
];
require('mongodb-es-river')(settings, options, function(err) {
if (err) throw err;
console.log('All rivers run, pid:', process.pid);
});
});
});
Запускаем приложение:
Ищем юзеров:
Бинго.
Масштабируем "в ширину":
- cluster.js:
var cluster = require('cluster');Для теста выведем в консоль кое-какую информацию о работе приложения:
if (cluster.isMaster) {
start();
} else {
require('./app');
}
function start() {
var i = require('os').cpus().length;
while (i--) {
cluster.fork();
}
process.on('SIGINT', onKill);
process.on('SIGTERM', onKill);
}
function onKill() {
setTimeout(function() {
process.exit(0);
}, 100); // время, необходимое для удаления документа из коллекции river - реестра сервисов
}
- app.js:
var mongodb = require('mongodb');Запускаем кластер:
var local = 'mongodb://localhost:27001,localhost:27002/local?replicaSet=x';
var test = 'mongodb://localhost:27001,localhost:27002/test?replicaSet=x';
mongodb.connect(local, function(err, localDb) {
if (err) throw err;
mongodb.connect(test, function(err, testDb) {
if (err) throw err;
var settings = {
oplog: localDb.collection('oplog.rs'),
registry: testDb.collection('river'), // коллекция для хранения реестра сервисов
client: require('elasticsearch').Client(),
worker: { // для теста
value: {
pid: process.pid,
},
onExit: function(signal) {
console.log('Process %s exit, signal %s', process.pid, signal);
process.exit(0);
}
}
};
var options = [
{
index: 'test',
type: 'users',
collection: testDb.collection('users'), // целевая коллекция
project: {name: 1, fullName: 1}, // к примеру нас интересуют только поля name и fullName
getObject: function(obj) { // убедимся в том, что процессы индексируют документы по-очереди
console.log('pid:', process.pid);
console.log('obj:', obj);
return obj;
},
init: false // пропустим первоначальный экспорт документов
}
];
require('mongodb-es-river')(settings, options, function(err) {
if (err) throw err;
console.log('All rivers run, pid:', process.pid);
});
});
});
Каждый процесс зарегистрировался в реестре сервисов:
Обновляем fullName юзера:
В консоли приложения наблюдаем следующую картину:
Проверяем как поживает индекс:
Все в елочку.
Останавливаем кластер:
Реестр сервисов снова пуст:
Количество реестров и сервисов миксуем самостоятельно.
Как это было:
За микшерским пультом DJ Baur:
Комментариев нет:
Отправить комментарий
Комментарий будет опубликован после модерации