Skip to content
Snippets Groups Projects
Commit a7077e52 authored by Deucе's avatar Deucе :ok_hand_tone4:
Browse files

More fixes... it seems broadly usefulish now

- Increment bytes required for every extra VBI byte we learn about
- If sock is null, don't look at sock.connection
- Set sock to null after close
- Don't read from sock unless we need some bytes
- If we already have some bytes, parse them

There's a couple hacks in here that should be refactored though.
parent dfd5de5e
Branches
Tags
No related merge requests found
......@@ -915,7 +915,8 @@ MQTT.Connection.timeToDie = function() {
MQTT.Connection.prototype.tearDown = function() {
var sock = this.sock;
this.sock.connection = null;
if (sock !== null)
this.sock.connection = null;
if (this.rx_listener !== null)
removeEventListener(this.rx_listener);
if (this.rx_once != null) {
......@@ -928,6 +929,7 @@ MQTT.Connection.prototype.tearDown = function() {
}
this.sock = null;
sock.close();
this.sock = null;
if (this.broker.connected[this.client_id] !== undefined)
delete this.broker.connected[this.client_id];
if (this.will !== null) {
......@@ -1904,14 +1906,16 @@ MQTT.Packet.parseRemaining = function(conn) {
MQTT.Packet.newBytes = function() {
try {
var data = this.recv(16384);
this.connection.rx_once = null;
if (data == null) {
// Error, tear down connection...
this.connection.tearDown();
return;
if (this.connection.rx_buf.length < this.connection.rx_need) {
var data = this.recv(16384);
this.connection.rx_once = null;
if (data == null) {
// Error, tear down connection...
this.connection.tearDown();
return;
}
this.connection.rx_buf += data;
}
this.connection.rx_buf += data;
while (this.connection.rx_need && this.connection.rx_buf.length >= this.connection.rx_need) {
try {
this.connection.rx_callback(this.connection);
......@@ -1925,6 +1929,9 @@ MQTT.Packet.newBytes = function() {
if (this.connection.rx_need) {
this.connection.rx_once = this.once('read', MQTT.Packet.newBytes);
}
else if(this.connection.rx_buf.length > 0) {
js.setImmediate(MQTT.Connection.rxPacket, this);
}
else if(this.connection.rx_once === null) {
this.connection.rx_once = this.once('read', MQTT.Connection.rxPacket);
}
......@@ -1945,7 +1952,7 @@ MQTT.Packet.gotRemainingLengthByte = function(conn) {
}
}
else {
conn.rx_need = 1;
conn.rx_need++;
}
};
......@@ -1979,7 +1986,10 @@ MQTT.Packet.prototype.recv = function(sock) {
return;
sock.connection.rx_need = 1;
sock.connection.rx_callback = MQTT.Packet.gotTypeFlags;
sock.connection.rx_once = sock.once('read', MQTT.Packet.newBytes);
if (sock.connection.rx_buf.length > 0)
MQTT.Packet.newBytes.call(sock);
else
sock.connection.rx_once = sock.once('read', MQTT.Packet.newBytes);
};
MQTT.Packet.prototype.serialize = function() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment