Skip to content
Snippets Groups Projects
Commit ec90a8d9 authored by Rob Swindell's avatar Rob Swindell :speech_balloon:
Browse files

MQTT improvements

Don't double-publish log messages when not in daemon-mode
Publishing to <host>/recycle will recycle all servers.
Log a successful broker connection upon startup.
parent d82d729b
No related branches found
No related tags found
No related merge requests found
...@@ -240,13 +240,12 @@ static void notify_systemd(const char* new_status) ...@@ -240,13 +240,12 @@ static void notify_systemd(const char* new_status)
} }
#endif #endif
static int lputs(int level, char *str) static int log_puts(int level, char *str)
{ {
static pthread_mutex_t mutex; static pthread_mutex_t mutex;
static BOOL mutex_initialized; static BOOL mutex_initialized;
char *p; char *p;
mqtt_lputs(&bbs_startup.mqtt, TOPIC_HOST, level, str);
#ifdef __unix__ #ifdef __unix__
if (is_daemon) { if (is_daemon) {
if(str!=NULL) { if(str!=NULL) {
...@@ -286,6 +285,13 @@ static int lputs(int level, char *str) ...@@ -286,6 +285,13 @@ static int lputs(int level, char *str)
return(prompt_len); return(prompt_len);
} }
static int lputs(int level, char *str)
{
if(str != NULL && *str != '\0')
mqtt_lputs(&bbs_startup.mqtt, TOPIC_HOST, level, str);
return log_puts(level, str);
}
static void errormsg(void *cbdata, int level, const char *msg) static void errormsg(void *cbdata, int level, const char *msg)
{ {
error_count++; error_count++;
...@@ -304,11 +310,20 @@ static int lprintf(int level, const char *fmt, ...) ...@@ -304,11 +310,20 @@ static int lprintf(int level, const char *fmt, ...)
return(lputs(level,sbuf)); return(lputs(level,sbuf));
} }
static void recycle_all()
{
bbs_startup.recycle_now = TRUE;
ftp_startup.recycle_now = TRUE;
web_startup.recycle_now = TRUE;
mail_startup.recycle_now = TRUE;
services_startup.recycle_now = TRUE;
}
#ifdef USE_MOSQUITTO #ifdef USE_MOSQUITTO
void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const struct mosquitto_message* msg) void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const struct mosquitto_message* msg)
{ {
char topic[128]; char topic[128];
lprintf(LOG_DEBUG, "MQTT message received on %s", msg->topic); lprintf(LOG_DEBUG, "MQTT message received (%d bytes) on %s", msg->payloadlen, msg->topic);
for(int i = bbs_startup.first_node; i <= bbs_startup.last_node; i++) { for(int i = bbs_startup.first_node; i <= bbs_startup.last_node; i++) {
mqtt_topic(&bbs_startup.mqtt, TOPIC_BBS, topic, sizeof(topic), "node%d/input", i); mqtt_topic(&bbs_startup.mqtt, TOPIC_BBS, topic, sizeof(topic), "node%d/input", i);
if(strcmp(msg->topic, topic) != 0) if(strcmp(msg->topic, topic) != 0)
...@@ -317,28 +332,27 @@ void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const struct mo ...@@ -317,28 +332,27 @@ void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const struct mo
RingBufWrite(bbs_startup.node_inbuf[i - 1], msg->payload, msg->payloadlen); RingBufWrite(bbs_startup.node_inbuf[i - 1], msg->payload, msg->payloadlen);
return; return;
} }
mqtt_topic(&bbs_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle"); if(strcmp(msg->topic, mqtt_topic(&bbs_startup.mqtt, TOPIC_HOST, topic, sizeof(topic), "recycle")) == 0) {
if(strcmp(msg->topic, topic) == 0) { recycle_all();
return;
}
if(strcmp(msg->topic, mqtt_topic(&bbs_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle")) == 0) {
bbs_startup.recycle_now = true; bbs_startup.recycle_now = true;
return; return;
} }
mqtt_topic(&ftp_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle"); if(strcmp(msg->topic, mqtt_topic(&ftp_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle")) == 0) {
if(strcmp(msg->topic, topic) == 0) {
ftp_startup.recycle_now = true; ftp_startup.recycle_now = true;
return; return;
} }
mqtt_topic(&web_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle"); if(strcmp(msg->topic, mqtt_topic(&web_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle")) == 0) {
if(strcmp(msg->topic, topic) == 0) {
web_startup.recycle_now = true; web_startup.recycle_now = true;
return; return;
} }
mqtt_topic(&mail_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle"); if(strcmp(msg->topic, mqtt_topic(&mail_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle")) == 0) {
if(strcmp(msg->topic, topic) == 0) {
mail_startup.recycle_now = true; mail_startup.recycle_now = true;
return; return;
} }
mqtt_topic(&services_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle"); if(strcmp(msg->topic, mqtt_topic(&services_startup.mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle")) == 0) {
if(strcmp(msg->topic, topic) == 0) {
services_startup.recycle_now = true; services_startup.recycle_now = true;
return; return;
} }
...@@ -765,7 +779,7 @@ static int bbs_lputs(void* p, int level, const char *str) ...@@ -765,7 +779,7 @@ static int bbs_lputs(void* p, int level, const char *str)
sprintf(logline,"%sterm %.*s",tstr,(int)sizeof(logline)-70,str); sprintf(logline,"%sterm %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline); truncsp(logline);
lputs(level,logline); log_puts(level,logline);
return(strlen(logline)+1); return(strlen(logline)+1);
} }
...@@ -811,7 +825,7 @@ static int ftp_lputs(void* p, int level, const char *str) ...@@ -811,7 +825,7 @@ static int ftp_lputs(void* p, int level, const char *str)
sprintf(logline,"%sftp %.*s",tstr,(int)sizeof(logline)-70,str); sprintf(logline,"%sftp %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline); truncsp(logline);
lputs(level,logline); log_puts(level,logline);
return(strlen(logline)+1); return(strlen(logline)+1);
} }
...@@ -852,7 +866,7 @@ static int mail_lputs(void* p, int level, const char *str) ...@@ -852,7 +866,7 @@ static int mail_lputs(void* p, int level, const char *str)
sprintf(logline,"%smail %.*s",tstr,(int)sizeof(logline)-70,str); sprintf(logline,"%smail %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline); truncsp(logline);
lputs(level,logline); log_puts(level,logline);
return(strlen(logline)+1); return(strlen(logline)+1);
} }
...@@ -893,7 +907,7 @@ static int services_lputs(void* p, int level, const char *str) ...@@ -893,7 +907,7 @@ static int services_lputs(void* p, int level, const char *str)
sprintf(logline,"%ssrvc %.*s",tstr,(int)sizeof(logline)-70,str); sprintf(logline,"%ssrvc %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline); truncsp(logline);
lputs(level,logline); log_puts(level,logline);
return(strlen(logline)+1); return(strlen(logline)+1);
} }
...@@ -934,7 +948,7 @@ static int event_lputs(void* p, int level, const char *str) ...@@ -934,7 +948,7 @@ static int event_lputs(void* p, int level, const char *str)
sprintf(logline,"%sevnt %.*s",tstr,(int)sizeof(logline)-70,str); sprintf(logline,"%sevnt %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline); truncsp(logline);
lputs(level,logline); log_puts(level,logline);
return(strlen(logline)+1); return(strlen(logline)+1);
} }
...@@ -975,7 +989,7 @@ static int web_lputs(void* p, int level, const char *str) ...@@ -975,7 +989,7 @@ static int web_lputs(void* p, int level, const char *str)
sprintf(logline,"%sweb %.*s",tstr,(int)sizeof(logline)-70,str); sprintf(logline,"%sweb %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline); truncsp(logline);
lputs(level,logline); log_puts(level,logline);
return(strlen(logline)+1); return(strlen(logline)+1);
} }
...@@ -1143,11 +1157,7 @@ void _sighandler_rerun(int sig) ...@@ -1143,11 +1157,7 @@ void _sighandler_rerun(int sig)
lputs(LOG_NOTICE," Got HUP (rerun) signal"); lputs(LOG_NOTICE," Got HUP (rerun) signal");
bbs_startup.recycle_now=TRUE; recycle_all();
ftp_startup.recycle_now=TRUE;
web_startup.recycle_now=TRUE;
mail_startup.recycle_now=TRUE;
services_startup.recycle_now=TRUE;
} }
static void handle_sigs(void) static void handle_sigs(void)
...@@ -1779,7 +1789,9 @@ int main(int argc, char** argv) ...@@ -1779,7 +1789,9 @@ int main(int argc, char** argv)
} else { } else {
lprintf(LOG_INFO, "MQTT connecting to broker %s:%u", scfg.mqtt.broker_addr, scfg.mqtt.broker_port); lprintf(LOG_INFO, "MQTT connecting to broker %s:%u", scfg.mqtt.broker_addr, scfg.mqtt.broker_port);
result = mqtt_connect(&bbs_startup.mqtt, /* bind_address: */NULL); result = mqtt_connect(&bbs_startup.mqtt, /* bind_address: */NULL);
if(result != MQTT_SUCCESS) { if(result == MQTT_SUCCESS) {
lprintf(LOG_INFO, "MQTT broker-connect (%s:d) successful", scfg.mqtt.broker_addr, scfg.mqtt.broker_port);
} else {
lprintf(LOG_ERR, "MQTT broker-connect (%s:%d) failure: %d", scfg.mqtt.broker_addr, scfg.mqtt.broker_port, result); lprintf(LOG_ERR, "MQTT broker-connect (%s:%d) failure: %d", scfg.mqtt.broker_addr, scfg.mqtt.broker_port, result);
mqtt_close(&bbs_startup.mqtt); mqtt_close(&bbs_startup.mqtt);
} }
...@@ -1812,6 +1824,7 @@ int main(int argc, char** argv) ...@@ -1812,6 +1824,7 @@ int main(int argc, char** argv)
for(int i = bbs_startup.first_node; i <= bbs_startup.last_node; i++) { for(int i = bbs_startup.first_node; i <= bbs_startup.last_node; i++) {
mqtt_subscribe(&bbs_startup.mqtt, TOPIC_BBS, str, sizeof(str), "node%d/input", i); mqtt_subscribe(&bbs_startup.mqtt, TOPIC_BBS, str, sizeof(str), "node%d/input", i);
} }
mqtt_subscribe(&bbs_startup.mqtt, TOPIC_HOST, str, sizeof(str), "recycle");
mqtt_subscribe(&bbs_startup.mqtt, TOPIC_SERVER, str, sizeof(str), "recycle"); mqtt_subscribe(&bbs_startup.mqtt, TOPIC_SERVER, str, sizeof(str), "recycle");
mqtt_subscribe(&ftp_startup.mqtt, TOPIC_SERVER, str, sizeof(str), "recycle"); mqtt_subscribe(&ftp_startup.mqtt, TOPIC_SERVER, str, sizeof(str), "recycle");
mqtt_subscribe(&web_startup.mqtt, TOPIC_SERVER, str, sizeof(str), "recycle"); mqtt_subscribe(&web_startup.mqtt, TOPIC_SERVER, str, sizeof(str), "recycle");
...@@ -2162,11 +2175,7 @@ int main(int argc, char** argv) ...@@ -2162,11 +2175,7 @@ int main(int argc, char** argv)
services_startup.shutdown_now=TRUE; services_startup.shutdown_now=TRUE;
} }
else { else {
bbs_startup.recycle_now=TRUE; recycle_all();
ftp_startup.recycle_now=TRUE;
web_startup.recycle_now=TRUE;
mail_startup.recycle_now=TRUE;
services_startup.recycle_now=TRUE;
} }
break; break;
case 'C': case 'C':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment