From 6ed3f6dc98a01516ad1005dd7faf864f9715d56a Mon Sep 17 00:00:00 2001 From: Rob Swindell <rob@synchro.net> Date: Sun, 1 Jan 2023 20:25:36 -0800 Subject: [PATCH] MQTT support overhaul, mainly for SBBSCTRL compatibility The only difference in the data/scheme is that the "error" topic (error log) is now under each server rather than each host. I don't *think* there are any other changes from the MQTT consumer side. Still not done: subscribing (e.g. support for recycle or node-spy-input via MQTT) and NT services support. This change also includes a cool feature that will prompt the sysop if there's a timeout (30 seconds) while waiting for servers to shutdown gracefully and giving the sysop the option to abort (Cancel) the wait (and shutdown ungracefully) or continue the wait (OK). --- src/sbbs3/ctrl/MainFormUnit.cpp | 43 ++++++- src/sbbs3/ftpsrvr.c | 19 +-- src/sbbs3/ftpsrvr.h | 42 +------ src/sbbs3/logfile.cpp | 4 +- src/sbbs3/mailsrvr.c | 25 ++-- src/sbbs3/mailsrvr.h | 35 +----- src/sbbs3/main.cpp | 18 +-- src/sbbs3/mqtt.c | 199 +++++++++++++++++++++++++++++++- src/sbbs3/mqtt.h | 34 +++++- src/sbbs3/sbbscon.c | 149 +++++------------------- src/sbbs3/server.h | 43 +++++++ src/sbbs3/services.c | 11 +- src/sbbs3/services.h | 39 +------ src/sbbs3/startup.h | 86 ++++++-------- src/sbbs3/websrvr.c | 15 ++- src/sbbs3/websrvr.h | 36 +----- 16 files changed, 433 insertions(+), 365 deletions(-) create mode 100644 src/sbbs3/server.h diff --git a/src/sbbs3/ctrl/MainFormUnit.cpp b/src/sbbs3/ctrl/MainFormUnit.cpp index 1cf6906fe9..39b4505728 100644 --- a/src/sbbs3/ctrl/MainFormUnit.cpp +++ b/src/sbbs3/ctrl/MainFormUnit.cpp @@ -171,6 +171,7 @@ static void thread_up(void* p, BOOL up, BOOL setuid) threads++; else if(threads>0) threads--; + mqtt_thread_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, threads); ReleaseMutex(mutex); } @@ -188,6 +189,7 @@ void socket_open(void* p, BOOL open) sockets++; else if(sockets>0) sockets--; + mqtt_socket_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, sockets); ReleaseMutex(mutex); } @@ -201,8 +203,10 @@ static void client_add(void* p, BOOL add) if(add) { clients++; total_clients++; + mqtt_served_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, total_clients); } else if(clients>0) clients--; + mqtt_client_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, clients); } static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update) @@ -212,13 +216,15 @@ static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update) time_t t; static HANDLE mutex; TListItem* Item; + + mqtt_client_on(&MainForm->bbs_startup.mqtt, on, sock, client, update); if(!mutex) mutex=CreateMutex(NULL,false,NULL); WaitForSingleObject(mutex,INFINITE); WaitForSingleObject(ClientForm->ListMutex,INFINITE); - /* Search for exising entry for this socket */ + /* Search for existing entry for this socket */ for(i=0;i<ClientForm->ListView->Items->Count;i++) { if(ClientForm->ListView->Items->Item[i]->Caption.ToIntDef(0)==sock) break; @@ -332,6 +338,7 @@ static void bbs_set_state(void* p, enum server_state state) { TelnetForm->Status->Caption = server_state_str(state); + mqtt_server_state(&MainForm->bbs_startup.mqtt, state); switch(state) { case SERVER_STOPPED: MainForm->TelnetStart->Enabled=true; @@ -429,6 +436,7 @@ static void services_set_state(void* p, enum server_state state) { ServicesForm->Status->Caption = server_state_str(state); + mqtt_server_state(&MainForm->services_startup.mqtt, state); switch(state) { case SERVER_STOPPED: MainForm->ServicesStart->Enabled=true; @@ -495,6 +503,7 @@ static void mail_set_state(void* p, enum server_state state) { MailForm->Status->Caption = server_state_str(state); + mqtt_server_state(&MainForm->mail_startup.mqtt, state); switch(state) { case SERVER_STOPPED: MainForm->MailStart->Enabled=true; @@ -590,6 +599,7 @@ static void ftp_set_state(void* p, enum server_state state) { FtpForm->Status->Caption = server_state_str(state); + mqtt_server_state(&MainForm->ftp_startup.mqtt, state); switch(state) { case SERVER_STOPPED: MainForm->FtpStart->Enabled=true; @@ -658,6 +668,7 @@ static void web_set_state(void* p, enum server_state state) { WebForm->Status->Caption = server_state_str(state); + mqtt_server_state(&MainForm->web_startup.mqtt, state); switch(state) { case SERVER_STOPPED: MainForm->WebStart->Enabled=true; @@ -782,6 +793,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner) memset(&bbs_startup,0,sizeof(bbs_startup)); bbs_startup.size=sizeof(bbs_startup); + bbs_startup.type = SERVER_TERM; bbs_startup.cbdata=&bbs_log_list; bbs_startup.event_cbdata=&event_log_list; bbs_startup.first_node=1; @@ -802,6 +814,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner) memset(&mail_startup,0,sizeof(mail_startup)); mail_startup.size=sizeof(mail_startup); + mail_startup.type = SERVER_MAIL; mail_startup.cbdata=&mail_log_list; mail_startup.smtp_port=IPPORT_SMTP; mail_startup.relay_port=IPPORT_SMTP; @@ -824,6 +837,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner) memset(&ftp_startup,0,sizeof(ftp_startup)); ftp_startup.size=sizeof(ftp_startup); + ftp_startup.type = SERVER_FTP; ftp_startup.cbdata=&ftp_log_list; ftp_startup.port=IPPORT_FTP; ftp_startup.lputs=lputs; @@ -842,6 +856,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner) memset(&web_startup,0,sizeof(web_startup)); web_startup.size=sizeof(web_startup); + web_startup.type = SERVER_WEB; web_startup.cbdata=&web_log_list; web_startup.lputs=lputs; web_startup.errormsg=errormsg; @@ -855,6 +870,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner) memset(&services_startup,0,sizeof(services_startup)); services_startup.size=sizeof(services_startup); + services_startup.type = SERVER_SERVICES; services_startup.cbdata=&services_log_list; services_startup.lputs=lputs; services_startup.errormsg=errormsg; @@ -1038,6 +1054,7 @@ BOOL __fastcall TMainForm::servicesServiceEnabled(void) //--------------------------------------------------------------------------- void __fastcall TMainForm::FormClose(TObject *Sender, TCloseAction &Action) { + mqtt_terminating(&bbs_startup.mqtt); UpTimer->Enabled=false; /* Stop updating the status bar */ StatsTimer->Enabled=false; @@ -1055,19 +1072,25 @@ void __fastcall TMainForm::FormClose(TObject *Sender, TCloseAction &Action) || (FtpStop->Enabled && !ftpServiceEnabled()) || (WebStop->Enabled && !webServiceEnabled()) || (ServicesStop->Enabled && !servicesServiceEnabled())) { - if(time(NULL)-start>30) - break; + if(time(NULL)-start>30) { + if(Application->MessageBox("Abort wait for servers to terminate?" + ,"Synchronet Server Still Running", MB_OKCANCEL) == IDOK) + break; + start = time(NULL); + } Application->ProcessMessages(); YIELD(); } StatusBar->Panels->Items[STATUSBAR_LAST_PANEL]->Text="Closing..."; Application->ProcessMessages(); + mqtt_terminating(&bbs_startup.mqtt); LogTimer->Enabled=false; ServiceStatusTimer->Enabled=false; NodeForm->Timer->Enabled=false; ClientForm->Timer->Enabled=false; + mqtt_shutdown(&bbs_startup.mqtt); } //--------------------------------------------------------------------------- void __fastcall TMainForm::FormCloseQuery(TObject *Sender, bool &CanClose) @@ -1947,7 +1970,21 @@ void __fastcall TMainForm::StartupTimerTick(TObject *Sender) StartupTimer->Interval = 2500; // Let 'em see the logo for a bit StartupTimer->Enabled = true; } else { + if(!bbsServiceEnabled()) { + mqtt_startup(&bbs_startup.mqtt, &cfg, SERVER_TERM, ver() + ,/* lputs: */NULL + ,/* shared_client_list: */TRUE); + ftp_startup.mqtt = bbs_startup.mqtt; + ftp_startup.mqtt.server_type = SERVER_FTP; + web_startup.mqtt = bbs_startup.mqtt; + web_startup.mqtt.server_type = SERVER_WEB; + mail_startup.mqtt = bbs_startup.mqtt; + mail_startup.mqtt.server_type = SERVER_MAIL; + services_startup.mqtt = bbs_startup.mqtt; + services_startup.mqtt.server_type = SERVER_SERVICES; + } DisplayMainPanels(Sender); + mqtt_online(&bbs_startup.mqtt); } Initialized=true; } diff --git a/src/sbbs3/ftpsrvr.c b/src/sbbs3/ftpsrvr.c index 11716d9a8c..abf01b247a 100644 --- a/src/sbbs3/ftpsrvr.c +++ b/src/sbbs3/ftpsrvr.c @@ -142,8 +142,8 @@ static int lprintf(int level, const char *fmt, ...) if(level <= LOG_ERR) { char errmsg[sizeof(sbuf)+16]; + errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, sbuf); SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf); - errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg); if(startup!=NULL && startup->errormsg!=NULL) startup->errormsg(startup->cbdata,level,errmsg); } @@ -196,8 +196,11 @@ static char* server_host_name(void) static void set_state(enum server_state state) { - if(startup != NULL && startup->set_state != NULL) - startup->set_state(startup->cbdata, state); + if(startup != NULL) { + if(startup->set_state != NULL) + startup->set_state(startup->cbdata, state); + mqtt_server_state(&startup->mqtt, state); + } } static void update_clients(void) @@ -206,7 +209,7 @@ static void update_clients(void) uint32_t count = protected_uint32_value(active_clients); if(startup->clients != NULL) startup->clients(startup->cbdata, count); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", count); + mqtt_client_count(&startup->mqtt, TOPIC_SERVER, count); } } @@ -230,7 +233,7 @@ static void thread_up(BOOL setuid) if(startup != NULL) { if(startup->thread_up != NULL) startup->thread_up(startup->cbdata,TRUE, setuid); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", protected_uint32_value(thread_count)); + mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(thread_count)); } } @@ -240,7 +243,7 @@ static int32_t thread_down(void) if(startup != NULL) { if(startup->thread_up != NULL) startup->thread_up(startup->cbdata,FALSE, FALSE); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", count); + mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, count); } return count; } @@ -4952,7 +4955,7 @@ void ftp_server(void* arg) } set_state(SERVER_INIT); - mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", ftp_ver()); + mqtt_server_version(&startup->mqtt, ftp_ver()); uptime=0; served=0; @@ -5093,7 +5096,7 @@ void ftp_server(void* arg) set_state(SERVER_READY); lprintf(LOG_INFO,"FTP Server thread started"); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", startup->max_clients); + mqtt_client_max(&startup->mqtt, startup->max_clients); while(ftp_set!=NULL && !terminate_server) { YIELD(); diff --git a/src/sbbs3/ftpsrvr.h b/src/sbbs3/ftpsrvr.h index df48c6e52e..19a4935e49 100644 --- a/src/sbbs3/ftpsrvr.h +++ b/src/sbbs3/ftpsrvr.h @@ -26,7 +26,7 @@ typedef struct { - DWORD size; /* sizeof(ftp_startup_t) */ + STARTUP_COMMON_ELEMENTS WORD port; WORD max_clients; #define FTP_DEFAULT_MAX_CLIENTS 10 @@ -34,7 +34,6 @@ typedef struct { #define FTP_DEFAULT_MAX_INACTIVITY 300 WORD qwk_timeout; #define FTP_DEFAULT_QWK_TIMEOUT 600 - WORD sem_chk_freq; /* semaphore file checking frequency (in seconds) */ struct in_addr outgoing4; struct in6_addr outgoing6; str_list_t interfaces; @@ -42,47 +41,10 @@ typedef struct { struct in6_addr pasv_ip6_addr; WORD pasv_port_low; WORD pasv_port_high; - DWORD options; /* See FTP_OPT definitions */ int64_t min_fsize; /* Minimum file size accepted for upload */ int64_t max_fsize; /* Maximum file size accepted for upload (0=unlimited) */ - - void* cbdata; /* Private data passed to callbacks */ - - /* Callbacks (NULL if unused) */ - int (*lputs)(void*, int level, const char* msg); - void (*errormsg)(void*, int level, const char* msg); - void (*set_state)(void*, enum server_state); - void (*recycle)(void*); - void (*terminated)(void*, int code); - void (*clients)(void*, int active); - void (*thread_up)(void*, BOOL up, BOOL setuid); - void (*socket_open)(void*, BOOL open); - void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update); - BOOL (*seteuid)(BOOL user); - BOOL (*setuid)(BOOL force); - - /* Paths */ - char ctrl_dir[INI_MAX_VALUE_LEN]; - char index_file_name[64]; - char temp_dir[INI_MAX_VALUE_LEN]; - char ini_fname[INI_MAX_VALUE_LEN]; - - /* Misc */ - char host_name[128]; - BOOL recycle_now; - BOOL shutdown_now; - int log_level; - uint bind_retry_count; /* Number of times to retry bind() calls */ - uint bind_retry_delay; /* Time to wait between each bind() retry */ - - struct startup_sound_settings sound; - - /* Login Attempt parameters */ - struct login_attempt_settings login_attempt; - link_list_t* login_attempt_list; - uint max_concurrent_connections; - struct mqtt mqtt; + char index_file_name[64]; } ftp_startup_t; diff --git a/src/sbbs3/logfile.cpp b/src/sbbs3/logfile.cpp index e4be0d93bf..a727fcf0e6 100644 --- a/src/sbbs3/logfile.cpp +++ b/src/sbbs3/logfile.cpp @@ -150,9 +150,7 @@ extern "C" int errorlog(scfg_t* cfg, struct mqtt* mqtt, int level, const char* h SAFEPRINTF2(subject, "%s %sERROR occurred", host, level <= LOG_CRIT ? "CRITICAL " : ""); notify(cfg, cfg->node_erruser, subject, text); } - if(mqtt != NULL) { - mqtt_pub_strval(mqtt, TOPIC_HOST, "error", text); - } + mqtt_errormsg(mqtt, level, text); return 0; } diff --git a/src/sbbs3/mailsrvr.c b/src/sbbs3/mailsrvr.c index 6330bc92ac..9ea556ed73 100644 --- a/src/sbbs3/mailsrvr.c +++ b/src/sbbs3/mailsrvr.c @@ -191,8 +191,8 @@ static int lprintf(int level, const char *fmt, ...) if(level <= LOG_ERR) { char errmsg[sizeof(sbuf)+16]; + errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, sbuf), stats.errors++; SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf); - errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name,errmsg), stats.errors++; if(startup!=NULL && startup->errormsg!=NULL) startup->errormsg(startup->cbdata,level,errmsg); } @@ -248,8 +248,11 @@ static char* server_host_name(void) static void set_state(enum server_state state) { - if(startup != NULL && startup->set_state != NULL) - startup->set_state(startup->cbdata, state); + if(startup != NULL) { + if(startup->set_state != NULL) + startup->set_state(startup->cbdata, state); + mqtt_server_state(&startup->mqtt, state); + } } static void update_clients(void) @@ -257,7 +260,7 @@ static void update_clients(void) if(startup != NULL) { if(startup->clients != NULL) startup->clients(startup->cbdata,protected_uint32_value(active_clients)+active_sendmail); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", protected_uint32_value(active_clients)); + mqtt_client_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(active_clients)); } } @@ -281,7 +284,7 @@ static void thread_up(BOOL setuid) if(startup != NULL) { if(startup->thread_up != NULL) startup->thread_up(startup->cbdata,TRUE,setuid); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", protected_uint32_value(thread_count)); + mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(thread_count)); } } @@ -291,7 +294,7 @@ static int32_t thread_down(void) if(startup != NULL) { if(startup->thread_up != NULL) startup->thread_up(startup->cbdata,FALSE,FALSE); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", count); + mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, count); } return count; } @@ -310,7 +313,7 @@ void mail_open_socket(SOCKET sock, void* cb_protocol) stats.sockets++; if(startup != NULL) - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "socket_count", stats.sockets); + mqtt_socket_count(&startup->mqtt, TOPIC_SERVER, stats.sockets); } void mail_close_socket_cb(SOCKET sock, void* cb_protocol) @@ -319,7 +322,7 @@ void mail_close_socket_cb(SOCKET sock, void* cb_protocol) startup->socket_open(startup->cbdata,FALSE); stats.sockets--; if(startup != NULL) - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "socket_count", stats.sockets); + mqtt_socket_count(&startup->mqtt, TOPIC_SERVER, stats.sockets); } int mail_close_socket(SOCKET *sock, int *sess) @@ -339,7 +342,7 @@ int mail_close_socket(SOCKET *sock, int *sess) startup->socket_open(startup->cbdata,FALSE); stats.sockets--; if(startup != NULL) - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "socket_count", stats.sockets); + mqtt_socket_count(&startup->mqtt, TOPIC_SERVER, stats.sockets); if(result!=0) { if(ERROR_VALUE!=ENOTSOCK) lprintf(LOG_WARNING,"%04d !ERROR %d closing socket",*sock, ERROR_VALUE); @@ -6052,7 +6055,7 @@ void mail_server(void* arg) set_state(SERVER_INIT); - mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", mail_ver()); + mqtt_server_version(&startup->mqtt, mail_ver()); ZERO_VAR(js_server_props); SAFEPRINTF3(js_server_props.version,"%s %s%c",server_name, VERSION, REVISION); @@ -6273,7 +6276,7 @@ void mail_server(void* arg) set_state(SERVER_READY); lprintf(LOG_INFO,"Mail Server thread started"); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", startup->max_clients); + mqtt_client_max(&startup->mqtt, startup->max_clients); while(!terminated && !terminate_server) { YIELD(); diff --git a/src/sbbs3/mailsrvr.h b/src/sbbs3/mailsrvr.h index 7d6d10489a..b32c9f13d4 100644 --- a/src/sbbs3/mailsrvr.h +++ b/src/sbbs3/mailsrvr.h @@ -27,7 +27,7 @@ typedef struct { - DWORD size; /* sizeof(mail_startup_t) */ + STARTUP_COMMON_ELEMENTS WORD smtp_port; WORD pop3_port; WORD pop3s_port; @@ -46,37 +46,16 @@ typedef struct { #define MAIL_DEFAULT_LINES_PER_YIELD 10 WORD max_recipients; #define MAIL_DEFAULT_MAX_RECIPIENTS 100 - WORD sem_chk_freq; /* semaphore file checking frequency (in seconds) */ struct in_addr outgoing4; struct in6_addr outgoing6; str_list_t interfaces; str_list_t pop3_interfaces; - DWORD options; /* See MAIL_OPT definitions */ DWORD max_msg_size; /* Max msg size in bytes (0=unlimited) */ #define MAIL_DEFAULT_MAX_MSG_SIZE (20*1024*1024) /* 20MB */ DWORD max_msgs_waiting; /* Max msgs in user's inbox (0=unlimited) */ #define MAIL_DEFAULT_MAX_MSGS_WAITING 100 DWORD connect_timeout; /* in seconds, for non-blocking connect (0=blocking socket) */ #define MAIL_DEFAULT_CONNECT_TIMEOUT 30 /* seconds */ - void* cbdata; /* Private data passed to callbacks */ - - /* Callbacks (NULL if unused) */ - int (*lputs)(void*, int level, const char* msg); - void (*errormsg)(void*, int level, const char* msg); - void (*set_state)(void*, enum server_state); - void (*recycle)(void*); - void (*terminated)(void*, int code); - void (*clients)(void*, int active); - void (*thread_up)(void*, BOOL up, BOOL setuid); - void (*socket_open)(void*, BOOL open); - void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update); - BOOL (*seteuid)(BOOL user); - BOOL (*setuid)(BOOL force); - - /* Paths */ - char ctrl_dir[INI_MAX_VALUE_LEN]; - char temp_dir[INI_MAX_VALUE_LEN]; - char ini_fname[INI_MAX_VALUE_LEN]; /* Strings */ char dns_server[128]; @@ -88,13 +67,7 @@ typedef struct { char pop3_sound[INI_MAX_VALUE_LEN]; /* Misc */ - char host_name[128]; - BOOL recycle_now; - BOOL shutdown_now; - int log_level; int tls_error_level; /* Cap the severity of TLS error log messages */ - uint bind_retry_count; /* Number of times to retry bind() calls */ - uint bind_retry_delay; /* Time to wait between each bind() retry */ /* Relay Server */ char relay_server[128]; @@ -105,13 +78,7 @@ typedef struct { /* JavaScript operating parameters */ js_startup_t js; - struct startup_sound_settings sound; - - /* Login Attempt parameters */ - struct login_attempt_settings login_attempt; - link_list_t* login_attempt_list; uint max_concurrent_connections; - struct mqtt mqtt; } mail_startup_t; diff --git a/src/sbbs3/main.cpp b/src/sbbs3/main.cpp index 4ab9c90989..d369e9bd7e 100644 --- a/src/sbbs3/main.cpp +++ b/src/sbbs3/main.cpp @@ -160,8 +160,11 @@ static bbs_startup_t* startup=NULL; static void set_state(enum server_state state) { - if(startup != NULL && startup->set_state != NULL) - startup->set_state(startup->cbdata, state); + if(startup != NULL) { + if(startup->set_state != NULL) + startup->set_state(startup->cbdata, state); + mqtt_server_state(&startup->mqtt, state); + } } static void update_clients() @@ -169,7 +172,7 @@ static void update_clients() if(startup != NULL) { if(startup->clients != NULL) startup->clients(startup->cbdata,protected_uint32_value(node_threads_running)); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", protected_uint32_value(node_threads_running)); + mqtt_client_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(node_threads_running)); } } @@ -206,8 +209,8 @@ int lputs(int level, const char* str) mqtt_lputs(&startup->mqtt, TOPIC_SERVER, level, str); if(level <= LOG_ERR) { char errmsg[1024]; + errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, str); SAFEPRINTF2(errmsg, "%s %s", server_abbrev, str); - errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg); if(startup!=NULL && startup->errormsg!=NULL) startup->errormsg(startup->cbdata,level,errmsg); } @@ -233,8 +236,8 @@ int eputs(int level, const char *str) if(level <= LOG_ERR) { char errmsg[1024]; + errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, str); SAFEPRINTF(errmsg, "evnt %s", str); - errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg); if(startup!=NULL && startup->errormsg!=NULL) startup->errormsg(startup->cbdata, level, errmsg); } @@ -4834,6 +4837,7 @@ void bbs_thread(void* arg) } if(startup->size!=sizeof(bbs_startup_t)) { // verify size + int sz = sizeof(bbs_startup_t); sbbs_beep(100,500); sbbs_beep(300,500); sbbs_beep(100,500); @@ -4937,7 +4941,7 @@ void bbs_thread(void* arg) cleanup(1); return; } - mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", bbs_ver()); + mqtt_server_version(&startup->mqtt, bbs_ver()); t=time(NULL); lprintf(LOG_INFO,"Initializing on %.24s with options: %x" @@ -5229,7 +5233,7 @@ NO_SSH: set_state(SERVER_READY); lprintf(LOG_INFO,"Terminal Server thread started for nodes %d through %d", first_node, last_node); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", (last_node - first_node) + 1); + mqtt_client_max(&startup->mqtt, (last_node - first_node) + 1); while(!terminate_server) { YIELD(); diff --git a/src/sbbs3/mqtt.c b/src/sbbs3/mqtt.c index c5587feadf..1b4aba8556 100644 --- a/src/sbbs3/mqtt.c +++ b/src/sbbs3/mqtt.c @@ -22,9 +22,24 @@ #include <string.h> #include "mqtt.h" +#include "startup.h" -int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, const char* host, const char* server) +const char* server_type_desc(enum server_type type) { + switch(type) { + case SERVER_TERM: return "term"; + case SERVER_FTP: return "ftp"; + case SERVER_WEB: return "web"; + case SERVER_MAIL: return "mail"; + case SERVER_SERVICES: return "srvc"; + } + return "???"; +} + +int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, enum server_type type) +{ + char hostname[256]="undefined-hostname"; + if(mqtt == NULL || cfg == NULL) return MQTT_FAILURE; if(!cfg->mqtt.enabled) @@ -32,8 +47,14 @@ int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, const char* host, const char* serv if(mqtt != NULL) { memset(mqtt, 0, sizeof(*mqtt)); mqtt->cfg = cfg; - mqtt->host = host; - mqtt->server = server; + mqtt->server_type = type; + listInit(&mqtt->client_list, LINK_LIST_MUTEX); +#ifdef _WIN32 + WSADATA WSAData; + WSAStartup(MAKEWORD(1,1), &WSAData); +#endif + gethostname(hostname, sizeof(hostname)); + mqtt->host = strdup(hostname); #ifdef USE_MOSQUITTO return mosquitto_lib_init(); #endif @@ -54,7 +75,7 @@ static char* format_topic(struct mqtt* mqtt, enum topic_depth depth, char* str, safe_snprintf(str, size, "sbbs/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, sbuf); break; case TOPIC_SERVER: - safe_snprintf(str, size, "sbbs/%s/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, mqtt->server, sbuf); + safe_snprintf(str, size, "sbbs/%s/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(mqtt->server_type), sbuf); break; case TOPIC_EVENT: safe_snprintf(str, size, "sbbs/%s/%s/event/%s", mqtt->cfg->sys_id, mqtt->host, sbuf); @@ -277,8 +298,10 @@ void mqtt_close(struct mqtt* mqtt) if(mqtt->handle != NULL) { mosquitto_destroy(mqtt->handle); mqtt->handle = NULL; + listFree(&mqtt->client_list); } #endif + FREE_AND_NULL(mqtt->host); } static int pw_callback(char* buf, int size, int rwflag, void* userdata) @@ -379,3 +402,171 @@ int mqtt_thread_stop(struct mqtt* mqtt) #endif } +static int lprintf(int (*lputs)(int level, const char* str), int level, const char *fmt, ...) +{ + va_list argptr; + char sbuf[1024]; + + if(lputs == NULL) + return -1; + va_start(argptr,fmt); + vsnprintf(sbuf,sizeof(sbuf),fmt,argptr); + sbuf[sizeof(sbuf)-1]=0; + va_end(argptr); + return lputs(level,sbuf); +} + +int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, enum server_type type, const char* version + ,int (*lputs)(int level, const char* str) + ,BOOL shared_client_list) +{ + int result = MQTT_FAILURE; + char str[128]; + + if(mqtt == NULL) + return MQTT_FAILURE; + + if(cfg->mqtt.enabled) { + result = mqtt_init(mqtt, cfg, type); + if(result != MQTT_SUCCESS) { + lprintf(lputs, LOG_INFO, "MQTT init failure: %d", result); + } else { + lprintf(lputs, LOG_INFO, "MQTT lib: %s", mqtt_libver(str, sizeof(str))); + result = mqtt_open(mqtt); + if(result != MQTT_SUCCESS) { + lprintf(lputs, LOG_ERR, "MQTT open failure: %d", result); + } else { + result = mqtt_thread_start(mqtt); + if(result != MQTT_SUCCESS) { + lprintf(lputs, LOG_ERR, "Error %d starting pub/sub thread", result); + mqtt_close(mqtt); + } else { + lprintf(lputs, LOG_INFO, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port); + result = mqtt_connect(mqtt, /* bind_address: */NULL); + if(result == MQTT_SUCCESS) { + lprintf(lputs, LOG_INFO, "MQTT broker-connect (%s:%d) successful", cfg->mqtt.broker_addr, cfg->mqtt.broker_port); + } else { + lprintf(lputs, LOG_ERR, "MQTT broker-connect (%s:%d) failure: %d", cfg->mqtt.broker_addr, cfg->mqtt.broker_port, result); + mqtt_close(mqtt); + } + } + } + } + } + mqtt->shared_client_list = shared_client_list; + mqtt_pub_strval(mqtt, TOPIC_HOST, "version", version); + mqtt_pub_strval(mqtt, TOPIC_HOST, "status", "initializing"); + for(enum topic_depth depth = TOPIC_HOST; depth <= TOPIC_SERVER; depth++) { + mqtt_pub_noval(mqtt, depth, "error_count"); + mqtt_pub_noval(mqtt, depth, "thread_count"); + mqtt_pub_noval(mqtt, depth, "socket_count"); + mqtt_pub_noval(mqtt, depth, "client_count"); + mqtt_pub_noval(mqtt, depth, "client_list"); + mqtt_pub_noval(mqtt, depth, "served"); + } + return result; +} + +int mqtt_online(struct mqtt* mqtt) +{ + return mqtt_pub_strval(mqtt, TOPIC_HOST, "status", "online"); +} + +int mqtt_server_state(struct mqtt* mqtt, enum server_state state) +{ + return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "state", state); +} + +int mqtt_server_version(struct mqtt* mqtt, const char* str) +{ + return mqtt_pub_strval(mqtt, TOPIC_SERVER, "version", str); +} + +int mqtt_errormsg(struct mqtt* mqtt, int level, const char* msg) +{ + if(mqtt == NULL) + return MQTT_FAILURE; + ++mqtt->error_count; + mqtt_pub_uintval(mqtt, TOPIC_SERVER, "error_count", mqtt->error_count); + return mqtt_pub_strval(mqtt, TOPIC_HOST, "error", msg); +} + +int mqtt_thread_count(struct mqtt* mqtt, enum topic_depth depth, ulong count) +{ + return mqtt_pub_uintval(mqtt, depth, "thread_count", count); +} + +int mqtt_socket_count(struct mqtt* mqtt, enum topic_depth depth, ulong count) +{ + return mqtt_pub_uintval(mqtt, depth, "socket_count", count); +} + +int mqtt_client_count(struct mqtt* mqtt, enum topic_depth depth, ulong count) +{ + return mqtt_pub_uintval(mqtt, depth, "client_count", count); +} + +int mqtt_client_max(struct mqtt* mqtt, ulong count) +{ + return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "max_clients", count); +} + +int mqtt_client_on(struct mqtt* mqtt, BOOL on, int sock, client_t* client, BOOL update) +{ + if(mqtt == NULL) + return MQTT_FAILURE; + + listLock(&mqtt->client_list); + if(on) { + if(update) { + list_node_t* node; + + if((node=listFindTaggedNode(&mqtt->client_list, sock)) != NULL) + memcpy(node->data, client, sizeof(client_t)); + } else { + listAddNodeData(&mqtt->client_list, client, sizeof(client_t), sock, LAST_NODE); + mqtt->served++; + } + } else + listRemoveTaggedNode(&mqtt->client_list, sock, /* free_data: */TRUE); + + str_list_t list = strListInit(); + for(list_node_t* node = mqtt->client_list.first; node != NULL; node = node->next) { + client_t* client = node->data; + strListAppendFormat(&list, "%ld\t%s\t%s\t%s\t%s\t%u\t%lu" + ,node->tag + ,client->protocol + ,client->user + ,client->addr + ,client->host + ,client->port + ,(ulong)client->time + ); + } + listUnlock(&mqtt->client_list); + char buf[1024]; // TODO + strListJoin(list, buf, sizeof(buf), "\n"); + strListFree(&list); + + mqtt_client_count(mqtt, mqtt->shared_client_list ? TOPIC_HOST : TOPIC_SERVER, mqtt->client_list.count); + mqtt_served_count(mqtt, TOPIC_SERVER, mqtt->served); + return mqtt_pub_strval(mqtt, mqtt->shared_client_list ? TOPIC_HOST : TOPIC_SERVER, "client_list", buf); +} + +int mqtt_served_count(struct mqtt* mqtt, enum topic_depth depth, ulong count) +{ + return mqtt_pub_uintval(mqtt, depth, "served", count); +} + +int mqtt_terminating(struct mqtt* mqtt) +{ + return mqtt_pub_strval(mqtt, TOPIC_HOST, "status", "terminating"); +} + +void mqtt_shutdown(struct mqtt* mqtt) +{ + mqtt_pub_strval(mqtt, TOPIC_HOST, "status", "offline"); + mqtt_disconnect(mqtt); + mqtt_thread_stop(mqtt); + mqtt_close(mqtt); +} diff --git a/src/sbbs3/mqtt.h b/src/sbbs3/mqtt.h index 0115eb537f..0f91ad5911 100644 --- a/src/sbbs3/mqtt.h +++ b/src/sbbs3/mqtt.h @@ -19,10 +19,13 @@ * Note: If this box doesn't appear square, then you need to fix your tabs. * ****************************************************************************/ -#ifndef MQTT_H -#define MQTT_H +#ifndef MQTT_H_ +#define MQTT_H_ #include "scfgdefs.h" +#include "client.h" +#include "server.h" +#include "link_list.h" #include "dllexport.h" #include <stdarg.h> @@ -38,8 +41,12 @@ struct mqtt { mqtt_handle_t handle; scfg_t* cfg; - const char* host; - const char* server; + char* host; + enum server_type server_type; + ulong error_count; + ulong served; + link_list_t client_list; + BOOL shared_client_list; }; enum topic_depth { @@ -57,7 +64,16 @@ enum topic_depth { extern "C" { #endif -DLLEXPORT int mqtt_init(struct mqtt*, scfg_t*, const char* host, const char* server); +DLLEXPORT int mqtt_init(struct mqtt*, scfg_t*, enum server_type); +DLLEXPORT int mqtt_startup(struct mqtt*, scfg_t*, enum server_type, const char* version + ,int (*lputs)(int level, const char* str) + ,BOOL shared_client_list); +DLLEXPORT int mqtt_online(struct mqtt*); +DLLEXPORT int mqtt_server_state(struct mqtt*, enum server_state); +DLLEXPORT int mqtt_server_version(struct mqtt*, const char*); +DLLEXPORT int mqtt_errormsg(struct mqtt*, int level, const char*); +DLLEXPORT int mqtt_terminating(struct mqtt*); +DLLEXPORT void mqtt_shutdown(struct mqtt*); DLLEXPORT char* mqtt_libver(char* str, size_t size); DLLEXPORT char* mqtt_topic(struct mqtt*, enum topic_depth, char* str, size_t size, const char* fmt, ...); DLLEXPORT int mqtt_subscribe(struct mqtt*, enum topic_depth, char* str, size_t size, const char* fmt, ...); @@ -72,9 +88,15 @@ DLLEXPORT int mqtt_connect(struct mqtt*, const char* bind_address); DLLEXPORT int mqtt_disconnect(struct mqtt*); DLLEXPORT int mqtt_thread_start(struct mqtt*); DLLEXPORT int mqtt_thread_stop(struct mqtt*); +DLLEXPORT int mqtt_thread_count(struct mqtt*, enum topic_depth, ulong count); +DLLEXPORT int mqtt_socket_count(struct mqtt*, enum topic_depth, ulong count); +DLLEXPORT int mqtt_served_count(struct mqtt*, enum topic_depth, ulong count); +DLLEXPORT int mqtt_client_count(struct mqtt*, enum topic_depth, ulong count); +DLLEXPORT int mqtt_client_on(struct mqtt*, BOOL on, int sock, client_t* client, BOOL update); +DLLEXPORT int mqtt_client_max(struct mqtt*, ulong count); #ifdef __cplusplus } #endif -#endif // MQTT_H +#endif // MQTT_H_ diff --git a/src/sbbs3/sbbscon.c b/src/sbbs3/sbbscon.c index 3486beba0d..3f8d3839e1 100644 --- a/src/sbbs3/sbbscon.c +++ b/src/sbbs3/sbbscon.c @@ -246,11 +246,11 @@ static void notify_systemd(const char* new_status) } #endif -static int log_puts(int level, char *str) +static int log_puts(int level, const char *str) { static pthread_mutex_t mutex; static BOOL mutex_initialized; - char *p; + const char *p; #ifdef __unix__ if (is_daemon) { @@ -291,7 +291,7 @@ static int log_puts(int level, char *str) return(prompt_len); } -static int lputs(int level, char *str) +static int lputs(int level, const char *str) { if(str != NULL && *str != '\0') mqtt_lputs(&bbs_startup.mqtt, TOPIC_HOST, level, str); @@ -301,7 +301,6 @@ static int lputs(int level, char *str) static void errormsg(void *cbdata, int level, const char *msg) { error_count++; - mqtt_pub_uintval(&bbs_startup.mqtt, TOPIC_HOST, "error_count", error_count); } static int lprintf(int level, const char *fmt, ...) @@ -363,31 +362,6 @@ static void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const st return; } } - -#ifdef MOSQUITTO_LOG -static void mqtt_log_msg(struct mosquitto* moq, void* cbdata, int level, const char* str) -{ - static FILE* fp; - - if(fp == NULL) { - char path[MAX_PATH + 1]; - SAFEPRINTF(path, "%smqtt.log", scfg.logs_dir); - fp = fopen(path, "a"); - if(fp == NULL) - lprintf(LOG_ERR, "Error %d opening %s", errno, path); - } - time_t now = time(NULL); - char tmp[32]; - fprintf(fp, "%.24s %x %s\n", ctime_r(&now, tmp), level, str); -} -#endif - -static void mqtt_disconnected(struct mosquitto* mosq , void* cbdata, int reason) -{ - char msg[1024]; - SAFEPRINTF(msg, "MQTT broker disconnected, reason: %d", reason); - log_puts(LOG_INFO, msg); -} #endif // USE_MOSQUITTO #ifdef __unix__ @@ -656,14 +630,12 @@ static BOOL winsock_cleanup(void) static void set_state(void* cbdata, enum server_state state) { - enum server_type server_type = (enum server_type)cbdata; + enum server_type server_type = ((struct startup*)cbdata)->type; server_state[server_type] = state; #ifdef USE_SYSTEMD notify_systemd(NULL); #endif - mqtt_pub_uintval(mqtt[server_type], TOPIC_SERVER, "state", state); - #ifdef _THREAD_SUID_BROKEN if(state == SERVER_READY) { if(thread_suid_broken) { @@ -699,14 +671,14 @@ static void thread_up(void* p, BOOL up, BOOL setuid) thread_count++; else if(thread_count>0) thread_count--; - mqtt_pub_uintval(&bbs_startup.mqtt, TOPIC_HOST, "thread_count", thread_count); + mqtt_thread_count(&bbs_startup.mqtt, TOPIC_HOST, thread_count); pthread_mutex_unlock(&mutex); lputs(LOG_INFO,NULL); /* update displayed stats */ } static void socket_open(void* cbdata, BOOL open) { - enum server_type server_type = (enum server_type)cbdata; + enum server_type server_type = ((struct startup*)cbdata)->type; static pthread_mutex_t mutex; static BOOL mutex_initialized; @@ -720,38 +692,11 @@ static void socket_open(void* cbdata, BOOL open) socket_count++; else if(socket_count>0) socket_count--; - mqtt_pub_uintval(mqtt[server_type], TOPIC_HOST, "socket_count", socket_count); + mqtt_socket_count(mqtt[server_type], TOPIC_HOST, socket_count); pthread_mutex_unlock(&mutex); lputs(LOG_INFO,NULL); /* update displayed stats */ } -static void pub_client_list() -{ - if(bbs_startup.mqtt.handle != NULL) { - list_node_t* node; - str_list_t list = strListInit(); - - listLock(&client_list); - for(node=client_list.first; node!=NULL; node=node->next) { - client_t* client=node->data; - strListAppendFormat(&list, "%ld\t%s\t%s\t%s\t%s\t%u\t%lu" - ,node->tag - ,client->protocol - ,client->user - ,client->addr - ,client->host - ,client->port - ,(ulong)client->time - ); - } - listUnlock(&client_list); - char buf[1024]; - strListJoin(list, buf, sizeof(buf), "\n"); - strListFree(&list); - mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "client_list", buf); - } -} - static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update) { if(on) { @@ -769,9 +714,9 @@ static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update) } else listRemoveTaggedNode(&client_list, sock, /* free_data: */TRUE); - pub_client_list(); - mqtt_pub_uintval(&bbs_startup.mqtt, TOPIC_HOST, "client_count", client_list.count); - mqtt_pub_uintval(&bbs_startup.mqtt, TOPIC_HOST, "served", served); + mqtt_client_on(&bbs_startup.mqtt, on, sock, client, update); + mqtt_client_count(&bbs_startup.mqtt, TOPIC_HOST, client_list.count); + mqtt_served_count(&bbs_startup.mqtt, TOPIC_HOST, served); lputs(LOG_INFO,NULL); /* update displayed stats */ } @@ -1117,7 +1062,7 @@ void recycle(void* cbdata) mail_startup_t* mail=NULL; services_startup_t* services=NULL; - switch((enum server_type)cbdata) { + switch(((struct startup*)cbdata)->type) { case SERVER_TERM: bbs = &bbs_startup; break; @@ -1340,7 +1285,8 @@ int main(int argc, char** argv) /* Initialize BBS startup structure */ memset(&bbs_startup,0,sizeof(bbs_startup)); bbs_startup.size=sizeof(bbs_startup); - bbs_startup.cbdata = (void*)SERVER_TERM; + bbs_startup.type = SERVER_TERM; + bbs_startup.cbdata = &bbs_startup; bbs_startup.log_level = LOG_DEBUG; bbs_startup.lputs=bbs_lputs; bbs_startup.event_lputs=event_lputs; @@ -1360,7 +1306,8 @@ int main(int argc, char** argv) /* Initialize FTP startup structure */ memset(&ftp_startup,0,sizeof(ftp_startup)); ftp_startup.size=sizeof(ftp_startup); - ftp_startup.cbdata = (void*)SERVER_FTP; + ftp_startup.type = SERVER_FTP; + ftp_startup.cbdata = &ftp_startup; ftp_startup.log_level = LOG_DEBUG; ftp_startup.lputs=ftp_lputs; ftp_startup.errormsg=errormsg; @@ -1380,7 +1327,8 @@ int main(int argc, char** argv) /* Initialize Web Server startup structure */ memset(&web_startup,0,sizeof(web_startup)); web_startup.size=sizeof(web_startup); - web_startup.cbdata = (void*)SERVER_WEB; + web_startup.type = SERVER_WEB; + web_startup.cbdata = &web_startup; web_startup.log_level = LOG_DEBUG; web_startup.lputs=web_lputs; web_startup.errormsg=errormsg; @@ -1399,7 +1347,8 @@ int main(int argc, char** argv) /* Initialize Mail Server startup structure */ memset(&mail_startup,0,sizeof(mail_startup)); mail_startup.size=sizeof(mail_startup); - mail_startup.cbdata = (void*)SERVER_MAIL; + mail_startup.type = SERVER_MAIL; + mail_startup.cbdata = &mail_startup; mail_startup.log_level = LOG_DEBUG; mail_startup.lputs=mail_lputs; mail_startup.errormsg=errormsg; @@ -1418,7 +1367,8 @@ int main(int argc, char** argv) /* Initialize Services startup structure */ memset(&services_startup,0,sizeof(services_startup)); services_startup.size=sizeof(services_startup); - services_startup.cbdata = (void*)SERVER_SERVICES; + services_startup.type = SERVER_SERVICES; + services_startup.cbdata = &services_startup; services_startup.log_level = LOG_DEBUG; services_startup.lputs=services_lputs; services_startup.errormsg=errormsg; @@ -1838,60 +1788,23 @@ int main(int argc, char** argv) #endif // __unix__ - if(scfg.mqtt.enabled) { - int result = mqtt_init(&bbs_startup.mqtt, &scfg, host_name, "term"); - if(result != MQTT_SUCCESS) { - lprintf(LOG_INFO, "MQTT init failure: %d", result); - } else { - lprintf(LOG_INFO, "MQTT lib: %s", mqtt_libver(str, sizeof(str))); - result = mqtt_open(&bbs_startup.mqtt); - if(result != MQTT_SUCCESS) { - lprintf(LOG_ERR, "MQTT open failure: %d", result); - } else { - result = mqtt_thread_start(&bbs_startup.mqtt); - if(result != MQTT_SUCCESS) { - lprintf(LOG_ERR, "Error %d starting pub/sub thread", result); - mqtt_close(&bbs_startup.mqtt); - } else { - lprintf(LOG_INFO, "MQTT connecting to broker %s:%u", scfg.mqtt.broker_addr, scfg.mqtt.broker_port); - result = mqtt_connect(&bbs_startup.mqtt, /* bind_address: */NULL); - if(result == MQTT_SUCCESS) { - lprintf(LOG_INFO, "MQTT broker-connect (%s:%d) successful", scfg.mqtt.broker_addr, scfg.mqtt.broker_port); - } else { - lprintf(LOG_ERR, "MQTT broker-connect (%s:%d) failure: %d", scfg.mqtt.broker_addr, scfg.mqtt.broker_port, result); - mqtt_close(&bbs_startup.mqtt); - } - } - } - } - } + mqtt_startup(&bbs_startup.mqtt, &scfg, SERVER_TERM, sbbscon_ver(), log_puts, /* shared_client_list: */TRUE); mqtt[SERVER_TERM] = &bbs_startup.mqtt; mail_startup.mqtt = bbs_startup.mqtt; - mail_startup.mqtt.server = "mail"; + mail_startup.mqtt.server_type = SERVER_MAIL; mqtt[SERVER_MAIL] = &mail_startup.mqtt; ftp_startup.mqtt = bbs_startup.mqtt; - ftp_startup.mqtt.server = "ftp"; + ftp_startup.mqtt.server_type = SERVER_FTP; mqtt[SERVER_FTP] = &ftp_startup.mqtt; web_startup.mqtt = bbs_startup.mqtt; - web_startup.mqtt.server = "web"; + web_startup.mqtt.server_type = SERVER_WEB; mqtt[SERVER_WEB] = &web_startup.mqtt; services_startup.mqtt = bbs_startup.mqtt; - services_startup.mqtt.server = "srvc"; + services_startup.mqtt.server_type = SERVER_SERVICES; mqtt[SERVER_SERVICES] = &services_startup.mqtt; - mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "version", sbbscon_ver()); - mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "status", "initializing"); #ifdef USE_MOSQUITTO if(bbs_startup.mqtt.handle != NULL) { -#ifdef MOSQUITTO_LOG - mosquitto_log_callback_set(bbs_startup.mqtt.handle, mqtt_log_msg); -#endif - mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "error_count"); - mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "thread_count"); - mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "socket_count"); - mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "client_count"); - mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "served"); - mosquitto_disconnect_callback_set(bbs_startup.mqtt.handle, mqtt_disconnected); mosquitto_message_callback_set(bbs_startup.mqtt.handle, mqtt_message_received); for(int i = bbs_startup.first_node; i <= bbs_startup.last_node; i++) { mqtt_subscribe(&bbs_startup.mqtt, TOPIC_BBS, str, sizeof(str), "node%d/input", i); @@ -2017,7 +1930,8 @@ int main(int argc, char** argv) if(run_web) _beginthread((void(*)(void*))web_server,0,&web_startup); - mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "status", "online"); + mqtt_online(&bbs_startup.mqtt); + #ifdef __unix__ uid_t uid = getuid(); if(uid != 0 && !capabilities_set) { /* are we running as a normal user? */ @@ -2328,16 +2242,13 @@ int main(int argc, char** argv) } } - mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "status", "terminating"); + mqtt_terminating(&bbs_startup.mqtt); terminate(); /* erase the prompt */ printf("\r%*s\r",prompt_len,""); - mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "status", "offline"); - mqtt_disconnect(&bbs_startup.mqtt); - mqtt_thread_stop(&bbs_startup.mqtt); - mqtt_close(&bbs_startup.mqtt); + mqtt_shutdown(&bbs_startup.mqtt); return(0); } diff --git a/src/sbbs3/server.h b/src/sbbs3/server.h new file mode 100644 index 0000000000..79617ee38e --- /dev/null +++ b/src/sbbs3/server.h @@ -0,0 +1,43 @@ +/* Synchronet Server values */ + +/**************************************************************************** + * @format.tab-size 4 (Plain Text/Source Code File Header) * + * @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) * + * * + * Copyright Rob Swindell - http://www.synchro.net/copyright.html * + * * + * This program is free software; you can redistribute it and/or * + * modify it under the terms of the GNU General Public License * + * as published by the Free Software Foundation; either version 2 * + * of the License, or (at your option) any later version. * + * See the GNU General Public License for more details: gpl.txt or * + * http://www.fsf.org/copyleft/gpl.html * + * * + * For Synchronet coding style and modification guidelines, see * + * http://www.synchro.net/source.html * + * * + * Note: If this box doesn't appear square, then you need to fix your tabs. * + ****************************************************************************/ + +#ifndef SERVER_H_ +#define SERVER_H_ + +enum server_type { + SERVER_TERM, + SERVER_MAIL, + SERVER_FTP, + SERVER_WEB, + SERVER_SERVICES, + SERVER_COUNT +}; + +// Approximate systemd service states (see sd_notify) +enum server_state { + SERVER_STOPPED, + SERVER_INIT, + SERVER_READY, + SERVER_RELOADING, + SERVER_STOPPING +}; + +#endif /* Don't add anything after this line */ diff --git a/src/sbbs3/services.c b/src/sbbs3/services.c index 817077a148..abe81aa1b7 100644 --- a/src/sbbs3/services.c +++ b/src/sbbs3/services.c @@ -125,8 +125,8 @@ static int lprintf(int level, const char *fmt, ...) if(level <= LOG_ERR) { char errmsg[sizeof(sbuf)+16]; + errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, sbuf); SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf); - errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg); if(startup!=NULL && startup->errormsg!=NULL) startup->errormsg(startup->cbdata,level,errmsg); } @@ -179,8 +179,11 @@ static char* server_host_name(void) static void set_state(enum server_state state) { - if(startup != NULL && startup->set_state != NULL) - startup->set_state(startup->cbdata, state); + if(startup != NULL) { + if(startup->set_state != NULL) + startup->set_state(startup->cbdata, state); + mqtt_server_state(&startup->mqtt, state); + } } static ulong active_clients(void) @@ -1857,7 +1860,7 @@ void services_thread(void* arg) } set_state(SERVER_INIT); - mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", services_ver()); + mqtt_server_version(&startup->mqtt, services_ver()); #ifdef _THREAD_SUID_BROKEN if(thread_suid_broken) diff --git a/src/sbbs3/services.h b/src/sbbs3/services.h index f2b80f39aa..7ccaf3adb3 100644 --- a/src/sbbs3/services.h +++ b/src/sbbs3/services.h @@ -24,52 +24,15 @@ typedef struct { - DWORD size; /* sizeof(bbs_struct_t) */ + STARTUP_COMMON_ELEMENTS struct in_addr outgoing4; struct in6_addr outgoing6; str_list_t interfaces; - DWORD options; /* See BBS_OPT definitions */ - WORD sem_chk_freq; /* semaphore file checking frequency (in seconds) */ - - void* cbdata; /* Private data passed to callbacks */ - - /* Callbacks (NULL if unused) */ - int (*lputs)(void*, int level, const char*); /* Log - put string */ - void (*errormsg)(void*, int level, const char* msg); - void (*set_state)(void*, enum server_state); - void (*recycle)(void*); - void (*terminated)(void*, int code); - void (*clients)(void*, int active); - void (*thread_up)(void*, BOOL up, BOOL setuid); - void (*socket_open)(void*, BOOL open); - void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update); - BOOL (*seteuid)(BOOL user); - BOOL (*setuid)(BOOL force); - - /* Paths */ - char ctrl_dir[INI_MAX_VALUE_LEN]; - char temp_dir[INI_MAX_VALUE_LEN]; - char ini_fname[INI_MAX_VALUE_LEN]; // sbbs.ini path/filename char services_ini[128]; // services.ini filename - /* Misc */ - char host_name[128]; - BOOL recycle_now; - BOOL shutdown_now; - int log_level; - uint bind_retry_count; /* Number of times to retry bind() calls */ - uint bind_retry_delay; /* Time to wait between each bind() retry */ - - struct startup_sound_settings sound; - /* JavaScript operating parameters */ js_startup_t js; - /* Login Attempt parameters */ - struct login_attempt_settings login_attempt; - link_list_t* login_attempt_list; - struct mqtt mqtt; - } services_startup_t; #if 0 diff --git a/src/sbbs3/startup.h b/src/sbbs3/startup.h index dcf8d59ac4..bfd775fe03 100644 --- a/src/sbbs3/startup.h +++ b/src/sbbs3/startup.h @@ -24,6 +24,7 @@ #include <stddef.h> /* offsetof */ #include "client.h" +#include "server.h" #include "ringbuf.h" #include "semwrap.h" /* sem_t */ #include "ini_file.h" /* INI_MAX_VALUE_LEN */ @@ -80,27 +81,46 @@ typedef struct { } global_startup_t; -enum server_type { - SERVER_TERM, - SERVER_MAIL, - SERVER_FTP, - SERVER_WEB, - SERVER_SERVICES, - SERVER_COUNT -}; +// The elements common to all Synchronet server startup structures +// C++ doesn't support anonymous structs, or I would have just used one here :-( +#define STARTUP_COMMON_ELEMENTS \ + int size; \ + int type; \ + uint32_t options; \ + int log_level; \ + void* cbdata; \ + int (*lputs)(void*, int level, const char*); \ + void (*errormsg)(void*, int level, const char* msg); \ + void (*set_state)(void*, enum server_state); \ + void (*recycle)(void*); \ + void (*terminated)(void*, int code); \ + void (*clients)(void*, int active); \ + void (*thread_up)(void*, BOOL up, BOOL setuid); \ + void (*socket_open)(void*, BOOL open); \ + void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update); \ + BOOL (*seteuid)(BOOL user); \ + BOOL (*setuid)(BOOL force); \ + char ctrl_dir[INI_MAX_VALUE_LEN]; \ + char temp_dir[INI_MAX_VALUE_LEN]; \ + char ini_fname[INI_MAX_VALUE_LEN]; \ + char host_name[128]; \ + BOOL recycle_now; \ + BOOL shutdown_now; \ + int sem_chk_freq; \ + uint bind_retry_count; \ + uint bind_retry_delay; \ + struct startup_sound_settings sound; \ + struct login_attempt_settings login_attempt; \ + link_list_t* login_attempt_list; \ + struct mqtt mqtt; -// Aproximate systemd service states (see sd_notify) -enum server_state { - SERVER_STOPPED, - SERVER_INIT, - SERVER_READY, - SERVER_RELOADING, - SERVER_STOPPING +struct startup { + STARTUP_COMMON_ELEMENTS }; typedef struct { - DWORD size; /* sizeof(bbs_struct_t) */ + STARTUP_COMMON_ELEMENTS WORD first_node; WORD last_node; WORD telnet_port; @@ -111,61 +131,31 @@ typedef struct { WORD ssh_connect_timeout; WORD outbuf_highwater_mark; /* output block size control */ WORD outbuf_drain_timeout; - WORD sem_chk_freq; /* semaphore file checking frequency (in seconds) */ struct in_addr outgoing4; struct in6_addr outgoing6; str_list_t telnet_interfaces; - uint32_t options; /* See BBS_OPT definitions */ str_list_t rlogin_interfaces; str_list_t ssh_interfaces; RingBuf** node_spybuf; /* Spy output buffer (each node) */ RingBuf** node_inbuf; /* User input buffer (each node) */ - void* cbdata; /* Private data passed to callbacks */ void* event_cbdata; /* Private data passed to event_lputs callback */ - - /* Callbacks (NULL if unused) */ - int (*lputs)(void*, int , const char*); /* Log - put string */ - int (*event_lputs)(void*, int, const char*); /* Event log - put string */ - void (*errormsg)(void*, int level, const char* msg); - void (*set_state)(void*, enum server_state); - void (*recycle)(void*); - void (*terminated)(void*, int code); - void (*clients)(void*, int active); - void (*thread_up)(void*, BOOL up, BOOL setuid); - void (*socket_open)(void*, BOOL open); - void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update); - BOOL (*seteuid)(BOOL user); /* Set Unix uid for thread (bind) */ - BOOL (*setuid)(BOOL force); + int (*event_lputs)(void*, int, const char*); /* Paths */ - char ctrl_dir[INI_MAX_VALUE_LEN]; char dosemu_path[INI_MAX_VALUE_LEN]; char dosemuconf_path[INI_MAX_VALUE_LEN]; - char temp_dir[INI_MAX_VALUE_LEN]; - char ini_fname[INI_MAX_VALUE_LEN]; char web_file_vpath_prefix[INI_MAX_VALUE_LEN]; /* Miscellaneous */ + uint max_concurrent_connections; BOOL usedosemu; char xtrn_term_ansi[32]; /* external ANSI terminal type (e.g. "ansi-bbs") */ char xtrn_term_dumb[32]; /* external dumb terminal type (e.g. "dumb") */ - char host_name[128]; - BOOL recycle_now; - BOOL shutdown_now; - int log_level; - uint bind_retry_count; /* Number of times to retry bind() calls */ - uint bind_retry_delay; /* Time to wait between each bind() retry */ /* JavaScript operating parameters */ js_startup_t js; - struct startup_sound_settings sound; - struct login_attempt_settings login_attempt; - link_list_t* login_attempt_list; - uint max_concurrent_connections; - struct mqtt mqtt; - } bbs_startup_t; #define DEFAULT_SEM_CHK_FREQ 2 diff --git a/src/sbbs3/websrvr.c b/src/sbbs3/websrvr.c index 5b943ba099..3ddea1a73d 100644 --- a/src/sbbs3/websrvr.c +++ b/src/sbbs3/websrvr.c @@ -550,8 +550,8 @@ static int lprintf(int level, const char *fmt, ...) if(level <= LOG_ERR) { char errmsg[sizeof(sbuf)+16]; + errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, sbuf); SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf); - errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg); if(startup!=NULL && startup->errormsg!=NULL) startup->errormsg(startup->cbdata,level,errmsg); } @@ -738,8 +738,11 @@ static char* server_host_name(void) static void set_state(enum server_state state) { - if(startup != NULL && startup->set_state != NULL) - startup->set_state(startup->cbdata, state); + if(startup != NULL) { + if(startup->set_state != NULL) + startup->set_state(startup->cbdata, state); + mqtt_server_state(&startup->mqtt, state); + } } static void update_clients(void) @@ -748,7 +751,7 @@ static void update_clients(void) uint32_t count = protected_uint32_value(active_clients); if(startup->clients!=NULL) startup->clients(startup->cbdata, count); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", count); + mqtt_client_count(&startup->mqtt, TOPIC_SERVER, count); } } @@ -7003,7 +7006,7 @@ void web_server(void* arg) } set_state(SERVER_INIT); - mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", web_ver()); + mqtt_server_version(&startup->mqtt, web_ver()); #ifdef _THREAD_SUID_BROKEN if(thread_suid_broken) @@ -7197,7 +7200,7 @@ void web_server(void* arg) set_state(SERVER_READY); lprintf(LOG_INFO,"Web Server thread started"); - mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", startup->max_clients); + mqtt_client_max(&startup->mqtt, startup->max_clients); while(!terminated && !terminate_server) { YIELD(); diff --git a/src/sbbs3/websrvr.h b/src/sbbs3/websrvr.h index b8cad6d8cc..bdc0a6fde1 100644 --- a/src/sbbs3/websrvr.h +++ b/src/sbbs3/websrvr.h @@ -27,72 +27,40 @@ #include "semwrap.h" /* sem_t */ typedef struct { - size_t size; /* sizeof(web_startup_t) */ + + STARTUP_COMMON_ELEMENTS uint16_t max_clients; #define WEB_DEFAULT_MAX_CLIENTS 0 /* 0=unlimited */ uint16_t max_inactivity; #define WEB_DEFAULT_MAX_INACTIVITY 120 /* seconds */ uint16_t max_cgi_inactivity; #define WEB_DEFAULT_MAX_CGI_INACTIVITY 120 /* seconds */ - uint16_t sem_chk_freq; /* semaphore file checking frequency (in seconds) */ - uint32_t options; uint16_t port; uint16_t tls_port; str_list_t interfaces; str_list_t tls_interfaces; - void* cbdata; /* Private data passed to callbacks */ - - /* Callbacks (NULL if unused) */ - int (*lputs)(void*, int level, const char* msg); - void (*errormsg)(void*, int level, const char* msg); - void (*set_state)(void*, enum server_state); - void (*recycle)(void*); - void (*terminated)(void*, int code); - void (*clients)(void*, int active); - void (*thread_up)(void*, BOOL up, BOOL setuid); - void (*socket_open)(void*, BOOL open); - void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update); - BOOL (*seteuid)(BOOL user); - BOOL (*setuid)(BOOL); - /* Paths */ char ssjs_ext[16]; /* Server-Side JavaScript file extension */ char** cgi_ext; /* CGI Extensions */ char cgi_dir[INI_MAX_VALUE_LEN]; /* relative to root_dir (all files executable) */ - char ctrl_dir[INI_MAX_VALUE_LEN]; char root_dir[INI_MAX_VALUE_LEN]; /* HTML root directory */ char error_dir[INI_MAX_VALUE_LEN]; /* relative to root_dir */ - char temp_dir[INI_MAX_VALUE_LEN]; char** index_file_name; /* Index filenames */ char logfile_base[INI_MAX_VALUE_LEN]; /* Logfile base name (date is appended) */ - char ini_fname[INI_MAX_VALUE_LEN]; char file_index_script[INI_MAX_VALUE_LEN]; char file_vpath_prefix[INI_MAX_VALUE_LEN]; BOOL file_vpath_for_vhosts; /* Misc */ - char host_name[128]; - BOOL recycle_now; - BOOL shutdown_now; - int log_level; int tls_error_level; /* Cap the severity of TLS error log messages */ - uint bind_retry_count; /* Number of times to retry bind() calls */ - uint bind_retry_delay; /* Time to wait between each bind() retry */ char default_cgi_content[128]; char default_auth_list[128]; uint16_t outbuf_drain_timeout; - struct startup_sound_settings sound; - /* JavaScript operating parameters */ js_startup_t js; - /* Login Attempt parameters */ - struct login_attempt_settings login_attempt; - link_list_t* login_attempt_list; - struct mqtt mqtt; - } web_startup_t; #if defined(STARTUP_INIT_FIELD_TABLES) -- GitLab