From e94281e6cd8efcf69f7dafeb8a3e128aa503e535 Mon Sep 17 00:00:00 2001
From: Rob Swindell <rob@synchro.net>
Date: Sun, 29 Jan 2023 20:19:10 -0800
Subject: [PATCH] The 3rd great MQTT data scheme update (sorry Nelgin)

- Most published messages (besides log entries) have a timestamp (in ISO8601 format) prepended and tab-separated
- The order and number of elements in client messages (list and activities) has been updated, now includes user number
- Server client lists are now published to .../SERVER/client/list
- Server client activities (connect, disconnect, update) are now published to .../SERVER/client/action/#
- Server client count is now published to .../SERVER/client (with the maximum client count, if applicable)
- Server states are now just represented by name (e.g. initializing, ready, stopping, stopped) and not number
- BBS errors are logged to sbbs/BBS/action/error/LEVEL (where LEVEL is the log level name, e.g. "critical" or "error')
- All server hack-attempts, SPAM attempts, logins, logouts, uploads, downloads, are published to sbbs/BBS/action/ACTION/*
- Chat pages are published to sbbs/BBS/action/page/node/#
- New users (on the terminal server) are published to sbbs/BBS/action/newuser
- Posted messages and executed external programs (on the terminal server) are published to sbbs/BBS/action/ACTION/CODE topic
- The event thread started/stopped status is published to .../SERVER/event

Yeah, the wiki will get updated soon to reflect/document all these changes
---
 src/sbbs3/chat.cpp     |   4 +
 src/sbbs3/download.cpp |   1 +
 src/sbbs3/ftpsrvr.c    |  10 +-
 src/sbbs3/logfile.cpp  |  20 ++--
 src/sbbs3/logon.cpp    |   2 +
 src/sbbs3/logout.cpp   |   2 +
 src/sbbs3/mailsrvr.c   |   3 +
 src/sbbs3/main.cpp     |  20 +---
 src/sbbs3/mqtt.c       | 213 ++++++++++++++++++++++++++++++++---------
 src/sbbs3/mqtt.h       |  16 +++-
 src/sbbs3/newuser.cpp  |   7 ++
 src/sbbs3/postmsg.cpp  |  10 +-
 src/sbbs3/sbbs.h       |  16 ++--
 src/sbbs3/server.h     |   3 +-
 src/sbbs3/services.c   |   4 +
 src/sbbs3/upload.cpp   |   2 +-
 src/sbbs3/websrvr.c    |   7 +-
 src/sbbs3/xtrn_sec.cpp |  12 ++-
 18 files changed, 260 insertions(+), 92 deletions(-)

diff --git a/src/sbbs3/chat.cpp b/src/sbbs3/chat.cpp
index ee0f3d3147..f2c1342d7f 100644
--- a/src/sbbs3/chat.cpp
+++ b/src/sbbs3/chat.cpp
@@ -697,6 +697,10 @@ bool sbbs_t::sysop_page(void)
 			sprintf(str, "%s paged you to chat", useron.alias);
 			notify(&cfg, 1, str, NULL);
 			ftouch(syspage_semfile);
+			char topic[128];
+			SAFEPRINTF(topic, "page/node/%u", cfg.node_num);
+			snprintf(str, sizeof(str), "%u\t%s", useron.number, useron.alias);
+			mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, time(NULL), str);
 		}
 		for(i=0;i<cfg.total_pages;i++)
 			if(chk_ar(cfg.page[i]->ar,&useron,&client))
diff --git a/src/sbbs3/download.cpp b/src/sbbs3/download.cpp
index 5cef24c570..e6ca6319ff 100644
--- a/src/sbbs3/download.cpp
+++ b/src/sbbs3/download.cpp
@@ -46,6 +46,7 @@ void sbbs_t::downloadedfile(file_t* f)
 	logline("D-",str);
 
 	user_downloaded_file(&cfg, &useron, &client, f->dir, f->name, length);
+	mqtt_file_download(mqtt, &useron, f, length, &client);
 
 	user_event(EVENT_DOWNLOAD);
 }
diff --git a/src/sbbs3/ftpsrvr.c b/src/sbbs3/ftpsrvr.c
index 9bde1f7f5f..076d457773 100644
--- a/src/sbbs3/ftpsrvr.c
+++ b/src/sbbs3/ftpsrvr.c
@@ -820,13 +820,15 @@ static void send_thread(void* arg)
 								,getfname(xfer.filename)
 								,prefix
 								,username,tmp); 
-						putsmsg(&scfg,uploader.number,str); 
+						putsmsg(&scfg,uploader.number,str);
 					}
 				}
 			}
 			if(!xfer.tmpfile && !xfer.delfile && !(scfg.dir[f.dir]->misc&DIR_NOSTAT))
 				inc_download_stats(&scfg, 1, (ulong)total);
-		}	
+
+			mqtt_file_download(&mqtt, xfer.user, &f, total, xfer.client);
+		}
 
 		if(xfer.credits) {
 			user_downloaded(&scfg, xfer.user, 1, total);
@@ -1104,6 +1106,8 @@ static void receive_thread(void* arg)
 			}
 			if(!(scfg.dir[f.dir]->misc&DIR_NOSTAT))
 				inc_upload_stats(&scfg, 1, (ulong)total);
+
+			mqtt_file_upload(&mqtt, xfer.user, &f, total, xfer.client);
 		}
 		/* Send ACK */
 		sockprintf(xfer.ctrl_sock,sess,"226 Upload complete (%lu cps).",cps);
@@ -2561,6 +2565,7 @@ static void ctrl_thread(void* arg)
 			SAFECOPY(user.ipaddr,host_ip);
 			user.logontime=(time32_t)logintime;
 			putuserdat(&scfg, &user);
+			mqtt_user_login(&mqtt, &client);
 
 #ifdef _WIN32
 			if(startup->sound.login[0] && !sound_muted(&scfg)) 
@@ -4808,6 +4813,7 @@ static void ctrl_thread(void* arg)
 		/* Update User Statistics */
 		if(!logoutuserdat(&scfg, &user, time(NULL), logintime))
 			lprintf(LOG_ERR,"%04d <%s> !ERROR in logoutuserdat", sock, user.alias);
+		mqtt_user_logout(&mqtt, &client, logintime);
 		lprintf(LOG_INFO,"%04d <%s> logged off", sock, user.alias);
 #ifdef _WIN32
 		if(startup->sound.logout[0] && !sound_muted(&scfg)) 
diff --git a/src/sbbs3/logfile.cpp b/src/sbbs3/logfile.cpp
index ad47803192..686e5b8888 100644
--- a/src/sbbs3/logfile.cpp
+++ b/src/sbbs3/logfile.cpp
@@ -32,7 +32,7 @@ extern "C" BOOL hacklog(scfg_t* cfg, struct mqtt* mqtt, const char* prot, const
 	char	fname[MAX_PATH+1];
 	FILE*	fp;
 	char	ip[INET6_ADDRSTRLEN];
-	time32_t now=time32(NULL);
+	time_t	now = time(NULL);
 
 	SAFEPRINTF(fname, "%shack.log", cfg->logs_dir);
 
@@ -43,7 +43,7 @@ extern "C" BOOL hacklog(scfg_t* cfg, struct mqtt* mqtt, const char* prot, const
 	fprintf(fp,"SUSPECTED %s HACK ATTEMPT for user '%s' on %.24s%sUsing port %u at %s [%s]%s"
 		,prot
 		,user
-		,timestr(cfg,now,tstr)
+		,timestr(cfg, (time32_t)now, tstr)
 		,log_line_ending
 		,inet_addrport(addr)
 		,host
@@ -56,11 +56,13 @@ extern "C" BOOL hacklog(scfg_t* cfg, struct mqtt* mqtt, const char* prot, const
 	fcloselog(fp);
 
 	if(mqtt != NULL) {
+		char topic[128];
 		char str[1024];
 		if(text == NULL)
 			text= "";
-		snprintf(str, sizeof(str), "%s\t%s\t%u\t%s\t%s\t%s", prot, user, inet_addrport(addr), host, ip, text);
-		mqtt_pub_strval(mqtt, TOPIC_BBS, "hack", str);
+		snprintf(str, sizeof(str), "%s\t%u\t%s\t%s\t%s", user, inet_addrport(addr), host, ip, text);
+		snprintf(topic, sizeof(topic), "hack/%s", prot);
+		mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, now, str);
 	}
 
 	return true;
@@ -79,7 +81,7 @@ extern "C" BOOL spamlog(scfg_t* cfg, struct mqtt* mqtt, char* prot, char* action
 	char	tstr[64];
 	char	fname[MAX_PATH+1];
 	FILE*	fp;
-	time32_t now=time32(NULL);
+	time_t	now = time(NULL);
 
 	SAFEPRINTF(fname, "%sspam.log", cfg->logs_dir);
 
@@ -97,7 +99,7 @@ extern "C" BOOL spamlog(scfg_t* cfg, struct mqtt* mqtt, char* prot, char* action
 	fprintf(fp, "SUSPECTED %s SPAM %s on %.24s%sHost: %s [%s]%sFrom: %.128s %s%s"
 		,prot
 		,action
-		,timestr(cfg,now,tstr)
+		,timestr(cfg, (time32_t)now, tstr)
 		,log_line_ending
 		,host
 		,ip_addr
@@ -113,10 +115,12 @@ extern "C" BOOL spamlog(scfg_t* cfg, struct mqtt* mqtt, char* prot, char* action
 
 	if(mqtt != NULL) {
 		char str[1024];
+		char topic[128];
 		if(reason == NULL)
 			reason = (char*)"";
-		snprintf(str, sizeof(str), "%s\t%s\t%s\t%s\t%s\t%s\t%s", prot, action, host, ip_addr, from, to_user, reason);
-		mqtt_pub_strval(mqtt, TOPIC_BBS, "spam", str);
+		snprintf(str, sizeof(str), "%s\t%s\t%s\t%s\t%s\t%s", prot, host, ip_addr, from, to_user, reason);
+		snprintf(topic, sizeof(topic), "spam/%s", action);
+		mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, now, str);
 	}
 
 	return true;
diff --git a/src/sbbs3/logon.cpp b/src/sbbs3/logon.cpp
index 834766bca3..bceb41e34e 100644
--- a/src/sbbs3/logon.cpp
+++ b/src/sbbs3/logon.cpp
@@ -431,6 +431,8 @@ bool sbbs_t::logon()
 	getmsgptrs();
 	sys_status|=SS_USERON;          /* moved from further down */
 
+	mqtt_user_login(mqtt, &client);
+
 	if(useron.rest&FLAG('Q')) {
 		safe_snprintf(str, sizeof(str), "(%04u)  %-25s  QWK Network Connection"
 			,useron.number,useron.alias);
diff --git a/src/sbbs3/logout.cpp b/src/sbbs3/logout.cpp
index a64943a34d..fd6dd27e76 100644
--- a/src/sbbs3/logout.cpp
+++ b/src/sbbs3/logout.cpp
@@ -153,6 +153,8 @@ void sbbs_t::logout()
 		PlaySound(startup->sound.logout, NULL, SND_ASYNC|SND_FILENAME);
 #endif
 
+	mqtt_user_logout(mqtt, &client, logontime);
+
 	lprintf(LOG_DEBUG, "logout completed");
 }
 
diff --git a/src/sbbs3/mailsrvr.c b/src/sbbs3/mailsrvr.c
index e9eacf659e..7199c24e5d 100644
--- a/src/sbbs3/mailsrvr.c
+++ b/src/sbbs3/mailsrvr.c
@@ -1366,6 +1366,8 @@ static void pop3_thread(void* arg)
 		if(startup->sound.login[0] && !sound_muted(&scfg)) 
 			PlaySound(startup->sound.login, NULL, SND_ASYNC|SND_FILENAME);
 #endif
+		mqtt_user_login(&mqtt, &client);
+
 		SAFEPRINTF(smb.file,"%smail",scfg.data_dir);
 		if(smb_islocked(&smb)) {
 			lprintf(LOG_WARNING,"%04d %s <%s> !MAIL BASE LOCKED: %s",socket, client.protocol, user.alias, smb.last_error);
@@ -1737,6 +1739,7 @@ static void pop3_thread(void* arg)
 #endif
 			if(!logoutuserdat(&scfg,&user,time(NULL),client.time))
 				lprintf(LOG_ERR,"%04d %s <%s> !ERROR in logoutuserdat", socket, client.protocol, user.alias);
+			mqtt_user_logout(&mqtt, &client, client.time);
 		}
 
 	} while(0);
diff --git a/src/sbbs3/main.cpp b/src/sbbs3/main.cpp
index d243fb8681..5da7673d37 100644
--- a/src/sbbs3/main.cpp
+++ b/src/sbbs3/main.cpp
@@ -231,7 +231,7 @@ int eputs(int level, const char *str)
 	if(*str == 0)
 		return 0;
 
-	mqtt_lputs(&mqtt, TOPIC_EVENT, level, str);
+	mqtt_lputs(&mqtt, TOPIC_HOST_EVENT, level, str);
 
 	if(level <= LOG_ERR) {
 		char errmsg[1024];
@@ -2612,6 +2612,8 @@ void event_thread(void* arg)
 	SetThreadName("sbbs/events");
 	thread_up(TRUE /* setuid */);
 
+	mqtt_pub_strval(&mqtt, TOPIC_HOST, "event", "thread started");
+
 #ifdef JAVASCRIPT
 	if(!(startup->options&BBS_OPT_NO_JAVASCRIPT)) {
 		if((sbbs->js_cx = sbbs->js_init(&sbbs->js_runtime, &sbbs->js_glob, "event")) == NULL) /* This must be done in the context of the events thread */
@@ -3248,11 +3250,12 @@ void event_thread(void* arg)
 
 	sbbs->event_thread_running = false;
 
+	mqtt_pub_strval(&mqtt, TOPIC_HOST, "event", "thread stopped");
+
 	thread_down();
 	sbbs->lprintf(LOG_INFO,"BBS Events thread terminated");
 }
 
-
 //****************************************************************************
 sbbs_t::sbbs_t(ushort node_num, union xp_sockaddr *addr, size_t addr_len, const char* name, SOCKET sd,
 			   scfg_t* global_cfg, char* global_text[], client_t* client_info, bool is_event_thread)
@@ -4159,8 +4162,6 @@ void sbbs_t::reset_logon_vars(void)
     lbuflen=0;
     timeleft_warn=0;
 	keybufbot=keybuftop=0;
-    logon_uls=logon_ulb=logon_dls=logon_dlb=0;
-    logon_posts=logon_emails=logon_fbacks=0;
     usrgrps=usrlibs=0;
     curgrp=curlib=0;
 	for(i=0;i<cfg.total_libs;i++)
@@ -4407,17 +4408,6 @@ void node_thread(void* arg)
 		fclose(fp);
 	}
 
-	if(sbbs->useron.number) {
-		char topic[128];
-		char tmp[32];
-		long tused = (long)(now - sbbs->logontime);
-		if(tused < 0)
-			tused = 0;
-		SAFEPRINTF(topic, "node/%u/laston", sbbs->cfg.node_num);
-		snprintf(str, sizeof(str), "%u\t%s\t%s", sbbs->useron.number, sbbs->useron.alias, sectostr(tused, tmp));
-		mqtt_pub_strval(&mqtt, TOPIC_BBS, topic, str);
-	}
-
 	if(sbbs->sys_status&SS_DAILY) {	// New day, run daily events/maintenance
 		sbbs->daily_maint();
 	}
diff --git a/src/sbbs3/mqtt.c b/src/sbbs3/mqtt.c
index 239155fbc7..a2e46d421e 100644
--- a/src/sbbs3/mqtt.c
+++ b/src/sbbs3/mqtt.c
@@ -24,6 +24,7 @@
 #include "mqtt.h"
 #include "startup.h"
 #include "xpdatetime.h"
+#include "date_str.h"
 
 const char* server_type_desc(enum server_type type)
 {
@@ -47,11 +48,26 @@ const char* server_state_desc(enum server_state state)
 		case SERVER_READY: return "ready";
 		case SERVER_RELOADING: return "reloading";
 		case SERVER_STOPPING: return "stopping";
-		case SERVER_DISCONNECTED: return "disconnected";
 		default: return "???";
 	}
 }
 
+const char* log_level_desc(int level)
+{
+	switch(level) {
+		case LOG_DEBUG: return "debug";
+		case LOG_INFO: return "info";
+		case LOG_NOTICE: return "notice";
+		case LOG_WARNING: return "warn";
+		case LOG_ERR: return "error";
+		default:
+			if(level <= LOG_CRIT)
+				return "critical";
+			break;
+	}
+	return "????";
+}
+
 int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup)
 {
 	char hostname[256]="undefined-hostname";
@@ -82,17 +98,23 @@ static char* format_topic(struct mqtt* mqtt, enum server_type type, enum topic_d
 		case TOPIC_ROOT:
 			safe_snprintf(str, size, "sbbs/%s", sbuf);
 			break;
+		case TOPIC_BBS_LEVEL:
+			safe_snprintf(str, size, "sbbs/%s", mqtt->cfg->sys_id);
+			break;
 		case TOPIC_BBS:
 			safe_snprintf(str, size, "sbbs/%s/%s", mqtt->cfg->sys_id, sbuf);
 			break;
-		case TOPIC_BBS_LEVEL:
-			safe_snprintf(str, size, "sbbs/%s", mqtt->cfg->sys_id);
+		case TOPIC_BBS_ACTION:
+			safe_snprintf(str, size, "sbbs/%s/action/%s", mqtt->cfg->sys_id, sbuf);
+			break;
+		case TOPIC_HOST_LEVEL:
+			safe_snprintf(str, size, "sbbs/%s/host/%s", mqtt->cfg->sys_id, mqtt->host);
 			break;
 		case TOPIC_HOST:
 			safe_snprintf(str, size, "sbbs/%s/host/%s/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
 			break;
-		case TOPIC_HOST_LEVEL:
-			safe_snprintf(str, size, "sbbs/%s/host/%s", mqtt->cfg->sys_id, mqtt->host);
+		case TOPIC_HOST_EVENT:
+			safe_snprintf(str, size, "sbbs/%s/host/%s/event/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
 			break;
 		case TOPIC_SERVER:
 			safe_snprintf(str, size, "sbbs/%s/host/%s/server/%s/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(type), sbuf);
@@ -100,9 +122,6 @@ static char* format_topic(struct mqtt* mqtt, enum server_type type, enum topic_d
 		case TOPIC_SERVER_LEVEL:
 			safe_snprintf(str, size, "sbbs/%s/host/%s/server/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(type));
 			break;
-		case TOPIC_EVENT:
-			safe_snprintf(str, size, "sbbs/%s/host/%s/event/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
-			break;
 		case TOPIC_OTHER:
 		default:
 			safe_snprintf(str, size, "%s", sbuf);
@@ -227,6 +246,20 @@ int mqtt_pub_strval(struct mqtt* mqtt, enum topic_depth depth, const char* key,
 	return MQTT_FAILURE;
 }
 
+int mqtt_pub_timestamped_msg(struct mqtt* mqtt, enum topic_depth depth, const char* key, time_t t, const char* msg)
+{
+	char timestamp[64];
+	char str[1024];
+
+	if(mqtt == NULL || mqtt->cfg == NULL)
+		return MQTT_FAILURE;
+	if(!mqtt->cfg->mqtt.enabled)
+		return MQTT_SUCCESS;
+	time_to_isoDateTimeStr(t, xpTimeZone_local(), timestamp, sizeof(timestamp));
+	SAFEPRINTF2(str, "%s\t%s", timestamp, msg);
+	return mqtt_pub_strval(mqtt, depth, key, str);
+}
+
 int mqtt_pub_uintval(struct mqtt* mqtt, enum topic_depth depth, const char* key, ulong value)
 {
 	if(mqtt == NULL || mqtt->cfg == NULL)
@@ -323,12 +356,6 @@ static int pw_callback(char* buf, int size, int rwflag, void* userdata)
 	return strlen(mqtt->cfg->mqtt.tls.keypass);
 }
 
-static char* server_state_str(char* str, size_t size, enum server_state state)
-{
-	snprintf(str, size, "%u-%s", state, server_state_desc(state));
-	return str;
-}
-
 int mqtt_connect(struct mqtt* mqtt, const char* bind_address)
 {
 	if(mqtt == NULL || mqtt->handle == NULL || mqtt->cfg == NULL)
@@ -345,7 +372,7 @@ int mqtt_connect(struct mqtt* mqtt, const char* bind_address)
 	mosquitto_int_option(mqtt->handle, MOSQ_OPT_PROTOCOL_VERSION, mqtt->cfg->mqtt.protocol_version);
 	mosquitto_username_pw_set(mqtt->handle, username, password);
 	char value[128];
-	server_state_str(value, sizeof(value), SERVER_DISCONNECTED);
+	SAFECOPY(value, "DISCONNECTED");
 	mosquitto_will_set(mqtt->handle
 		,mqtt_topic(mqtt, TOPIC_SERVER_LEVEL, topic, sizeof(topic), NULL)
 		,strlen(value), value, /* QOS: */2, /* retain: */true);
@@ -514,8 +541,7 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
 		}
 	}
 	mqtt_pub_noval(mqtt, TOPIC_SERVER, "recycle");
-	mqtt_pub_noval(mqtt, TOPIC_SERVER, "client_list");
-	mqtt_pub_uintval(mqtt, TOPIC_SERVER, "client_count", 0);
+	mqtt_pub_noval(mqtt, TOPIC_SERVER, "client");
 	mqtt_subscribe(mqtt, TOPIC_SERVER, str, sizeof(str), "recycle");
 	mqtt_subscribe(mqtt, TOPIC_HOST, str, sizeof(str), "recycle");
 
@@ -525,7 +551,6 @@ int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const
 int mqtt_server_state(struct mqtt* mqtt, enum server_state state)
 {
 	char str[128];
-	char tmp[256];
 
 	if(mqtt == NULL || mqtt->cfg == NULL)
 		return MQTT_FAILURE;
@@ -544,34 +569,35 @@ int mqtt_server_state(struct mqtt* mqtt, enum server_state state)
 		if(mqtt->client_list.count)
 			snprintf(clients, sizeof(clients), "%lu%s clients", mqtt->client_list.count, max_clients);
 		snprintf(str, sizeof(str), "%s\t%s\t%s\t%s"
-			,server_state_str(tmp, sizeof(tmp), state)
+			,server_state_desc(state)
 			,clients
 			,served
 			,errors);
 	} else
-		server_state_str(str, sizeof(str), state);
+		SAFECOPY(str, server_state_desc(state));
 	int result = mqtt_pub_strval(mqtt, TOPIC_SERVER_LEVEL, NULL, str);
 	if(mqtt->server_state != state) {
-		mqtt->server_state = state;
 		char topic[128];
-		time_t t = time(NULL);
-		snprintf(topic, sizeof(topic), "state/%s", server_state_str(tmp, sizeof(tmp), state));
-		safe_snprintf(str, sizeof(str), "%" PRIu32 "T%06" PRIu32 "%d"
-			,time_to_isoDate(t), time_to_isoTime(t), xpTimeZone_local());
-		result = mqtt_pub_strval(mqtt, TOPIC_SERVER, topic, str);
+		snprintf(topic, sizeof(topic), "state/%s", server_state_desc(state));
+		result = mqtt_pub_timestamped_msg(mqtt, TOPIC_SERVER, topic, time(NULL), server_state_desc(mqtt->server_state));
+		mqtt->server_state = state;
 	}
 	return result;
 }
 
 int mqtt_errormsg(struct mqtt* mqtt, int level, const char* msg)
 {
+	char topic[128];
+	time_t t = time(NULL);
+
 	if(mqtt == NULL || mqtt->cfg == NULL)
 		return MQTT_FAILURE;
 	++mqtt->error_count;
 	mqtt_pub_uintval(mqtt, TOPIC_SERVER, "error_count", mqtt->error_count);
 	if(mqtt->cfg->mqtt.verbose)
 		mqtt_server_state(mqtt, mqtt->server_state);
-	return mqtt_pub_strval(mqtt, TOPIC_BBS, "error", msg);
+	snprintf(topic, sizeof(topic), "error/%s", log_level_desc(level));
+	return mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, t, msg);
 }
 
 int mqtt_client_max(struct mqtt* mqtt, ulong count)
@@ -579,13 +605,30 @@ int mqtt_client_max(struct mqtt* mqtt, ulong count)
 	if(mqtt == NULL || mqtt->cfg == NULL)
 		return MQTT_FAILURE;
 	mqtt->max_clients = count;
-	if(mqtt->cfg->mqtt.verbose)
-		mqtt_server_state(mqtt, mqtt->server_state);
-	return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "max_clients", count);
+	return mqtt_client_count(mqtt);
+}
+
+static void format_client_info(char* str, size_t size, int sock, client_t* client, time_t t)
+{
+	char timestamp[32];
+	
+	snprintf(str, size, "%s\t%s\t%d\t%s\t%s\t%s\t%hu\t%d"
+		,time_to_isoDateTimeStr(t, xpTimeZone_local(), timestamp, sizeof(timestamp))
+		,client->protocol
+		,client->usernum
+		,client->user
+		,client->addr
+		,client->host
+		,client->port
+		,sock
+		);
 }
 
 int mqtt_client_on(struct mqtt* mqtt, BOOL on, int sock, client_t* client, BOOL update)
 {
+	#define MAX_CLIENT_STRLEN 512
+	char str[MAX_CLIENT_STRLEN + 1];
+
 	if(mqtt == NULL || mqtt->cfg == NULL)
 		return MQTT_FAILURE;
 
@@ -597,31 +640,31 @@ int mqtt_client_on(struct mqtt* mqtt, BOOL on, int sock, client_t* client, BOOL
 		if(update) {
 			list_node_t*	node;
 
-			if((node=listFindTaggedNode(&mqtt->client_list, sock)) != NULL)
+			if((node=listFindTaggedNode(&mqtt->client_list, sock)) != NULL) {
 				memcpy(node->data, client, sizeof(client_t));
+				format_client_info(str, sizeof(str), sock, client, time(NULL));
+				mqtt_pub_strval(mqtt, TOPIC_SERVER, "client/action/update", str);
+			}
 		} else {
 			listAddNodeData(&mqtt->client_list, client, sizeof(client_t), sock, LAST_NODE);
+			format_client_info(str, sizeof(str), sock, client, client->time);
+			mqtt_pub_strval(mqtt, TOPIC_SERVER, "client/action/connect", str);
 		}
 	} else {
-		listRemoveTaggedNode(&mqtt->client_list, sock, /* free_data: */TRUE);
+		client = listRemoveTaggedNode(&mqtt->client_list, sock, /* free_data: */FALSE);
+		if(client != NULL) {
+			format_client_info(str, sizeof(str), sock, client, time(NULL));
+			mqtt_pub_strval(mqtt, TOPIC_SERVER, "client/action/disconnect", str);
+			FREE_AND_NULL(client);
+		}
 		mqtt->served++;
 	}
 
-	#define MAX_CLIENT_STRLEN 512
 	str_list_t list = strListInit();
 	size_t client_count = 0;
 	for(list_node_t* node = mqtt->client_list.first; node != NULL; node = node->next) {
-		char str[MAX_CLIENT_STRLEN + 1];
 		client_t* client = node->data;
-		snprintf(str, sizeof(str), "%ld\t%s\t%s\t%s\t%s\t%u\t%lu"
-			,node->tag
-			,client->protocol
-			,client->user
-			,client->addr
-			,client->host
-			,client->port
-			,(ulong)client->time
-			);
+		format_client_info(str, sizeof(str), node->tag, client, client->time);
 		strListPush(&list, str);
 		client_count++;
 	}
@@ -636,18 +679,72 @@ int mqtt_client_on(struct mqtt* mqtt, BOOL on, int sock, client_t* client, BOOL
 
 	mqtt_client_count(mqtt);
 	mqtt_pub_uintval(mqtt, TOPIC_SERVER, "served", mqtt->served);
-	int result = mqtt_pub_strval(mqtt, TOPIC_SERVER, "client_list", buf);
+	int result = mqtt_pub_strval(mqtt, TOPIC_SERVER, "client/list", buf);
 	free(buf);
 	return result;
 }
 
+static int mqtt_user_login(struct mqtt* mqtt, client_t* client)
+{
+	char str[128];
+	char topic[128];
+
+	if(mqtt == NULL || mqtt->cfg == NULL || client == NULL)
+		return MQTT_FAILURE;
+
+	if(!mqtt->cfg->mqtt.enabled)
+		return MQTT_SUCCESS;
+
+	snprintf(topic, sizeof(topic), "login/%s", client->protocol);
+	snprintf(str, sizeof(str), "%u\t%s\t%s\t%s"
+		,client->usernum
+		,client->user
+		,client->addr
+		,client->host
+		);
+	return mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, time(NULL), str);
+}
+
+static int mqtt_user_logout(struct mqtt* mqtt, client_t* client, time_t logintime)
+{
+	char str[128];
+	char tmp[128];
+	char topic[128];
+
+	if(mqtt == NULL || mqtt->cfg == NULL || client == NULL)
+		return MQTT_FAILURE;
+
+	if(!mqtt->cfg->mqtt.enabled)
+		return MQTT_SUCCESS;
+
+	long tused = (long)(time(NULL) - logintime);
+	if(tused < 0)
+		tused = 0;
+	snprintf(topic, sizeof(topic), "logout/%s", client->protocol);
+	snprintf(str, sizeof(str), "%u\t%s\t%s\t%s\t%s"
+		,client->usernum
+		,client->user
+		,client->addr
+		,client->host
+		,sectostr(tused, tmp)
+		);
+	return mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, time(NULL), str);
+}
+
+
 int mqtt_client_count(struct mqtt* mqtt)
 {
+	char str[128];
+
 	if(mqtt == NULL || mqtt->cfg == NULL)
 		return MQTT_FAILURE;
 	if(mqtt->cfg->mqtt.verbose)
 		mqtt_server_state(mqtt, mqtt->server_state);
-	return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "client_count", mqtt->client_list.count);
+	if(mqtt->max_clients)
+		snprintf(str, sizeof(str), "%ld total\t%ld max", mqtt->client_list.count, mqtt->max_clients);
+	else
+		snprintf(str, sizeof(str), "%ld total", mqtt->client_list.count);
+	return mqtt_pub_strval(mqtt, TOPIC_SERVER, "client", str);
 }
 
 void mqtt_shutdown(struct mqtt* mqtt)
@@ -658,3 +755,29 @@ void mqtt_shutdown(struct mqtt* mqtt)
 		mqtt_close(mqtt);
 	}
 }
+
+static int mqtt_file_xfer(struct mqtt* mqtt, user_t* user, file_t* f, off_t bytes, client_t* client, const char* xfer)
+{
+	if(mqtt == NULL || mqtt->cfg == NULL || user == NULL || f == NULL || client == NULL)
+		return MQTT_FAILURE;
+
+	if(!mqtt->cfg->mqtt.enabled)
+		return MQTT_SUCCESS;
+
+	char str[256];
+	char topic[128];
+	snprintf(topic, sizeof(topic), "%s/%s", xfer, mqtt->cfg->dir[f->dir]->code);
+	snprintf(str, sizeof(str), "%u\t%s\t%u\t%s\t%" PRIdOFF "\t%s"
+		,user->number, user->alias, user->uls, f->name, bytes, client->protocol);
+	return mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, time(NULL), str);
+}
+
+int mqtt_file_upload(struct mqtt* mqtt, user_t* user, file_t* f, off_t bytes, client_t* client)
+{
+	return mqtt_file_xfer(mqtt, user, f, bytes, client, "upload");
+}
+
+int mqtt_file_download(struct mqtt* mqtt, user_t* user, file_t* f, off_t bytes, client_t* client)
+{
+	return mqtt_file_xfer(mqtt, user, f, bytes, client, "download");
+}
diff --git a/src/sbbs3/mqtt.h b/src/sbbs3/mqtt.h
index 9a0010d69f..8b1598767d 100644
--- a/src/sbbs3/mqtt.h
+++ b/src/sbbs3/mqtt.h
@@ -54,13 +54,14 @@ struct mqtt {
 enum topic_depth {
 	TOPIC_OTHER,
 	TOPIC_ROOT,			// sbbs/*
-	TOPIC_BBS,			// sbbs/BBSID/*
 	TOPIC_BBS_LEVEL,	// sbbs/BBSID
-	TOPIC_HOST,			// sbbs/BBSID/host/HOSTNAME/*
+	TOPIC_BBS,			// sbbs/BBSID/*
+	TOPIC_BBS_ACTION,	// sbbs/BBSID/action/*
 	TOPIC_HOST_LEVEL,	// sbbs/BBSID/host/HOSTNAME
-	TOPIC_EVENT,		// sbbs/BBSID/event/*
-	TOPIC_SERVER,		// sbbs/BBSID/server/SERVER/*
-	TOPIC_SERVER_LEVEL, // sbbs/BBSID/server/SERVER
+	TOPIC_HOST,			// sbbs/BBSID/host/HOSTNAME/*
+	TOPIC_HOST_EVENT,	// sbbs/BBSID/host/HOSTNAME/event/*
+	TOPIC_SERVER_LEVEL, // sbbs/BBSID/host/HOSTNAME/server/SERVER
+	TOPIC_SERVER,		// sbbs/BBSID/host/HOSTNAME/server/SERVER/*
 };
 
 #define MQTT_SUCCESS 0 // Same as MOSQ_ERR_SUCCESS
@@ -86,6 +87,7 @@ DLLEXPORT int mqtt_pub_noval(struct mqtt*, enum topic_depth, const char* key);
 DLLEXPORT int mqtt_pub_strval(struct mqtt*, enum topic_depth, const char* key, const char* str);
 DLLEXPORT int mqtt_pub_uintval(struct mqtt*, enum topic_depth, const char* key, ulong value);
 DLLEXPORT int mqtt_pub_message(struct mqtt*, enum topic_depth, const char* key, const void* buf, size_t len, BOOL retain);
+DLLEXPORT int mqtt_pub_timestamped_msg(struct mqtt*, enum topic_depth, const char* key, time_t, const char* msg);
 DLLEXPORT int mqtt_open(struct mqtt*);
 DLLEXPORT void mqtt_close(struct mqtt*);
 DLLEXPORT int mqtt_connect(struct mqtt*, const char* bind_address);
@@ -95,6 +97,10 @@ DLLEXPORT int mqtt_thread_stop(struct mqtt*);
 DLLEXPORT int mqtt_client_on(struct mqtt*, BOOL on, int sock, client_t* client, BOOL update);
 DLLEXPORT int mqtt_client_max(struct mqtt*, ulong count);
 DLLEXPORT int mqtt_client_count(struct mqtt*);
+DLLEXPORT int mqtt_user_login(struct mqtt*, client_t*);
+DLLEXPORT int mqtt_user_logout(struct mqtt*, client_t*, time_t);
+DLLEXPORT int mqtt_file_upload(struct mqtt*, user_t*, file_t*, off_t size, client_t*);
+DLLEXPORT int mqtt_file_download(struct mqtt*, user_t*, file_t*, off_t size, client_t*);
 
 #ifdef __cplusplus
 }
diff --git a/src/sbbs3/newuser.cpp b/src/sbbs3/newuser.cpp
index decd830eb8..b35a181155 100644
--- a/src/sbbs3/newuser.cpp
+++ b/src/sbbs3/newuser.cpp
@@ -449,6 +449,13 @@ BOOL sbbs_t::newuser()
 	}
 	SAFEPRINTF2(str,"Created user record #%u: %s",useron.number,useron.alias);
 	logline(nulstr,str);
+
+	snprintf(str, sizeof(str), "%u\t%s"
+		,useron.number, useron.alias);
+	char topic[128];
+	snprintf(str, sizeof(str), "newuser/%s", client.protocol);
+	mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, answertime, str);
+
 	if(cfg.new_sif[0]) {
 		SAFEPRINTF2(str,"%suser/%4.4u.dat",cfg.data_dir,useron.number);
 		create_sif_dat(cfg.new_sif,str); 
diff --git a/src/sbbs3/postmsg.cpp b/src/sbbs3/postmsg.cpp
index 0325d51d77..1bf2ab8926 100644
--- a/src/sbbs3/postmsg.cpp
+++ b/src/sbbs3/postmsg.cpp
@@ -328,10 +328,16 @@ bool sbbs_t::postmsg(uint subnum, long wm_mode, smb_t* resmb, smbmsg_t* remsg)
 	user_posted_msg(&cfg, &useron, 1);
 	bprintf(text[Posted],cfg.grp[cfg.sub[subnum]->grp]->sname
 		,cfg.sub[subnum]->lname);
-	sprintf(str,"posted on %s %s"
-		,cfg.grp[cfg.sub[subnum]->grp]->sname,cfg.sub[subnum]->lname);
+	sprintf(str,"posted to %s on %s %s"
+		,touser, cfg.grp[cfg.sub[subnum]->grp]->sname,cfg.sub[subnum]->lname);
 	logline("P+",str);
 
+	char topic[128];
+	snprintf(topic, sizeof(topic), "post/%s", cfg.sub[subnum]->code);
+	snprintf(str, sizeof(str), "%u\t%s\t%u\t%u\t%s\t%s"
+		,useron.number, useron.alias, useron.ptoday, useron.posts, touser, title);
+	mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, time(NULL), str);
+
 	if(!(msgattr & MSG_ANONYMOUS)
 		&& stricmp(touser, "All") != 0
 		&& (remsg == NULL || remsg->from_net.type == NET_NONE)) {
diff --git a/src/sbbs3/sbbs.h b/src/sbbs3/sbbs.h
index 8abe162875..ac8f923af2 100644
--- a/src/sbbs3/sbbs.h
+++ b/src/sbbs3/sbbs.h
@@ -576,13 +576,13 @@ public:
 	long 	sys_status; 	/* System Status */
 	subscan_t	*subscan;	/* User sub configuration/scan info */
 
-	int64_t	logon_ulb,		/* Upload Bytes This Call */
-			logon_dlb,		/* Download Bytes This Call */
-			logon_uls,		/* Uploads This Call */
-			logon_dls;		/* Downloads This Call */
-	ulong	logon_posts,	/* Posts This Call */
-			logon_emails,	/* Emails This Call */
-			logon_fbacks;	/* Feedbacks This Call */
+	int64_t	logon_ulb=0,	/* Upload Bytes This Call */
+			logon_dlb=0;	/* Download Bytes This Call */
+	ulong	logon_uls=0,	/* Uploads This Call */
+			logon_dls=0;	/* Downloads This Call */
+	ulong	logon_posts=0,	/* Posts This Call */
+			logon_emails=0,	/* Emails This Call */
+			logon_fbacks=0;	/* Feedbacks This Call */
 	uchar	logon_ml;		/* Security level of the user upon logon */
 
 	uint 	main_cmds;		/* Number of Main Commands this call */
@@ -1105,7 +1105,7 @@ public:
 	int		xtrn_sec(const char* section = "");	/* The external program section  */
 	void	xtrndat(const char* name, const char* dropdir, uchar type, ulong tleft
 				,ulong misc);
-	bool	exec_xtrn(uint xtrnnum);			/* Executes online external program */
+	bool	exec_xtrn(uint xtrnnum, bool user_event = false);	/* Executes online external program */
 	bool	user_event(user_event_t);			/* Executes user event(s) */
 	void	moduserdat(uint xtrnnum);
 	const char* xtrn_dropdir(const xtrn_t*, char* buf, size_t);
diff --git a/src/sbbs3/server.h b/src/sbbs3/server.h
index 1a3e048c7f..79617ee38e 100644
--- a/src/sbbs3/server.h
+++ b/src/sbbs3/server.h
@@ -37,8 +37,7 @@ enum server_state {
 	SERVER_INIT,
 	SERVER_READY,
 	SERVER_RELOADING,
-	SERVER_STOPPING,
-	SERVER_DISCONNECTED
+	SERVER_STOPPING
 };
 
 #endif /* Don't add anything after this line */
diff --git a/src/sbbs3/services.c b/src/sbbs3/services.c
index 31ba6ec5d8..ce0b2aca25 100644
--- a/src/sbbs3/services.c
+++ b/src/sbbs3/services.c
@@ -481,6 +481,8 @@ js_login(JSContext *cx, uintN argc, jsval *arglist)
 		PlaySound(startup->sound.login, NULL, SND_ASYNC|SND_FILENAME);
 #endif
 
+	mqtt_user_login(&mqtt, client->client);
+
 	return(JS_TRUE);
 }
 
@@ -508,6 +510,8 @@ js_logout(JSContext *cx, uintN argc, jsval *arglist)
 		lprintf(LOG_INFO,"%04d %s Logging out %s"
 			,client->socket,client->service->protocol,client->user.alias);
 
+	mqtt_user_logout(&mqtt, client->client, client->logintime);
+
 	memset(&client->user,0,sizeof(client->user));
 	JS_RESUMEREQUEST(cx, rc);
 
diff --git a/src/sbbs3/upload.cpp b/src/sbbs3/upload.cpp
index 41ec312228..ed3b9d3151 100644
--- a/src/sbbs3/upload.cpp
+++ b/src/sbbs3/upload.cpp
@@ -193,7 +193,7 @@ bool sbbs_t::uploadfile(file_t* f)
 			useron.cdt = adjustuserval(&cfg, useron.number, USER_CDT
 				,(int64_t)(f->cost * (cfg.dir[f->dir]->up_pct/100.0))); 
 	}
-
+	mqtt_file_upload(mqtt, &useron, f, length, &client);
 	user_event(EVENT_UPLOAD);
 
 	return true;
diff --git a/src/sbbs3/websrvr.c b/src/sbbs3/websrvr.c
index 1e7b4f9caf..07d1eec5dc 100644
--- a/src/sbbs3/websrvr.c
+++ b/src/sbbs3/websrvr.c
@@ -1623,6 +1623,7 @@ void http_logon(http_session_t * session, user_t *usr)
 		SAFECOPY(session->user.ipaddr, session->host_ip);
 		session->user.logontime = (time32_t)session->logon_time;
 		putuserdat(&scfg, &session->user);
+		mqtt_user_login(&mqtt, &session->client);
 	}
 	SAFECOPY(session->client.user, session->username);
 	session->client.usernum = session->user.number;
@@ -1650,6 +1651,8 @@ void http_logoff(http_session_t* session, SOCKET socket, int line)
 	if(startup->sound.logout[0] && !sound_muted(&scfg))
 		PlaySound(startup->sound.logout, NULL, SND_ASYNC|SND_FILENAME);
 #endif
+
+	mqtt_user_logout(&mqtt, &session->client, session->logon_time);
 }
 
 BOOL http_checkuser(http_session_t * session)
@@ -6153,8 +6156,10 @@ static void respond(http_session_t * session)
 				e = 1;
 			lprintf(LOG_INFO, "%04d Sent file: %s (%"PRIdOFF" bytes, %ld cps)"
 				,session->socket, session->req.physical_path, snt, (long)(snt / e));
-			if(session->parsed_vpath == PARSED_VPATH_FULL)
+			if(session->parsed_vpath == PARSED_VPATH_FULL) {
 				user_downloaded_file(&scfg, &session->user, &session->client, session->file.dir, session->file.name, snt);
+				mqtt_file_download(&mqtt, &session->user, &session->file, snt, &session->client);
+			}
 		}
 	}
 	session->req.finished=TRUE;
diff --git a/src/sbbs3/xtrn_sec.cpp b/src/sbbs3/xtrn_sec.cpp
index f803eb431b..71e8939295 100644
--- a/src/sbbs3/xtrn_sec.cpp
+++ b/src/sbbs3/xtrn_sec.cpp
@@ -1255,7 +1255,7 @@ const char* sbbs_t::xtrn_dropdir(const xtrn_t* xtrn, char* buf, size_t maxlen)
 /****************************************************************************/
 /* This function handles configured external program execution. 			*/
 /****************************************************************************/
-bool sbbs_t::exec_xtrn(uint xtrnnum)
+bool sbbs_t::exec_xtrn(uint xtrnnum, bool user_event)
 {
 	char str[256],path[MAX_PATH+1],dropdir[MAX_PATH+1],name[32],c;
 	uint i;
@@ -1372,7 +1372,7 @@ bool sbbs_t::exec_xtrn(uint xtrnnum)
 	xtrndat(name,dropdir,cfg.xtrn[xtrnnum]->type,tleft,cfg.xtrn[xtrnnum]->misc);
 	if(!online)
 		return(false);
-	SAFEPRINTF(str, "running external program: %s", cfg.xtrn[xtrnnum]->name);
+	snprintf(str, sizeof(str), "running external %s: %s", cfg.xtrn[xtrnnum]->name, user_event ? "user event" : "program");
 	logline("X-",str);
 	if(cfg.xtrn[xtrnnum]->cmd[0]!='*' && logfile_fp!=NULL) {
 		fclose(logfile_fp);
@@ -1418,6 +1418,12 @@ bool sbbs_t::exec_xtrn(uint xtrnnum)
 	}
 
 	start=time(NULL);
+
+	char topic[128];
+	snprintf(topic, sizeof(topic), "exec/%s", cfg.xtrn[xtrnnum]->code);
+	snprintf(str, sizeof(str), "%u\t%s", useron.number, useron.alias);
+	mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, start, str);
+
 	external(cmdstr(cfg.xtrn[xtrnnum]->cmd, drop_file, startup_dir, NULL, mode)
 		,mode
 		,cfg.xtrn[xtrnnum]->path);
@@ -1487,7 +1493,7 @@ bool sbbs_t::user_event(user_event_t event)
 			|| !chk_ar(cfg.xtrn[i]->run_ar,&useron,&client)
 			|| !chk_ar(cfg.xtrnsec[cfg.xtrn[i]->sec]->ar,&useron,&client))
 			continue;
-		success=exec_xtrn(i); 
+		success=exec_xtrn(i, /* user_event: */true); 
 	}
 
 	return(success);
-- 
GitLab