diff --git a/src/sbbs3/mqtt.c b/src/sbbs3/mqtt.c index c302044a371b8cf2baa08fdfe7a7ba3443262a28..d6d9f42d5563accee2cf2e0470728f120b68bd2e 100644 --- a/src/sbbs3/mqtt.c +++ b/src/sbbs3/mqtt.c @@ -467,6 +467,28 @@ static int lprintf(int (*lputs)(int level, const char* str), int level, const ch #ifdef USE_MOSQUITTO +static void mqtt_connect_callback(struct mosquitto* mosq, void* cbdata, int rc) +{ + struct mqtt* mqtt = (struct mqtt*)cbdata; + char str[128]; + + if (rc == MQTT_SUCCESS) { + if(mqtt->startup->type == SERVER_TERM) { + bbs_startup_t* bbs_startup = (bbs_startup_t*)mqtt->startup; + for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) { + mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node/%d/input", i); + mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node/%d/set/#", i); + } + mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "exec"); + mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "call"); + } + mqtt_pub_noval(mqtt, TOPIC_SERVER, "recycle"); + mqtt_pub_noval(mqtt, TOPIC_SERVER, "client"); + mqtt_subscribe(mqtt, TOPIC_SERVER, str, sizeof(str), "recycle"); + mqtt_subscribe(mqtt, TOPIC_HOST, str, sizeof(str), "recycle"); + } +} + static ulong mqtt_message_value(const struct mosquitto_message* msg, ulong deflt) { if(msg->payloadlen < 1) @@ -581,6 +603,10 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const lprintf(lputs, LOG_ERR, "Error %d starting pub/sub thread", result); mqtt_close(mqtt); } else { +#ifdef USE_MOSQUITTO + if(mqtt->handle != NULL) + mosquitto_connect_callback_set(mqtt->handle, mqtt_connect_callback); +#endif lprintf(lputs, LOG_DEBUG, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port); result = mqtt_connect(mqtt, /* bind_address: */NULL); if(result == MQTT_SUCCESS) { @@ -603,20 +629,6 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const if(mqtt->handle != NULL) mosquitto_message_callback_set(mqtt->handle, mqtt_message_received); #endif - if(mqtt->startup->type == SERVER_TERM) { - bbs_startup_t* bbs_startup = (bbs_startup_t*)startup; - char str[128]; - for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) { - mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node/%d/input", i); - mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node/%d/set/#", i); - } - mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "exec"); - mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "call"); - } - mqtt_pub_noval(mqtt, TOPIC_SERVER, "recycle"); - mqtt_pub_noval(mqtt, TOPIC_SERVER, "client"); - mqtt_subscribe(mqtt, TOPIC_SERVER, str, sizeof(str), "recycle"); - mqtt_subscribe(mqtt, TOPIC_HOST, str, sizeof(str), "recycle"); return result; }