Commit 10fd9c94 authored by Rob Swindell's avatar Rob Swindell 💬
Browse files

Publish some topics directly to topic levels

<server>/state is now published directly to the <sever> topic level (the "state" topic goes away).

Moved <bbs-id>/node# topics to <bbs-id>/nodes/# (allows better wild-card subscription per MQTT standards).
Publishing total node count to <bbs-id>/nodes rather than <bbs-id>/node_count

Introduced "publish verbosity" control (defaults to "High") - if you want nice human readable (and sometimes redundant) topic messages, leave this set to "High". To reduce traffic, set to "Low". It's expected that purpose-built Synchronet/MQTT clients should work equally-well with either setting, but when using generic MQTT clients/browser, "high" verbosity is nice. The human-readable node status is only published when "high" verbosity is enabled.

Publishing the BBS name to the <bbs-id> topic, the instance hostname (as configured in sbbs.ini) to the <host> topic. Makes the hierarchy much more clear when using an MQTT browser like MQTT Explorer.
parent 5b77181d
Pipeline #3592 passed with stage
in 6 minutes and 44 seconds
......@@ -573,7 +573,7 @@ bool sbbs_t::update_nodeterm(void)
if(cfg.mqtt.enabled) {
char str[256];
char topic[128];
SAFEPRINTF(topic, "node%u/terminal", cfg.node_num);
SAFEPRINTF(topic, "nodes/%u/terminal", cfg.node_num);
snprintf(str, sizeof(str), "%lu\t%lu\t%s\t%s\t%s\t%lx\t%lx\t%lx"
,cols
,rows
......
......@@ -2292,7 +2292,7 @@ void output_thread(void* arg)
if(sbbs->cfg.node_num) {
SAFEPRINTF(node,"Node %d",sbbs->cfg.node_num);
SAFEPRINTF(spy_topic, "node%d/output", sbbs->cfg.node_num);
SAFEPRINTF(spy_topic, "nodes/%d/output", sbbs->cfg.node_num);
} else
SAFECOPY(node,sbbs->client_name);
#ifdef _DEBUG
......@@ -4413,7 +4413,7 @@ void node_thread(void* arg)
long tused = (long)(now - sbbs->logontime);
if(tused < 0)
tused = 0;
SAFEPRINTF(topic, "node%u/laston", sbbs->cfg.node_num);
SAFEPRINTF(topic, "nodes/%u/laston", sbbs->cfg.node_num);
snprintf(str, sizeof(str), "%u\t%s\t%s", sbbs->useron.number, sbbs->useron.alias, sectostr(tused, tmp));
mqtt_pub_strval(&mqtt, TOPIC_BBS, topic, str);
}
......@@ -4993,7 +4993,7 @@ void bbs_thread(void* arg)
startup->last_node=scfg.sys_nodes;
}
mqtt_pub_uintval(&mqtt, TOPIC_BBS, "node_count", scfg.sys_nodes);
mqtt_pub_uintval(&mqtt, TOPIC_BBS, "nodes", scfg.sys_nodes);
/* Create missing directories */
lprintf(LOG_INFO,"Verifying/creating data directories");
......
......@@ -84,12 +84,21 @@ static char* format_topic(struct mqtt* mqtt, enum server_type type, enum topic_d
case TOPIC_BBS:
safe_snprintf(str, size, "sbbs/%s/%s", mqtt->cfg->sys_id, sbuf);
break;
case TOPIC_BBS_LEVEL:
safe_snprintf(str, size, "sbbs/%s", mqtt->cfg->sys_id);
break;
case TOPIC_HOST:
safe_snprintf(str, size, "sbbs/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
break;
case TOPIC_HOST_LEVEL:
safe_snprintf(str, size, "sbbs/%s/%s", mqtt->cfg->sys_id, mqtt->host);
break;
case TOPIC_SERVER:
safe_snprintf(str, size, "sbbs/%s/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(type), sbuf);
break;
case TOPIC_SERVER_LEVEL:
safe_snprintf(str, size, "sbbs/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(type));
break;
case TOPIC_EVENT:
safe_snprintf(str, size, "sbbs/%s/%s/event/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
break;
......@@ -104,12 +113,14 @@ static char* format_topic(struct mqtt* mqtt, enum server_type type, enum topic_d
char* mqtt_topic(struct mqtt* mqtt, enum topic_depth depth, char* str, size_t size, const char* fmt, ...)
{
va_list argptr;
char sbuf[1024];
char sbuf[1024]="";
va_start(argptr, fmt);
vsnprintf(sbuf, sizeof(sbuf), fmt, argptr);
sbuf[sizeof(sbuf) - 1]=0;
va_end(argptr);
if(fmt != NULL) {
va_start(argptr, fmt);
vsnprintf(sbuf, sizeof(sbuf), fmt, argptr);
sbuf[sizeof(sbuf) - 1]=0;
va_end(argptr);
}
return format_topic(mqtt, mqtt->startup->type, depth, str, size, sbuf);
}
......@@ -350,7 +361,7 @@ int mqtt_connect(struct mqtt* mqtt, const char* bind_address)
char value[128];
server_state_str(value, sizeof(value), SERVER_DISCONNECTED);
mosquitto_will_set(mqtt->handle
,mqtt_topic(mqtt, TOPIC_SERVER, topic, sizeof(topic), "state")
,mqtt_topic(mqtt, TOPIC_SERVER_LEVEL, topic, sizeof(topic), NULL)
,strlen(value), value, /* QOS: */2, /* retain: */true);
if(mqtt->cfg->mqtt.tls.mode == MQTT_TLS_CERT) {
char* certfile = NULL;
......@@ -446,7 +457,7 @@ static void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const st
if(mqtt->startup->type == SERVER_TERM) {
bbs_startup_t* bbs_startup = (bbs_startup_t*)mqtt->startup;
for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) {
mqtt_topic(mqtt, TOPIC_BBS, topic, sizeof(topic), "node%d/input", i);
mqtt_topic(mqtt, TOPIC_BBS, topic, sizeof(topic), "nodes/%d/input", i);
if(strcmp(msg->topic, topic) != 0)
continue;
if(bbs_startup->node_inbuf != NULL && bbs_startup->node_inbuf[i - 1] != NULL)
......@@ -500,6 +511,8 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
}
}
mqtt_server_state(mqtt, SERVER_INIT);
mqtt_pub_strval(mqtt, TOPIC_BBS_LEVEL, NULL, mqtt->cfg->sys_name);
mqtt_pub_strval(mqtt, TOPIC_HOST_LEVEL, NULL, startup->host_name);
mqtt_pub_strval(mqtt, TOPIC_SERVER, "version", version);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "served", mqtt->served);
......@@ -510,7 +523,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
bbs_startup_t* bbs_startup = (bbs_startup_t*)startup;
char str[128];
for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) {
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node%d/input", i);
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "nodes/%d/input", i);
}
}
mqtt_pub_noval(mqtt, TOPIC_SERVER, "recycle");
......@@ -523,8 +536,33 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
int mqtt_server_state(struct mqtt* mqtt, enum server_state state)
{
char str[128];
server_state_str(str, sizeof(str), state);
return mqtt_pub_strval(mqtt, TOPIC_SERVER, "state", str);
if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE;
if(mqtt->cfg->mqtt.verbose) {
char tmp[256];
char errors[64] = "";
if(mqtt->error_count)
snprintf(errors, sizeof(errors), "%lu errors", mqtt->error_count);
char served[64] = "";
if(mqtt->served)
snprintf(served, sizeof(served), "%lu served", mqtt->served);
char max_clients[64] = "";
if(mqtt->max_clients)
snprintf(max_clients, sizeof(max_clients), "/%u", mqtt->max_clients);
char clients[64] = "";
if(mqtt->client_list.count)
snprintf(clients, sizeof(clients), "%u%s clients", mqtt->client_list.count, max_clients);
snprintf(str, sizeof(str), "%s\t%s\t%s\t%s"
,server_state_str(tmp, sizeof(tmp), state)
,clients
,served
,errors);
} else
server_state_str(str, sizeof(str), state);
mqtt->server_state = state;
return mqtt_pub_strval(mqtt, TOPIC_SERVER_LEVEL, NULL, str);
}
int mqtt_errormsg(struct mqtt* mqtt, int level, const char* msg)
......@@ -533,11 +571,18 @@ int mqtt_errormsg(struct mqtt* mqtt, int level, const char* msg)
return MQTT_FAILURE;
++mqtt->error_count;
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "error_count", mqtt->error_count);
if(mqtt->cfg->mqtt.verbose)
mqtt_server_state(mqtt, mqtt->server_state);
return mqtt_pub_strval(mqtt, TOPIC_BBS, "error", msg);
}
int mqtt_client_max(struct mqtt* mqtt, ulong count)
{
if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE;
mqtt->max_clients = count;
if(mqtt->cfg->mqtt.verbose)
mqtt_server_state(mqtt, mqtt->server_state);
return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "max_clients", count);
}
......@@ -581,11 +626,20 @@ int mqtt_client_on(struct mqtt* mqtt, BOOL on, int sock, client_t* client, BOOL
strListJoin(list, buf, sizeof(buf), "\n");
strListFree(&list);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "client_count", mqtt->client_list.count);
mqtt_client_count(mqtt);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "served", mqtt->served);
return mqtt_pub_strval(mqtt, TOPIC_SERVER, "client_list", buf);
}
int mqtt_client_count(struct mqtt* mqtt)
{
if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE;
if(mqtt->cfg->mqtt.verbose)
mqtt_server_state(mqtt, mqtt->server_state);
return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "client_count", mqtt->client_list.count);
}
void mqtt_shutdown(struct mqtt* mqtt)
{
if(mqtt != NULL && mqtt->cfg != NULL && mqtt->cfg->mqtt.enabled) {
......
......@@ -43,19 +43,24 @@ struct mqtt {
mqtt_handle_t handle;
scfg_t* cfg;
char* host;
ulong max_clients;
ulong error_count;
ulong served;
link_list_t client_list;
struct startup* startup;
enum server_state server_state;
};
enum topic_depth {
TOPIC_OTHER,
TOPIC_ROOT, // sbbs/*
TOPIC_BBS, // sbbs/BBS-ID/*
TOPIC_HOST, // sbbs/BBS-ID/hostname/*
TOPIC_EVENT, // sbbs/BBS-ID/event/*
TOPIC_SERVER // sbbs/BBS-ID/server/*
TOPIC_ROOT, // sbbs/*
TOPIC_BBS, // sbbs/BBS-ID/*
TOPIC_BBS_LEVEL, // sbbs/BBS-ID
TOPIC_HOST, // sbbs/BBS-ID/hostname/*
TOPIC_HOST_LEVEL, // sbbs/BBS-DI/hostname
TOPIC_EVENT, // sbbs/BBS-ID/event/*
TOPIC_SERVER, // sbbs/BBS-ID/server/*
TOPIC_SERVER_LEVEL, // sbbs/BBS-ID/server
};
#define MQTT_SUCCESS 0 // Same as MOSQ_ERR_SUCCESS
......@@ -89,6 +94,7 @@ DLLEXPORT int mqtt_thread_start(struct mqtt*);
DLLEXPORT int mqtt_thread_stop(struct mqtt*);
DLLEXPORT int mqtt_client_on(struct mqtt*, BOOL on, int sock, client_t* client, BOOL update);
DLLEXPORT int mqtt_client_max(struct mqtt*, ulong count);
DLLEXPORT int mqtt_client_count(struct mqtt*);
#ifdef __cplusplus
}
......
......@@ -102,10 +102,10 @@ int sbbs_t::putnodedat(uint number, node_t* node)
,node->extaux
,node->errors
);
SAFEPRINTF(topic, "node%u/status", number + 1);
SAFEPRINTF(topic, "nodes/%u/status", number + 1);
int result = mqtt_pub_strval(mqtt, TOPIC_BBS, topic, str);
if(result == MQTT_SUCCESS) {
SAFEPRINTF(topic, "node%u", number + 1);
if(result == MQTT_SUCCESS && cfg.mqtt.verbose) {
SAFEPRINTF(topic, "nodes/%u", number + 1);
result = mqtt_pub_strval(mqtt, TOPIC_BBS, topic
,nodestatus(&cfg, node, str, sizeof(str), number + 1));
}
......
......@@ -169,6 +169,7 @@ void mqtt_cfg()
sprintf(opt[i++], "%-20s%s", "Password", cfg.mqtt.password);
sprintf(opt[i++], "%-20s%u seconds", "Keep-alive", cfg.mqtt.keepalive);
sprintf(opt[i++], "%-20s%s", "Protocol Version", mqttVersion[cfg.mqtt.protocol_version - 3]);
sprintf(opt[i++], "%-20s%s", "Publish Verbosity", cfg.mqtt.verbose ? "High" : "Low");
sprintf(opt[i++], "%-20s%s", "Publish QOS", mqttQOS[cfg.mqtt.publish_qos]);
sprintf(opt[i++], "%-20s%s", "Subscribe QOS", mqttQOS[cfg.mqtt.subscribe_qos]);
sprintf(opt[i++], "%-20s%s", "Log Level", logLevelStringList[cfg.mqtt.log_level]);
......@@ -265,16 +266,19 @@ void mqtt_cfg()
cfg.mqtt.protocol_version = 3 + i;
break;
case 7:
cfg.mqtt.verbose = !cfg.mqtt.verbose;
break;
case 8:
i = cfg.mqtt.publish_qos;
if((i = uifc.list(WIN_MID|WIN_SAV, 0, 0, 0, &i, 0, "Quality of Service for Publishing", mqttQOS)) >= 0)
cfg.mqtt.publish_qos = i;
break;
case 8:
case 9:
i = cfg.mqtt.subscribe_qos;
if((i = uifc.list(WIN_MID|WIN_SAV, 0, 0, 0, &i, 0, "Quality of Service for Subscriptions", mqttQOS)) >= 0)
cfg.mqtt.subscribe_qos = i;
break;
case 9:
case 10:
uifc.helpbuf =
"~ MQTT Log Level ~\n"
"\n"
......@@ -287,7 +291,7 @@ void mqtt_cfg()
if(i>=0 && i<=LOG_DEBUG)
cfg.mqtt.log_level=i;
break;
case 10:
case 11:
uifc.helpbuf =
"~ Encryption via TLS ~\n"
"\n"
......@@ -303,7 +307,7 @@ void mqtt_cfg()
if(i >= 0)
cfg.mqtt.tls.mode = i;
break;
case 11:
case 12:
if(cfg.mqtt.tls.mode == MQTT_TLS_CERT) {
uifc.helpbuf =
"~ CA Certificate File ~\n"
......@@ -323,7 +327,7 @@ void mqtt_cfg()
,cfg.mqtt.tls.psk, sizeof(cfg.mqtt.tls.psk) - 1, K_EDIT);
}
break;
case 12:
case 13:
if(cfg.mqtt.tls.mode == MQTT_TLS_CERT) {
uifc.helpbuf =
"~ Client Certificate File ~\n"
......@@ -347,7 +351,7 @@ void mqtt_cfg()
,cfg.mqtt.tls.identity, sizeof(cfg.mqtt.tls.identity) - 1, K_EDIT);
}
break;
case 13:
case 14:
uifc.helpbuf =
"~ Private Key File ~\n"
"\n"
......@@ -359,7 +363,7 @@ void mqtt_cfg()
uifc.input(WIN_MID|WIN_SAV, 0, 0, "Private Key File"
,cfg.mqtt.tls.keyfile, sizeof(cfg.mqtt.tls.keyfile) - 1, K_EDIT);
break;
case 14:
case 15:
uifc.helpbuf =
"~ Private Key File Password ~\n"
"\n"
......
......@@ -365,6 +365,7 @@ typedef struct {
struct mqtt_cfg {
BOOL enabled;
BOOL verbose;
char broker_addr[128];
uint16_t broker_port;
char username[256];
......
......@@ -205,6 +205,7 @@ BOOL read_main_cfg(scfg_t* cfg, char* error, size_t maxerrlen)
/*****************/
section = iniGetParsedSection(sections, "mqtt", /* cut: */TRUE);
cfg->mqtt.enabled = iniGetBool(section, NULL, "enabled", FALSE);
cfg->mqtt.verbose = iniGetBool(section, NULL, "verbose", TRUE);
SAFECOPY(cfg->mqtt.username, iniGetString(section, NULL, "username", "", value));
SAFECOPY(cfg->mqtt.password, iniGetString(section, NULL, "password", "", value));
SAFECOPY(cfg->mqtt.broker_addr, iniGetString(section, NULL, "broker_addr", "127.0.0.1", value));
......
......@@ -211,6 +211,7 @@ BOOL write_main_cfg(scfg_t* cfg, int backup_level)
{
const char* name = "MQTT";
iniSetBool(&ini, name, "Enabled", cfg->mqtt.enabled, NULL);
iniSetBool(&ini, name, "Verbose", cfg->mqtt.verbose, NULL);
iniSetString(&ini, name, "Broker_addr", cfg->mqtt.broker_addr, NULL);
iniSetUInt16(&ini, name, "Broker_port", cfg->mqtt.broker_port, NULL);
iniSetInteger(&ini, name, "Protocol_version", cfg->mqtt.protocol_version, NULL);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment