diff --git a/exec/broker.js b/exec/broker.js index 1a1770314c1ba7faed55e37d74be2e6f0eb82395..96059c0cc4bc50c25e760ff5799e9b1783cf418a 100644 --- a/exec/broker.js +++ b/exec/broker.js @@ -915,7 +915,8 @@ MQTT.Connection.timeToDie = function() { MQTT.Connection.prototype.tearDown = function() { var sock = this.sock; - this.sock.connection = null; + if (sock !== null) + this.sock.connection = null; if (this.rx_listener !== null) removeEventListener(this.rx_listener); if (this.rx_once != null) { @@ -928,6 +929,7 @@ MQTT.Connection.prototype.tearDown = function() { } this.sock = null; sock.close(); + this.sock = null; if (this.broker.connected[this.client_id] !== undefined) delete this.broker.connected[this.client_id]; if (this.will !== null) { @@ -1904,14 +1906,16 @@ MQTT.Packet.parseRemaining = function(conn) { MQTT.Packet.newBytes = function() { try { - var data = this.recv(16384); - this.connection.rx_once = null; - if (data == null) { - // Error, tear down connection... - this.connection.tearDown(); - return; + if (this.connection.rx_buf.length < this.connection.rx_need) { + var data = this.recv(16384); + this.connection.rx_once = null; + if (data == null) { + // Error, tear down connection... + this.connection.tearDown(); + return; + } + this.connection.rx_buf += data; } - this.connection.rx_buf += data; while (this.connection.rx_need && this.connection.rx_buf.length >= this.connection.rx_need) { try { this.connection.rx_callback(this.connection); @@ -1925,6 +1929,9 @@ MQTT.Packet.newBytes = function() { if (this.connection.rx_need) { this.connection.rx_once = this.once('read', MQTT.Packet.newBytes); } + else if(this.connection.rx_buf.length > 0) { + js.setImmediate(MQTT.Connection.rxPacket, this); + } else if(this.connection.rx_once === null) { this.connection.rx_once = this.once('read', MQTT.Connection.rxPacket); } @@ -1945,7 +1952,7 @@ MQTT.Packet.gotRemainingLengthByte = function(conn) { } } else { - conn.rx_need = 1; + conn.rx_need++; } }; @@ -1979,7 +1986,10 @@ MQTT.Packet.prototype.recv = function(sock) { return; sock.connection.rx_need = 1; sock.connection.rx_callback = MQTT.Packet.gotTypeFlags; - sock.connection.rx_once = sock.once('read', MQTT.Packet.newBytes); + if (sock.connection.rx_buf.length > 0) + MQTT.Packet.newBytes.call(sock); + else + sock.connection.rx_once = sock.once('read', MQTT.Packet.newBytes); }; MQTT.Packet.prototype.serialize = function() {