Commits (1)
  • Rob Swindell's avatar
    Over-overhaul of MQTT support · a8a06fd6
    Rob Swindell authored
    Each Synchronet server is now its own MQTT client. This means there's no
    longer any MQTT logic in the Synchronet "hosts" (e.g. sbbscon.c, ctrl/*.cpp)
    and none needed for SBBS NT services (they'll "just work" with MQTT).
    
    This also means that just about everything (except for nodes, spam and hack)
    is now published per-server (in the sbbs/BBS-ID/hostname/server/ topic branch)
    and if you want aggregated totals or client lists, you'll have to do that in
    your own MQTT client or dashboard.
    
    I also removed the publishing of thread_count and socket_count topics as
    they weren't universally supported across all servers and are of questionable
    value. They can be added back later if determined to be useful.
    a8a06fd6
......@@ -582,7 +582,7 @@ bool sbbs_t::update_nodeterm(void)
,mouse_mode
,console
);
mqtt_pub_strval((struct startup*)startup, TOPIC_BBS, topic, str);
mqtt_pub_strval(mqtt, TOPIC_BBS, topic, str);
}
return result;
}
......
......@@ -171,7 +171,6 @@ static void thread_up(void* p, BOOL up, BOOL setuid)
threads++;
else if(threads>0)
threads--;
mqtt_thread_count((struct startup*)&MainForm->bbs_startup, TOPIC_HOST, threads);
ReleaseMutex(mutex);
}
......@@ -189,7 +188,6 @@ void socket_open(void* p, BOOL open)
sockets++;
else if(sockets>0)
sockets--;
mqtt_socket_count((struct startup*)&MainForm->bbs_startup, TOPIC_HOST, sockets);
ReleaseMutex(mutex);
}
......@@ -203,10 +201,8 @@ static void client_add(void* p, BOOL add)
if(add) {
clients++;
total_clients++;
mqtt_served_count((struct startup*)&MainForm->bbs_startup, TOPIC_HOST, total_clients);
} else if(clients>0)
clients--;
mqtt_client_count((struct startup*)&MainForm->bbs_startup, TOPIC_HOST, clients);
}
static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update)
......@@ -217,8 +213,6 @@ static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update)
static HANDLE mutex;
TListItem* Item;
mqtt_client_on((struct startup*)&MainForm->bbs_startup, on, sock, client, update);
if(!mutex)
mutex=CreateMutex(NULL,false,NULL);
WaitForSingleObject(mutex,INFINITE);
......@@ -338,7 +332,6 @@ static void bbs_set_state(void* p, enum server_state state)
{
TelnetForm->Status->Caption = server_state_str(state);
mqtt_server_state((struct startup*)&MainForm->bbs_startup, state);
switch(state) {
case SERVER_STOPPED:
MainForm->TelnetStart->Enabled=true;
......@@ -436,7 +429,6 @@ static void services_set_state(void* p, enum server_state state)
{
ServicesForm->Status->Caption = server_state_str(state);
mqtt_server_state((struct startup*)&MainForm->services_startup, state);
switch(state) {
case SERVER_STOPPED:
MainForm->ServicesStart->Enabled=true;
......@@ -503,7 +495,6 @@ static void mail_set_state(void* p, enum server_state state)
{
MailForm->Status->Caption = server_state_str(state);
mqtt_server_state((struct startup*)&MainForm->mail_startup, state);
switch(state) {
case SERVER_STOPPED:
MainForm->MailStart->Enabled=true;
......@@ -599,7 +590,6 @@ static void ftp_set_state(void* p, enum server_state state)
{
FtpForm->Status->Caption = server_state_str(state);
mqtt_server_state((struct startup*)&MainForm->ftp_startup, state);
switch(state) {
case SERVER_STOPPED:
MainForm->FtpStart->Enabled=true;
......@@ -668,7 +658,6 @@ static void web_set_state(void* p, enum server_state state)
{
WebForm->Status->Caption = server_state_str(state);
mqtt_server_state((struct startup*)&MainForm->web_startup, state);
switch(state) {
case SERVER_STOPPED:
MainForm->WebStart->Enabled=true;
......@@ -1054,7 +1043,6 @@ BOOL __fastcall TMainForm::servicesServiceEnabled(void)
//---------------------------------------------------------------------------
void __fastcall TMainForm::FormClose(TObject *Sender, TCloseAction &Action)
{
mqtt_terminating((struct startup*)&bbs_startup);
UpTimer->Enabled=false; /* Stop updating the status bar */
StatsTimer->Enabled=false;
......@@ -1083,14 +1071,12 @@ void __fastcall TMainForm::FormClose(TObject *Sender, TCloseAction &Action)
}
StatusBar->Panels->Items[STATUSBAR_LAST_PANEL]->Text="Closing...";
Application->ProcessMessages();
mqtt_terminating((struct startup*)&bbs_startup);
LogTimer->Enabled=false;
ServiceStatusTimer->Enabled=false;
NodeForm->Timer->Enabled=false;
ClientForm->Timer->Enabled=false;
mqtt_shutdown((struct startup*)&bbs_startup);
}
//---------------------------------------------------------------------------
void __fastcall TMainForm::FormCloseQuery(TObject *Sender, bool &CanClose)
......@@ -1970,22 +1956,7 @@ void __fastcall TMainForm::StartupTimerTick(TObject *Sender)
StartupTimer->Interval = 2500; // Let 'em see the logo for a bit
StartupTimer->Enabled = true;
} else {
if(!bbsServiceEnabled()) {
bbs_startup.startup[SERVER_TERM] = (struct startup*)&bbs_startup;
bbs_startup.startup[SERVER_FTP] = (struct startup*)&ftp_startup;
bbs_startup.startup[SERVER_WEB] = (struct startup*)&web_startup;
bbs_startup.startup[SERVER_MAIL] = (struct startup*)&mail_startup;
bbs_startup.startup[SERVER_SERVICES] = (struct startup*)&services_startup;
mqtt_startup((struct startup*)&bbs_startup, &cfg, ver()
,/* lputs: */NULL
,/* shared_client_list: */TRUE);
ftp_startup.mqtt = bbs_startup.mqtt;
web_startup.mqtt = bbs_startup.mqtt;
mail_startup.mqtt = bbs_startup.mqtt;
services_startup.mqtt = bbs_startup.mqtt;
}
DisplayMainPanels(Sender);
mqtt_online((struct startup*)&bbs_startup);
}
Initialized=true;
}
......
......@@ -78,6 +78,7 @@ const char* server_abbrev = "ftp";
static ftp_startup_t* startup=NULL;
static scfg_t scfg;
static struct mqtt mqtt;
static struct xpms_set *ftp_set = NULL;
static protected_uint32_t active_clients;
static protected_uint32_t thread_count;
......@@ -127,6 +128,28 @@ BOOL dir_op(scfg_t* cfg, user_t* user, client_t* client, uint dirnum)
return is_user_dirop(cfg, dirnum, user, client);
}
static int lputs(int level, const char* str)
{
mqtt_lputs(&mqtt, TOPIC_SERVER, level, str);
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, &mqtt, level, startup == NULL ? NULL : startup->host_name, str);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, str);
if(startup != NULL && startup->errormsg != NULL)
startup->errormsg(startup->cbdata, level, errmsg);
}
if(startup == NULL || startup->lputs == NULL || str == NULL || level > startup->log_level)
return 0;
#if defined(_WIN32)
if(IsBadCodePtr((FARPROC)startup->lputs))
return 0;
#endif
return startup->lputs(startup->cbdata,level,str);
}
#if defined(__GNUC__) // Catch printf-format errors with lprintf
static int lprintf(int level, const char *fmt, ...) __attribute__ ((format (printf, 2, 3)));
#endif
......@@ -140,25 +163,7 @@ static int lprintf(int level, const char *fmt, ...)
sbuf[sizeof(sbuf)-1]=0;
va_end(argptr);
if(level <= LOG_ERR) {
char errmsg[sizeof(sbuf)+16];
errorlog(&scfg, (struct startup*)startup, level, startup==NULL ? NULL:startup->host_name, sbuf);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
if(startup != NULL)
mqtt_lputs((struct startup*)startup, TOPIC_SERVER, level, sbuf);
if(startup==NULL || startup->lputs==NULL || level > startup->log_level)
return(0);
#if defined(_WIN32)
if(IsBadCodePtr((FARPROC)startup->lputs))
return(0);
#endif
return startup->lputs(startup->cbdata,level,sbuf);
return lputs(level, sbuf);
}
#ifdef _WINSOCKAPI_
......@@ -199,7 +204,7 @@ static void set_state(enum server_state state)
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state((struct startup*)startup, state);
mqtt_server_state(&mqtt, state);
}
}
......@@ -209,7 +214,6 @@ static void update_clients(void)
uint32_t count = protected_uint32_value(active_clients);
if(startup->clients != NULL)
startup->clients(startup->cbdata, count);
mqtt_client_count((struct startup*)startup, TOPIC_SERVER, count);
}
}
......@@ -219,6 +223,7 @@ static void client_on(SOCKET sock, client_t* client, BOOL update)
listAddNodeData(&current_connections, client->addr, strlen(client->addr) + 1, sock, LAST_NODE);
if(startup!=NULL && startup->client_on!=NULL)
startup->client_on(startup->cbdata,TRUE,sock,client,update);
mqtt_client_on(&mqtt, TRUE, sock, client, update);
}
static void client_off(SOCKET sock)
......@@ -226,6 +231,7 @@ static void client_off(SOCKET sock)
listRemoveTaggedNode(&current_connections, sock, /* free_data */TRUE);
if(startup!=NULL && startup->client_on!=NULL)
startup->client_on(startup->cbdata,FALSE,sock,NULL,FALSE);
mqtt_client_on(&mqtt, FALSE, sock, NULL, FALSE);
}
static void thread_up(BOOL setuid)
......@@ -233,7 +239,6 @@ static void thread_up(BOOL setuid)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,TRUE, setuid);
mqtt_thread_count((struct startup*)startup, TOPIC_SERVER, protected_uint32_value(thread_count));
}
}
......@@ -243,7 +248,6 @@ static int32_t thread_down(void)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,FALSE, FALSE);
mqtt_thread_count((struct startup*)startup, TOPIC_SERVER, count);
}
return count;
}
......@@ -1726,7 +1730,7 @@ static BOOL ftp_hacklog(char* prot, char* user, char* text, char* host, union xp
PlaySound(startup->sound.hack, NULL, SND_ASYNC|SND_FILENAME);
#endif
return hacklog(&scfg, (struct startup*)startup, prot, user, text, host, addr);
return hacklog(&scfg, &mqtt, prot, user, text, host, addr);
}
/****************************************************************************/
......@@ -4888,6 +4892,8 @@ static void cleanup(int code, int line)
thread_down();
if(terminate_server || code)
lprintf(LOG_INFO,"#### FTP Server thread terminated (%lu clients served)", served);
set_state(SERVER_STOPPED);
mqtt_shutdown(&mqtt);
if(startup!=NULL && startup->terminated!=NULL)
startup->terminated(startup->cbdata,code);
}
......@@ -4955,8 +4961,6 @@ void ftp_server(void* arg)
}
set_state(SERVER_INIT);
mqtt_server_version((struct startup*)startup, ftp_ver());
uptime=0;
served=0;
startup->recycle_now=FALSE;
......@@ -5019,6 +5023,8 @@ void ftp_server(void* arg)
break;
}
mqtt_startup(&mqtt, &scfg, (struct startup*)startup, ftp_ver(), lputs);
if((t=checktime())!=0) { /* Check binary time */
lprintf(LOG_ERR,"!TIME PROBLEM (%ld)",t);
}
......@@ -5064,8 +5070,6 @@ void ftp_server(void* arg)
update_clients();
strlwr(scfg.sys_id); /* Use lower-case unix-looking System ID for group name */
/* open a socket and wait for a client */
ftp_set = xpms_create(startup->bind_retry_count, startup->bind_retry_delay, lprintf);
......@@ -5096,7 +5100,7 @@ void ftp_server(void* arg)
set_state(SERVER_READY);
lprintf(LOG_INFO,"FTP Server thread started");
mqtt_client_max((struct startup*)startup, startup->max_clients);
mqtt_client_max(&mqtt, startup->max_clients);
while(ftp_set!=NULL && !terminate_server) {
YIELD();
......@@ -5212,6 +5216,4 @@ void ftp_server(void* arg)
} while(!terminate_server);
protected_uint32_destroy(thread_count);
set_state(SERVER_STOPPED);
}
......@@ -20,12 +20,13 @@
****************************************************************************/
#include "sbbs.h"
#include "mqtt.h"
#include "git_branch.h"
#include "git_hash.h"
const char* log_line_ending = "\r\n";
extern "C" BOOL hacklog(scfg_t* cfg, struct startup* startup, const char* prot, const char* user, const char* text, const char* host, union xp_sockaddr* addr)
extern "C" BOOL hacklog(scfg_t* cfg, struct mqtt* mqtt, const char* prot, const char* user, const char* text, const char* host, union xp_sockaddr* addr)
{
char tstr[64];
char fname[MAX_PATH+1];
......@@ -54,12 +55,12 @@ extern "C" BOOL hacklog(scfg_t* cfg, struct startup* startup, const char* prot,
fputs(log_line_ending, fp);
fcloselog(fp);
if(startup != NULL) {
if(mqtt != NULL) {
char str[1024];
if(text == NULL)
text= "";
snprintf(str, sizeof(str), "%s\t%s\t%u\t%s\t%s\t%s", prot, user, inet_addrport(addr), host, ip, text);
mqtt_pub_strval(startup, TOPIC_HOST, "hack", str);
mqtt_pub_strval(mqtt, TOPIC_HOST, "hack", str);
}
return true;
......@@ -67,10 +68,10 @@ extern "C" BOOL hacklog(scfg_t* cfg, struct startup* startup, const char* prot,
BOOL sbbs_t::hacklog(const char* prot, const char* text)
{
return ::hacklog(&cfg, (struct startup*)startup, prot, useron.alias, text, client_name, &client_addr);
return ::hacklog(&cfg, mqtt, prot, useron.alias, text, client_name, &client_addr);
}
extern "C" BOOL spamlog(scfg_t* cfg, struct startup* startup, char* prot, char* action
extern "C" BOOL spamlog(scfg_t* cfg, struct mqtt* mqtt, char* prot, char* action
,char* reason, char* host, char* ip_addr
,char* to, char* from)
{
......@@ -110,18 +111,18 @@ extern "C" BOOL spamlog(scfg_t* cfg, struct startup* startup, char* prot, char*
fputs(log_line_ending, fp);
fcloselog(fp);
if(startup != NULL) {
if(mqtt != NULL) {
char str[1024];
if(reason == NULL)
reason = (char*)"";
snprintf(str, sizeof(str), "%s\t%s\t%s\t%s\t%s\t%s\t%s", prot, action, host, ip_addr, from, to_user, reason);
mqtt_pub_strval(startup, TOPIC_HOST, "spam", str);
mqtt_pub_strval(mqtt, TOPIC_HOST, "spam", str);
}
return true;
}
extern "C" int errorlog(scfg_t* cfg, struct startup* startup, int level, const char* host, const char* text)
extern "C" int errorlog(scfg_t* cfg, struct mqtt* mqtt, int level, const char* host, const char* text)
{
FILE* fp;
char buf[128];
......@@ -150,7 +151,7 @@ extern "C" int errorlog(scfg_t* cfg, struct startup* startup, int level, const c
SAFEPRINTF2(subject, "%s %sERROR occurred", host, level <= LOG_CRIT ? "CRITICAL " : "");
notify(cfg, cfg->node_erruser, subject, text);
}
mqtt_errormsg(startup, level, text);
mqtt_errormsg(mqtt, level, text);
return 0;
}
......
......@@ -145,7 +145,7 @@ void sbbs_t::badlogin(const char* user, const char* passwd, const char* protocol
count=loginFailure(startup->login_attempt_list, addr, protocol, user, passwd);
if(user!=NULL && startup->login_attempt.hack_threshold && count>=startup->login_attempt.hack_threshold) {
getnameinfo(&addr->addr, addr_len, host_name, sizeof(host_name), NULL, 0, NI_NAMEREQD);
::hacklog(&cfg, (struct startup*)startup, reason, user, passwd, host_name, addr);
::hacklog(&cfg, mqtt, reason, user, passwd, host_name, addr);
#ifdef _WIN32
if(startup->sound.hack[0] && !sound_muted(&cfg))
PlaySound(startup->sound.hack, NULL, SND_ASYNC|SND_FILENAME);
......
......@@ -94,6 +94,7 @@ static link_list_t current_logins;
static link_list_t current_connections;
static bool savemsg_mutex_created = false;
static pthread_mutex_t savemsg_mutex;
static struct mqtt mqtt;
static const char* servprot_smtp = "SMTP";
static const char* servprot_submission = "SMTP";
......@@ -176,6 +177,33 @@ typedef struct {
} \
} while(0)
static int lputs(int level, const char* str)
{
mqtt_lputs(&mqtt, TOPIC_SERVER, level, str);
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, &mqtt, level, startup == NULL ? NULL : startup->host_name, str);
stats.errors++;
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, str);
if(startup != NULL && startup->errormsg != NULL)
startup->errormsg(startup->cbdata, level, errmsg);
}
if(level <= LOG_CRIT)
stats.crit_errors++;
if(startup == NULL || startup->lputs == NULL || str == NULL || level > startup->log_level)
return 0;
#if defined(_WIN32)
if(IsBadCodePtr((FARPROC)startup->lputs))
return 0;
#endif
return startup->lputs(startup->cbdata,level,str);
}
#if defined(__GNUC__) // Catch printf-format errors with lprintf
static int lprintf(int level, const char *fmt, ...) __attribute__ ((format (printf, 2, 3)));
#endif
......@@ -189,29 +217,7 @@ static int lprintf(int level, const char *fmt, ...)
sbuf[sizeof(sbuf)-1]=0;
va_end(argptr);
if(level <= LOG_ERR) {
char errmsg[sizeof(sbuf)+16];
errorlog(&scfg, (struct startup*)startup, level, startup==NULL ? NULL:startup->host_name, sbuf), stats.errors++;
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
if(level <= LOG_CRIT)
stats.crit_errors++;
if(startup != NULL)
mqtt_lputs((struct startup*)startup, TOPIC_SERVER, level, sbuf);
if(startup==NULL || startup->lputs==NULL || level > startup->log_level)
return(0);
#if defined(_WIN32)
if(IsBadCodePtr((FARPROC)startup->lputs))
return(0);
#endif
return(startup->lputs(startup->cbdata, level, condense_whitespace(sbuf)));
return lputs(level, condense_whitespace(sbuf));
}
#ifdef _WINSOCKAPI_
......@@ -251,7 +257,7 @@ static void set_state(enum server_state state)
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state((struct startup*)startup, state);
mqtt_server_state(&mqtt, state);
}
}
......@@ -260,7 +266,6 @@ static void update_clients(void)
if(startup != NULL) {
if(startup->clients != NULL)
startup->clients(startup->cbdata,protected_uint32_value(active_clients)+active_sendmail);
mqtt_client_count((struct startup*)startup, TOPIC_SERVER, protected_uint32_value(active_clients));
}
}
......@@ -270,6 +275,7 @@ static void client_on(SOCKET sock, client_t* client, BOOL update)
listAddNodeData(&current_connections, client->addr, strlen(client->addr)+1, sock, LAST_NODE);
if(startup!=NULL && startup->client_on!=NULL)
startup->client_on(startup->cbdata,TRUE,sock,client,update);
mqtt_client_on(&mqtt, TRUE, sock, client, update);
}
static void client_off(SOCKET sock)
......@@ -277,6 +283,7 @@ static void client_off(SOCKET sock)
listRemoveTaggedNode(&current_connections, sock, /* free_data */TRUE);
if(startup!=NULL && startup->client_on!=NULL)
startup->client_on(startup->cbdata,FALSE,sock,NULL,FALSE);
mqtt_client_on(&mqtt, FALSE, sock, NULL, FALSE);
}
static void thread_up(BOOL setuid)
......@@ -284,7 +291,6 @@ static void thread_up(BOOL setuid)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,TRUE,setuid);
mqtt_thread_count((struct startup*)startup, TOPIC_SERVER, protected_uint32_value(thread_count));
}
}
......@@ -294,7 +300,6 @@ static int32_t thread_down(void)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,FALSE,FALSE);
mqtt_thread_count((struct startup*)startup, TOPIC_SERVER, count);
}
return count;
}
......@@ -312,8 +317,6 @@ void mail_open_socket(SOCKET sock, void* cb_protocol)
lprintf(LOG_ERR,"%04d %s !ERROR %s", sock, protocol, error);
stats.sockets++;
if(startup != NULL)
mqtt_socket_count((struct startup*)startup, TOPIC_SERVER, stats.sockets);
}
void mail_close_socket_cb(SOCKET sock, void* cb_protocol)
......@@ -321,8 +324,6 @@ void mail_close_socket_cb(SOCKET sock, void* cb_protocol)
if(startup!=NULL && startup->socket_open!=NULL)
startup->socket_open(startup->cbdata,FALSE);
stats.sockets--;
if(startup != NULL)
mqtt_socket_count((struct startup*)startup, TOPIC_SERVER, stats.sockets);
}
int mail_close_socket(SOCKET *sock, int *sess)
......@@ -341,8 +342,6 @@ int mail_close_socket(SOCKET *sock, int *sess)
if(startup!=NULL && startup->socket_open!=NULL)
startup->socket_open(startup->cbdata,FALSE);
stats.sockets--;
if(startup != NULL)
mqtt_socket_count((struct startup*)startup, TOPIC_SERVER, stats.sockets);
if(result!=0) {
if(ERROR_VALUE!=ENOTSOCK)
lprintf(LOG_WARNING,"%04d !ERROR %d closing socket",*sock, ERROR_VALUE);
......@@ -981,7 +980,7 @@ static void badlogin(SOCKET sock, CRYPT_SESSION sess, const char* prot, const ch
SAFEPRINTF(reason,"%s LOGIN", prot);
count=loginFailure(startup->login_attempt_list, addr, prot, user, passwd);
if(startup->login_attempt.hack_threshold && count>=startup->login_attempt.hack_threshold) {
hacklog(&scfg, (struct startup*)startup, reason, user, passwd, host, addr);
hacklog(&scfg, &mqtt, reason, user, passwd, host, addr);
#ifdef _WIN32
if(startup->sound.hack[0] && !sound_muted(&scfg))
PlaySound(startup->sound.hack, NULL, SND_ASYNC|SND_FILENAME);
......@@ -1916,7 +1915,7 @@ static BOOL chk_email_addr(SOCKET socket, const char* prot, char* p, char* host_
lprintf(LOG_NOTICE,"%04d %s [%s] !BLOCKED %s e-mail address: %s"
,socket, prot, host_ip, source, addr);
SAFEPRINTF2(tmp,"Blocked %s e-mail address: %s", source, addr);
spamlog(&scfg, (struct startup*)startup, (char*)prot, "REFUSED", tmp, host_name, host_ip, to, from);
spamlog(&scfg, &mqtt, (char*)prot, "REFUSED", tmp, host_name, host_ip, to, from);
return(FALSE);
}
......@@ -3112,7 +3111,7 @@ static void smtp_thread(void* arg)
,socket, client.protocol, dnsbl_ip, dnsbl, host_name, inet_ntoa(dnsbl_result));
if(startup->options&MAIL_OPT_DNSBL_REFUSE) {
SAFEPRINTF2(str,"Listed on %s as %s", dnsbl, inet_ntoa(dnsbl_result));
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "SESSION REFUSED", str, host_name, dnsbl_ip, NULL, NULL);
spamlog(&scfg, &mqtt, (char*)client.protocol, "SESSION REFUSED", str, host_name, dnsbl_ip, NULL, NULL);
sockprintf(socket,client.protocol,session
,"550 Mail from %s refused due to listing at %s"
,dnsbl_ip, dnsbl);
......@@ -3238,7 +3237,7 @@ static void smtp_thread(void* arg)
lprintf(LOG_NOTICE,"%04d %s %s !FILTERING TWIT-LISTED SENDER: '%s' <%s> (%lu total)"
,socket, client.protocol, client_id, sender, sender_addr, ++stats.msgs_refused);
SAFEPRINTF2(tmp,"Twit-listed sender: '%s' <%s>", sender, sender_addr);
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "REFUSED", tmp, host_name, host_ip, rcpt_addr, reverse_path);
spamlog(&scfg, &mqtt, (char*)client.protocol, "REFUSED", tmp, host_name, host_ip, rcpt_addr, reverse_path);
sockprintf(socket,client.protocol,session, "554 Sender not allowed.");
continue;
}
......@@ -3512,7 +3511,7 @@ static void smtp_thread(void* arg)
,socket, client.protocol, client_id, p, reverse_path, ++stats.msgs_refused);
SAFEPRINTF2(tmp,"Blocked subject (%s) from: %s"
,p, reverse_path);
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "REFUSED"
spamlog(&scfg, &mqtt, (char*)client.protocol, "REFUSED"
,tmp, host_name, host_ip, rcpt_addr, reverse_path);
errmsg="554 Subject not allowed.";
smb_error=SMB_FAILURE;
......@@ -3615,7 +3614,7 @@ static void smtp_thread(void* arg)
}
if(startup->dnsbl_hdr[0] || startup->dnsbl_tag[0]) {
SAFEPRINTF2(str,"Listed on %s as %s", dnsbl, inet_ntoa(dnsbl_result));
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "TAGGED", str, host_name, dnsbl_ip, rcpt_addr, reverse_path);
spamlog(&scfg, &mqtt, (char*)client.protocol, "TAGGED", str, host_name, dnsbl_ip, rcpt_addr, reverse_path);
}
}
if(dnsbl_recvhdr) /* DNSBL-listed IP found in Received header? */
......@@ -3804,7 +3803,7 @@ static void smtp_thread(void* arg)
);
lprintf(LOG_NOTICE,"%04d %s %s Message from %s %s", socket, client.protocol, client_id, sender_info, str);
if(!is_spam) {
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "IGNORED"
spamlog(&scfg, &mqtt, (char*)client.protocol, "IGNORED"
,str, host_name, host_ip, rcpt_addr, reverse_path);
is_spam=TRUE;
}
......@@ -3849,7 +3848,7 @@ static void smtp_thread(void* arg)
SAFEPRINTF2(str,"Listed on %s as %s", dnsbl, inet_ntoa(dnsbl_result));
lprintf(LOG_NOTICE,"%04d %s %s !IGNORED MAIL from %s to <%s> from server: %s (%lu total)"
,socket, client.protocol, client_id, sender_info, rcpt_addr, str, ++stats.msgs_ignored);
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "IGNORED"
spamlog(&scfg, &mqtt, (char*)client.protocol, "IGNORED"
,str, host_name, dnsbl_ip, rcpt_addr, reverse_path);
}
/* pretend we received it */
......@@ -4508,7 +4507,7 @@ static void smtp_thread(void* arg)
lprintf(LOG_NOTICE,"%04d %s %s !MAXIMUM RECIPIENTS (%d) REACHED"
,socket, client.protocol, client_id, startup->max_recipients);
SAFEPRINTF(tmp,"Maximum recipient count (%d)",startup->max_recipients);
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "REFUSED", tmp
spamlog(&scfg, &mqtt, (char*)client.protocol, "REFUSED", tmp
,host_name, host_ip, rcpt_addr, reverse_path);
sockprintf(socket,client.protocol,session, "452 Too many recipients");
stats.msgs_refused++;
......@@ -4530,7 +4529,7 @@ static void smtp_thread(void* arg)
,socket, client.protocol, client_id, scfg.level_emailperday[relay_user.level], relay_user.number, relay_user.alias);
SAFEPRINTF2(tmp,"Maximum emails per day (%u) for %s"
,scfg.level_emailperday[relay_user.level], relay_user.alias);
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "REFUSED", tmp
spamlog(&scfg, &mqtt, (char*)client.protocol, "REFUSED", tmp
,host_name, host_ip, rcpt_addr, reverse_path);
sockprintf(socket,client.protocol,session, "452 Too many emails today");
stats.msgs_refused++;
......@@ -4551,7 +4550,7 @@ static void smtp_thread(void* arg)
filter_ip(&scfg, client.protocol, reason, host_name, host_ip, reverse_path, spam_block);
strcat(tmp," and BLOCKED");
}
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, tmp, "Attempted recipient in SPAM BAIT list"
spamlog(&scfg, &mqtt, (char*)client.protocol, tmp, "Attempted recipient in SPAM BAIT list"
,host_name, host_ip, rcpt_addr, reverse_path);
dnsbl_result.s_addr=0;
}
......@@ -4572,7 +4571,7 @@ static void smtp_thread(void* arg)
lprintf(LOG_NOTICE,"%04d %s %s !REFUSED MAIL from blacklisted server (%lu total)"
,socket, client.protocol, client_id, ++stats.sessions_refused);
SAFEPRINTF2(str,"Listed on %s as %s", dnsbl, inet_ntoa(dnsbl_result));
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "REFUSED", str, host_name, host_ip, rcpt_addr, reverse_path);
spamlog(&scfg, &mqtt, (char*)client.protocol, "REFUSED", str, host_name, host_ip, rcpt_addr, reverse_path);
sockprintf(socket,client.protocol,session
,"550 Mail from %s refused due to listing at %s"
,host_ip, dnsbl);
......@@ -4666,7 +4665,7 @@ static void smtp_thread(void* arg)
lprintf(LOG_WARNING,"%04d %s %s !ILLEGAL RELAY ATTEMPT from %s [%s] to %s"
,socket, client.protocol, client_id, reverse_path, host_ip, p);
SAFEPRINTF(tmp,"Relay attempt to: %s", p);
spamlog(&scfg, (struct startup*)startup, (char*)client.protocol, "REFUSED", tmp, host_name, host_ip, rcpt_addr, reverse_path);
spamlog(&scfg, &mqtt, (char*)client.protocol, "REFUSED", tmp, host_name, host_ip, rcpt_addr, reverse_path);
if(startup->options&MAIL_OPT_ALLOW_RELAY)
sockprintf(socket,client.protocol,session, "553 Relaying through this server "
"requires authentication. "
......@@ -5981,6 +5980,8 @@ static void cleanup(int code)
lprintf(LOG_INFO,"#### Mail Server thread terminated (%s)",str);
}
set_state(SERVER_STOPPED);
mqtt_shutdown(&mqtt);
if(startup!=NULL && startup->terminated!=NULL)
startup->terminated(startup->cbdata,code);
}
......@@ -6055,8 +6056,6 @@ void mail_server(void* arg)
set_state(SERVER_INIT);
mqtt_server_version((struct startup*)startup, mail_ver());
ZERO_VAR(js_server_props);
SAFEPRINTF3(js_server_props.version,"%s %s%c",server_name, VERSION, REVISION);
js_server_props.version_detail=mail_ver();
......@@ -6141,6 +6140,8 @@ void mail_server(void* arg)
return;
}
mqtt_startup(&mqtt, &scfg, (struct startup*)startup, mail_ver(), lputs);
if(startup->temp_dir[0])
SAFECOPY(scfg.temp_dir,startup->temp_dir);
else
......@@ -6276,7 +6277,7 @@ void mail_server(void* arg)
set_state(SERVER_READY);
lprintf(LOG_INFO,"Mail Server thread started");
mqtt_client_max((struct startup*)startup, startup->max_clients);
mqtt_client_max(&mqtt, startup->max_clients);
while(!terminated && !terminate_server) {
YIELD();
......@@ -6446,6 +6447,4 @@ void mail_server(void* arg)
} while(!terminate_server);
protected_uint32_destroy(thread_count);
set_state(SERVER_STOPPED);
}
......@@ -90,8 +90,8 @@ SOCKET uspy_socket[MAX_NODES]; /* UNIX domain spy sockets */
#endif
SOCKET node_socket[MAX_NODES];
struct xpms_set *ts_set;
//static sbbs_t* sbbs=NULL;
static scfg_t scfg;
static struct mqtt mqtt;
static char * text[TOTAL_TEXT];
static scfg_t node_scfg[MAX_NODES];
static char * node_text[MAX_NODES][TOTAL_TEXT];
......@@ -163,7 +163,7 @@ static void set_state(enum server_state state)
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state((struct startup*)startup, state);
mqtt_server_state(&mqtt, state);
}
}
......@@ -172,7 +172,6 @@ static void update_clients()
if(startup != NULL) {
if(startup->clients != NULL)
startup->clients(startup->cbdata,protected_uint32_value(node_threads_running));
mqtt_client_count((struct startup*)startup, TOPIC_SERVER, protected_uint32_value(node_threads_running));
}
}
......@@ -182,6 +181,7 @@ void client_on(SOCKET sock, client_t* client, BOOL update)
listAddNodeData(&current_connections, client->addr, strlen(client->addr)+1, sock, LAST_NODE);
if(startup!=NULL && startup->client_on!=NULL)
startup->client_on(startup->cbdata,TRUE,sock,client,update);
mqtt_client_on(&mqtt, TRUE, sock, client, update);
}
static void client_off(SOCKET sock)
......@@ -189,6 +189,7 @@ static void client_off(SOCKET sock)
listRemoveTaggedNode(&current_connections, sock, /* free_data */TRUE);
if(startup!=NULL && startup->client_on!=NULL)
startup->client_on(startup->cbdata,FALSE,sock,NULL,FALSE);
mqtt_client_on(&mqtt, FALSE, sock, NULL, FALSE);
}
static void thread_up(BOOL setuid)
......@@ -205,11 +206,10 @@ static void thread_down()
int lputs(int level, const char* str)
{
if(startup != NULL)
mqtt_lputs((struct startup*)startup, TOPIC_SERVER, level, str);
mqtt_lputs(&mqtt, TOPIC_SERVER, level, str);
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, (struct startup*)startup, level, startup==NULL ? NULL:startup->host_name, str);
errorlog(&scfg, &mqtt, level, startup==NULL ? NULL:startup->host_name, str);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, str);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
......@@ -231,12 +231,11 @@ int eputs(int level, const char *str)
if(*str == 0)
return 0;
if(startup != NULL)
mqtt_lputs((struct startup*)startup, TOPIC_EVENT, level, str);
mqtt_lputs(&mqtt, TOPIC_EVENT, level, str);
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, (struct startup*)startup, level, startup==NULL ? NULL:startup->host_name, str);
errorlog(&scfg, &mqtt, level, startup==NULL ? NULL:startup->host_name, str);
SAFEPRINTF(errmsg, "evnt %s", str);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata, level, errmsg);
......@@ -2483,7 +2482,7 @@ void output_thread(void* arg)
}
/* Spy on the user remotely */
if(sbbs->cfg.mqtt.enabled) {
int result = mqtt_pub_message((struct startup*)startup, TOPIC_BBS, spy_topic, buf+bufbot, i);
int result = mqtt_pub_message(&mqtt, TOPIC_BBS, spy_topic, buf+bufbot, i);
if(result != MQTT_SUCCESS)
lprintf(LOG_WARNING, "%s ERROR %d (%d) publishing node output (%u bytes): %s"
,node, result, errno, i, spy_topic);
......@@ -3272,6 +3271,7 @@ sbbs_t::sbbs_t(ushort node_num, union xp_sockaddr *addr, size_t addr_len, const
,sd, global_cfg->node_misc);
startup = ::startup; // Convert from global to class member
mqtt = &::mqtt;
memcpy(&cfg, global_cfg, sizeof(cfg));
cfg.node_num = node_num; // Restore the node_num passed to the constructor
......@@ -4410,7 +4410,7 @@ void node_thread(void* arg)
char topic[128];
SAFEPRINTF(topic, "node%u/laston", sbbs->cfg.node_num);
SAFEPRINTF2(str, "%u\t%s", sbbs->useron.number, sbbs->useron.alias);
mqtt_pub_strval((struct startup*)startup, TOPIC_BBS, topic, str);
mqtt_pub_strval(&mqtt, TOPIC_BBS, topic, str);
}
if(sbbs->sys_status&SS_DAILY) { // New day, run daily events/maintenance
......@@ -4793,6 +4793,8 @@ static void cleanup(int code)
thread_down();
if(terminate_server || code)
lprintf(LOG_INFO,"Terminal Server thread terminated (%lu clients served)", served);
set_state(SERVER_STOPPED);
mqtt_shutdown(&mqtt);
if(startup->terminated!=NULL)
startup->terminated(startup->cbdata,code);
}
......@@ -4940,7 +4942,6 @@ void bbs_thread(void* arg)
cleanup(1);
return;
}
mqtt_server_version((struct startup*)startup, bbs_ver());
t=time(NULL);
lprintf(LOG_INFO,"Initializing on %.24s with options: %x"
......@@ -4962,6 +4963,8 @@ void bbs_thread(void* arg)
return;
}
mqtt_startup(&mqtt, &scfg, (struct startup*)startup, bbs_ver(), lputs);
if(scfg.total_shells < 1) {
lprintf(LOG_CRIT, "At least one command shell must be configured (e.g. in SCFG->Command Shells)");
cleanup(1);
......@@ -4985,7 +4988,7 @@ void bbs_thread(void* arg)
startup->last_node=scfg.sys_nodes;
}
mqtt_pub_uintval((struct startup*)startup, TOPIC_BBS, "node_count", scfg.sys_nodes);
mqtt_pub_uintval(&mqtt, TOPIC_BBS, "node_count", scfg.sys_nodes);
/* Create missing directories */
lprintf(LOG_INFO,"Verifying/creating data directories");
......@@ -5232,7 +5235,7 @@ NO_SSH:
set_state(SERVER_READY);
lprintf(LOG_INFO,"Terminal Server thread started for nodes %d through %d", first_node, last_node);
mqtt_client_max((struct startup*)startup, (last_node - first_node) + 1);
mqtt_client_max(&mqtt, (last_node - first_node) + 1);
while(!terminate_server) {
YIELD();
......@@ -5927,5 +5930,4 @@ NO_SSH:
} while(!terminate_server);
set_state(SERVER_STOPPED);
}
This diff is collapsed.
......@@ -27,8 +27,7 @@
#include "server.h"
#include "link_list.h"
#include "dllexport.h"
struct startup;
#include "startup.h"
#include <stdarg.h>
......@@ -47,7 +46,7 @@ struct mqtt {
ulong error_count;
ulong served;
link_list_t client_list;
BOOL shared_instance;
struct startup* startup;
};
enum topic_depth {
......@@ -66,36 +65,30 @@ enum topic_depth {
extern "C" {
#endif
DLLEXPORT int mqtt_init(struct startup*, scfg_t*);
DLLEXPORT int mqtt_startup(struct startup*, scfg_t*, const char* version
,int (*lputs)(int level, const char* str)
,BOOL shared_instance);
DLLEXPORT int mqtt_online(struct startup*);
DLLEXPORT int mqtt_server_state(struct startup*, enum server_state);
DLLEXPORT int mqtt_server_version(struct startup*, const char*);
DLLEXPORT int mqtt_errormsg(struct startup*, int level, const char*);
DLLEXPORT int mqtt_terminating(struct startup*);
DLLEXPORT void mqtt_shutdown(struct startup*);
DLLEXPORT int mqtt_init(struct mqtt*, scfg_t*, struct startup*);
DLLEXPORT int mqtt_startup(struct mqtt*, scfg_t*, struct startup*, const char* version
,int (*lputs)(int level, const char* str));
DLLEXPORT int mqtt_online(struct mqtt*);
DLLEXPORT int mqtt_server_state(struct mqtt*, enum server_state);
DLLEXPORT int mqtt_errormsg(struct mqtt*, int level, const char*);
DLLEXPORT int mqtt_terminating(struct mqtt*);
DLLEXPORT void mqtt_shutdown(struct mqtt*);
DLLEXPORT char* mqtt_libver(char* str, size_t size);
DLLEXPORT char* mqtt_topic(struct startup*, enum topic_depth, char* str, size_t size, const char* fmt, ...);
DLLEXPORT int mqtt_subscribe(struct startup*, enum topic_depth, char* str, size_t size, const char* fmt, ...);
DLLEXPORT int mqtt_lputs(struct startup*, enum topic_depth, int level, const char* str);
DLLEXPORT int mqtt_pub_noval(struct startup*, enum topic_depth, const char* key);
DLLEXPORT int mqtt_pub_strval(struct startup*, enum topic_depth, const char* key, const char* str);
DLLEXPORT int mqtt_pub_uintval(struct startup*, enum topic_depth, const char* key, ulong value);
DLLEXPORT int mqtt_pub_message(struct startup*, enum topic_depth, const char* key, const void* buf, size_t len);
DLLEXPORT int mqtt_open(struct startup*);
DLLEXPORT void mqtt_close(struct startup*);
DLLEXPORT int mqtt_connect(struct startup*, const char* bind_address);
DLLEXPORT int mqtt_disconnect(struct startup*);
DLLEXPORT int mqtt_thread_start(struct startup*);
DLLEXPORT int mqtt_thread_stop(struct startup*);
DLLEXPORT int mqtt_thread_count(struct startup*, enum topic_depth, ulong count);
DLLEXPORT int mqtt_socket_count(struct startup*, enum topic_depth, ulong count);
DLLEXPORT int mqtt_served_count(struct startup*, enum topic_depth, ulong count);
DLLEXPORT int mqtt_client_count(struct startup*, enum topic_depth, ulong count);
DLLEXPORT int mqtt_client_on(struct startup*, BOOL on, int sock, client_t* client, BOOL update);
DLLEXPORT int mqtt_client_max(struct startup*, ulong count);
DLLEXPORT char* mqtt_topic(struct mqtt*, enum topic_depth, char* str, size_t size, const char* fmt, ...);
DLLEXPORT int mqtt_subscribe(struct mqtt*, enum topic_depth, char* str, size_t size, const char* fmt, ...);
DLLEXPORT int mqtt_lputs(struct mqtt*, enum topic_depth, int level, const char* str);
DLLEXPORT int mqtt_pub_noval(struct mqtt*, enum topic_depth, const char* key);
DLLEXPORT int mqtt_pub_strval(struct mqtt*, enum topic_depth, const char* key, const char* str);
DLLEXPORT int mqtt_pub_uintval(struct mqtt*, enum topic_depth, const char* key, ulong value);
DLLEXPORT int mqtt_pub_message(struct mqtt*, enum topic_depth, const char* key, const void* buf, size_t len);
DLLEXPORT int mqtt_open(struct mqtt*);
DLLEXPORT void mqtt_close(struct mqtt*);
DLLEXPORT int mqtt_connect(struct mqtt*, const char* bind_address);
DLLEXPORT int mqtt_disconnect(struct mqtt*);
DLLEXPORT int mqtt_thread_start(struct mqtt*);
DLLEXPORT int mqtt_thread_stop(struct mqtt*);
DLLEXPORT int mqtt_client_on(struct mqtt*, BOOL on, int sock, client_t* client, BOOL update);
DLLEXPORT int mqtt_client_max(struct mqtt*, ulong count);
#ifdef __cplusplus
}
......
......@@ -103,7 +103,7 @@ int sbbs_t::putnodedat(uint number, node_t* node)
,node->errors
);
SAFEPRINTF(topic, "node%u/status", number + 1);
int result = mqtt_pub_strval((struct startup*)startup, TOPIC_BBS, topic, str);
int result = mqtt_pub_strval(mqtt, TOPIC_BBS, topic, str);
if(result != MQTT_SUCCESS)
lprintf(LOG_WARNING, "ERROR %d (%d) publishing node status: %s", result, errno, topic);
}
......
......@@ -296,6 +296,7 @@ extern int thread_suid_broken; /* NPTL is no longer broken */
#include "msgdate.h"
#include "getmail.h"
#include "msg_id.h"
#include "mqtt.h"
#if defined(JAVASCRIPT)
enum js_event_type {
......@@ -417,6 +418,7 @@ public:
bool passthru_thread_running;
scfg_t cfg;
struct mqtt* mqtt=NULL;
enum ansiState {
ansiState_none // No sequence
......@@ -1246,11 +1248,11 @@ extern "C" {
DLLEXPORT int notify(scfg_t*, uint usernumber, const char* subject, const char* msg);
/* logfile.cpp */
DLLEXPORT int errorlog(scfg_t* cfg, struct startup*, int level, const char* host, const char* text);
DLLEXPORT int errorlog(scfg_t* cfg, struct mqtt*, int level, const char* host, const char* text);
DLLEXPORT BOOL hacklog(scfg_t* cfg, struct startup*, const char* prot, const char* user, const char* text
DLLEXPORT BOOL hacklog(scfg_t* cfg, struct mqtt*, const char* prot, const char* user, const char* text
,const char* host, union xp_sockaddr* addr);
DLLEXPORT BOOL spamlog(scfg_t* cfg, struct startup*, char* prot, char* action, char* reason
DLLEXPORT BOOL spamlog(scfg_t* cfg, struct mqtt*, char* prot, char* action, char* reason
,char* host, char* ip_addr, char* to, char* from);
/* data.cpp */
......
......@@ -44,7 +44,6 @@
#include "mailsrvr.h" /* mail_startup_t, mail_server */
#include "services.h" /* services_startup_t, services_thread */
#include "ver.h"
#include "mqtt.h"
/* XPDEV headers */
#include "conwrap.h" /* kbhit/getch */
......@@ -245,7 +244,7 @@ static void notify_systemd(const char* new_status)
}
#endif
static int log_puts(int level, const char *str)
static int lputs(int level, const char *str)
{
static pthread_mutex_t mutex;
static BOOL mutex_initialized;
......@@ -290,13 +289,6 @@ static int log_puts(int level, const char *str)
return(prompt_len);
}
static int lputs(int level, const char *str)
{
if(str != NULL && *str != '\0')
mqtt_lputs((struct startup*)&bbs_startup, TOPIC_HOST, level, str);
return log_puts(level, str);
}
static void errormsg(void *cbdata, int level, const char *msg)
{
error_count++;
......@@ -629,14 +621,12 @@ static void thread_up(void* p, BOOL up, BOOL setuid)
thread_count++;
else if(thread_count>0)
thread_count--;
mqtt_thread_count((struct startup*)&bbs_startup, TOPIC_HOST, thread_count);
pthread_mutex_unlock(&mutex);
lputs(LOG_INFO,NULL); /* update displayed stats */
}
static void socket_open(void* cbdata, BOOL open)
static void socket_open(void* p, BOOL open)
{
enum server_type server_type = ((struct startup*)cbdata)->type;
static pthread_mutex_t mutex;
static BOOL mutex_initialized;
......@@ -650,7 +640,6 @@ static void socket_open(void* cbdata, BOOL open)
socket_count++;
else if(socket_count>0)
socket_count--;
mqtt_socket_count(bbs_startup.startup[server_type], TOPIC_HOST, socket_count);
pthread_mutex_unlock(&mutex);
lputs(LOG_INFO,NULL); /* update displayed stats */
}
......@@ -672,8 +661,6 @@ static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update)
} else
listRemoveTaggedNode(&client_list, sock, /* free_data: */TRUE);
mqtt_client_on((struct startup*)&bbs_startup, on, sock, client, update);
lputs(LOG_INFO,NULL); /* update displayed stats */
}
......@@ -713,7 +700,7 @@ static int bbs_lputs(void* p, int level, const char *str)
sprintf(logline,"%sterm %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline);
log_puts(level,logline);
lputs(level,logline);
return(strlen(logline)+1);
}
......@@ -759,7 +746,7 @@ static int ftp_lputs(void* p, int level, const char *str)
sprintf(logline,"%sftp %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline);
log_puts(level,logline);
lputs(level,logline);
return(strlen(logline)+1);
}
......@@ -800,7 +787,7 @@ static int mail_lputs(void* p, int level, const char *str)
sprintf(logline,"%smail %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline);
log_puts(level,logline);
lputs(level,logline);
return(strlen(logline)+1);
}
......@@ -841,7 +828,7 @@ static int services_lputs(void* p, int level, const char *str)
sprintf(logline,"%ssrvc %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline);
log_puts(level,logline);
lputs(level,logline);
return(strlen(logline)+1);
}
......@@ -882,7 +869,7 @@ static int event_lputs(void* p, int level, const char *str)
sprintf(logline,"%sevnt %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline);
log_puts(level,logline);
lputs(level,logline);
return(strlen(logline)+1);
}
......@@ -923,7 +910,7 @@ static int web_lputs(void* p, int level, const char *str)
sprintf(logline,"%sweb %.*s",tstr,(int)sizeof(logline)-70,str);
truncsp(logline);
log_puts(level,logline);
lputs(level,logline);
return(strlen(logline)+1);
}
......@@ -1759,17 +1746,6 @@ int main(int argc, char** argv)
#endif // __unix__
bbs_startup.startup[SERVER_TERM] = (struct startup*)&bbs_startup;
bbs_startup.startup[SERVER_MAIL] = (struct startup*)&mail_startup;
bbs_startup.startup[SERVER_FTP] = (struct startup*)&ftp_startup;
bbs_startup.startup[SERVER_WEB] = (struct startup*)&web_startup;
bbs_startup.startup[SERVER_SERVICES] = (struct startup*)&services_startup;
mqtt_startup((struct startup*)&bbs_startup, &scfg, sbbscon_ver(), log_puts, /* shared_client_list: */TRUE);
mail_startup.mqtt = bbs_startup.mqtt;
ftp_startup.mqtt = bbs_startup.mqtt;
web_startup.mqtt = bbs_startup.mqtt;
services_startup.mqtt = bbs_startup.mqtt;
#ifdef _THREAD_SUID_BROKEN
/* check if we're using NPTL */
/* Old (2.2) systems don't have this. */
......@@ -1870,8 +1846,6 @@ int main(int argc, char** argv)
if(run_web)
_beginthread((void(*)(void*))web_server,0,&web_startup);
mqtt_online((struct startup*)&bbs_startup);
#ifdef __unix__
uid_t uid = getuid();
if(uid != 0 && !capabilities_set) { /* are we running as a normal user? */
......@@ -2182,13 +2156,10 @@ int main(int argc, char** argv)
}
}
mqtt_terminating((struct startup*)&bbs_startup);
terminate();
/* erase the prompt */
printf("\r%*s\r",prompt_len,"");
mqtt_shutdown((struct startup*)&bbs_startup);
return(0);
}
......@@ -37,7 +37,8 @@ enum server_state {
SERVER_INIT,
SERVER_READY,
SERVER_RELOADING,
SERVER_STOPPING
SERVER_STOPPING,
SERVER_DISCONNECTED
};
#endif /* Don't add anything after this line */
......@@ -65,6 +65,7 @@ static ulong served=0;
static str_list_t recycle_semfiles;
static str_list_t shutdown_semfiles;
static protected_uint32_t threads_pending_start;
static struct mqtt mqtt;
typedef struct {
/* These are sysop-configurable */
......@@ -110,6 +111,28 @@ typedef struct {
static service_t *service=NULL;
static unsigned int services=0;
static int lputs(int level, const char* str)
{
mqtt_lputs(&mqtt, TOPIC_SERVER, level, str);
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, &mqtt, level, startup == NULL ? NULL : startup->host_name, str);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, str);
if(startup != NULL && startup->errormsg != NULL)
startup->errormsg(startup->cbdata, level, errmsg);
}
if(startup == NULL || startup->lputs == NULL || str == NULL || level > startup->log_level)
return 0;
#if defined(_WIN32)
if(IsBadCodePtr((FARPROC)startup->lputs))
return 0;
#endif
return startup->lputs(startup->cbdata,level,str);
}
#if defined(__GNUC__) // Catch printf-format errors with lprintf
static int lprintf(int level, const char *fmt, ...) __attribute__ ((format (printf, 2, 3)));
#endif
......@@ -123,26 +146,7 @@ static int lprintf(int level, const char *fmt, ...)
sbuf[sizeof(sbuf)-1]=0;
va_end(argptr);
if(level <= LOG_ERR) {
char errmsg[sizeof(sbuf)+16];
errorlog(&scfg, (struct startup*)startup, level, startup==NULL ? NULL:startup->host_name, sbuf);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
if(startup != NULL)
mqtt_lputs((struct startup*)startup, TOPIC_SERVER, level, sbuf);
if(startup==NULL || startup->lputs==NULL || level > startup->log_level)
return(0);
#if defined(_WIN32)
if(IsBadCodePtr((FARPROC)startup->lputs))
return(0);
#endif
return(startup->lputs(startup->cbdata,level,sbuf));
return lputs(level, sbuf);
}
#ifdef _WINSOCKAPI_
......@@ -182,7 +186,7 @@ static void set_state(enum server_state state)
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state((struct startup*)startup, state);
mqtt_server_state(&mqtt, state);
}
}
......@@ -207,12 +211,14 @@ static void client_on(SOCKET sock, client_t* client, BOOL update)
{
if(startup!=NULL && startup->client_on!=NULL)
startup->client_on(startup->cbdata,TRUE,sock,client,update);
mqtt_client_on(&mqtt, TRUE, sock, client, update);
}
static void client_off(SOCKET sock)
{
if(startup!=NULL && startup->client_on!=NULL)
startup->client_on(startup->cbdata,FALSE,sock,NULL,FALSE);
mqtt_client_on(&mqtt, FALSE, sock, NULL, FALSE);
}
static void thread_up(BOOL setuid)
......@@ -336,7 +342,7 @@ static void badlogin(SOCKET sock, char* prot, char* user, char* passwd, char* ho
SAFEPRINTF(reason,"%s LOGIN", prot);
count=loginFailure(startup->login_attempt_list, addr, prot, user, passwd);
if(startup->login_attempt.hack_threshold && count>=startup->login_attempt.hack_threshold) {
hacklog(&scfg, (struct startup*)startup, reason, user, passwd, host, addr);
hacklog(&scfg, &mqtt, reason, user, passwd, host, addr);
#ifdef _WIN32
if(startup->sound.hack[0] && !sound_muted(&scfg))
PlaySound(startup->sound.hack, NULL, SND_ASYNC|SND_FILENAME);
......@@ -1716,6 +1722,8 @@ static void cleanup(int code)
thread_down();
if(terminated || code)
lprintf(LOG_INFO,"#### Services thread terminated (%lu clients served)",served);
set_state(SERVER_STOPPED);
mqtt_shutdown(&mqtt);
if(startup!=NULL && startup->terminated!=NULL)
startup->terminated(startup->cbdata,code);
}
......@@ -1860,8 +1868,6 @@ void services_thread(void* arg)
}
set_state(SERVER_INIT);
mqtt_server_version((struct startup*)startup, services_ver());
#ifdef _THREAD_SUID_BROKEN
if(thread_suid_broken)
startup->seteuid(TRUE);
......@@ -1922,6 +1928,8 @@ void services_thread(void* arg)
return;
}
mqtt_startup(&mqtt, &scfg, (struct startup*)startup, services_ver(), lputs);
if(startup->temp_dir[0])
SAFECOPY(scfg.temp_dir,startup->temp_dir);
else
......@@ -2400,6 +2408,4 @@ void services_thread(void* arg)
}
} while(!terminated);
set_state(SERVER_STOPPED);
}
......@@ -33,7 +33,6 @@
#define LINK_LIST_THREADSAFE
#endif
#include "link_list.h"
#include "mqtt.h"
typedef struct {
ulong max_bytes; /* max allocated bytes before garbage collection */
......@@ -111,9 +110,7 @@ typedef struct {
uint bind_retry_delay; \
struct startup_sound_settings sound; \
struct login_attempt_settings login_attempt; \
link_list_t* login_attempt_list; \
struct mqtt mqtt; \
struct startup* startup[SERVER_COUNT];
link_list_t* login_attempt_list;
struct startup {
STARTUP_COMMON_ELEMENTS
......
......@@ -123,6 +123,7 @@ static js_server_props_t js_server_props;
static str_list_t recycle_semfiles;
static str_list_t shutdown_semfiles;
static str_list_t cgi_env;
static struct mqtt mqtt;
static named_string_t** mime_types;
static named_string_t** cgi_handlers;
......@@ -535,6 +536,28 @@ time_gm(struct tm *tm)
return (t < 0 ? (time_t) -1 : t);
}
static int lputs(int level, const char* str)
{
mqtt_lputs(&mqtt, TOPIC_SERVER, level, str);
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, &mqtt, level, startup == NULL ? NULL : startup->host_name, str);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, str);
if(startup != NULL && startup->errormsg != NULL)
startup->errormsg(startup->cbdata, level, errmsg);
}
if(startup == NULL || startup->lputs == NULL || str == NULL || level > startup->log_level)
return 0;
#if defined(_WIN32)
if(IsBadCodePtr((FARPROC)startup->lputs))
return 0;
#endif
return startup->lputs(startup->cbdata,level,str);
}
#if defined(__GNUC__) // Catch printf-format errors with lprintf
static int lprintf(int level, const char *fmt, ...) __attribute__ ((format (printf, 2, 3)));
#endif
......@@ -548,26 +571,7 @@ static int lprintf(int level, const char *fmt, ...)
sbuf[sizeof(sbuf)-1]=0;
va_end(argptr);
if(level <= LOG_ERR) {
char errmsg[sizeof(sbuf)+16];
errorlog(&scfg, (struct startup*)startup, level, startup==NULL ? NULL:startup->host_name, sbuf);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
if(startup != NULL)
mqtt_lputs((struct startup*)startup, TOPIC_SERVER, level, sbuf);
if(startup==NULL || startup->lputs==NULL || level > startup->log_level)
return(0);
#if defined(_WIN32)
if(IsBadCodePtr((FARPROC)startup->lputs))
return(0);
#endif
return(startup->lputs(startup->cbdata,level,sbuf));
return lputs(level, sbuf);
}
static int writebuf(http_session_t *session, const char *buf, size_t len)
......@@ -741,7 +745,7 @@ static void set_state(enum server_state state)
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state((struct startup*)startup, state);
mqtt_server_state(&mqtt, state);
}
}
......@@ -751,7 +755,6 @@ static void update_clients(void)
uint32_t count = protected_uint32_value(active_clients);
if(startup->clients!=NULL)
startup->clients(startup->cbdata, count);
mqtt_client_count((struct startup*)startup, TOPIC_SERVER, count);
}
}
......@@ -759,12 +762,14 @@ static void client_on(SOCKET sock, client_t* client, BOOL update)
{
if(startup!=NULL && startup->client_on!=NULL)
startup->client_on(startup->cbdata,TRUE,sock,client,update);
mqtt_client_on(&mqtt, TRUE, sock, client, update);