Skip to content
Snippets Groups Projects
Commit 86e9f99a authored by Deucе's avatar Deucе :ok_hand_tone4:
Browse files

Do some of that refactor

Add a nextPacket() method that will get the next packet
Add a Connection.parseBytes() method to consume the rx_buf
Simplify Packet.newBytes() to only do socket recv()
Clear RX buffer when closing socket
parent a0b9a6db
No related branches found
No related tags found
No related merge requests found
......@@ -864,6 +864,17 @@ MQTT.Connection.rxPacket = function() {
}
};
MQTT.Connection.nextPacket = function() {
if (this.rx_buf.length > 0) {
this.rx_packet = new MQTT.Packet();
this.rx_packet.recv(this.sock);
}
else {
if (this.rx_once === null && this.sock !== null)
this.rx_once = this.sock.once('read', MQTT.Connection.rxPacket);
}
};
MQTT.Connection.txData = function() {
this.connection.tx_once = null;
var bytes = this.send(this.connection.tx_buf);
......@@ -930,6 +941,7 @@ MQTT.Connection.prototype.tearDown = function() {
if (sock !== null) {
sock.close();
this.sock = null;
this.rx_buf = '';
}
if (this.broker.connected[this.client_id] !== undefined)
delete this.broker.connected[this.client_id];
......@@ -1516,6 +1528,31 @@ MQTT.Connection.prototype.getProperties = function(is_will) {
return ret;
};
MQTT.Connection.prototype.parseBytes = function() {
while (this.rx_need && this.rx_buf.length >= this.rx_need) {
try {
this.rx_callback(this);
}
catch (e) {
log(LOG_WARNING, e.toSource());
this.error(e);
return;
}
}
// If we need bytes, and we're not waiting for them, add handler
if (this.rx_need && this.rx_once === null) {
this.rx_once = this.sock.once('read', MQTT.Packet.newBytes);
}
// If we're waiting for bytes, schedule the next packet
else if(this.rx_buf.length > 0) {
js.setImmediate(MQTT.Connection.nextPacket, this);
}
// Otherwise, wait for new bytes from socket and see what happens
else if(this.rx_once === null) {
this.rx_once = this.sock.once('read', MQTT.Connection.rxPacket);
}
};
// MQTT.Connection.SubscriptionOptions class
MQTT.Connection.SubscriptionOptions = function(value) {
......@@ -1907,35 +1944,15 @@ MQTT.Packet.parseRemaining = function(conn) {
MQTT.Packet.newBytes = function() {
try {
if (this.connection.rx_buf.length < this.connection.rx_need) {
var data = this.recv(16384);
this.connection.rx_once = null;
if (data == null) {
// Error, tear down connection...
this.connection.tearDown();
return;
}
this.connection.rx_buf += data;
}
while (this.connection.rx_need && this.connection.rx_buf.length >= this.connection.rx_need) {
try {
this.connection.rx_callback(this.connection);
}
catch (e) {
log(LOG_WARNING, e.toSource());
this.connection.error(e);
return;
}
}
if (this.connection.rx_need) {
this.connection.rx_once = this.once('read', MQTT.Packet.newBytes);
}
else if(this.connection.rx_buf.length > 0) {
js.setImmediate(MQTT.Connection.rxPacket, this);
}
else if(this.connection.rx_once === null) {
this.connection.rx_once = this.once('read', MQTT.Connection.rxPacket);
var data = this.recv(16384);
this.connection.rx_once = null;
if (data == null) {
// Error, tear down connection...
this.connection.tearDown();
return;
}
this.connection.rx_buf += data;
this.connection.parseBytes();
}
catch (e) {
this.connection.error(e);
......@@ -1987,10 +2004,7 @@ MQTT.Packet.prototype.recv = function(sock) {
return;
sock.connection.rx_need = 1;
sock.connection.rx_callback = MQTT.Packet.gotTypeFlags;
if (sock.connection.rx_buf.length > 0)
MQTT.Packet.newBytes.call(sock);
else
sock.connection.rx_once = sock.once('read', MQTT.Packet.newBytes);
sock.connection.parseBytes();
};
MQTT.Packet.prototype.serialize = function() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment