From 9b6138f2b579e90cf7e9dd97bd4c81d942f97065 Mon Sep 17 00:00:00 2001 From: "Rob Swindell (on Windows 11)" <rob@synchro.net> Date: Sat, 7 Sep 2024 14:42:55 -0700 Subject: [PATCH] Don't attempt to publish MQTT messages unless/until connected to broker For cases where an mqtt struct is shared between threads without concurrency control. I'm making this improvement in light of research into issue #781, though I don't expect this change to fix the reported issue. The reported error seems to come from the event thread (publishing node status upon starting to run the "DAILY" event) when a broker connection was not successful, however the reporter (Nelgin) may not have had debug-level logging turned on, so didn't capture the successful broker-connect log message. I think the broker connection *was* successful and perhaps then terminated by the broker ("due to protocol error"?). --- src/sbbs3/con_out.cpp | 2 +- src/sbbs3/main.cpp | 2 +- src/sbbs3/mqtt.c | 31 ++++++++++++++++++------------- src/sbbs3/mqtt.h | 1 + src/sbbs3/putnode.cpp | 2 +- 5 files changed, 22 insertions(+), 16 deletions(-) diff --git a/src/sbbs3/con_out.cpp b/src/sbbs3/con_out.cpp index 9410d62215..32e8d19d1c 100644 --- a/src/sbbs3/con_out.cpp +++ b/src/sbbs3/con_out.cpp @@ -606,7 +606,7 @@ bool sbbs_t::update_nodeterm(void) } strListFree(&ini); - if(cfg.mqtt.enabled) { + if(mqtt->connected) { char str[256]; char topic[128]; SAFEPRINTF(topic, "node/%u/terminal", cfg.node_num); diff --git a/src/sbbs3/main.cpp b/src/sbbs3/main.cpp index e8c5f2fda1..6cb7daac40 100644 --- a/src/sbbs3/main.cpp +++ b/src/sbbs3/main.cpp @@ -2684,7 +2684,7 @@ void output_thread(void* arg) ,node, result, i); } /* Spy on the user remotely */ - if(sbbs->cfg.mqtt.enabled && mqtt.handle != NULL) { + if(mqtt.connected) { int result = mqtt_pub_message(&mqtt, TOPIC_BBS, spy_topic, buf+bufbot, i, /* retain: */false); if(result != MQTT_SUCCESS) lprintf(LOG_WARNING, "%s ERROR %d (%d) publishing node output (%u bytes): %s" diff --git a/src/sbbs3/mqtt.c b/src/sbbs3/mqtt.c index 664f0675be..cc4f74802d 100644 --- a/src/sbbs3/mqtt.c +++ b/src/sbbs3/mqtt.c @@ -77,6 +77,7 @@ int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup) if(mqtt == NULL || cfg == NULL || startup == NULL) return MQTT_FAILURE; + mqtt->connected = false; if(!cfg->mqtt.enabled) return MQTT_SUCCESS; mqtt->handle = NULL; @@ -155,7 +156,7 @@ static int mqtt_sub(struct mqtt* mqtt, const char* topic) { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL && topic != NULL) { @@ -184,7 +185,7 @@ int mqtt_lputs(struct mqtt* mqtt, enum topic_depth depth, int level, const char* { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; if(level > mqtt->cfg->mqtt.log_level) return MQTT_SUCCESS; @@ -239,7 +240,7 @@ int mqtt_pub_strval(struct mqtt* mqtt, enum topic_depth depth, const char* key, { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL) { @@ -265,7 +266,7 @@ int mqtt_pub_timestamped_msg(struct mqtt* mqtt, enum topic_depth depth, const ch if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; time_to_isoDateTimeStr(t, xpTimeZone_local(), timestamp, sizeof(timestamp)); SAFEPRINTF2(str, "%s\t%s", timestamp, msg); @@ -276,7 +277,7 @@ int mqtt_pub_uintval(struct mqtt* mqtt, enum topic_depth depth, const char* key, { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL) { @@ -301,7 +302,7 @@ int mqtt_pub_message(struct mqtt* mqtt, enum topic_depth depth, const char* key, { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL) { @@ -627,7 +628,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const } else { result = mqtt_thread_start(mqtt); if(result != MQTT_SUCCESS) { - lprintf(lputs, LOG_ERR, "Error %d starting pub/sub thread", result); + lprintf(lputs, LOG_ERR, "MQTT error %d starting pub/sub thread", result); mqtt_close(mqtt); } else { #ifdef USE_MOSQUITTO @@ -637,6 +638,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const lprintf(lputs, LOG_INFO, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port); result = mqtt_connect(mqtt, /* bind_address: */NULL); if(result == MQTT_SUCCESS) { + mqtt->connected = true; lprintf(lputs, LOG_DEBUG, "MQTT broker-connect (%s:%d) successful", cfg->mqtt.broker_addr, cfg->mqtt.broker_port); mqtt_pub_noval(mqtt, TOPIC_SERVER, "client"); } else { @@ -746,7 +748,7 @@ int mqtt_client_on(struct mqtt* mqtt, bool on, int sock, client_t* client, bool if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; listLock(&mqtt->client_list); @@ -806,7 +808,7 @@ int mqtt_user_login_fail(struct mqtt* mqtt, client_t* client, const char* userna if(mqtt == NULL || mqtt->cfg == NULL || client == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; if(username == NULL) @@ -829,7 +831,7 @@ int mqtt_user_login(struct mqtt* mqtt, client_t* client) if(mqtt == NULL || mqtt->cfg == NULL || client == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; snprintf(topic, sizeof(topic), "login/%s", client->protocol); @@ -852,7 +854,7 @@ int mqtt_user_logout(struct mqtt* mqtt, client_t* client, time_t logintime) if(mqtt == NULL || mqtt->cfg == NULL || client == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; long tused = (long)(time(NULL) - logintime); @@ -889,7 +891,10 @@ int mqtt_client_count(struct mqtt* mqtt) void mqtt_shutdown(struct mqtt* mqtt) { if(mqtt != NULL && mqtt->cfg != NULL && mqtt->cfg->mqtt.enabled) { - mqtt_disconnect(mqtt); + if(mqtt->connected) { + if(mqtt_disconnect(mqtt) == MQTT_SUCCESS) + mqtt->connected = false; + } mqtt_thread_stop(mqtt); mqtt_close(mqtt); } @@ -903,7 +908,7 @@ static int mqtt_file_xfer(struct mqtt* mqtt, user_t* user, int dirnum, const cha if(!is_valid_dirnum(mqtt->cfg, dirnum)) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; char str[256]; diff --git a/src/sbbs3/mqtt.h b/src/sbbs3/mqtt.h index 8c1c4cd1c2..27543f777a 100644 --- a/src/sbbs3/mqtt.h +++ b/src/sbbs3/mqtt.h @@ -43,6 +43,7 @@ struct mqtt { mqtt_handle_t handle; scfg_t* cfg; char* host; + bool connected; ulong max_clients; ulong error_count; ulong served; diff --git a/src/sbbs3/putnode.cpp b/src/sbbs3/putnode.cpp index c42e97379b..1f905eba57 100644 --- a/src/sbbs3/putnode.cpp +++ b/src/sbbs3/putnode.cpp @@ -90,7 +90,7 @@ int sbbs_t::putnodedat(uint number, node_t* node) } pthread_mutex_unlock(&nodefile_mutex); - if(cfg.mqtt.enabled && mqtt->handle != NULL) { + if(mqtt->connected) { int result = mqtt_putnodedat(mqtt, number + 1, node); if(result != MQTT_SUCCESS) lprintf(LOG_WARNING, "ERROR %d (%d) publishing node status", result, errno); -- GitLab