Skip to content
Snippets Groups Projects
Commit f99713f3 authored by Rob Swindell's avatar Rob Swindell :speech_balloon:
Browse files

Don't publish to the initial server topics (e.g. version) until after connect

Fix for issue #884

We must wait until the connect_callback() has been called to publish these
messages. Using mqtt.server_version as the indicator of an initial startup
connection versus a re-connection (due to broker-connection loss), i.e.
setting to NULL after initial connection.
parent 2adf8468
No related branches found
No related tags found
No related merge requests found
Pipeline #8652 passed
......@@ -534,6 +534,10 @@ static void mqtt_connect_callback(struct mosquitto* mosq, void* cbdata, int rc)
mqtt_subscribe(mqtt, TOPIC_HOST, str, sizeof(str), "pause");
mqtt_subscribe(mqtt, TOPIC_SERVER, str, sizeof(str), "resume");
mqtt_subscribe(mqtt, TOPIC_HOST, str, sizeof(str), "resume");
if (mqtt->server_version != NULL) {
mqtt_server_startup(mqtt);
mqtt->server_version = NULL;
}
}
else
mqtt->connect_error = rc;
......@@ -648,6 +652,7 @@ static void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const st
}
#endif // USE_MOSQUITTO
// 'version' argument should not point to stack memory as it'll be read later (i.e. in connect callback)
int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const char* version
, int (*lputs)(int level, const char* str))
{
......@@ -674,6 +679,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
lprintf(lputs, LOG_ERR, "MQTT error %d starting pub/sub thread", result);
mqtt_close(mqtt);
} else {
mqtt->server_version = version;
#ifdef USE_MOSQUITTO
if (mqtt->handle != NULL) {
mosquitto_connect_callback_set(mqtt->handle, mqtt_connect_callback);
......@@ -692,13 +698,6 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
}
}
}
mqtt_server_state(mqtt, SERVER_INIT);
mqtt_pub_strval(mqtt, TOPIC_BBS_LEVEL, NULL, mqtt->cfg->sys_name);
mqtt_pub_strval(mqtt, TOPIC_HOST_LEVEL, NULL, startup->host_name);
mqtt_pub_strval(mqtt, TOPIC_SERVER, "version", version);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "served", mqtt->served);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "highwater", 0);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "error_count", mqtt->error_count);
#ifdef USE_MOSQUITTO
if (mqtt->handle != NULL)
......@@ -708,6 +707,18 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
return result;
}
int mqtt_server_startup(struct mqtt* mqtt)
{
int result = mqtt_server_state(mqtt, SERVER_INIT);
mqtt_pub_strval(mqtt, TOPIC_BBS_LEVEL, NULL, mqtt->cfg->sys_name);
mqtt_pub_strval(mqtt, TOPIC_HOST_LEVEL, NULL, mqtt->startup->host_name);
mqtt_pub_strval(mqtt, TOPIC_SERVER, "version", mqtt->server_version);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "served", mqtt->served);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "highwater", 0);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "error_count", mqtt->error_count);
return result;
}
int mqtt_server_state(struct mqtt* mqtt, enum server_state state)
{
char str[128];
......
......@@ -52,6 +52,7 @@ struct mqtt {
link_list_t client_list;
struct startup* startup;
enum server_state server_state;
const char* server_version;
};
enum topic_depth {
......@@ -78,6 +79,7 @@ DLLEXPORT int mqtt_init(struct mqtt*, scfg_t*, struct startup*);
DLLEXPORT int mqtt_startup(struct mqtt*, scfg_t*, struct startup*, const char* version
, int (*lputs)(int level, const char* str));
DLLEXPORT int mqtt_online(struct mqtt*);
DLLEXPORT int mqtt_server_startup(struct mqtt*);
DLLEXPORT int mqtt_server_state(struct mqtt*, enum server_state);
DLLEXPORT int mqtt_errormsg(struct mqtt*, int level, const char*);
DLLEXPORT int mqtt_terminating(struct mqtt*);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment