Skip to content
Snippets Groups Projects
Commit 9b6138f2 authored by Rob Swindell's avatar Rob Swindell :speech_balloon:
Browse files

Don't attempt to publish MQTT messages unless/until connected to broker

For cases where an mqtt struct is shared between threads without concurrency
control. I'm making this improvement in light of research into issue #781,
though I don't expect this change to fix the reported issue.

The reported error seems to come from the event thread (publishing node
status upon starting to run the "DAILY" event) when a broker connection was
not successful, however the reporter (Nelgin) may not have had debug-level
logging turned on, so didn't capture the successful broker-connect log
message. I think the broker connection *was* successful and perhaps then
terminated by the broker ("due to protocol error"?).
parent e86b0a10
No related branches found
No related tags found
No related merge requests found
...@@ -606,7 +606,7 @@ bool sbbs_t::update_nodeterm(void) ...@@ -606,7 +606,7 @@ bool sbbs_t::update_nodeterm(void)
} }
strListFree(&ini); strListFree(&ini);
if(cfg.mqtt.enabled) { if(mqtt->connected) {
char str[256]; char str[256];
char topic[128]; char topic[128];
SAFEPRINTF(topic, "node/%u/terminal", cfg.node_num); SAFEPRINTF(topic, "node/%u/terminal", cfg.node_num);
......
...@@ -2684,7 +2684,7 @@ void output_thread(void* arg) ...@@ -2684,7 +2684,7 @@ void output_thread(void* arg)
,node, result, i); ,node, result, i);
} }
/* Spy on the user remotely */ /* Spy on the user remotely */
if(sbbs->cfg.mqtt.enabled && mqtt.handle != NULL) { if(mqtt.connected) {
int result = mqtt_pub_message(&mqtt, TOPIC_BBS, spy_topic, buf+bufbot, i, /* retain: */false); int result = mqtt_pub_message(&mqtt, TOPIC_BBS, spy_topic, buf+bufbot, i, /* retain: */false);
if(result != MQTT_SUCCESS) if(result != MQTT_SUCCESS)
lprintf(LOG_WARNING, "%s ERROR %d (%d) publishing node output (%u bytes): %s" lprintf(LOG_WARNING, "%s ERROR %d (%d) publishing node output (%u bytes): %s"
......
...@@ -77,6 +77,7 @@ int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup) ...@@ -77,6 +77,7 @@ int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup)
if(mqtt == NULL || cfg == NULL || startup == NULL) if(mqtt == NULL || cfg == NULL || startup == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
mqtt->connected = false;
if(!cfg->mqtt.enabled) if(!cfg->mqtt.enabled)
return MQTT_SUCCESS; return MQTT_SUCCESS;
mqtt->handle = NULL; mqtt->handle = NULL;
...@@ -155,7 +156,7 @@ static int mqtt_sub(struct mqtt* mqtt, const char* topic) ...@@ -155,7 +156,7 @@ static int mqtt_sub(struct mqtt* mqtt, const char* topic)
{ {
if(mqtt == NULL || mqtt->cfg == NULL) if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
#ifdef USE_MOSQUITTO #ifdef USE_MOSQUITTO
if(mqtt->handle != NULL && topic != NULL) { if(mqtt->handle != NULL && topic != NULL) {
...@@ -184,7 +185,7 @@ int mqtt_lputs(struct mqtt* mqtt, enum topic_depth depth, int level, const char* ...@@ -184,7 +185,7 @@ int mqtt_lputs(struct mqtt* mqtt, enum topic_depth depth, int level, const char*
{ {
if(mqtt == NULL || mqtt->cfg == NULL) if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
if(level > mqtt->cfg->mqtt.log_level) if(level > mqtt->cfg->mqtt.log_level)
return MQTT_SUCCESS; return MQTT_SUCCESS;
...@@ -239,7 +240,7 @@ int mqtt_pub_strval(struct mqtt* mqtt, enum topic_depth depth, const char* key, ...@@ -239,7 +240,7 @@ int mqtt_pub_strval(struct mqtt* mqtt, enum topic_depth depth, const char* key,
{ {
if(mqtt == NULL || mqtt->cfg == NULL) if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
#ifdef USE_MOSQUITTO #ifdef USE_MOSQUITTO
if(mqtt->handle != NULL) { if(mqtt->handle != NULL) {
...@@ -265,7 +266,7 @@ int mqtt_pub_timestamped_msg(struct mqtt* mqtt, enum topic_depth depth, const ch ...@@ -265,7 +266,7 @@ int mqtt_pub_timestamped_msg(struct mqtt* mqtt, enum topic_depth depth, const ch
if(mqtt == NULL || mqtt->cfg == NULL) if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
time_to_isoDateTimeStr(t, xpTimeZone_local(), timestamp, sizeof(timestamp)); time_to_isoDateTimeStr(t, xpTimeZone_local(), timestamp, sizeof(timestamp));
SAFEPRINTF2(str, "%s\t%s", timestamp, msg); SAFEPRINTF2(str, "%s\t%s", timestamp, msg);
...@@ -276,7 +277,7 @@ int mqtt_pub_uintval(struct mqtt* mqtt, enum topic_depth depth, const char* key, ...@@ -276,7 +277,7 @@ int mqtt_pub_uintval(struct mqtt* mqtt, enum topic_depth depth, const char* key,
{ {
if(mqtt == NULL || mqtt->cfg == NULL) if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
#ifdef USE_MOSQUITTO #ifdef USE_MOSQUITTO
if(mqtt->handle != NULL) { if(mqtt->handle != NULL) {
...@@ -301,7 +302,7 @@ int mqtt_pub_message(struct mqtt* mqtt, enum topic_depth depth, const char* key, ...@@ -301,7 +302,7 @@ int mqtt_pub_message(struct mqtt* mqtt, enum topic_depth depth, const char* key,
{ {
if(mqtt == NULL || mqtt->cfg == NULL) if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
#ifdef USE_MOSQUITTO #ifdef USE_MOSQUITTO
if(mqtt->handle != NULL) { if(mqtt->handle != NULL) {
...@@ -627,7 +628,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const ...@@ -627,7 +628,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
} else { } else {
result = mqtt_thread_start(mqtt); result = mqtt_thread_start(mqtt);
if(result != MQTT_SUCCESS) { if(result != MQTT_SUCCESS) {
lprintf(lputs, LOG_ERR, "Error %d starting pub/sub thread", result); lprintf(lputs, LOG_ERR, "MQTT error %d starting pub/sub thread", result);
mqtt_close(mqtt); mqtt_close(mqtt);
} else { } else {
#ifdef USE_MOSQUITTO #ifdef USE_MOSQUITTO
...@@ -637,6 +638,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const ...@@ -637,6 +638,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
lprintf(lputs, LOG_INFO, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port); lprintf(lputs, LOG_INFO, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port);
result = mqtt_connect(mqtt, /* bind_address: */NULL); result = mqtt_connect(mqtt, /* bind_address: */NULL);
if(result == MQTT_SUCCESS) { if(result == MQTT_SUCCESS) {
mqtt->connected = true;
lprintf(lputs, LOG_DEBUG, "MQTT broker-connect (%s:%d) successful", cfg->mqtt.broker_addr, cfg->mqtt.broker_port); lprintf(lputs, LOG_DEBUG, "MQTT broker-connect (%s:%d) successful", cfg->mqtt.broker_addr, cfg->mqtt.broker_port);
mqtt_pub_noval(mqtt, TOPIC_SERVER, "client"); mqtt_pub_noval(mqtt, TOPIC_SERVER, "client");
} else { } else {
...@@ -746,7 +748,7 @@ int mqtt_client_on(struct mqtt* mqtt, bool on, int sock, client_t* client, bool ...@@ -746,7 +748,7 @@ int mqtt_client_on(struct mqtt* mqtt, bool on, int sock, client_t* client, bool
if(mqtt == NULL || mqtt->cfg == NULL) if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
listLock(&mqtt->client_list); listLock(&mqtt->client_list);
...@@ -806,7 +808,7 @@ int mqtt_user_login_fail(struct mqtt* mqtt, client_t* client, const char* userna ...@@ -806,7 +808,7 @@ int mqtt_user_login_fail(struct mqtt* mqtt, client_t* client, const char* userna
if(mqtt == NULL || mqtt->cfg == NULL || client == NULL) if(mqtt == NULL || mqtt->cfg == NULL || client == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
if(username == NULL) if(username == NULL)
...@@ -829,7 +831,7 @@ int mqtt_user_login(struct mqtt* mqtt, client_t* client) ...@@ -829,7 +831,7 @@ int mqtt_user_login(struct mqtt* mqtt, client_t* client)
if(mqtt == NULL || mqtt->cfg == NULL || client == NULL) if(mqtt == NULL || mqtt->cfg == NULL || client == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
snprintf(topic, sizeof(topic), "login/%s", client->protocol); snprintf(topic, sizeof(topic), "login/%s", client->protocol);
...@@ -852,7 +854,7 @@ int mqtt_user_logout(struct mqtt* mqtt, client_t* client, time_t logintime) ...@@ -852,7 +854,7 @@ int mqtt_user_logout(struct mqtt* mqtt, client_t* client, time_t logintime)
if(mqtt == NULL || mqtt->cfg == NULL || client == NULL) if(mqtt == NULL || mqtt->cfg == NULL || client == NULL)
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
long tused = (long)(time(NULL) - logintime); long tused = (long)(time(NULL) - logintime);
...@@ -889,7 +891,10 @@ int mqtt_client_count(struct mqtt* mqtt) ...@@ -889,7 +891,10 @@ int mqtt_client_count(struct mqtt* mqtt)
void mqtt_shutdown(struct mqtt* mqtt) void mqtt_shutdown(struct mqtt* mqtt)
{ {
if(mqtt != NULL && mqtt->cfg != NULL && mqtt->cfg->mqtt.enabled) { if(mqtt != NULL && mqtt->cfg != NULL && mqtt->cfg->mqtt.enabled) {
mqtt_disconnect(mqtt); if(mqtt->connected) {
if(mqtt_disconnect(mqtt) == MQTT_SUCCESS)
mqtt->connected = false;
}
mqtt_thread_stop(mqtt); mqtt_thread_stop(mqtt);
mqtt_close(mqtt); mqtt_close(mqtt);
} }
...@@ -903,7 +908,7 @@ static int mqtt_file_xfer(struct mqtt* mqtt, user_t* user, int dirnum, const cha ...@@ -903,7 +908,7 @@ static int mqtt_file_xfer(struct mqtt* mqtt, user_t* user, int dirnum, const cha
if(!is_valid_dirnum(mqtt->cfg, dirnum)) if(!is_valid_dirnum(mqtt->cfg, dirnum))
return MQTT_FAILURE; return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled) if(!mqtt->connected)
return MQTT_SUCCESS; return MQTT_SUCCESS;
char str[256]; char str[256];
......
...@@ -43,6 +43,7 @@ struct mqtt { ...@@ -43,6 +43,7 @@ struct mqtt {
mqtt_handle_t handle; mqtt_handle_t handle;
scfg_t* cfg; scfg_t* cfg;
char* host; char* host;
bool connected;
ulong max_clients; ulong max_clients;
ulong error_count; ulong error_count;
ulong served; ulong served;
......
...@@ -90,7 +90,7 @@ int sbbs_t::putnodedat(uint number, node_t* node) ...@@ -90,7 +90,7 @@ int sbbs_t::putnodedat(uint number, node_t* node)
} }
pthread_mutex_unlock(&nodefile_mutex); pthread_mutex_unlock(&nodefile_mutex);
if(cfg.mqtt.enabled && mqtt->handle != NULL) { if(mqtt->connected) {
int result = mqtt_putnodedat(mqtt, number + 1, node); int result = mqtt_putnodedat(mqtt, number + 1, node);
if(result != MQTT_SUCCESS) if(result != MQTT_SUCCESS)
lprintf(LOG_WARNING, "ERROR %d (%d) publishing node status", result, errno); lprintf(LOG_WARNING, "ERROR %d (%d) publishing node status", result, errno);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment