Skip to content
Snippets Groups Projects
Commit a4140ad3 authored by Rick Parrish's avatar Rick Parrish Committed by Rob Swindell
Browse files

Add a connect callback handler.

parent 985369ad
No related branches found
No related tags found
No related merge requests found
......@@ -467,6 +467,28 @@ static int lprintf(int (*lputs)(int level, const char* str), int level, const ch
#ifdef USE_MOSQUITTO
static void mqtt_connect_callback(struct mosquitto* mosq, void* cbdata, int rc)
{
struct mqtt* mqtt = (struct mqtt*)cbdata;
char str[128];
if (rc == MQTT_SUCCESS) {
if(mqtt->startup->type == SERVER_TERM) {
bbs_startup_t* bbs_startup = (bbs_startup_t*)mqtt->startup;
for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) {
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node/%d/input", i);
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node/%d/set/#", i);
}
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "exec");
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "call");
}
mqtt_pub_noval(mqtt, TOPIC_SERVER, "recycle");
mqtt_pub_noval(mqtt, TOPIC_SERVER, "client");
mqtt_subscribe(mqtt, TOPIC_SERVER, str, sizeof(str), "recycle");
mqtt_subscribe(mqtt, TOPIC_HOST, str, sizeof(str), "recycle");
}
}
static ulong mqtt_message_value(const struct mosquitto_message* msg, ulong deflt)
{
if(msg->payloadlen < 1)
......@@ -581,6 +603,10 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
lprintf(lputs, LOG_ERR, "Error %d starting pub/sub thread", result);
mqtt_close(mqtt);
} else {
#ifdef USE_MOSQUITTO
if(mqtt->handle != NULL)
mosquitto_connect_callback_set(mqtt->handle, mqtt_connect_callback);
#endif
lprintf(lputs, LOG_DEBUG, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port);
result = mqtt_connect(mqtt, /* bind_address: */NULL);
if(result == MQTT_SUCCESS) {
......@@ -603,20 +629,6 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
if(mqtt->handle != NULL)
mosquitto_message_callback_set(mqtt->handle, mqtt_message_received);
#endif
if(mqtt->startup->type == SERVER_TERM) {
bbs_startup_t* bbs_startup = (bbs_startup_t*)startup;
char str[128];
for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) {
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node/%d/input", i);
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node/%d/set/#", i);
}
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "exec");
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "call");
}
mqtt_pub_noval(mqtt, TOPIC_SERVER, "recycle");
mqtt_pub_noval(mqtt, TOPIC_SERVER, "client");
mqtt_subscribe(mqtt, TOPIC_SERVER, str, sizeof(str), "recycle");
mqtt_subscribe(mqtt, TOPIC_HOST, str, sizeof(str), "recycle");
return result;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment