Skip to content
Snippets Groups Projects
Commit cefca3c6 authored by Rob Swindell's avatar Rob Swindell :speech_balloon:
Browse files

Track broker-connected status via Mosquitto connect/disconnect callbacks

I think this might fix issue #781. I suspect that SBBS (the MQTT client) is
being disconnected by the server ("due to protocol error") *after* the
call to mosquitto_connect_bind() is successful. We don't have any correponding
log output for this case, but at least we can track the connection status
accurately using the Mosquitto client callbacks and not try to publish when
we're not connected.
parent 9b6138f2
No related branches found
No related tags found
No related merge requests found
Pipeline #6592 passed
......@@ -484,6 +484,7 @@ static void mqtt_connect_callback(struct mosquitto* mosq, void* cbdata, int rc)
char str[128];
if (rc == MQTT_SUCCESS) {
mqtt->connected = true;
if(mqtt->startup->type == SERVER_TERM) {
bbs_startup_t* bbs_startup = (bbs_startup_t*)mqtt->startup;
for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; ++i) {
......@@ -501,6 +502,15 @@ static void mqtt_connect_callback(struct mosquitto* mosq, void* cbdata, int rc)
mqtt_subscribe(mqtt, TOPIC_SERVER, str, sizeof(str), "resume");
mqtt_subscribe(mqtt, TOPIC_HOST, str, sizeof(str), "resume");
}
else
mqtt->connect_error = rc;
}
static void mqtt_disconnect_callback(struct mosquitto* mosq, void* cbdata, int rc)
{
struct mqtt* mqtt = (struct mqtt*)cbdata;
mqtt->disconnect_reason = rc;
mqtt->connected = false;
}
static ulong mqtt_message_value(const struct mosquitto_message* msg, ulong deflt)
......@@ -632,13 +642,14 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
mqtt_close(mqtt);
} else {
#ifdef USE_MOSQUITTO
if(mqtt->handle != NULL)
if(mqtt->handle != NULL) {
mosquitto_connect_callback_set(mqtt->handle, mqtt_connect_callback);
mosquitto_disconnect_callback_set(mqtt->handle, mqtt_disconnect_callback);
}
#endif
lprintf(lputs, LOG_INFO, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port);
result = mqtt_connect(mqtt, /* bind_address: */NULL);
if(result == MQTT_SUCCESS) {
mqtt->connected = true;
lprintf(lputs, LOG_DEBUG, "MQTT broker-connect (%s:%d) successful", cfg->mqtt.broker_addr, cfg->mqtt.broker_port);
mqtt_pub_noval(mqtt, TOPIC_SERVER, "client");
} else {
......@@ -891,10 +902,7 @@ int mqtt_client_count(struct mqtt* mqtt)
void mqtt_shutdown(struct mqtt* mqtt)
{
if(mqtt != NULL && mqtt->cfg != NULL && mqtt->cfg->mqtt.enabled) {
if(mqtt->connected) {
if(mqtt_disconnect(mqtt) == MQTT_SUCCESS)
mqtt->connected = false;
}
mqtt_disconnect(mqtt);
mqtt_thread_stop(mqtt);
mqtt_close(mqtt);
}
......
......@@ -44,6 +44,8 @@ struct mqtt {
scfg_t* cfg;
char* host;
bool connected;
int connect_error;
int disconnect_reason;
ulong max_clients;
ulong error_count;
ulong served;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment