Skip to content
Snippets Groups Projects
Commit a7077e52 authored by Deucе's avatar Deucе :ok_hand_tone4:
Browse files

More fixes... it seems broadly usefulish now

- Increment bytes required for every extra VBI byte we learn about
- If sock is null, don't look at sock.connection
- Set sock to null after close
- Don't read from sock unless we need some bytes
- If we already have some bytes, parse them

There's a couple hacks in here that should be refactored though.
parent dfd5de5e
Branches
Tags
No related merge requests found
...@@ -915,6 +915,7 @@ MQTT.Connection.timeToDie = function() { ...@@ -915,6 +915,7 @@ MQTT.Connection.timeToDie = function() {
MQTT.Connection.prototype.tearDown = function() { MQTT.Connection.prototype.tearDown = function() {
var sock = this.sock; var sock = this.sock;
if (sock !== null)
this.sock.connection = null; this.sock.connection = null;
if (this.rx_listener !== null) if (this.rx_listener !== null)
removeEventListener(this.rx_listener); removeEventListener(this.rx_listener);
...@@ -928,6 +929,7 @@ MQTT.Connection.prototype.tearDown = function() { ...@@ -928,6 +929,7 @@ MQTT.Connection.prototype.tearDown = function() {
} }
this.sock = null; this.sock = null;
sock.close(); sock.close();
this.sock = null;
if (this.broker.connected[this.client_id] !== undefined) if (this.broker.connected[this.client_id] !== undefined)
delete this.broker.connected[this.client_id]; delete this.broker.connected[this.client_id];
if (this.will !== null) { if (this.will !== null) {
...@@ -1904,6 +1906,7 @@ MQTT.Packet.parseRemaining = function(conn) { ...@@ -1904,6 +1906,7 @@ MQTT.Packet.parseRemaining = function(conn) {
MQTT.Packet.newBytes = function() { MQTT.Packet.newBytes = function() {
try { try {
if (this.connection.rx_buf.length < this.connection.rx_need) {
var data = this.recv(16384); var data = this.recv(16384);
this.connection.rx_once = null; this.connection.rx_once = null;
if (data == null) { if (data == null) {
...@@ -1912,6 +1915,7 @@ MQTT.Packet.newBytes = function() { ...@@ -1912,6 +1915,7 @@ MQTT.Packet.newBytes = function() {
return; return;
} }
this.connection.rx_buf += data; this.connection.rx_buf += data;
}
while (this.connection.rx_need && this.connection.rx_buf.length >= this.connection.rx_need) { while (this.connection.rx_need && this.connection.rx_buf.length >= this.connection.rx_need) {
try { try {
this.connection.rx_callback(this.connection); this.connection.rx_callback(this.connection);
...@@ -1925,6 +1929,9 @@ MQTT.Packet.newBytes = function() { ...@@ -1925,6 +1929,9 @@ MQTT.Packet.newBytes = function() {
if (this.connection.rx_need) { if (this.connection.rx_need) {
this.connection.rx_once = this.once('read', MQTT.Packet.newBytes); this.connection.rx_once = this.once('read', MQTT.Packet.newBytes);
} }
else if(this.connection.rx_buf.length > 0) {
js.setImmediate(MQTT.Connection.rxPacket, this);
}
else if(this.connection.rx_once === null) { else if(this.connection.rx_once === null) {
this.connection.rx_once = this.once('read', MQTT.Connection.rxPacket); this.connection.rx_once = this.once('read', MQTT.Connection.rxPacket);
} }
...@@ -1945,7 +1952,7 @@ MQTT.Packet.gotRemainingLengthByte = function(conn) { ...@@ -1945,7 +1952,7 @@ MQTT.Packet.gotRemainingLengthByte = function(conn) {
} }
} }
else { else {
conn.rx_need = 1; conn.rx_need++;
} }
}; };
...@@ -1979,6 +1986,9 @@ MQTT.Packet.prototype.recv = function(sock) { ...@@ -1979,6 +1986,9 @@ MQTT.Packet.prototype.recv = function(sock) {
return; return;
sock.connection.rx_need = 1; sock.connection.rx_need = 1;
sock.connection.rx_callback = MQTT.Packet.gotTypeFlags; sock.connection.rx_callback = MQTT.Packet.gotTypeFlags;
if (sock.connection.rx_buf.length > 0)
MQTT.Packet.newBytes.call(sock);
else
sock.connection.rx_once = sock.once('read', MQTT.Packet.newBytes); sock.connection.rx_once = sock.once('read', MQTT.Packet.newBytes);
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment