diff --git a/package.json b/package.json index 7348b52..a25c6d3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "smith.io", - "version": "0.0.38", + "version": "0.0.39", "author": "ajax.org B.V. ", "contributors": [ { diff --git a/server-plugin/plugin.js b/server-plugin/plugin.js index 60799d9..0fe954f 100644 --- a/server-plugin/plugin.js +++ b/server-plugin/plugin.js @@ -8,7 +8,7 @@ const EVENTS = require("events"); // Switch from `away` to `disconnect` after this many milliseconds. const RECONNECT_TIMEOUT = 60 * 1000; - +const THRESHOLD = 300; // Time to detect the connection is gone var engines = []; @@ -49,6 +49,9 @@ module.exports = function startup(options, imports, register) { var gee = new EVENTS.EventEmitter(); + if (options.debug) + SMITH.debug = true; + if (options.messageRoute) { var serverId = "server-id-" + Date.now(); @@ -76,7 +79,6 @@ module.exports = function startup(options, imports, register) { } engine.on("connection", function (socket) { - if (match && !match(socket.transport.request.url)) { return; } @@ -92,23 +94,45 @@ module.exports = function startup(options, imports, register) { delete timeouts[id]; } if (!connections[id]) { + buffers[id] = []; + buffers["_" + id] = []; connections[id] = { ee: new EVENTS.EventEmitter(), - transport: transport + transport: transport, + sequence: 0 }; + gee.emit("connect", { id: id, transport: connections[id].transport, on: connections[id].ee.on.bind(connections[id].ee), once: connections[id].ee.once.bind(connections[id].ee), send: function(message) { - if (timeouts[id]) { - if (!buffers[id]) { - buffers[id] = []; - } + // Sequence number used to catch duplicates + if (message.push) { + message.push(++connections[id].sequence); + if (connections[id].sequence > 30000) + connections[id].sequence = 0; + } + + var transport = connections[id].transport; + if (timeouts[id] || transport.socket.readyState != "open") { buffers[id].push(message); - } else if (connections[id]) { - connections[id].transport.send(message); + } + else if (connections[id]) { + // Clear Existing Buffer > THRESHOLD + var now = Date.now(); + var items = buffers["_" + id]; + for (var i = items.length - 1; i >= 0; i--) { + if (now - items[i][1] > THRESHOLD) + items.splice(i, 1); + } + + // Add item to buffer + items.push([message, Date.now()]); + + // Send message + transport.send(message); } } }); @@ -116,11 +140,11 @@ module.exports = function startup(options, imports, register) { connections[id].transport = transport; if (buffers[id]) { buffers[id].forEach(function(message) { - connections[id].transport.send(message); + transport.send(message); }); - delete buffers[id]; + buffers[id] = []; } - connections[id].ee.emit("back"); + connections[id].ee.emit("back", {transport: transport}); } } else if (connections[id]) { connections[id].ee.emit("message", message); @@ -138,8 +162,19 @@ module.exports = function startup(options, imports, register) { connections[id].ee.emit("disconnect", reason); delete connections[id]; } + delete buffers[id]; + delete buffers["_" + id]; id = false; }, RECONNECT_TIMEOUT); + + var now = Date.now(); + buffers["_" + id].forEach(function(iter){ + if (now - iter[1] < THRESHOLD) { + buffers[id].push(iter[0]); + } + }); + buffers["_" + id] = []; + connections[id].ee.emit("away"); }); diff --git a/server-plugin/www/client.js b/server-plugin/www/client.js index 8b4ddb7..b76e831 100644 --- a/server-plugin/www/client.js +++ b/server-plugin/www/client.js @@ -5,13 +5,14 @@ define(function(require, exports, module) { var ENGINE_IO = eio; // NOTE: `eio` is a global! See `npm info engine.io-client`. var SMITH = require("smith"); var EVENTS = require("smith/events-amd"); - + var transports = []; var debugHandler = null; var connectCounter = 0; + var THRESHOLD = 300; function inherits(Child, Parent) { - Child.prototype = Object.create(Parent.prototype, { constructor: { value: Child }}); + Child.prototype = Object.create(Parent.prototype, { constructor: { value: Child }}); } function getLogTimestamp() { @@ -35,6 +36,9 @@ define(function(require, exports, module) { this.away = false; this.buffer = false; this.connectIndex = -1; + this.sequence = 0; + this.buffer = []; + this.sbuffer = []; } inherits(Transport, EVENTS.EventEmitter); @@ -86,8 +90,8 @@ define(function(require, exports, module) { if (options.reconnectAttempt === 6) { _self.away = false; - _self.connected = false; - try { + _self.connected = false; + try { _self.emit("disconnect", "away re-connect attempts exceeded"); } catch(err) { console.error(err.stack); @@ -142,7 +146,24 @@ define(function(require, exports, module) { }, delay); } + var writeBuffer = _self.socket && _self.socket.writeBuffer; _self.socket = new ENGINE_IO.Socket(_self.options); + if (writeBuffer) { + var onback, ondisconnect; + _self.on("back", onback = function(){ + writeBuffer.forEach(function(packet){ + _self.socket.sendPacket(packet.type, packet.data); + }); + + _self.removeListener("back", onback); + _self.removeListener("disconnect", ondisconnect); + }); + _self.on("disconnect", ondisconnect = function(){ + writeBuffer = null; + _self.removeListener("back", onback); + _self.removeListener("disconnect", ondisconnect); + }); + } _self.socket.on("error", function (err) { if (_self.debug) { @@ -183,8 +204,8 @@ define(function(require, exports, module) { return; } else if (pongPayload && pongPayload.serverId && pongPayload.serverId !== _self.serverId) { - // If `pongPayload.serverId` does not match our cached `_self.serverId` we close - // the connection and re-connect as the server instance has changed and we may need to re-init. + // If `pongPayload.serverId` does not match our cached `_self.serverId` we close + // the connection and re-connect as the server instance has changed and we may need to re-init. if (_self.debug) { log("Detected server reboot on heartbeat. Close connection."); } @@ -235,11 +256,11 @@ define(function(require, exports, module) { _self.transport = new SMITH.EngineIoTransport(_self.socket); _self.transport.on("legacy", function (message) { - if (typeof message === "object" && message.type === "__ASSIGN-ID__") { + if (typeof message === "object" && message.type === "__ASSIGN-ID__") { - if (_self.serverId !== false && _self.serverId !== message.serverId) { - // If `message.serverId` does not match our cached `_self.serverId` we issue - // a connect as the server instance has changed and we may need to re-init. + if (_self.serverId !== false && _self.serverId !== message.serverId) { + // If `message.serverId` does not match our cached `_self.serverId` we issue + // a connect as the server instance has changed and we may need to re-init. if (_self.debug) { log("Detected server reboot on handshake. Issue re-connect."); } @@ -252,17 +273,17 @@ define(function(require, exports, module) { console.error(err.stack); } } - } - _self.serverId = message.serverId; - - if (_self.id === false) { - _self.id = message.id; - } - _self.transport.send({ - type: "__ANNOUNCE-ID__", - id: _self.id - }); - if (_self.away && (Date.now()-_self.away) > 30*1000) { + } + _self.serverId = message.serverId; + + if (_self.id === false) { + _self.id = message.id; + } + _self.transport.send({ + type: "__ANNOUNCE-ID__", + id: _self.id + }); + if (_self.away && (Date.now()-_self.away) > 30*1000) { if (_self.debug) { log("Long away (hibernate) detected. Issue re-connect."); } @@ -275,37 +296,38 @@ define(function(require, exports, module) { console.error(err.stack); } } - } - _self.away = false; - _self.connected = true; - if (options.fireConnect !== false) { - try { + } + _self.away = false; + _self.connected = true; + if (options.fireConnect !== false) { + try { _self.emit("connect", _self); } catch(err) { console.error(err.stack); } } else if (options.reconnectAttempt > 0) { + if (_self.buffer) { + _self.buffer.forEach(function(message) { + _self.transport.send(message); + }); + _self.buffer = []; + } + try { _self.emit("back"); } catch(err) { console.error(err.stack); - } + } } options.reconnectAttempt = 0; - if (_self.buffer) { - _self.buffer.forEach(function(message) { - _self.transport.send(message); - }); - _self.buffer = false; - } - } else { - try { + } else { + try { _self.emit("message", message); } catch(err) { console.error(err.stack); } - } + } }); _self.transport.on("disconnect", ondisconnect); @@ -325,6 +347,15 @@ define(function(require, exports, module) { if (_self.connected) { _self.away = Date.now(); + + var now = Date.now(); + _self.sbuffer.forEach(function(iter){ + if (now - iter[1] < THRESHOLD) { + _self.buffer.push(iter[0]); + } + }); + _self.sbuffer = []; + try { _self.emit("away"); } catch(err) { @@ -348,13 +379,31 @@ define(function(require, exports, module) { console.log(err.stack); throw err; } - else if(this.away) { - if (!this.buffer) { - this.buffer = []; - } + + if (message.push) { + message.push(++this.sequence); + if (this.sequence > 30000) + this.sequence = 0; + } + + if (this.away) { this.buffer.push(message); } - this.transport.send(message); + else { + // Clear Existing Buffer > THRESHOLD + var now = Date.now(); + var items = this.sbuffer; + for (var i = items.length - 1; i >= 0; i--) { + if (now - items[i][1] > THRESHOLD) + items.splice(i, 1); + } + + // Add item to buffer + items.push([message, Date.now()]); + + // Send message + this.transport.send(message); + } } exports.connect = function(options, callback) {