Для начала предлагаю создать эту самую "ограниченную" коллекцию, я назвал ее "messages" так как планирую хранить в ней сообщения простенького чата, который должен получиться в итоге:
Теперь пишем код приложения, которое будет выводить в консоль свежие документы по мере их поступления в коллекцию, а заодно и добавлять документы с целью тестирования работы "tailable" курсора:
var Db = require('mongodb').Db;
var Server = require('mongodb').Server;
var config = {db: 'test', host: 'localhost', port: 27017, collection: 'messages'};
var db = new Db(config.db, new Server(config.host, config.port, { auto_reconnect: true }), {w: 'majority', safe: true});
db.open(function(err) {
if (err) throw err;
console.log('MongoDB connected to db %s on %s:%d', config.db, config.host, config.port);
db.collection(config.collection, function(err, collection) {
if (err) throw err;
collection.isCapped(function(err, capped) {
if (err) throw err;
if (!capped) throw new Error('Collection ' + config.collection + ' is not capped');
var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
latest.nextObject(function(err, item) {
if (err) throw err;
if (item) return gogogo(collection, item._id);
console.log('Inserting first document...');
collection.insert({a: 0}, function(err, items) {
if (err) throw err;
gogogo(collection, items[0]._id);
});
});
});
});
});
function gogogo(collection, oid) {
console.log('Start tailing on %s collection...', config.collection);
var query = { _id: { $gt: oid }};
var options = { tailable: true, awaitdata: true, numberOfRetries: -1, tailableRetryInterval: 200 };
var cursor = collection.find(query, options).sort({ $natural: 1 });
(function next() {
cursor.nextObject(function(err, item) {
if (err) throw err;
console.log(item);
next();
});
})();
// тест
var i = 1;
(function test() {
setTimeout(function() {
collection.insert({a: i}, function(err) {
if (err) throw err;
i += 1;
test();
});
}, 5000);
})();
}
Запускаем приложение, через 15 секунд наблюдаем следующую картину:
Для того, чтобы курсор "зацепился" за коллекцию необходимо чтобы самый первый, я бы сказал "initial" запрос - latest.nextObject... - вернул документ, что предусмотрено в коде и собственно почему у нас в консоли появилось сообщение Inserting first document..., после чего мы вставили в коллекцию документ {a: 0}.
Итак, код отработал как надо, курсор отловил свежие документы информация о которых благополучно отобразилась в консоли.
Переходим к созданию простенького чата. За основу я взял код чата на TCP-сокетах - проще некуда:
- server.js:
- client.js:
- pubsub.js:
Запускаем сервер, пару клиентов, клиенты приветствуют друг друга, после чего консоль сервера выглядит как-то так:
Клиенты в свою очередь могут созерцать сообщения чата:
На текущий момент сообщения вообще никак не хранятся.
Создадим модуль по имени db на основе нашего приложения app.js:
- db.js:
Теперь вместо вывода в консоль свежего документа код эмитит событие broadcast модуля pubsub. Кроме того созданный модуль экспортирует пару методов для создания и извлечения документов коллекции messages.
Редактируем код сервера:
- server.js:
Отличия от первоначального варианта:
- сразу после присоединения к чату - pubsub.emit('join'...- мы получаем последние три сообщения и отдаем их новому клиенту - db.find(...
- вместо того, чтобы эмитить событие 'broadcast' - pubsub.emit('broadcast'..., теперь мы просто добавляем новое сообщение - db.insert(..., а событие эмитится уже в модуле db - в функции с многозначительным названием gogogo :) - в тот самый момент, когда "tailable" курсор получает свежесозданный документ коллекции messages.
Изменения в модуле pubsub ограничиваются появлением функции обратного вызова в списке аргументов функции, выполняемой в ответ на событие подключения к чату нового клиента:
- pubsub.js:
Запускаем сервер, подключаем первого клиента, пишем в чат несколько сообщений:
По понятным причинам первые три сообщения, которые получил клиент при подключении выглядят странно, так как три последних сообщения на тот момент - это тестовые документы коллекции messages, созданные в процессе тестирования приложения app.js в самом начале повествования и выглядят они следующим образом:
Подключаем второго клиента, пишем сообщение в чат:
Здесь уже все в елочку. На сервере в этот момент тоже все по фэн-шую:
Таким образом мы получили реализацию pub/sub системы используя "tailable" (не знаю как правильно перевести на русский язык это слово, да и не уверен что имеет смысл это делать) курсор.
Теперь можно масштабировать приложение "в ширину" - разворачивать несколько серверов, хотя вряд ли это имеет смысл в случае нашего чата :).
Для того, чтобы курсор "зацепился" за коллекцию необходимо чтобы самый первый, я бы сказал "initial" запрос - latest.nextObject... - вернул документ, что предусмотрено в коде и собственно почему у нас в консоли появилось сообщение Inserting first document..., после чего мы вставили в коллекцию документ {a: 0}.
Итак, код отработал как надо, курсор отловил свежие документы информация о которых благополучно отобразилась в консоли.
Переходим к созданию простенького чата. За основу я взял код чата на TCP-сокетах - проще некуда:
- server.js:
var net = require('net');
var pubsub = require('./pubsub');
var server = net.createServer(function(socket) {
socket.setEncoding('utf8');
console.log('--- socket connected ---\nfrom: %s', socket.remoteAddress + ':' + socket.remotePort);
pubsub.emit('join', socket);
socket.on('data', function(data) {
data = data.replace(/^\s+|\s+$|\r\n/g, '');
if (!data) return this.write('\033[1AEmpty message!');
console.log('--- socket data ---\n%s', data);
pubsub.emit('broadcast', this._id, data);
});
socket.on('close', function() {
console.log('--- socket closed ---');
pubsub.emit('leave', this);
});
socket.on('end', function() {
console.log('--- socket end ---');
pubsub.emit('leave', this);
});
socket.on('error', function(e) {
console.log('--- server error ---\ncode: %s', e.code);
});
});
server.listen(8124, function() {
console.log('Chamber of Secrets is opened on port %d...', this.address()['port']);
});
- client.js:
var net = require('net');
var socket = new net.Socket();
socket.setEncoding('utf8');
socket.connect('8124', 'localhost', function() {
console.log('--- connected to server ---');
});
process.stdin.resume();
process.stdin.on('data', function(data) {
socket.write(data);
});
socket.on('data', function(data) {
console.log(data);
});
socket.on('close', function() {
console.log('--- connection closed ---');
process.exit();
});
socket.on('end', function() {
console.log('--- connection end ---');
});
socket.on('error', function(e) {
console.log('--- socket error ---\ncode: %s', e.code);
});
- pubsub.js:
var events = require('events');
var pubsub = new events.EventEmitter();
pubsub.clients = {};
pubsub.subscriptions = {};
pubsub.on('join', function(socket) {
socket['_id'] = socket.remoteAddress + ':' + socket.remotePort;
this.clients[socket._id] = socket;
this.subscriptions[socket._id] = function(socket_id, data) {
data = new Date().toLocaleTimeString() + ' ' + socket_id + ' >>> ' + data;
if (socket._id === socket_id) data = '\033[1A' + data;
this.clients[socket._id].write(data);
}
this.on('broadcast', this.subscriptions[socket._id]);
console.log('--- socket saved ---\nusers online: %d', this.listeners('broadcast').length);
socket.write('Welcome to Chamber of Secrets!');
});
pubsub.on('leave', function(socket) {
delete pubsub.clients[socket._id];
this.removeListener('broadcast', this.subscriptions[socket._id]);
socket.destroy();
console.log('--- socket destroyed ---\nusers online: %d', this.listeners('broadcast').length);
});
pubsub.on('error', function(e) {
console.log('--- pubsub error ---\n%s', e.message);
});
module.exports = pubsub;
Запускаем сервер, пару клиентов, клиенты приветствуют друг друга, после чего консоль сервера выглядит как-то так:
Клиенты в свою очередь могут созерцать сообщения чата:
На текущий момент сообщения вообще никак не хранятся.
Создадим модуль по имени db на основе нашего приложения app.js:
- db.js:
var pubsub = require('./pubsub');
var Db = require('mongodb').Db;
var Server = require('mongodb').Server;
var config = {db: 'test', host: 'localhost', port: 27017, collection: 'messages'};
var db = new Db(config.db, new Server(config.host, config.port, { auto_reconnect: true }), {w: 'majority', safe: true});
db.open(function(err) {
if (err) throw err;
console.log('MongoDB connected to db %s on %s:%d', config.db, config.host, config.port);
db.collection(config.collection, function(err, collection) {
if (err) throw err;
collection.isCapped(function(err, capped) {
if (err) throw err;
if (!capped) throw new Error('Collection ' + config.collection + ' is not capped');
var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
latest.nextObject(function(err, item) {
if (err) throw err;
if (item) return gogogo(collection, item._id);
console.log('Inserting first document...');
collection.insert({a: 0}, function(err, items) {
if (err) throw err;
gogogo(collection, items[0]._id);
});
});
});
});
});
function gogogo(collection, oid) {
console.log('Start tailing on %s collection...', config.collection);
var query = { _id: { $gt: oid }};
var options = { tailable: true, awaitdata: true, numberOfRetries: -1, tailableRetryInterval: 200 };
var cursor = collection.find(query, options).sort({ $natural: 1 });
(function next() {
cursor.nextObject(function(err, item) {
if (err) throw err;
pubsub.emit('broadcast', item.from, item.message);
next();
});
})();
}
module.exports.insert = function(client_id, message, cb) {
db.collection(config.collection).insert({message: message, from: client_id}, cb);
};
module.exports.find = function(limit, cb) {
db.collection(config.collection).find({}).sort({ $natural: -1 }).limit(limit).toArray(cb);
};
Теперь вместо вывода в консоль свежего документа код эмитит событие broadcast модуля pubsub. Кроме того созданный модуль экспортирует пару методов для создания и извлечения документов коллекции messages.
Редактируем код сервера:
- server.js:
var net = require('net');
var pubsub = require('./pubsub');
var db = require('./db');
var server = net.createServer(function(socket) {
socket.setEncoding('utf8');
console.log('--- socket connected ---\nfrom: %s', socket.remoteAddress + ':' + socket.remotePort);
pubsub.emit('join', socket, function() {
db.find(3, function(err, items) {
if (err) throw err;
var str = '\n' + items.map(function(item) {
return item._id.getTimestamp().toLocaleTimeString() + ' ' + item.from + ' >>> ' + item.message;
}).reverse().join('\n');
socket.write(str);
});
});
socket.on('data', function(data) {
data = data.replace(/^\s+|\s+$|\r\n/g, '');
if (!data) return this.write('\033[1AEmpty message!');
console.log('--- socket data ---\n%s', data);
db.insert(this._id, data, function(err) {
if (err) throw (err);
});
});
socket.on('close', function() {
console.log('--- socket closed ---');
pubsub.emit('leave', this);
});
socket.on('end', function() {
console.log('--- socket end ---');
pubsub.emit('leave', this);
});
socket.on('error', function(e) {
console.log('--- server error ---\ncode: %s', e.code);
});
});
server.listen(8124, function() {
console.log('Chamber of Secrets is opened on port %d...', this.address()['port']);
});
Отличия от первоначального варианта:
- сразу после присоединения к чату - pubsub.emit('join'...- мы получаем последние три сообщения и отдаем их новому клиенту - db.find(...
- вместо того, чтобы эмитить событие 'broadcast' - pubsub.emit('broadcast'..., теперь мы просто добавляем новое сообщение - db.insert(..., а событие эмитится уже в модуле db - в функции с многозначительным названием gogogo :) - в тот самый момент, когда "tailable" курсор получает свежесозданный документ коллекции messages.
Изменения в модуле pubsub ограничиваются появлением функции обратного вызова в списке аргументов функции, выполняемой в ответ на событие подключения к чату нового клиента:
- pubsub.js:
var events = require('events');
var pubsub = new events.EventEmitter();
pubsub.clients = {};
pubsub.subscriptions = {};
pubsub.on('join', function(socket, cb) {
socket['_id'] = socket.remoteAddress + ':' + socket.remotePort;
this.clients[socket._id] = socket;
this.subscriptions[socket._id] = function(socket_id, data) {
data = new Date().toLocaleTimeString() + ' ' + socket_id + ' >>> ' + data;
if (socket._id === socket_id) data = '\033[1A' + data;
this.clients[socket._id].write(data);
}
this.on('broadcast', this.subscriptions[socket._id]);
console.log('--- socket saved ---\nusers online: %d', this.listeners('broadcast').length);
socket.write('Welcome to Chamber of Secrets!');
cb();
});
pubsub.on('leave', function(socket) {
delete pubsub.clients[socket._id];
this.removeListener('broadcast', this.subscriptions[socket._id]);
socket.destroy();
console.log('--- socket destroyed ---\nusers online: %d', this.listeners('broadcast').length);
});
pubsub.on('error', function(e) {
console.log('--- pubsub error ---\n%s', e.message);
});
module.exports = pubsub;
Запускаем сервер, подключаем первого клиента, пишем в чат несколько сообщений:
По понятным причинам первые три сообщения, которые получил клиент при подключении выглядят странно, так как три последних сообщения на тот момент - это тестовые документы коллекции messages, созданные в процессе тестирования приложения app.js в самом начале повествования и выглядят они следующим образом:
Подключаем второго клиента, пишем сообщение в чат:
Здесь уже все в елочку. На сервере в этот момент тоже все по фэн-шую:
Таким образом мы получили реализацию pub/sub системы используя "tailable" (не знаю как правильно перевести на русский язык это слово, да и не уверен что имеет смысл это делать) курсор.
Теперь можно масштабировать приложение "в ширину" - разворачивать несколько серверов, хотя вряд ли это имеет смысл в случае нашего чата :).









Комментариев нет:
Отправить комментарий
Комментарий будет опубликован после модерации