Skip to content
Snippets Groups Projects
Commit c5aaba6c authored by Deucе's avatar Deucе :ok_hand_tone4:
Browse files

Add some documentation about how the script works.

As always when writing documentation, fix things that aren't quite
right... we now schedule nextPacket, but run each packet to
completion as long as we have all the bytes.
parent d1863a4e
No related branches found
No related tags found
No related merge requests found
Pipeline #8117 passed
"use strict";
require("userdefs.js", "USER_DELETED");
/*
* How this works:
* Data input comes in via the socket using the MQTT.Connection.rxPacket
* or MQTT.Packet.newBytes callbacks these are one-shot events traced
* in connection.rx_once. They both set rx_once to null on entry.
*
* MQTT.Connection.rxPacket is for the start of a new packet, and
* MQTT.Packet.newBytes is for when a partial packet is in the buffer.
*
* The MQTT.Connection.rxPacket is only installed at the start of the
* connection and by MQTT.Connection.nextPacket
*
* Both rxPacket and nextPacket create a new Packet object and call
* recv() on it. The main difference is the this object. rxPacket this
* is the socket, and nextPacket this is the Connection.
*
* Once Packet.recv() is called, it sets Connection.rx_need and
* Connection.rx_callback. Once there's rx_need bytes in the buffer,
* rx_callback is called. This is performed by Connection.parseBytes
* As long as there's rx_need bytes left in the buffer, rx_callback
* is called in a loop. Once a full packet is received, rx_need gets
* set to zero, and parseBytes schedules nextPacket immediately.
*
* The flow here is:
* rx_need = 1, rx_callback = gotTypeFlags
* This subclasses the Packet
* rx_need = 1, rx_callback = gotRemainingLengthByte
* This will increment rx_need until the whole
* VBI is received, then set rx_need to the
* remaining size and rx_callback = parseRemaining
*
* parseRemaining will set rx_need = 0 and rx_callback = null then calls
* handlePacket, which will call handleTYPE where TYPE is the upper-case
* control message type. It then schedules nextPacket immediately.
*/
// MQTT class
function MQTT() {
......@@ -1387,14 +1423,12 @@ MQTT.Connection.prototype.handlePacket = function() {
}
}
this['handle'+MQTT.typename[this.rx_packet.type]]();
if (this.sock !== null) {
this.rx_once = this.sock.once('read', MQTT.Connection.rxPacket);
}
if (this.keep_alive > 0) {
this.keep_alive_timeout = js.setTimeout(MQTT.Connection.timeToDie, (this.keep_alive * 1000) * 1.5, this);
js.clearTimeout(this.keep_alive_timeout);
this.keep_alive_timeout = null;
}
js.setImmediate(MQTT.Connection.nextPacket, this);
}
catch (e) {
log(LOG_WARNING, e.toSource());
......@@ -1513,6 +1547,7 @@ MQTT.Connection.prototype.getProperties = function(is_will) {
};
MQTT.Connection.prototype.parseBytes = function() {
// As long as we have enough bytes, keep processing...
while (this.rx_need && this.rx_buf.length >= this.rx_need) {
try {
this.rx_callback(this);
......@@ -1524,17 +1559,14 @@ MQTT.Connection.prototype.parseBytes = function() {
}
}
// If we need bytes, and we're not waiting for them, add handler
if (this.rx_need && this.rx_once === null) {
this.rx_once = this.sock.once('read', MQTT.Packet.newBytes);
if (this.rx_need) {
if (this.rx_once === null)
this.rx_once = this.sock.once('read', MQTT.Packet.newBytes);
}
// If we're waiting for bytes, schedule the next packet
else if(this.rx_buf.length > 0) {
// Otherwise, we're done with this packet and need to start another
else {
js.setImmediate(MQTT.Connection.nextPacket, this);
}
// Otherwise, wait for new bytes from socket and see what happens
else if(this.rx_once === null) {
this.rx_once = this.sock.once('read', MQTT.Connection.rxPacket);
}
};
// MQTT.Connection.SubscriptionOptions class
......@@ -1923,13 +1955,13 @@ MQTT.Packet.parseRemaining = function(conn) {
conn.rx_callback = null;
conn.rx_need = 0;
//log(LOG_INFO, "Got " + MQTT.typename[conn.rx_packet.type] + " packet from "+conn.client_id);
js.setImmediate(conn.handlePacket, conn);
conn.handlePacket();
};
MQTT.Packet.newBytes = function() {
this.connection.rx_once = null;
try {
var data = this.recv(16384);
this.connection.rx_once = null;
if (data == null) {
// Error, tear down connection...
this.connection.tearDown();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment