Для начала предлагаю создать эту самую "ограниченную" коллекцию, я назвал ее "messages" так как планирую хранить в ней сообщения простенького чата, который должен получиться в итоге:
Теперь пишем код приложения, которое будет выводить в консоль свежие документы по мере их поступления в коллекцию, а заодно и добавлять документы с целью тестирования работы "tailable" курсора:
var Db = require('mongodb').Db; var Server = require('mongodb').Server; var config = {db: 'test', host: 'localhost', port: 27017, collection: 'messages'}; var db = new Db(config.db, new Server(config.host, config.port, { auto_reconnect: true }), {w: 'majority', safe: true}); db.open(function(err) { if (err) throw err; console.log('MongoDB connected to db %s on %s:%d', config.db, config.host, config.port); db.collection(config.collection, function(err, collection) { if (err) throw err; collection.isCapped(function(err, capped) { if (err) throw err; if (!capped) throw new Error('Collection ' + config.collection + ' is not capped'); var latest = collection.find({}).sort({ $natural: -1 }).limit(1); latest.nextObject(function(err, item) { if (err) throw err; if (item) return gogogo(collection, item._id); console.log('Inserting first document...'); collection.insert({a: 0}, function(err, items) { if (err) throw err; gogogo(collection, items[0]._id); }); }); }); }); }); function gogogo(collection, oid) { console.log('Start tailing on %s collection...', config.collection); var query = { _id: { $gt: oid }}; var options = { tailable: true, awaitdata: true, numberOfRetries: -1, tailableRetryInterval: 200 }; var cursor = collection.find(query, options).sort({ $natural: 1 }); (function next() { cursor.nextObject(function(err, item) { if (err) throw err; console.log(item); next(); }); })(); // тест var i = 1; (function test() { setTimeout(function() { collection.insert({a: i}, function(err) { if (err) throw err; i += 1; test(); }); }, 5000); })(); }
Запускаем приложение, через 15 секунд наблюдаем следующую картину:
Для того, чтобы курсор "зацепился" за коллекцию необходимо чтобы самый первый, я бы сказал "initial" запрос - latest.nextObject... - вернул документ, что предусмотрено в коде и собственно почему у нас в консоли появилось сообщение Inserting first document..., после чего мы вставили в коллекцию документ {a: 0}.
Итак, код отработал как надо, курсор отловил свежие документы информация о которых благополучно отобразилась в консоли.
Переходим к созданию простенького чата. За основу я взял код чата на TCP-сокетах - проще некуда:
- server.js:
- client.js:
- pubsub.js:
Запускаем сервер, пару клиентов, клиенты приветствуют друг друга, после чего консоль сервера выглядит как-то так:
Клиенты в свою очередь могут созерцать сообщения чата:
На текущий момент сообщения вообще никак не хранятся.
Создадим модуль по имени db на основе нашего приложения app.js:
- db.js:
Теперь вместо вывода в консоль свежего документа код эмитит событие broadcast модуля pubsub. Кроме того созданный модуль экспортирует пару методов для создания и извлечения документов коллекции messages.
Редактируем код сервера:
- server.js:
Отличия от первоначального варианта:
- сразу после присоединения к чату - pubsub.emit('join'...- мы получаем последние три сообщения и отдаем их новому клиенту - db.find(...
- вместо того, чтобы эмитить событие 'broadcast' - pubsub.emit('broadcast'..., теперь мы просто добавляем новое сообщение - db.insert(..., а событие эмитится уже в модуле db - в функции с многозначительным названием gogogo :) - в тот самый момент, когда "tailable" курсор получает свежесозданный документ коллекции messages.
Изменения в модуле pubsub ограничиваются появлением функции обратного вызова в списке аргументов функции, выполняемой в ответ на событие подключения к чату нового клиента:
- pubsub.js:
Запускаем сервер, подключаем первого клиента, пишем в чат несколько сообщений:
По понятным причинам первые три сообщения, которые получил клиент при подключении выглядят странно, так как три последних сообщения на тот момент - это тестовые документы коллекции messages, созданные в процессе тестирования приложения app.js в самом начале повествования и выглядят они следующим образом:
Подключаем второго клиента, пишем сообщение в чат:
Здесь уже все в елочку. На сервере в этот момент тоже все по фэн-шую:
Таким образом мы получили реализацию pub/sub системы используя "tailable" (не знаю как правильно перевести на русский язык это слово, да и не уверен что имеет смысл это делать) курсор.
Теперь можно масштабировать приложение "в ширину" - разворачивать несколько серверов, хотя вряд ли это имеет смысл в случае нашего чата :).
Для того, чтобы курсор "зацепился" за коллекцию необходимо чтобы самый первый, я бы сказал "initial" запрос - latest.nextObject... - вернул документ, что предусмотрено в коде и собственно почему у нас в консоли появилось сообщение Inserting first document..., после чего мы вставили в коллекцию документ {a: 0}.
Итак, код отработал как надо, курсор отловил свежие документы информация о которых благополучно отобразилась в консоли.
Переходим к созданию простенького чата. За основу я взял код чата на TCP-сокетах - проще некуда:
- server.js:
var net = require('net'); var pubsub = require('./pubsub'); var server = net.createServer(function(socket) { socket.setEncoding('utf8'); console.log('--- socket connected ---\nfrom: %s', socket.remoteAddress + ':' + socket.remotePort); pubsub.emit('join', socket); socket.on('data', function(data) { data = data.replace(/^\s+|\s+$|\r\n/g, ''); if (!data) return this.write('\033[1AEmpty message!'); console.log('--- socket data ---\n%s', data); pubsub.emit('broadcast', this._id, data); }); socket.on('close', function() { console.log('--- socket closed ---'); pubsub.emit('leave', this); }); socket.on('end', function() { console.log('--- socket end ---'); pubsub.emit('leave', this); }); socket.on('error', function(e) { console.log('--- server error ---\ncode: %s', e.code); }); }); server.listen(8124, function() { console.log('Chamber of Secrets is opened on port %d...', this.address()['port']); });
- client.js:
var net = require('net'); var socket = new net.Socket(); socket.setEncoding('utf8'); socket.connect('8124', 'localhost', function() { console.log('--- connected to server ---'); }); process.stdin.resume(); process.stdin.on('data', function(data) { socket.write(data); }); socket.on('data', function(data) { console.log(data); }); socket.on('close', function() { console.log('--- connection closed ---'); process.exit(); }); socket.on('end', function() { console.log('--- connection end ---'); }); socket.on('error', function(e) { console.log('--- socket error ---\ncode: %s', e.code); });
- pubsub.js:
var events = require('events'); var pubsub = new events.EventEmitter(); pubsub.clients = {}; pubsub.subscriptions = {}; pubsub.on('join', function(socket) { socket['_id'] = socket.remoteAddress + ':' + socket.remotePort; this.clients[socket._id] = socket; this.subscriptions[socket._id] = function(socket_id, data) { data = new Date().toLocaleTimeString() + ' ' + socket_id + ' >>> ' + data; if (socket._id === socket_id) data = '\033[1A' + data; this.clients[socket._id].write(data); } this.on('broadcast', this.subscriptions[socket._id]); console.log('--- socket saved ---\nusers online: %d', this.listeners('broadcast').length); socket.write('Welcome to Chamber of Secrets!'); }); pubsub.on('leave', function(socket) { delete pubsub.clients[socket._id]; this.removeListener('broadcast', this.subscriptions[socket._id]); socket.destroy(); console.log('--- socket destroyed ---\nusers online: %d', this.listeners('broadcast').length); }); pubsub.on('error', function(e) { console.log('--- pubsub error ---\n%s', e.message); }); module.exports = pubsub;
Запускаем сервер, пару клиентов, клиенты приветствуют друг друга, после чего консоль сервера выглядит как-то так:
Клиенты в свою очередь могут созерцать сообщения чата:
На текущий момент сообщения вообще никак не хранятся.
Создадим модуль по имени db на основе нашего приложения app.js:
- db.js:
var pubsub = require('./pubsub'); var Db = require('mongodb').Db; var Server = require('mongodb').Server; var config = {db: 'test', host: 'localhost', port: 27017, collection: 'messages'}; var db = new Db(config.db, new Server(config.host, config.port, { auto_reconnect: true }), {w: 'majority', safe: true}); db.open(function(err) { if (err) throw err; console.log('MongoDB connected to db %s on %s:%d', config.db, config.host, config.port); db.collection(config.collection, function(err, collection) { if (err) throw err; collection.isCapped(function(err, capped) { if (err) throw err; if (!capped) throw new Error('Collection ' + config.collection + ' is not capped'); var latest = collection.find({}).sort({ $natural: -1 }).limit(1); latest.nextObject(function(err, item) { if (err) throw err; if (item) return gogogo(collection, item._id); console.log('Inserting first document...'); collection.insert({a: 0}, function(err, items) { if (err) throw err; gogogo(collection, items[0]._id); }); }); }); }); }); function gogogo(collection, oid) { console.log('Start tailing on %s collection...', config.collection); var query = { _id: { $gt: oid }}; var options = { tailable: true, awaitdata: true, numberOfRetries: -1, tailableRetryInterval: 200 }; var cursor = collection.find(query, options).sort({ $natural: 1 }); (function next() { cursor.nextObject(function(err, item) { if (err) throw err; pubsub.emit('broadcast', item.from, item.message); next(); }); })(); } module.exports.insert = function(client_id, message, cb) { db.collection(config.collection).insert({message: message, from: client_id}, cb); }; module.exports.find = function(limit, cb) { db.collection(config.collection).find({}).sort({ $natural: -1 }).limit(limit).toArray(cb); };
Теперь вместо вывода в консоль свежего документа код эмитит событие broadcast модуля pubsub. Кроме того созданный модуль экспортирует пару методов для создания и извлечения документов коллекции messages.
Редактируем код сервера:
- server.js:
var net = require('net'); var pubsub = require('./pubsub'); var db = require('./db'); var server = net.createServer(function(socket) { socket.setEncoding('utf8'); console.log('--- socket connected ---\nfrom: %s', socket.remoteAddress + ':' + socket.remotePort); pubsub.emit('join', socket, function() { db.find(3, function(err, items) { if (err) throw err; var str = '\n' + items.map(function(item) { return item._id.getTimestamp().toLocaleTimeString() + ' ' + item.from + ' >>> ' + item.message; }).reverse().join('\n'); socket.write(str); }); }); socket.on('data', function(data) { data = data.replace(/^\s+|\s+$|\r\n/g, ''); if (!data) return this.write('\033[1AEmpty message!'); console.log('--- socket data ---\n%s', data); db.insert(this._id, data, function(err) { if (err) throw (err); }); }); socket.on('close', function() { console.log('--- socket closed ---'); pubsub.emit('leave', this); }); socket.on('end', function() { console.log('--- socket end ---'); pubsub.emit('leave', this); }); socket.on('error', function(e) { console.log('--- server error ---\ncode: %s', e.code); }); }); server.listen(8124, function() { console.log('Chamber of Secrets is opened on port %d...', this.address()['port']); });
Отличия от первоначального варианта:
- сразу после присоединения к чату - pubsub.emit('join'...- мы получаем последние три сообщения и отдаем их новому клиенту - db.find(...
- вместо того, чтобы эмитить событие 'broadcast' - pubsub.emit('broadcast'..., теперь мы просто добавляем новое сообщение - db.insert(..., а событие эмитится уже в модуле db - в функции с многозначительным названием gogogo :) - в тот самый момент, когда "tailable" курсор получает свежесозданный документ коллекции messages.
Изменения в модуле pubsub ограничиваются появлением функции обратного вызова в списке аргументов функции, выполняемой в ответ на событие подключения к чату нового клиента:
- pubsub.js:
var events = require('events'); var pubsub = new events.EventEmitter(); pubsub.clients = {}; pubsub.subscriptions = {}; pubsub.on('join', function(socket, cb) { socket['_id'] = socket.remoteAddress + ':' + socket.remotePort; this.clients[socket._id] = socket; this.subscriptions[socket._id] = function(socket_id, data) { data = new Date().toLocaleTimeString() + ' ' + socket_id + ' >>> ' + data; if (socket._id === socket_id) data = '\033[1A' + data; this.clients[socket._id].write(data); } this.on('broadcast', this.subscriptions[socket._id]); console.log('--- socket saved ---\nusers online: %d', this.listeners('broadcast').length); socket.write('Welcome to Chamber of Secrets!'); cb(); }); pubsub.on('leave', function(socket) { delete pubsub.clients[socket._id]; this.removeListener('broadcast', this.subscriptions[socket._id]); socket.destroy(); console.log('--- socket destroyed ---\nusers online: %d', this.listeners('broadcast').length); }); pubsub.on('error', function(e) { console.log('--- pubsub error ---\n%s', e.message); }); module.exports = pubsub;
Запускаем сервер, подключаем первого клиента, пишем в чат несколько сообщений:
По понятным причинам первые три сообщения, которые получил клиент при подключении выглядят странно, так как три последних сообщения на тот момент - это тестовые документы коллекции messages, созданные в процессе тестирования приложения app.js в самом начале повествования и выглядят они следующим образом:
Подключаем второго клиента, пишем сообщение в чат:
Здесь уже все в елочку. На сервере в этот момент тоже все по фэн-шую:
Таким образом мы получили реализацию pub/sub системы используя "tailable" (не знаю как правильно перевести на русский язык это слово, да и не уверен что имеет смысл это делать) курсор.
Теперь можно масштабировать приложение "в ширину" - разворачивать несколько серверов, хотя вряд ли это имеет смысл в случае нашего чата :).
Комментариев нет:
Отправить комментарий
Комментарий будет опубликован после модерации