diff --git a/exec/broker.js b/exec/broker.js index 6083e92114baee7767cccaf4067867b322d68745..1a1770314c1ba7faed55e37d74be2e6f0eb82395 100644 --- a/exec/broker.js +++ b/exec/broker.js @@ -887,7 +887,7 @@ MQTT.Connection.prototype.send = function(pkt) { if (this.tx_buf === '') this.tx_once = this.sock.once('write', MQTT.Connection.txData); this.tx_buf += pkt.serialize(); - log(LOG_INFO, "Sending " + MQTT.typename[pkt.type] + ' packet.'); + log(LOG_INFO, "Sending " + MQTT.typename[pkt.type] + ' packet to '+this.client_id); return true; //for (var i = 0; i < data.length; i++) //log(LOG_INFO, format("Byte %u: %2$02x (%2$d)", i, ascii(data[i]))); @@ -1388,8 +1388,9 @@ MQTT.Connection.prototype.handlePacket = function() { } } this['handle'+MQTT.typename[this.rx_packet.type]](); - if (this.sock !== null) + if (this.sock !== null) { this.rx_once = this.sock.once('read', MQTT.Connection.rxPacket); + } if (this.keep_alive > 0) { this.keep_alive_timeout = js.setTimeout(MQTT.Connection.timeToDie, (this.keep_alive * 1000) * 1.5, this); js.clearTimeout(this.keep_alive_timeout); @@ -1897,7 +1898,7 @@ MQTT.Packet.gotTypeFlags = function(conn) { MQTT.Packet.parseRemaining = function(conn) { conn.rx_callback = null; conn.rx_need = 0; - log(LOG_INFO, "Got " + MQTT.typename[conn.rx_packet.type] + " packet."); + log(LOG_INFO, "Got " + MQTT.typename[conn.rx_packet.type] + " packet from "+conn.client_id); js.setImmediate(conn.handlePacket, conn); }; @@ -1921,10 +1922,12 @@ MQTT.Packet.newBytes = function() { return; } } - if (this.connection.rx_need) + if (this.connection.rx_need) { this.connection.rx_once = this.once('read', MQTT.Packet.newBytes); - else + } + else if(this.connection.rx_once === null) { this.connection.rx_once = this.once('read', MQTT.Connection.rxPacket); + } } catch (e) { this.connection.error(e);