diff --git a/src/sbbs3/con_out.cpp b/src/sbbs3/con_out.cpp index 9410d62215291b0fab798d4975ad29c3a2abe235..32e8d19d1c1537fb47471c8af259554786777ba0 100644 --- a/src/sbbs3/con_out.cpp +++ b/src/sbbs3/con_out.cpp @@ -606,7 +606,7 @@ bool sbbs_t::update_nodeterm(void) } strListFree(&ini); - if(cfg.mqtt.enabled) { + if(mqtt->connected) { char str[256]; char topic[128]; SAFEPRINTF(topic, "node/%u/terminal", cfg.node_num); diff --git a/src/sbbs3/main.cpp b/src/sbbs3/main.cpp index e8c5f2fda18c595c3a48f1a462eb6cfe7765356e..6cb7daac4089fb995bbd3406f345845183b8b088 100644 --- a/src/sbbs3/main.cpp +++ b/src/sbbs3/main.cpp @@ -2684,7 +2684,7 @@ void output_thread(void* arg) ,node, result, i); } /* Spy on the user remotely */ - if(sbbs->cfg.mqtt.enabled && mqtt.handle != NULL) { + if(mqtt.connected) { int result = mqtt_pub_message(&mqtt, TOPIC_BBS, spy_topic, buf+bufbot, i, /* retain: */false); if(result != MQTT_SUCCESS) lprintf(LOG_WARNING, "%s ERROR %d (%d) publishing node output (%u bytes): %s" diff --git a/src/sbbs3/mqtt.c b/src/sbbs3/mqtt.c index 664f0675befa2c06d5d550cd465d88d13dec2961..cc4f74802d26e33d886d2a98fc55b2d246275b89 100644 --- a/src/sbbs3/mqtt.c +++ b/src/sbbs3/mqtt.c @@ -77,6 +77,7 @@ int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup) if(mqtt == NULL || cfg == NULL || startup == NULL) return MQTT_FAILURE; + mqtt->connected = false; if(!cfg->mqtt.enabled) return MQTT_SUCCESS; mqtt->handle = NULL; @@ -155,7 +156,7 @@ static int mqtt_sub(struct mqtt* mqtt, const char* topic) { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL && topic != NULL) { @@ -184,7 +185,7 @@ int mqtt_lputs(struct mqtt* mqtt, enum topic_depth depth, int level, const char* { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; if(level > mqtt->cfg->mqtt.log_level) return MQTT_SUCCESS; @@ -239,7 +240,7 @@ int mqtt_pub_strval(struct mqtt* mqtt, enum topic_depth depth, const char* key, { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL) { @@ -265,7 +266,7 @@ int mqtt_pub_timestamped_msg(struct mqtt* mqtt, enum topic_depth depth, const ch if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; time_to_isoDateTimeStr(t, xpTimeZone_local(), timestamp, sizeof(timestamp)); SAFEPRINTF2(str, "%s\t%s", timestamp, msg); @@ -276,7 +277,7 @@ int mqtt_pub_uintval(struct mqtt* mqtt, enum topic_depth depth, const char* key, { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL) { @@ -301,7 +302,7 @@ int mqtt_pub_message(struct mqtt* mqtt, enum topic_depth depth, const char* key, { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL) { @@ -627,7 +628,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const } else { result = mqtt_thread_start(mqtt); if(result != MQTT_SUCCESS) { - lprintf(lputs, LOG_ERR, "Error %d starting pub/sub thread", result); + lprintf(lputs, LOG_ERR, "MQTT error %d starting pub/sub thread", result); mqtt_close(mqtt); } else { #ifdef USE_MOSQUITTO @@ -637,6 +638,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const lprintf(lputs, LOG_INFO, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port); result = mqtt_connect(mqtt, /* bind_address: */NULL); if(result == MQTT_SUCCESS) { + mqtt->connected = true; lprintf(lputs, LOG_DEBUG, "MQTT broker-connect (%s:%d) successful", cfg->mqtt.broker_addr, cfg->mqtt.broker_port); mqtt_pub_noval(mqtt, TOPIC_SERVER, "client"); } else { @@ -746,7 +748,7 @@ int mqtt_client_on(struct mqtt* mqtt, bool on, int sock, client_t* client, bool if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; listLock(&mqtt->client_list); @@ -806,7 +808,7 @@ int mqtt_user_login_fail(struct mqtt* mqtt, client_t* client, const char* userna if(mqtt == NULL || mqtt->cfg == NULL || client == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; if(username == NULL) @@ -829,7 +831,7 @@ int mqtt_user_login(struct mqtt* mqtt, client_t* client) if(mqtt == NULL || mqtt->cfg == NULL || client == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; snprintf(topic, sizeof(topic), "login/%s", client->protocol); @@ -852,7 +854,7 @@ int mqtt_user_logout(struct mqtt* mqtt, client_t* client, time_t logintime) if(mqtt == NULL || mqtt->cfg == NULL || client == NULL) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; long tused = (long)(time(NULL) - logintime); @@ -889,7 +891,10 @@ int mqtt_client_count(struct mqtt* mqtt) void mqtt_shutdown(struct mqtt* mqtt) { if(mqtt != NULL && mqtt->cfg != NULL && mqtt->cfg->mqtt.enabled) { - mqtt_disconnect(mqtt); + if(mqtt->connected) { + if(mqtt_disconnect(mqtt) == MQTT_SUCCESS) + mqtt->connected = false; + } mqtt_thread_stop(mqtt); mqtt_close(mqtt); } @@ -903,7 +908,7 @@ static int mqtt_file_xfer(struct mqtt* mqtt, user_t* user, int dirnum, const cha if(!is_valid_dirnum(mqtt->cfg, dirnum)) return MQTT_FAILURE; - if(!mqtt->cfg->mqtt.enabled) + if(!mqtt->connected) return MQTT_SUCCESS; char str[256]; diff --git a/src/sbbs3/mqtt.h b/src/sbbs3/mqtt.h index 8c1c4cd1c286d2a03f139b454bf83eaa9a0dec66..27543f777a0e6eab019aad96558ec481c1b72f99 100644 --- a/src/sbbs3/mqtt.h +++ b/src/sbbs3/mqtt.h @@ -43,6 +43,7 @@ struct mqtt { mqtt_handle_t handle; scfg_t* cfg; char* host; + bool connected; ulong max_clients; ulong error_count; ulong served; diff --git a/src/sbbs3/putnode.cpp b/src/sbbs3/putnode.cpp index c42e97379bc4212895d8adb6047992c292b40bae..1f905eba57af5822d1255cae9363d40fe931ac7e 100644 --- a/src/sbbs3/putnode.cpp +++ b/src/sbbs3/putnode.cpp @@ -90,7 +90,7 @@ int sbbs_t::putnodedat(uint number, node_t* node) } pthread_mutex_unlock(&nodefile_mutex); - if(cfg.mqtt.enabled && mqtt->handle != NULL) { + if(mqtt->connected) { int result = mqtt_putnodedat(mqtt, number + 1, node); if(result != MQTT_SUCCESS) lprintf(LOG_WARNING, "ERROR %d (%d) publishing node status", result, errno);