diff --git a/exec/broker.js b/exec/broker.js index b625fd5509a6e7b2c5742177da38936b6b02c283..f02c3b1fe174c119cffc609dc5019f14903593da 100644 --- a/exec/broker.js +++ b/exec/broker.js @@ -47,6 +47,10 @@ function MQTT() { // MQTT.Topic objects this.topics = {}; + + // Callbacks + this.connect_callback = null; + this.disconnect_callback = null; }; // Static data @@ -978,8 +982,11 @@ MQTT.Connection.prototype.tearDown = function() { this.sock = null; this.rx_buf = ''; } - if (this.broker.connected[this.client_id] !== undefined) + if (this.broker.connected[this.client_id] !== undefined) { delete this.broker.connected[this.client_id]; + if (this.broker.disconnect_callback !== null) + this.broker.disconnect_callback(); + } if (this.will !== null) { if (this.will.properties[24] === undefined || this.will.properties[24] === 0) { this.rx_packet = this.will; @@ -1147,6 +1154,8 @@ MQTT.Connection.prototype.handleCONNECT = function() { js.setImmediate(this.serviceTxQueue, this); this.tx_service_pending = true; } + if (this.broker.connect_callback !== null) + this.broker.connect_callback(); }; MQTT.Connection.prototype.handleSUBSCRIBE = function() { @@ -2623,10 +2632,22 @@ for (var i = 1; i <= system.last_user; i++) { MQTT.psk[usr.alias.toLowerCase()] = usr.security.password.toLowerCase(); } +function connect_callback() +{ + server.client_add(); +} + +function disconnect_callback() +{ + server.client_remove(); +} + var broker = new MQTT(); var s; try { s = server.socket; + broker.connect_callback = connect_callback; + broker.disconnect_callback = disconnect_callback; } catch(e) { s = new ListeningSocket(["0.0.0.0", "::0"], 8883, 'MQTT');