diff --git a/exec/broker.js b/exec/broker.js index b532c14ff6fd236af50c1dd4d1f595a4f9afbd14..c0e34c950f6bc2f064901a2ef41c9f4a171be754 100644 --- a/exec/broker.js +++ b/exec/broker.js @@ -864,6 +864,17 @@ MQTT.Connection.rxPacket = function() { } }; +MQTT.Connection.nextPacket = function() { + if (this.rx_buf.length > 0) { + this.rx_packet = new MQTT.Packet(); + this.rx_packet.recv(this.sock); + } + else { + if (this.rx_once === null && this.sock !== null) + this.rx_once = this.sock.once('read', MQTT.Connection.rxPacket); + } +}; + MQTT.Connection.txData = function() { this.connection.tx_once = null; var bytes = this.send(this.connection.tx_buf); @@ -930,6 +941,7 @@ MQTT.Connection.prototype.tearDown = function() { if (sock !== null) { sock.close(); this.sock = null; + this.rx_buf = ''; } if (this.broker.connected[this.client_id] !== undefined) delete this.broker.connected[this.client_id]; @@ -1516,6 +1528,31 @@ MQTT.Connection.prototype.getProperties = function(is_will) { return ret; }; +MQTT.Connection.prototype.parseBytes = function() { + while (this.rx_need && this.rx_buf.length >= this.rx_need) { + try { + this.rx_callback(this); + } + catch (e) { + log(LOG_WARNING, e.toSource()); + this.error(e); + return; + } + } + // If we need bytes, and we're not waiting for them, add handler + if (this.rx_need && this.rx_once === null) { + this.rx_once = this.sock.once('read', MQTT.Packet.newBytes); + } + // If we're waiting for bytes, schedule the next packet + else if(this.rx_buf.length > 0) { + js.setImmediate(MQTT.Connection.nextPacket, this); + } + // Otherwise, wait for new bytes from socket and see what happens + else if(this.rx_once === null) { + this.rx_once = this.sock.once('read', MQTT.Connection.rxPacket); + } +}; + // MQTT.Connection.SubscriptionOptions class MQTT.Connection.SubscriptionOptions = function(value) { @@ -1907,35 +1944,15 @@ MQTT.Packet.parseRemaining = function(conn) { MQTT.Packet.newBytes = function() { try { - if (this.connection.rx_buf.length < this.connection.rx_need) { - var data = this.recv(16384); - this.connection.rx_once = null; - if (data == null) { - // Error, tear down connection... - this.connection.tearDown(); - return; - } - this.connection.rx_buf += data; - } - while (this.connection.rx_need && this.connection.rx_buf.length >= this.connection.rx_need) { - try { - this.connection.rx_callback(this.connection); - } - catch (e) { - log(LOG_WARNING, e.toSource()); - this.connection.error(e); - return; - } - } - if (this.connection.rx_need) { - this.connection.rx_once = this.once('read', MQTT.Packet.newBytes); - } - else if(this.connection.rx_buf.length > 0) { - js.setImmediate(MQTT.Connection.rxPacket, this); - } - else if(this.connection.rx_once === null) { - this.connection.rx_once = this.once('read', MQTT.Connection.rxPacket); + var data = this.recv(16384); + this.connection.rx_once = null; + if (data == null) { + // Error, tear down connection... + this.connection.tearDown(); + return; } + this.connection.rx_buf += data; + this.connection.parseBytes(); } catch (e) { this.connection.error(e); @@ -1987,10 +2004,7 @@ MQTT.Packet.prototype.recv = function(sock) { return; sock.connection.rx_need = 1; sock.connection.rx_callback = MQTT.Packet.gotTypeFlags; - if (sock.connection.rx_buf.length > 0) - MQTT.Packet.newBytes.call(sock); - else - sock.connection.rx_once = sock.once('read', MQTT.Packet.newBytes); + sock.connection.parseBytes(); }; MQTT.Packet.prototype.serialize = function() {