diff --git a/src/sbbs3/con_out.cpp b/src/sbbs3/con_out.cpp index 5853507e10781302e007e92de108241a104dc6a5..2bd08a6fdc37b5fa24f8453190a3b99f9eb448a1 100644 --- a/src/sbbs3/con_out.cpp +++ b/src/sbbs3/con_out.cpp @@ -573,7 +573,7 @@ bool sbbs_t::update_nodeterm(void) if(cfg.mqtt.enabled) { char str[256]; char topic[128]; - SAFEPRINTF(topic, "node%u/terminal", cfg.node_num); + SAFEPRINTF(topic, "nodes/%u/terminal", cfg.node_num); snprintf(str, sizeof(str), "%lu\t%lu\t%s\t%s\t%s\t%lx\t%lx\t%lx" ,cols ,rows diff --git a/src/sbbs3/main.cpp b/src/sbbs3/main.cpp index c990c1dc9e6611caac2dd3dc1855057ef8957519..9802284ac0ce4e4ac65f8b3302569183e5e85522 100644 --- a/src/sbbs3/main.cpp +++ b/src/sbbs3/main.cpp @@ -2292,7 +2292,7 @@ void output_thread(void* arg) if(sbbs->cfg.node_num) { SAFEPRINTF(node,"Node %d",sbbs->cfg.node_num); - SAFEPRINTF(spy_topic, "node%d/output", sbbs->cfg.node_num); + SAFEPRINTF(spy_topic, "nodes/%d/output", sbbs->cfg.node_num); } else SAFECOPY(node,sbbs->client_name); #ifdef _DEBUG @@ -4413,7 +4413,7 @@ void node_thread(void* arg) long tused = (long)(now - sbbs->logontime); if(tused < 0) tused = 0; - SAFEPRINTF(topic, "node%u/laston", sbbs->cfg.node_num); + SAFEPRINTF(topic, "nodes/%u/laston", sbbs->cfg.node_num); snprintf(str, sizeof(str), "%u\t%s\t%s", sbbs->useron.number, sbbs->useron.alias, sectostr(tused, tmp)); mqtt_pub_strval(&mqtt, TOPIC_BBS, topic, str); } @@ -4993,7 +4993,7 @@ void bbs_thread(void* arg) startup->last_node=scfg.sys_nodes; } - mqtt_pub_uintval(&mqtt, TOPIC_BBS, "node_count", scfg.sys_nodes); + mqtt_pub_uintval(&mqtt, TOPIC_BBS, "nodes", scfg.sys_nodes); /* Create missing directories */ lprintf(LOG_INFO,"Verifying/creating data directories"); diff --git a/src/sbbs3/mqtt.c b/src/sbbs3/mqtt.c index 785fdd172edd172f9f274851cf9eadf8de980701..86d54b37ffd8349a680a2fcca6571c638425cdcf 100644 --- a/src/sbbs3/mqtt.c +++ b/src/sbbs3/mqtt.c @@ -84,12 +84,21 @@ static char* format_topic(struct mqtt* mqtt, enum server_type type, enum topic_d case TOPIC_BBS: safe_snprintf(str, size, "sbbs/%s/%s", mqtt->cfg->sys_id, sbuf); break; + case TOPIC_BBS_LEVEL: + safe_snprintf(str, size, "sbbs/%s", mqtt->cfg->sys_id); + break; case TOPIC_HOST: safe_snprintf(str, size, "sbbs/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, sbuf); break; + case TOPIC_HOST_LEVEL: + safe_snprintf(str, size, "sbbs/%s/%s", mqtt->cfg->sys_id, mqtt->host); + break; case TOPIC_SERVER: safe_snprintf(str, size, "sbbs/%s/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(type), sbuf); break; + case TOPIC_SERVER_LEVEL: + safe_snprintf(str, size, "sbbs/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(type)); + break; case TOPIC_EVENT: safe_snprintf(str, size, "sbbs/%s/%s/event/%s", mqtt->cfg->sys_id, mqtt->host, sbuf); break; @@ -104,12 +113,14 @@ static char* format_topic(struct mqtt* mqtt, enum server_type type, enum topic_d char* mqtt_topic(struct mqtt* mqtt, enum topic_depth depth, char* str, size_t size, const char* fmt, ...) { va_list argptr; - char sbuf[1024]; + char sbuf[1024]=""; - va_start(argptr, fmt); - vsnprintf(sbuf, sizeof(sbuf), fmt, argptr); - sbuf[sizeof(sbuf) - 1]=0; - va_end(argptr); + if(fmt != NULL) { + va_start(argptr, fmt); + vsnprintf(sbuf, sizeof(sbuf), fmt, argptr); + sbuf[sizeof(sbuf) - 1]=0; + va_end(argptr); + } return format_topic(mqtt, mqtt->startup->type, depth, str, size, sbuf); } @@ -350,7 +361,7 @@ int mqtt_connect(struct mqtt* mqtt, const char* bind_address) char value[128]; server_state_str(value, sizeof(value), SERVER_DISCONNECTED); mosquitto_will_set(mqtt->handle - ,mqtt_topic(mqtt, TOPIC_SERVER, topic, sizeof(topic), "state") + ,mqtt_topic(mqtt, TOPIC_SERVER_LEVEL, topic, sizeof(topic), NULL) ,strlen(value), value, /* QOS: */2, /* retain: */true); if(mqtt->cfg->mqtt.tls.mode == MQTT_TLS_CERT) { char* certfile = NULL; @@ -446,7 +457,7 @@ static void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const st if(mqtt->startup->type == SERVER_TERM) { bbs_startup_t* bbs_startup = (bbs_startup_t*)mqtt->startup; for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) { - mqtt_topic(mqtt, TOPIC_BBS, topic, sizeof(topic), "node%d/input", i); + mqtt_topic(mqtt, TOPIC_BBS, topic, sizeof(topic), "nodes/%d/input", i); if(strcmp(msg->topic, topic) != 0) continue; if(bbs_startup->node_inbuf != NULL && bbs_startup->node_inbuf[i - 1] != NULL) @@ -500,6 +511,8 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const } } mqtt_server_state(mqtt, SERVER_INIT); + mqtt_pub_strval(mqtt, TOPIC_BBS_LEVEL, NULL, mqtt->cfg->sys_name); + mqtt_pub_strval(mqtt, TOPIC_HOST_LEVEL, NULL, startup->host_name); mqtt_pub_strval(mqtt, TOPIC_SERVER, "version", version); mqtt_pub_uintval(mqtt, TOPIC_SERVER, "served", mqtt->served); @@ -510,7 +523,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const bbs_startup_t* bbs_startup = (bbs_startup_t*)startup; char str[128]; for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) { - mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node%d/input", i); + mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "nodes/%d/input", i); } } mqtt_pub_noval(mqtt, TOPIC_SERVER, "recycle"); @@ -523,8 +536,33 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const int mqtt_server_state(struct mqtt* mqtt, enum server_state state) { char str[128]; - server_state_str(str, sizeof(str), state); - return mqtt_pub_strval(mqtt, TOPIC_SERVER, "state", str); + + if(mqtt == NULL || mqtt->cfg == NULL) + return MQTT_FAILURE; + + if(mqtt->cfg->mqtt.verbose) { + char tmp[256]; + char errors[64] = ""; + if(mqtt->error_count) + snprintf(errors, sizeof(errors), "%lu errors", mqtt->error_count); + char served[64] = ""; + if(mqtt->served) + snprintf(served, sizeof(served), "%lu served", mqtt->served); + char max_clients[64] = ""; + if(mqtt->max_clients) + snprintf(max_clients, sizeof(max_clients), "/%u", mqtt->max_clients); + char clients[64] = ""; + if(mqtt->client_list.count) + snprintf(clients, sizeof(clients), "%u%s clients", mqtt->client_list.count, max_clients); + snprintf(str, sizeof(str), "%s\t%s\t%s\t%s" + ,server_state_str(tmp, sizeof(tmp), state) + ,clients + ,served + ,errors); + } else + server_state_str(str, sizeof(str), state); + mqtt->server_state = state; + return mqtt_pub_strval(mqtt, TOPIC_SERVER_LEVEL, NULL, str); } int mqtt_errormsg(struct mqtt* mqtt, int level, const char* msg) @@ -533,11 +571,18 @@ int mqtt_errormsg(struct mqtt* mqtt, int level, const char* msg) return MQTT_FAILURE; ++mqtt->error_count; mqtt_pub_uintval(mqtt, TOPIC_SERVER, "error_count", mqtt->error_count); + if(mqtt->cfg->mqtt.verbose) + mqtt_server_state(mqtt, mqtt->server_state); return mqtt_pub_strval(mqtt, TOPIC_BBS, "error", msg); } int mqtt_client_max(struct mqtt* mqtt, ulong count) { + if(mqtt == NULL || mqtt->cfg == NULL) + return MQTT_FAILURE; + mqtt->max_clients = count; + if(mqtt->cfg->mqtt.verbose) + mqtt_server_state(mqtt, mqtt->server_state); return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "max_clients", count); } @@ -581,11 +626,20 @@ int mqtt_client_on(struct mqtt* mqtt, BOOL on, int sock, client_t* client, BOOL strListJoin(list, buf, sizeof(buf), "\n"); strListFree(&list); - mqtt_pub_uintval(mqtt, TOPIC_SERVER, "client_count", mqtt->client_list.count); + mqtt_client_count(mqtt); mqtt_pub_uintval(mqtt, TOPIC_SERVER, "served", mqtt->served); return mqtt_pub_strval(mqtt, TOPIC_SERVER, "client_list", buf); } +int mqtt_client_count(struct mqtt* mqtt) +{ + if(mqtt == NULL || mqtt->cfg == NULL) + return MQTT_FAILURE; + if(mqtt->cfg->mqtt.verbose) + mqtt_server_state(mqtt, mqtt->server_state); + return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "client_count", mqtt->client_list.count); +} + void mqtt_shutdown(struct mqtt* mqtt) { if(mqtt != NULL && mqtt->cfg != NULL && mqtt->cfg->mqtt.enabled) { diff --git a/src/sbbs3/mqtt.h b/src/sbbs3/mqtt.h index a1f67e5a0ba84fe7cb40941c5d7d7ea41af2f222..1bfd57ecc65fda6186408bed5679eadce13d6617 100644 --- a/src/sbbs3/mqtt.h +++ b/src/sbbs3/mqtt.h @@ -43,19 +43,24 @@ struct mqtt { mqtt_handle_t handle; scfg_t* cfg; char* host; + ulong max_clients; ulong error_count; ulong served; link_list_t client_list; struct startup* startup; + enum server_state server_state; }; enum topic_depth { TOPIC_OTHER, - TOPIC_ROOT, // sbbs/* - TOPIC_BBS, // sbbs/BBS-ID/* - TOPIC_HOST, // sbbs/BBS-ID/hostname/* - TOPIC_EVENT, // sbbs/BBS-ID/event/* - TOPIC_SERVER // sbbs/BBS-ID/server/* + TOPIC_ROOT, // sbbs/* + TOPIC_BBS, // sbbs/BBS-ID/* + TOPIC_BBS_LEVEL, // sbbs/BBS-ID + TOPIC_HOST, // sbbs/BBS-ID/hostname/* + TOPIC_HOST_LEVEL, // sbbs/BBS-DI/hostname + TOPIC_EVENT, // sbbs/BBS-ID/event/* + TOPIC_SERVER, // sbbs/BBS-ID/server/* + TOPIC_SERVER_LEVEL, // sbbs/BBS-ID/server }; #define MQTT_SUCCESS 0 // Same as MOSQ_ERR_SUCCESS @@ -89,6 +94,7 @@ DLLEXPORT int mqtt_thread_start(struct mqtt*); DLLEXPORT int mqtt_thread_stop(struct mqtt*); DLLEXPORT int mqtt_client_on(struct mqtt*, BOOL on, int sock, client_t* client, BOOL update); DLLEXPORT int mqtt_client_max(struct mqtt*, ulong count); +DLLEXPORT int mqtt_client_count(struct mqtt*); #ifdef __cplusplus } diff --git a/src/sbbs3/putnode.cpp b/src/sbbs3/putnode.cpp index 835cd4173b96a6f61a6589879809f3cb588d75f9..ac175b377ce9d2cdce982c0ce280d47efe49e7f6 100644 --- a/src/sbbs3/putnode.cpp +++ b/src/sbbs3/putnode.cpp @@ -102,10 +102,10 @@ int sbbs_t::putnodedat(uint number, node_t* node) ,node->extaux ,node->errors ); - SAFEPRINTF(topic, "node%u/status", number + 1); + SAFEPRINTF(topic, "nodes/%u/status", number + 1); int result = mqtt_pub_strval(mqtt, TOPIC_BBS, topic, str); - if(result == MQTT_SUCCESS) { - SAFEPRINTF(topic, "node%u", number + 1); + if(result == MQTT_SUCCESS && cfg.mqtt.verbose) { + SAFEPRINTF(topic, "nodes/%u", number + 1); result = mqtt_pub_strval(mqtt, TOPIC_BBS, topic ,nodestatus(&cfg, node, str, sizeof(str), number + 1)); } diff --git a/src/sbbs3/scfg/scfgnet.c b/src/sbbs3/scfg/scfgnet.c index b6afa8a7903f0a6ecc4e5bfb9300a7bf0abe5913..b80bd6cfee6ef6c623b6b0709c870d3cdcdf0c98 100644 --- a/src/sbbs3/scfg/scfgnet.c +++ b/src/sbbs3/scfg/scfgnet.c @@ -169,6 +169,7 @@ void mqtt_cfg() sprintf(opt[i++], "%-20s%s", "Password", cfg.mqtt.password); sprintf(opt[i++], "%-20s%u seconds", "Keep-alive", cfg.mqtt.keepalive); sprintf(opt[i++], "%-20s%s", "Protocol Version", mqttVersion[cfg.mqtt.protocol_version - 3]); + sprintf(opt[i++], "%-20s%s", "Publish Verbosity", cfg.mqtt.verbose ? "High" : "Low"); sprintf(opt[i++], "%-20s%s", "Publish QOS", mqttQOS[cfg.mqtt.publish_qos]); sprintf(opt[i++], "%-20s%s", "Subscribe QOS", mqttQOS[cfg.mqtt.subscribe_qos]); sprintf(opt[i++], "%-20s%s", "Log Level", logLevelStringList[cfg.mqtt.log_level]); @@ -265,16 +266,19 @@ void mqtt_cfg() cfg.mqtt.protocol_version = 3 + i; break; case 7: + cfg.mqtt.verbose = !cfg.mqtt.verbose; + break; + case 8: i = cfg.mqtt.publish_qos; if((i = uifc.list(WIN_MID|WIN_SAV, 0, 0, 0, &i, 0, "Quality of Service for Publishing", mqttQOS)) >= 0) cfg.mqtt.publish_qos = i; break; - case 8: + case 9: i = cfg.mqtt.subscribe_qos; if((i = uifc.list(WIN_MID|WIN_SAV, 0, 0, 0, &i, 0, "Quality of Service for Subscriptions", mqttQOS)) >= 0) cfg.mqtt.subscribe_qos = i; break; - case 9: + case 10: uifc.helpbuf = "~ MQTT Log Level ~\n" "\n" @@ -287,7 +291,7 @@ void mqtt_cfg() if(i>=0 && i<=LOG_DEBUG) cfg.mqtt.log_level=i; break; - case 10: + case 11: uifc.helpbuf = "~ Encryption via TLS ~\n" "\n" @@ -303,7 +307,7 @@ void mqtt_cfg() if(i >= 0) cfg.mqtt.tls.mode = i; break; - case 11: + case 12: if(cfg.mqtt.tls.mode == MQTT_TLS_CERT) { uifc.helpbuf = "~ CA Certificate File ~\n" @@ -323,7 +327,7 @@ void mqtt_cfg() ,cfg.mqtt.tls.psk, sizeof(cfg.mqtt.tls.psk) - 1, K_EDIT); } break; - case 12: + case 13: if(cfg.mqtt.tls.mode == MQTT_TLS_CERT) { uifc.helpbuf = "~ Client Certificate File ~\n" @@ -347,7 +351,7 @@ void mqtt_cfg() ,cfg.mqtt.tls.identity, sizeof(cfg.mqtt.tls.identity) - 1, K_EDIT); } break; - case 13: + case 14: uifc.helpbuf = "~ Private Key File ~\n" "\n" @@ -359,7 +363,7 @@ void mqtt_cfg() uifc.input(WIN_MID|WIN_SAV, 0, 0, "Private Key File" ,cfg.mqtt.tls.keyfile, sizeof(cfg.mqtt.tls.keyfile) - 1, K_EDIT); break; - case 14: + case 15: uifc.helpbuf = "~ Private Key File Password ~\n" "\n" diff --git a/src/sbbs3/scfgdefs.h b/src/sbbs3/scfgdefs.h index dbc37f58025cafac7b9e0fa7c31ab640b2fd3b76..37aa071d8caca9a298e33d46725149ccb9039ebd 100644 --- a/src/sbbs3/scfgdefs.h +++ b/src/sbbs3/scfgdefs.h @@ -365,6 +365,7 @@ typedef struct { struct mqtt_cfg { BOOL enabled; + BOOL verbose; char broker_addr[128]; uint16_t broker_port; char username[256]; diff --git a/src/sbbs3/scfglib1.c b/src/sbbs3/scfglib1.c index 4884d0ec03de275cc634f0286d7d06be3ca3c30a..5e843bd285809f2643263ba0be78580250d15ac8 100644 --- a/src/sbbs3/scfglib1.c +++ b/src/sbbs3/scfglib1.c @@ -205,6 +205,7 @@ BOOL read_main_cfg(scfg_t* cfg, char* error, size_t maxerrlen) /*****************/ section = iniGetParsedSection(sections, "mqtt", /* cut: */TRUE); cfg->mqtt.enabled = iniGetBool(section, NULL, "enabled", FALSE); + cfg->mqtt.verbose = iniGetBool(section, NULL, "verbose", TRUE); SAFECOPY(cfg->mqtt.username, iniGetString(section, NULL, "username", "", value)); SAFECOPY(cfg->mqtt.password, iniGetString(section, NULL, "password", "", value)); SAFECOPY(cfg->mqtt.broker_addr, iniGetString(section, NULL, "broker_addr", "127.0.0.1", value)); diff --git a/src/sbbs3/scfgsave.c b/src/sbbs3/scfgsave.c index 8e878bc3a26edada291b1d3103df9b87869eaca1..045b4b0f87aeb67db14205c55814b00903447c28 100644 --- a/src/sbbs3/scfgsave.c +++ b/src/sbbs3/scfgsave.c @@ -211,6 +211,7 @@ BOOL write_main_cfg(scfg_t* cfg, int backup_level) { const char* name = "MQTT"; iniSetBool(&ini, name, "Enabled", cfg->mqtt.enabled, NULL); + iniSetBool(&ini, name, "Verbose", cfg->mqtt.verbose, NULL); iniSetString(&ini, name, "Broker_addr", cfg->mqtt.broker_addr, NULL); iniSetUInt16(&ini, name, "Broker_port", cfg->mqtt.broker_port, NULL); iniSetInteger(&ini, name, "Protocol_version", cfg->mqtt.protocol_version, NULL);