Commits (3)
  • Rob Swindell's avatar
    Use int-sized enums for API compatible with MSVC built DLLs · f4c01684
    Rob Swindell authored
    e.g. sbbs.dll
    f4c01684
  • Rob Swindell's avatar
    Include git branch/hash in sbbsctrl version info · 30942ff2
    Rob Swindell authored
    And export a simple ver() function to be used in MainFormUnit.cpp (for mqtt stuff)
    30942ff2
  • Rob Swindell's avatar
    MQTT support overhaul, mainly for SBBSCTRL compatibility · 6ed3f6dc
    Rob Swindell authored
    The only difference in the data/scheme is that the "error" topic (error log) is now under each server rather than each host. I don't *think* there are any other changes from the MQTT consumer side.
    
    Still not done: subscribing (e.g. support for recycle or node-spy-input via MQTT) and NT services support.
    
    This change also includes a cool feature that will prompt the sysop if there's a timeout (30 seconds) while waiting for servers to shutdown gracefully and giving the sysop the option to abort (Cancel) the wait (and shutdown ungracefully) or continue the wait (OK).
    6ed3f6dc
/* Synchronet Control Panel (GUI Borland C++ Builder Project for Win32) */
/* $Id: AboutBoxFormUnit.cpp,v 1.14 2020/04/15 05:27:14 rswindell Exp $ */
/****************************************************************************
* @format.tab-size 4 (Plain Text/Source Code File Header) *
* @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) *
......@@ -15,21 +13,9 @@
* See the GNU General Public License for more details: gpl.txt or *
* http://www.fsf.org/copyleft/gpl.html *
* *
* Anonymous FTP access to the most recent released source is available at *
* ftp://vert.synchro.net, ftp://cvs.synchro.net and ftp://ftp.synchro.net *
* *
* Anonymous CVS access to the development source and modification history *
* is available at cvs.synchro.net:/cvsroot/sbbs, example: *
* cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs login *
* (just hit return, no password is necessary) *
* cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs checkout src *
* *
* For Synchronet coding style and modification guidelines, see *
* http://www.synchro.net/source.html *
* *
* You are encouraged to submit any modifications (preferably in Unix diff *
* format) via e-mail to mods@synchro.net *
* *
* Note: If this box doesn't appear square, then you need to fix your tabs. *
****************************************************************************/
......@@ -47,6 +33,8 @@
#include "ftpsrvr.h"
#include "websrvr.h"
#include "services.h"
#include "git_branch.h"
#include "git_hash.h"
//---------------------------------------------------------------------------
#pragma package(smart_init)
......@@ -57,11 +45,9 @@ __fastcall TAboutBoxForm::TAboutBoxForm(TComponent* Owner)
: TForm(Owner)
{
}
//---------------------------------------------------------------------------
void __fastcall TAboutBoxForm::FormShow(TObject *Sender)
{
Credits->Lines->Clear();
const char* ver(void)
{
DWORD i;
unsigned int len=GetFileVersionInfoSize(
Application->ExeName.c_str() // pointer to filename string
......@@ -69,8 +55,7 @@ void __fastcall TAboutBoxForm::FormShow(TObject *Sender)
);
BYTE* buf=(BYTE *)malloc(len);
if(buf==NULL) {
Credits->Lines->Add("MALLOC ERROR!");
return;
return "MALLOC ERROR!";
}
GetFileVersionInfo(
Application->ExeName.c_str() // pointer to filename string
......@@ -94,9 +79,9 @@ void __fastcall TAboutBoxForm::FormShow(TObject *Sender)
,__BORLANDC__>>8
,__BORLANDC__&0xff);
char ver[256];
static char ver[256];
wsprintf(ver,"Synchronet Control Panel v%u.%u.%u.%u%s%s "
"Compiled %s %s with %s"
"Compiled %s/%s %s %s with %s"
,Ver->dwFileVersionMS>>16
,Ver->dwFileVersionMS&0xffff
,Ver->dwFileVersionLS>>16
......@@ -105,21 +90,31 @@ void __fastcall TAboutBoxForm::FormShow(TObject *Sender)
" Debug" : ""
,Ver->dwFileFlags&VS_FF_PRERELEASE ?
" Pre-release" : ""
,GIT_BRANCH, GIT_HASH
,__DATE__, __TIME__, compiler
);
return ver;
}
//---------------------------------------------------------------------------
void __fastcall TAboutBoxForm::FormShow(TObject *Sender)
{
Credits->Lines->Clear();
Credits->Lines->Add(bbs_ver());
Credits->Lines->Add(mail_ver());
Credits->Lines->Add(ftp_ver());
Credits->Lines->Add(web_ver());
Credits->Lines->Add(services_ver());
Credits->Lines->Add(ver);
Credits->Lines->Add(ver());
Credits->Lines->Add("Synchronet Local Spy ANSI Terminal Emulation"
+ CopyRight);
Credits->Lines->Add(AnsiString(js_ver())
+ " (c) 1998 Netscape Communications Corp.");
#ifdef USE_CRYPTLIB
char ver[256];
wsprintf(ver,"Cryptlib v%u.%u.%u"
,(CRYPTLIB_VERSION/100)
,(CRYPTLIB_VERSION/10)%10
......
/* Synchronet Control Panel (GUI Borland C++ Builder Project for Win32) */
/* $Id: AboutBoxFormUnit.h,v 1.6 2018/07/24 01:11:28 rswindell Exp $ */
/****************************************************************************
* @format.tab-size 4 (Plain Text/Source Code File Header) *
* @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) *
......@@ -15,21 +13,9 @@
* See the GNU General Public License for more details: gpl.txt or *
* http://www.fsf.org/copyleft/gpl.html *
* *
* Anonymous FTP access to the most recent released source is available at *
* ftp://vert.synchro.net, ftp://cvs.synchro.net and ftp://ftp.synchro.net *
* *
* Anonymous CVS access to the development source and modification history *
* is available at cvs.synchro.net:/cvsroot/sbbs, example: *
* cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs login *
* (just hit return, no password is necessary) *
* cvs -d :pserver:anonymous@cvs.synchro.net:/cvsroot/sbbs checkout src *
* *
* For Synchronet coding style and modification guidelines, see *
* http://www.synchro.net/source.html *
* *
* You are encouraged to submit any modifications (preferably in Unix diff *
* format) via e-mail to mods@synchro.net *
* *
* Note: If this box doesn't appear square, then you need to fix your tabs. *
****************************************************************************/
//---------------------------------------------------------------------------
......@@ -43,6 +29,7 @@
#include <ExtCtrls.hpp>
#include <Graphics.hpp>
#include "MainFormUnit.h"
const char* ver(void);
//---------------------------------------------------------------------------
class TAboutBoxForm : public TForm
{
......
......@@ -171,6 +171,7 @@ static void thread_up(void* p, BOOL up, BOOL setuid)
threads++;
else if(threads>0)
threads--;
mqtt_thread_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, threads);
ReleaseMutex(mutex);
}
......@@ -188,6 +189,7 @@ void socket_open(void* p, BOOL open)
sockets++;
else if(sockets>0)
sockets--;
mqtt_socket_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, sockets);
ReleaseMutex(mutex);
}
......@@ -201,8 +203,10 @@ static void client_add(void* p, BOOL add)
if(add) {
clients++;
total_clients++;
mqtt_served_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, total_clients);
} else if(clients>0)
clients--;
mqtt_client_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, clients);
}
static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update)
......@@ -212,13 +216,15 @@ static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update)
time_t t;
static HANDLE mutex;
TListItem* Item;
mqtt_client_on(&MainForm->bbs_startup.mqtt, on, sock, client, update);
if(!mutex)
mutex=CreateMutex(NULL,false,NULL);
WaitForSingleObject(mutex,INFINITE);
WaitForSingleObject(ClientForm->ListMutex,INFINITE);
/* Search for exising entry for this socket */
/* Search for existing entry for this socket */
for(i=0;i<ClientForm->ListView->Items->Count;i++) {
if(ClientForm->ListView->Items->Item[i]->Caption.ToIntDef(0)==sock)
break;
......@@ -332,6 +338,7 @@ static void bbs_set_state(void* p, enum server_state state)
{
TelnetForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->bbs_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->TelnetStart->Enabled=true;
......@@ -429,6 +436,7 @@ static void services_set_state(void* p, enum server_state state)
{
ServicesForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->services_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->ServicesStart->Enabled=true;
......@@ -495,6 +503,7 @@ static void mail_set_state(void* p, enum server_state state)
{
MailForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->mail_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->MailStart->Enabled=true;
......@@ -590,6 +599,7 @@ static void ftp_set_state(void* p, enum server_state state)
{
FtpForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->ftp_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->FtpStart->Enabled=true;
......@@ -658,6 +668,7 @@ static void web_set_state(void* p, enum server_state state)
{
WebForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->web_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->WebStart->Enabled=true;
......@@ -782,6 +793,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&bbs_startup,0,sizeof(bbs_startup));
bbs_startup.size=sizeof(bbs_startup);
bbs_startup.type = SERVER_TERM;
bbs_startup.cbdata=&bbs_log_list;
bbs_startup.event_cbdata=&event_log_list;
bbs_startup.first_node=1;
......@@ -802,6 +814,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&mail_startup,0,sizeof(mail_startup));
mail_startup.size=sizeof(mail_startup);
mail_startup.type = SERVER_MAIL;
mail_startup.cbdata=&mail_log_list;
mail_startup.smtp_port=IPPORT_SMTP;
mail_startup.relay_port=IPPORT_SMTP;
......@@ -824,6 +837,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&ftp_startup,0,sizeof(ftp_startup));
ftp_startup.size=sizeof(ftp_startup);
ftp_startup.type = SERVER_FTP;
ftp_startup.cbdata=&ftp_log_list;
ftp_startup.port=IPPORT_FTP;
ftp_startup.lputs=lputs;
......@@ -842,6 +856,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&web_startup,0,sizeof(web_startup));
web_startup.size=sizeof(web_startup);
web_startup.type = SERVER_WEB;
web_startup.cbdata=&web_log_list;
web_startup.lputs=lputs;
web_startup.errormsg=errormsg;
......@@ -855,6 +870,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&services_startup,0,sizeof(services_startup));
services_startup.size=sizeof(services_startup);
services_startup.type = SERVER_SERVICES;
services_startup.cbdata=&services_log_list;
services_startup.lputs=lputs;
services_startup.errormsg=errormsg;
......@@ -1038,6 +1054,7 @@ BOOL __fastcall TMainForm::servicesServiceEnabled(void)
//---------------------------------------------------------------------------
void __fastcall TMainForm::FormClose(TObject *Sender, TCloseAction &Action)
{
mqtt_terminating(&bbs_startup.mqtt);
UpTimer->Enabled=false; /* Stop updating the status bar */
StatsTimer->Enabled=false;
......@@ -1055,19 +1072,25 @@ void __fastcall TMainForm::FormClose(TObject *Sender, TCloseAction &Action)
|| (FtpStop->Enabled && !ftpServiceEnabled())
|| (WebStop->Enabled && !webServiceEnabled())
|| (ServicesStop->Enabled && !servicesServiceEnabled())) {
if(time(NULL)-start>30)
break;
if(time(NULL)-start>30) {
if(Application->MessageBox("Abort wait for servers to terminate?"
,"Synchronet Server Still Running", MB_OKCANCEL) == IDOK)
break;
start = time(NULL);
}
Application->ProcessMessages();
YIELD();
}
StatusBar->Panels->Items[STATUSBAR_LAST_PANEL]->Text="Closing...";
Application->ProcessMessages();
mqtt_terminating(&bbs_startup.mqtt);
LogTimer->Enabled=false;
ServiceStatusTimer->Enabled=false;
NodeForm->Timer->Enabled=false;
ClientForm->Timer->Enabled=false;
mqtt_shutdown(&bbs_startup.mqtt);
}
//---------------------------------------------------------------------------
void __fastcall TMainForm::FormCloseQuery(TObject *Sender, bool &CanClose)
......@@ -1947,7 +1970,21 @@ void __fastcall TMainForm::StartupTimerTick(TObject *Sender)
StartupTimer->Interval = 2500; // Let 'em see the logo for a bit
StartupTimer->Enabled = true;
} else {
if(!bbsServiceEnabled()) {
mqtt_startup(&bbs_startup.mqtt, &cfg, SERVER_TERM, ver()
,/* lputs: */NULL
,/* shared_client_list: */TRUE);
ftp_startup.mqtt = bbs_startup.mqtt;
ftp_startup.mqtt.server_type = SERVER_FTP;
web_startup.mqtt = bbs_startup.mqtt;
web_startup.mqtt.server_type = SERVER_WEB;
mail_startup.mqtt = bbs_startup.mqtt;
mail_startup.mqtt.server_type = SERVER_MAIL;
services_startup.mqtt = bbs_startup.mqtt;
services_startup.mqtt.server_type = SERVER_SERVICES;
}
DisplayMainPanels(Sender);
mqtt_online(&bbs_startup.mqtt);
}
Initialized=true;
}
......
......@@ -62,7 +62,7 @@
-I$(BCB)\include -I$(BCB)\include\vcl
-I..\..\..\3rdp\win32.release\cryptlib\include -I..\..\comio -src_suffix
cpp -DSBBS -DRINGBUF_EVENT -DRINGBUF_MUTEX -DUSE_CRYPTLIB -D_DEBUG -boa"/>
<CFLAG1 value="-Od -H=$(BCB)\lib\vcl60.csm -Hc -Vx -Ve -X- -r- -a8 -b- -k -y -v -vi- -c
<CFLAG1 value="-Od -H=$(BCB)\lib\vcl60.csm -Hc -Vx -Ve -X- -r- -a8 -b -k -y -v -vi- -c
-tW -tWM"/>
<PFLAGS value="-$Y+ -$W -$O- -$A8 -v -JPHNE -M"/>
<RFLAGS value=""/>
......
......@@ -142,8 +142,8 @@ static int lprintf(int level, const char *fmt, ...)
if(level <= LOG_ERR) {
char errmsg[sizeof(sbuf)+16];
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, sbuf);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf);
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
......@@ -196,8 +196,11 @@ static char* server_host_name(void)
static void set_state(enum server_state state)
{
if(startup != NULL && startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state(&startup->mqtt, state);
}
}
static void update_clients(void)
......@@ -206,7 +209,7 @@ static void update_clients(void)
uint32_t count = protected_uint32_value(active_clients);
if(startup->clients != NULL)
startup->clients(startup->cbdata, count);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", count);
mqtt_client_count(&startup->mqtt, TOPIC_SERVER, count);
}
}
......@@ -230,7 +233,7 @@ static void thread_up(BOOL setuid)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,TRUE, setuid);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", protected_uint32_value(thread_count));
mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(thread_count));
}
}
......@@ -240,7 +243,7 @@ static int32_t thread_down(void)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,FALSE, FALSE);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", count);
mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, count);
}
return count;
}
......@@ -4952,7 +4955,7 @@ void ftp_server(void* arg)
}
set_state(SERVER_INIT);
mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", ftp_ver());
mqtt_server_version(&startup->mqtt, ftp_ver());
uptime=0;
served=0;
......@@ -5093,7 +5096,7 @@ void ftp_server(void* arg)
set_state(SERVER_READY);
lprintf(LOG_INFO,"FTP Server thread started");
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", startup->max_clients);
mqtt_client_max(&startup->mqtt, startup->max_clients);
while(ftp_set!=NULL && !terminate_server) {
YIELD();
......
......@@ -26,7 +26,7 @@
typedef struct {
DWORD size; /* sizeof(ftp_startup_t) */
STARTUP_COMMON_ELEMENTS
WORD port;
WORD max_clients;
#define FTP_DEFAULT_MAX_CLIENTS 10
......@@ -34,7 +34,6 @@ typedef struct {
#define FTP_DEFAULT_MAX_INACTIVITY 300
WORD qwk_timeout;
#define FTP_DEFAULT_QWK_TIMEOUT 600
WORD sem_chk_freq; /* semaphore file checking frequency (in seconds) */
struct in_addr outgoing4;
struct in6_addr outgoing6;
str_list_t interfaces;
......@@ -42,47 +41,10 @@ typedef struct {
struct in6_addr pasv_ip6_addr;
WORD pasv_port_low;
WORD pasv_port_high;
DWORD options; /* See FTP_OPT definitions */
int64_t min_fsize; /* Minimum file size accepted for upload */
int64_t max_fsize; /* Maximum file size accepted for upload (0=unlimited) */
void* cbdata; /* Private data passed to callbacks */
/* Callbacks (NULL if unused) */
int (*lputs)(void*, int level, const char* msg);
void (*errormsg)(void*, int level, const char* msg);
void (*set_state)(void*, enum server_state);
void (*recycle)(void*);
void (*terminated)(void*, int code);
void (*clients)(void*, int active);
void (*thread_up)(void*, BOOL up, BOOL setuid);
void (*socket_open)(void*, BOOL open);
void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update);
BOOL (*seteuid)(BOOL user);
BOOL (*setuid)(BOOL force);
/* Paths */
char ctrl_dir[INI_MAX_VALUE_LEN];
char index_file_name[64];
char temp_dir[INI_MAX_VALUE_LEN];
char ini_fname[INI_MAX_VALUE_LEN];
/* Misc */
char host_name[128];
BOOL recycle_now;
BOOL shutdown_now;
int log_level;
uint bind_retry_count; /* Number of times to retry bind() calls */
uint bind_retry_delay; /* Time to wait between each bind() retry */
struct startup_sound_settings sound;
/* Login Attempt parameters */
struct login_attempt_settings login_attempt;
link_list_t* login_attempt_list;
uint max_concurrent_connections;
struct mqtt mqtt;
char index_file_name[64];
} ftp_startup_t;
......
......@@ -150,9 +150,7 @@ extern "C" int errorlog(scfg_t* cfg, struct mqtt* mqtt, int level, const char* h
SAFEPRINTF2(subject, "%s %sERROR occurred", host, level <= LOG_CRIT ? "CRITICAL " : "");
notify(cfg, cfg->node_erruser, subject, text);
}
if(mqtt != NULL) {
mqtt_pub_strval(mqtt, TOPIC_HOST, "error", text);
}
mqtt_errormsg(mqtt, level, text);
return 0;
}
......
......@@ -191,8 +191,8 @@ static int lprintf(int level, const char *fmt, ...)
if(level <= LOG_ERR) {
char errmsg[sizeof(sbuf)+16];
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, sbuf), stats.errors++;
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf);
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name,errmsg), stats.errors++;
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
......@@ -248,8 +248,11 @@ static char* server_host_name(void)
static void set_state(enum server_state state)
{
if(startup != NULL && startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state(&startup->mqtt, state);
}
}
static void update_clients(void)
......@@ -257,7 +260,7 @@ static void update_clients(void)
if(startup != NULL) {
if(startup->clients != NULL)
startup->clients(startup->cbdata,protected_uint32_value(active_clients)+active_sendmail);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", protected_uint32_value(active_clients));
mqtt_client_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(active_clients));
}
}
......@@ -281,7 +284,7 @@ static void thread_up(BOOL setuid)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,TRUE,setuid);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", protected_uint32_value(thread_count));
mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(thread_count));
}
}
......@@ -291,7 +294,7 @@ static int32_t thread_down(void)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,FALSE,FALSE);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", count);
mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, count);
}
return count;
}
......@@ -310,7 +313,7 @@ void mail_open_socket(SOCKET sock, void* cb_protocol)
stats.sockets++;
if(startup != NULL)
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "socket_count", stats.sockets);
mqtt_socket_count(&startup->mqtt, TOPIC_SERVER, stats.sockets);
}
void mail_close_socket_cb(SOCKET sock, void* cb_protocol)
......@@ -319,7 +322,7 @@ void mail_close_socket_cb(SOCKET sock, void* cb_protocol)
startup->socket_open(startup->cbdata,FALSE);
stats.sockets--;
if(startup != NULL)
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "socket_count", stats.sockets);
mqtt_socket_count(&startup->mqtt, TOPIC_SERVER, stats.sockets);
}
int mail_close_socket(SOCKET *sock, int *sess)
......@@ -339,7 +342,7 @@ int mail_close_socket(SOCKET *sock, int *sess)
startup->socket_open(startup->cbdata,FALSE);
stats.sockets--;
if(startup != NULL)
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "socket_count", stats.sockets);
mqtt_socket_count(&startup->mqtt, TOPIC_SERVER, stats.sockets);
if(result!=0) {
if(ERROR_VALUE!=ENOTSOCK)
lprintf(LOG_WARNING,"%04d !ERROR %d closing socket",*sock, ERROR_VALUE);
......@@ -6052,7 +6055,7 @@ void mail_server(void* arg)
set_state(SERVER_INIT);
mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", mail_ver());
mqtt_server_version(&startup->mqtt, mail_ver());
ZERO_VAR(js_server_props);
SAFEPRINTF3(js_server_props.version,"%s %s%c",server_name, VERSION, REVISION);
......@@ -6273,7 +6276,7 @@ void mail_server(void* arg)
set_state(SERVER_READY);
lprintf(LOG_INFO,"Mail Server thread started");
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", startup->max_clients);
mqtt_client_max(&startup->mqtt, startup->max_clients);
while(!terminated && !terminate_server) {
YIELD();
......
......@@ -27,7 +27,7 @@
typedef struct {
DWORD size; /* sizeof(mail_startup_t) */
STARTUP_COMMON_ELEMENTS
WORD smtp_port;
WORD pop3_port;
WORD pop3s_port;
......@@ -46,37 +46,16 @@ typedef struct {
#define MAIL_DEFAULT_LINES_PER_YIELD 10
WORD max_recipients;
#define MAIL_DEFAULT_MAX_RECIPIENTS 100
WORD sem_chk_freq; /* semaphore file checking frequency (in seconds) */
struct in_addr outgoing4;
struct in6_addr outgoing6;
str_list_t interfaces;
str_list_t pop3_interfaces;
DWORD options; /* See MAIL_OPT definitions */
DWORD max_msg_size; /* Max msg size in bytes (0=unlimited) */
#define MAIL_DEFAULT_MAX_MSG_SIZE (20*1024*1024) /* 20MB */
DWORD max_msgs_waiting; /* Max msgs in user's inbox (0=unlimited) */
#define MAIL_DEFAULT_MAX_MSGS_WAITING 100
DWORD connect_timeout; /* in seconds, for non-blocking connect (0=blocking socket) */
#define MAIL_DEFAULT_CONNECT_TIMEOUT 30 /* seconds */
void* cbdata; /* Private data passed to callbacks */
/* Callbacks (NULL if unused) */
int (*lputs)(void*, int level, const char* msg);
void (*errormsg)(void*, int level, const char* msg);
void (*set_state)(void*, enum server_state);
void (*recycle)(void*);
void (*terminated)(void*, int code);
void (*clients)(void*, int active);
void (*thread_up)(void*, BOOL up, BOOL setuid);
void (*socket_open)(void*, BOOL open);
void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update);
BOOL (*seteuid)(BOOL user);
BOOL (*setuid)(BOOL force);
/* Paths */
char ctrl_dir[INI_MAX_VALUE_LEN];
char temp_dir[INI_MAX_VALUE_LEN];
char ini_fname[INI_MAX_VALUE_LEN];
/* Strings */
char dns_server[128];
......@@ -88,13 +67,7 @@ typedef struct {
char pop3_sound[INI_MAX_VALUE_LEN];
/* Misc */
char host_name[128];
BOOL recycle_now;
BOOL shutdown_now;
int log_level;
int tls_error_level; /* Cap the severity of TLS error log messages */
uint bind_retry_count; /* Number of times to retry bind() calls */
uint bind_retry_delay; /* Time to wait between each bind() retry */
/* Relay Server */
char relay_server[128];
......@@ -105,13 +78,7 @@ typedef struct {
/* JavaScript operating parameters */
js_startup_t js;
struct startup_sound_settings sound;
/* Login Attempt parameters */
struct login_attempt_settings login_attempt;
link_list_t* login_attempt_list;
uint max_concurrent_connections;
struct mqtt mqtt;
} mail_startup_t;
......
......@@ -160,8 +160,11 @@ static bbs_startup_t* startup=NULL;
static void set_state(enum server_state state)
{
if(startup != NULL && startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state(&startup->mqtt, state);
}
}
static void update_clients()
......@@ -169,7 +172,7 @@ static void update_clients()
if(startup != NULL) {
if(startup->clients != NULL)
startup->clients(startup->cbdata,protected_uint32_value(node_threads_running));
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", protected_uint32_value(node_threads_running));
mqtt_client_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(node_threads_running));
}
}
......@@ -206,8 +209,8 @@ int lputs(int level, const char* str)
mqtt_lputs(&startup->mqtt, TOPIC_SERVER, level, str);
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, str);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, str);
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
......@@ -233,8 +236,8 @@ int eputs(int level, const char *str)
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, str);
SAFEPRINTF(errmsg, "evnt %s", str);
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata, level, errmsg);
}
......@@ -4834,6 +4837,7 @@ void bbs_thread(void* arg)
}
if(startup->size!=sizeof(bbs_startup_t)) { // verify size
int sz = sizeof(bbs_startup_t);
sbbs_beep(100,500);
sbbs_beep(300,500);
sbbs_beep(100,500);
......@@ -4937,7 +4941,7 @@ void bbs_thread(void* arg)
cleanup(1);
return;
}
mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", bbs_ver());
mqtt_server_version(&startup->mqtt, bbs_ver());
t=time(NULL);
lprintf(LOG_INFO,"Initializing on %.24s with options: %x"
......@@ -5229,7 +5233,7 @@ NO_SSH:
set_state(SERVER_READY);
lprintf(LOG_INFO,"Terminal Server thread started for nodes %d through %d", first_node, last_node);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", (last_node - first_node) + 1);
mqtt_client_max(&startup->mqtt, (last_node - first_node) + 1);
while(!terminate_server) {
YIELD();
......
......@@ -22,9 +22,24 @@
#include <string.h>
#include "mqtt.h"
#include "startup.h"
int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, const char* host, const char* server)
const char* server_type_desc(enum server_type type)
{
switch(type) {
case SERVER_TERM: return "term";
case SERVER_FTP: return "ftp";
case SERVER_WEB: return "web";
case SERVER_MAIL: return "mail";
case SERVER_SERVICES: return "srvc";
}
return "???";
}
int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, enum server_type type)
{
char hostname[256]="undefined-hostname";
if(mqtt == NULL || cfg == NULL)
return MQTT_FAILURE;
if(!cfg->mqtt.enabled)
......@@ -32,8 +47,14 @@ int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, const char* host, const char* serv
if(mqtt != NULL) {
memset(mqtt, 0, sizeof(*mqtt));
mqtt->cfg = cfg;
mqtt->host = host;
mqtt->server = server;
mqtt->server_type = type;
listInit(&mqtt->client_list, LINK_LIST_MUTEX);
#ifdef _WIN32
WSADATA WSAData;
WSAStartup(MAKEWORD(1,1), &WSAData);
#endif
gethostname(hostname, sizeof(hostname));
mqtt->host = strdup(hostname);
#ifdef USE_MOSQUITTO
return mosquitto_lib_init();
#endif
......@@ -54,7 +75,7 @@ static char* format_topic(struct mqtt* mqtt, enum topic_depth depth, char* str,
safe_snprintf(str, size, "sbbs/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
break;
case TOPIC_SERVER:
safe_snprintf(str, size, "sbbs/%s/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, mqtt->server, sbuf);
safe_snprintf(str, size, "sbbs/%s/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(mqtt->server_type), sbuf);
break;
case TOPIC_EVENT:
safe_snprintf(str, size, "sbbs/%s/%s/event/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
......@@ -277,8 +298,10 @@ void mqtt_close(struct mqtt* mqtt)
if(mqtt->handle != NULL) {
mosquitto_destroy(mqtt->handle);
mqtt->handle = NULL;
listFree(&mqtt->client_list);
}
#endif
FREE_AND_NULL(mqtt->host);
}
static int pw_callback(char* buf, int size, int rwflag, void* userdata)
......@@ -379,3 +402,171 @@ int mqtt_thread_stop(struct mqtt* mqtt)
#endif
}
static int lprintf(int (*lputs)(int level, const char* str), int level, const char *fmt, ...)
{
va_list argptr;
char sbuf[1024];
if(lputs == NULL)
return -1;
va_start(argptr,fmt);
vsnprintf(sbuf,sizeof(sbuf),fmt,argptr);
sbuf[sizeof(sbuf)-1]=0;
va_end(argptr);
return lputs(level,sbuf);
}
int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, enum server_type type, const char* version
,int (*lputs)(int level, const char* str)
,BOOL shared_client_list)
{
int result = MQTT_FAILURE;
char str[128];
if(mqtt == NULL)
return MQTT_FAILURE;
if(cfg->mqtt.enabled) {
result = mqtt_init(mqtt, cfg, type);
if(result != MQTT_SUCCESS) {
lprintf(lputs, LOG_INFO, "MQTT init failure: %d", result);
} else {
lprintf(lputs, LOG_INFO, "MQTT lib: %s", mqtt_libver(str, sizeof(str)));
result = mqtt_open(mqtt);
if(result != MQTT_SUCCESS) {
lprintf(lputs, LOG_ERR, "MQTT open failure: %d", result);
} else {
result = mqtt_thread_start(mqtt);
if(result != MQTT_SUCCESS) {
lprintf(lputs, LOG_ERR, "Error %d starting pub/sub thread", result);
mqtt_close(mqtt);
} else {
lprintf(lputs, LOG_INFO, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port);
result = mqtt_connect(mqtt, /* bind_address: */NULL);
if(result == MQTT_SUCCESS) {
lprintf(lputs, LOG_INFO, "MQTT broker-connect (%s:%d) successful", cfg->mqtt.broker_addr, cfg->mqtt.broker_port);
} else {
lprintf(lputs, LOG_ERR, "MQTT broker-connect (%s:%d) failure: %d", cfg->mqtt.broker_addr, cfg->mqtt.broker_port, result);
mqtt_close(mqtt);
}
}
}
}
}
mqtt->shared_client_list = shared_client_list;
mqtt_pub_strval(mqtt, TOPIC_HOST, "version", version);
mqtt_pub_strval(mqtt, TOPIC_HOST, "status", "initializing");
for(enum topic_depth depth = TOPIC_HOST; depth <= TOPIC_SERVER; depth++) {
mqtt_pub_noval(mqtt, depth, "error_count");
mqtt_pub_noval(mqtt, depth, "thread_count");
mqtt_pub_noval(mqtt, depth, "socket_count");
mqtt_pub_noval(mqtt, depth, "client_count");
mqtt_pub_noval(mqtt, depth, "client_list");
mqtt_pub_noval(mqtt, depth, "served");
}
return result;
}
int mqtt_online(struct mqtt* mqtt)
{
return mqtt_pub_strval(mqtt, TOPIC_HOST, "status", "online");
}
int mqtt_server_state(struct mqtt* mqtt, enum server_state state)
{
return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "state", state);
}
int mqtt_server_version(struct mqtt* mqtt, const char* str)
{
return mqtt_pub_strval(mqtt, TOPIC_SERVER, "version", str);
}
int mqtt_errormsg(struct mqtt* mqtt, int level, const char* msg)
{
if(mqtt == NULL)
return MQTT_FAILURE;
++mqtt->error_count;
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "error_count", mqtt->error_count);
return mqtt_pub_strval(mqtt, TOPIC_HOST, "error", msg);
}
int mqtt_thread_count(struct mqtt* mqtt, enum topic_depth depth, ulong count)
{
return mqtt_pub_uintval(mqtt, depth, "thread_count", count);
}
int mqtt_socket_count(struct mqtt* mqtt, enum topic_depth depth, ulong count)
{
return mqtt_pub_uintval(mqtt, depth, "socket_count", count);
}
int mqtt_client_count(struct mqtt* mqtt, enum topic_depth depth, ulong count)
{
return mqtt_pub_uintval(mqtt, depth, "client_count", count);
}
int mqtt_client_max(struct mqtt* mqtt, ulong count)
{
return mqtt_pub_uintval(mqtt, TOPIC_SERVER, "max_clients", count);
}
int mqtt_client_on(struct mqtt* mqtt, BOOL on, int sock, client_t* client, BOOL update)
{
if(mqtt == NULL)
return MQTT_FAILURE;
listLock(&mqtt->client_list);
if(on) {
if(update) {
list_node_t* node;
if((node=listFindTaggedNode(&mqtt->client_list, sock)) != NULL)
memcpy(node->data, client, sizeof(client_t));
} else {
listAddNodeData(&mqtt->client_list, client, sizeof(client_t), sock, LAST_NODE);
mqtt->served++;
}
} else
listRemoveTaggedNode(&mqtt->client_list, sock, /* free_data: */TRUE);
str_list_t list = strListInit();
for(list_node_t* node = mqtt->client_list.first; node != NULL; node = node->next) {
client_t* client = node->data;
strListAppendFormat(&list, "%ld\t%s\t%s\t%s\t%s\t%u\t%lu"
,node->tag
,client->protocol
,client->user
,client->addr
,client->host
,client->port
,(ulong)client->time
);
}
listUnlock(&mqtt->client_list);
char buf[1024]; // TODO
strListJoin(list, buf, sizeof(buf), "\n");
strListFree(&list);
mqtt_client_count(mqtt, mqtt->shared_client_list ? TOPIC_HOST : TOPIC_SERVER, mqtt->client_list.count);
mqtt_served_count(mqtt, TOPIC_SERVER, mqtt->served);
return mqtt_pub_strval(mqtt, mqtt->shared_client_list ? TOPIC_HOST : TOPIC_SERVER, "client_list", buf);
}
int mqtt_served_count(struct mqtt* mqtt, enum topic_depth depth, ulong count)
{
return mqtt_pub_uintval(mqtt, depth, "served", count);
}
int mqtt_terminating(struct mqtt* mqtt)
{
return mqtt_pub_strval(mqtt, TOPIC_HOST, "status", "terminating");
}
void mqtt_shutdown(struct mqtt* mqtt)
{
mqtt_pub_strval(mqtt, TOPIC_HOST, "status", "offline");
mqtt_disconnect(mqtt);
mqtt_thread_stop(mqtt);
mqtt_close(mqtt);
}
......@@ -19,10 +19,13 @@
* Note: If this box doesn't appear square, then you need to fix your tabs. *
****************************************************************************/
#ifndef MQTT_H
#define MQTT_H
#ifndef MQTT_H_
#define MQTT_H_
#include "scfgdefs.h"
#include "client.h"
#include "server.h"
#include "link_list.h"
#include "dllexport.h"
#include <stdarg.h>
......@@ -38,8 +41,12 @@
struct mqtt {
mqtt_handle_t handle;
scfg_t* cfg;
const char* host;
const char* server;
char* host;
enum server_type server_type;
ulong error_count;
ulong served;
link_list_t client_list;
BOOL shared_client_list;
};
enum topic_depth {
......@@ -57,7 +64,16 @@ enum topic_depth {
extern "C" {
#endif
DLLEXPORT int mqtt_init(struct mqtt*, scfg_t*, const char* host, const char* server);
DLLEXPORT int mqtt_init(struct mqtt*, scfg_t*, enum server_type);
DLLEXPORT int mqtt_startup(struct mqtt*, scfg_t*, enum server_type, const char* version
,int (*lputs)(int level, const char* str)
,BOOL shared_client_list);
DLLEXPORT int mqtt_online(struct mqtt*);
DLLEXPORT int mqtt_server_state(struct mqtt*, enum server_state);
DLLEXPORT int mqtt_server_version(struct mqtt*, const char*);
DLLEXPORT int mqtt_errormsg(struct mqtt*, int level, const char*);
DLLEXPORT int mqtt_terminating(struct mqtt*);
DLLEXPORT void mqtt_shutdown(struct mqtt*);
DLLEXPORT char* mqtt_libver(char* str, size_t size);
DLLEXPORT char* mqtt_topic(struct mqtt*, enum topic_depth, char* str, size_t size, const char* fmt, ...);
DLLEXPORT int mqtt_subscribe(struct mqtt*, enum topic_depth, char* str, size_t size, const char* fmt, ...);
......@@ -72,9 +88,15 @@ DLLEXPORT int mqtt_connect(struct mqtt*, const char* bind_address);
DLLEXPORT int mqtt_disconnect(struct mqtt*);
DLLEXPORT int mqtt_thread_start(struct mqtt*);
DLLEXPORT int mqtt_thread_stop(struct mqtt*);
DLLEXPORT int mqtt_thread_count(struct mqtt*, enum topic_depth, ulong count);
DLLEXPORT int mqtt_socket_count(struct mqtt*, enum topic_depth, ulong count);
DLLEXPORT int mqtt_served_count(struct mqtt*, enum topic_depth, ulong count);
DLLEXPORT int mqtt_client_count(struct mqtt*, enum topic_depth, ulong count);
DLLEXPORT int mqtt_client_on(struct mqtt*, BOOL on, int sock, client_t* client, BOOL update);
DLLEXPORT int mqtt_client_max(struct mqtt*, ulong count);
#ifdef __cplusplus
}
#endif
#endif // MQTT_H
#endif // MQTT_H_
......@@ -246,11 +246,11 @@ static void notify_systemd(const char* new_status)
}
#endif
static int log_puts(int level, char *str)
static int log_puts(int level, const char *str)
{
static pthread_mutex_t mutex;
static BOOL mutex_initialized;
char *p;
const char *p;
#ifdef __unix__
if (is_daemon) {
......@@ -291,7 +291,7 @@ static int log_puts(int level, char *str)
return(prompt_len);
}
static int lputs(int level, char *str)
static int lputs(int level, const char *str)
{
if(str != NULL && *str != '\0')
mqtt_lputs(&bbs_startup.mqtt, TOPIC_HOST, level, str);
......@@ -301,7 +301,6 @@ static int lputs(int level, char *str)
static void errormsg(void *cbdata, int level, const char *msg)
{
error_count++;
mqtt_pub_uintval(&bbs_startup.mqtt, TOPIC_HOST, "error_count", error_count);
}
static int lprintf(int level, const char *fmt, ...)
......@@ -363,31 +362,6 @@ static void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const st
return;
}
}
#ifdef MOSQUITTO_LOG
static void mqtt_log_msg(struct mosquitto* moq, void* cbdata, int level, const char* str)
{
static FILE* fp;
if(fp == NULL) {
char path[MAX_PATH + 1];
SAFEPRINTF(path, "%smqtt.log", scfg.logs_dir);
fp = fopen(path, "a");
if(fp == NULL)
lprintf(LOG_ERR, "Error %d opening %s", errno, path);
}
time_t now = time(NULL);
char tmp[32];
fprintf(fp, "%.24s %x %s\n", ctime_r(&now, tmp), level, str);
}
#endif
static void mqtt_disconnected(struct mosquitto* mosq , void* cbdata, int reason)
{
char msg[1024];
SAFEPRINTF(msg, "MQTT broker disconnected, reason: %d", reason);
log_puts(LOG_INFO, msg);
}
#endif // USE_MOSQUITTO
#ifdef __unix__
......@@ -656,14 +630,12 @@ static BOOL winsock_cleanup(void)
static void set_state(void* cbdata, enum server_state state)
{
enum server_type server_type = (enum server_type)cbdata;
enum server_type server_type = ((struct startup*)cbdata)->type;
server_state[server_type] = state;
#ifdef USE_SYSTEMD
notify_systemd(NULL);
#endif
mqtt_pub_uintval(mqtt[server_type], TOPIC_SERVER, "state", state);
#ifdef _THREAD_SUID_BROKEN
if(state == SERVER_READY) {
if(thread_suid_broken) {
......@@ -699,14 +671,14 @@ static void thread_up(void* p, BOOL up, BOOL setuid)
thread_count++;
else if(thread_count>0)
thread_count--;
mqtt_pub_uintval(&bbs_startup.mqtt, TOPIC_HOST, "thread_count", thread_count);
mqtt_thread_count(&bbs_startup.mqtt, TOPIC_HOST, thread_count);
pthread_mutex_unlock(&mutex);
lputs(LOG_INFO,NULL); /* update displayed stats */
}
static void socket_open(void* cbdata, BOOL open)
{
enum server_type server_type = (enum server_type)cbdata;
enum server_type server_type = ((struct startup*)cbdata)->type;
static pthread_mutex_t mutex;
static BOOL mutex_initialized;
......@@ -720,38 +692,11 @@ static void socket_open(void* cbdata, BOOL open)
socket_count++;
else if(socket_count>0)
socket_count--;
mqtt_pub_uintval(mqtt[server_type], TOPIC_HOST, "socket_count", socket_count);
mqtt_socket_count(mqtt[server_type], TOPIC_HOST, socket_count);
pthread_mutex_unlock(&mutex);
lputs(LOG_INFO,NULL); /* update displayed stats */
}
static void pub_client_list()
{
if(bbs_startup.mqtt.handle != NULL) {
list_node_t* node;
str_list_t list = strListInit();
listLock(&client_list);
for(node=client_list.first; node!=NULL; node=node->next) {
client_t* client=node->data;
strListAppendFormat(&list, "%ld\t%s\t%s\t%s\t%s\t%u\t%lu"
,node->tag
,client->protocol
,client->user
,client->addr
,client->host
,client->port
,(ulong)client->time
);
}
listUnlock(&client_list);
char buf[1024];
strListJoin(list, buf, sizeof(buf), "\n");
strListFree(&list);
mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "client_list", buf);
}
}
static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update)
{
if(on) {
......@@ -769,9 +714,9 @@ static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update)
} else
listRemoveTaggedNode(&client_list, sock, /* free_data: */TRUE);
pub_client_list();
mqtt_pub_uintval(&bbs_startup.mqtt, TOPIC_HOST, "client_count", client_list.count);
mqtt_pub_uintval(&bbs_startup.mqtt, TOPIC_HOST, "served", served);
mqtt_client_on(&bbs_startup.mqtt, on, sock, client, update);
mqtt_client_count(&bbs_startup.mqtt, TOPIC_HOST, client_list.count);
mqtt_served_count(&bbs_startup.mqtt, TOPIC_HOST, served);
lputs(LOG_INFO,NULL); /* update displayed stats */
}
......@@ -1117,7 +1062,7 @@ void recycle(void* cbdata)
mail_startup_t* mail=NULL;
services_startup_t* services=NULL;
switch((enum server_type)cbdata) {
switch(((struct startup*)cbdata)->type) {
case SERVER_TERM:
bbs = &bbs_startup;
break;
......@@ -1340,7 +1285,8 @@ int main(int argc, char** argv)
/* Initialize BBS startup structure */
memset(&bbs_startup,0,sizeof(bbs_startup));
bbs_startup.size=sizeof(bbs_startup);
bbs_startup.cbdata = (void*)SERVER_TERM;
bbs_startup.type = SERVER_TERM;
bbs_startup.cbdata = &bbs_startup;
bbs_startup.log_level = LOG_DEBUG;
bbs_startup.lputs=bbs_lputs;
bbs_startup.event_lputs=event_lputs;
......@@ -1360,7 +1306,8 @@ int main(int argc, char** argv)
/* Initialize FTP startup structure */
memset(&ftp_startup,0,sizeof(ftp_startup));
ftp_startup.size=sizeof(ftp_startup);
ftp_startup.cbdata = (void*)SERVER_FTP;
ftp_startup.type = SERVER_FTP;
ftp_startup.cbdata = &ftp_startup;
ftp_startup.log_level = LOG_DEBUG;
ftp_startup.lputs=ftp_lputs;
ftp_startup.errormsg=errormsg;
......@@ -1380,7 +1327,8 @@ int main(int argc, char** argv)
/* Initialize Web Server startup structure */
memset(&web_startup,0,sizeof(web_startup));
web_startup.size=sizeof(web_startup);
web_startup.cbdata = (void*)SERVER_WEB;
web_startup.type = SERVER_WEB;
web_startup.cbdata = &web_startup;
web_startup.log_level = LOG_DEBUG;
web_startup.lputs=web_lputs;
web_startup.errormsg=errormsg;
......@@ -1399,7 +1347,8 @@ int main(int argc, char** argv)
/* Initialize Mail Server startup structure */
memset(&mail_startup,0,sizeof(mail_startup));
mail_startup.size=sizeof(mail_startup);
mail_startup.cbdata = (void*)SERVER_MAIL;
mail_startup.type = SERVER_MAIL;
mail_startup.cbdata = &mail_startup;
mail_startup.log_level = LOG_DEBUG;
mail_startup.lputs=mail_lputs;
mail_startup.errormsg=errormsg;
......@@ -1418,7 +1367,8 @@ int main(int argc, char** argv)
/* Initialize Services startup structure */
memset(&services_startup,0,sizeof(services_startup));
services_startup.size=sizeof(services_startup);
services_startup.cbdata = (void*)SERVER_SERVICES;
services_startup.type = SERVER_SERVICES;
services_startup.cbdata = &services_startup;
services_startup.log_level = LOG_DEBUG;
services_startup.lputs=services_lputs;
services_startup.errormsg=errormsg;
......@@ -1838,60 +1788,23 @@ int main(int argc, char** argv)
#endif // __unix__
if(scfg.mqtt.enabled) {
int result = mqtt_init(&bbs_startup.mqtt, &scfg, host_name, "term");
if(result != MQTT_SUCCESS) {
lprintf(LOG_INFO, "MQTT init failure: %d", result);
} else {
lprintf(LOG_INFO, "MQTT lib: %s", mqtt_libver(str, sizeof(str)));
result = mqtt_open(&bbs_startup.mqtt);
if(result != MQTT_SUCCESS) {
lprintf(LOG_ERR, "MQTT open failure: %d", result);
} else {
result = mqtt_thread_start(&bbs_startup.mqtt);
if(result != MQTT_SUCCESS) {
lprintf(LOG_ERR, "Error %d starting pub/sub thread", result);
mqtt_close(&bbs_startup.mqtt);
} else {
lprintf(LOG_INFO, "MQTT connecting to broker %s:%u", scfg.mqtt.broker_addr, scfg.mqtt.broker_port);
result = mqtt_connect(&bbs_startup.mqtt, /* bind_address: */NULL);
if(result == MQTT_SUCCESS) {
lprintf(LOG_INFO, "MQTT broker-connect (%s:%d) successful", scfg.mqtt.broker_addr, scfg.mqtt.broker_port);
} else {
lprintf(LOG_ERR, "MQTT broker-connect (%s:%d) failure: %d", scfg.mqtt.broker_addr, scfg.mqtt.broker_port, result);
mqtt_close(&bbs_startup.mqtt);
}
}
}
}
}
mqtt_startup(&bbs_startup.mqtt, &scfg, SERVER_TERM, sbbscon_ver(), log_puts, /* shared_client_list: */TRUE);
mqtt[SERVER_TERM] = &bbs_startup.mqtt;
mail_startup.mqtt = bbs_startup.mqtt;
mail_startup.mqtt.server = "mail";
mail_startup.mqtt.server_type = SERVER_MAIL;
mqtt[SERVER_MAIL] = &mail_startup.mqtt;
ftp_startup.mqtt = bbs_startup.mqtt;
ftp_startup.mqtt.server = "ftp";
ftp_startup.mqtt.server_type = SERVER_FTP;
mqtt[SERVER_FTP] = &ftp_startup.mqtt;
web_startup.mqtt = bbs_startup.mqtt;
web_startup.mqtt.server = "web";
web_startup.mqtt.server_type = SERVER_WEB;
mqtt[SERVER_WEB] = &web_startup.mqtt;
services_startup.mqtt = bbs_startup.mqtt;
services_startup.mqtt.server = "srvc";
services_startup.mqtt.server_type = SERVER_SERVICES;
mqtt[SERVER_SERVICES] = &services_startup.mqtt;
mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "version", sbbscon_ver());
mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "status", "initializing");
#ifdef USE_MOSQUITTO
if(bbs_startup.mqtt.handle != NULL) {
#ifdef MOSQUITTO_LOG
mosquitto_log_callback_set(bbs_startup.mqtt.handle, mqtt_log_msg);
#endif
mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "error_count");
mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "thread_count");
mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "socket_count");
mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "client_count");
mqtt_pub_noval(&bbs_startup.mqtt, TOPIC_HOST, "served");
mosquitto_disconnect_callback_set(bbs_startup.mqtt.handle, mqtt_disconnected);
mosquitto_message_callback_set(bbs_startup.mqtt.handle, mqtt_message_received);
for(int i = bbs_startup.first_node; i <= bbs_startup.last_node; i++) {
mqtt_subscribe(&bbs_startup.mqtt, TOPIC_BBS, str, sizeof(str), "node%d/input", i);
......@@ -2017,7 +1930,8 @@ int main(int argc, char** argv)
if(run_web)
_beginthread((void(*)(void*))web_server,0,&web_startup);
mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "status", "online");
mqtt_online(&bbs_startup.mqtt);
#ifdef __unix__
uid_t uid = getuid();
if(uid != 0 && !capabilities_set) { /* are we running as a normal user? */
......@@ -2328,16 +2242,13 @@ int main(int argc, char** argv)
}
}
mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "status", "terminating");
mqtt_terminating(&bbs_startup.mqtt);
terminate();
/* erase the prompt */
printf("\r%*s\r",prompt_len,"");
mqtt_pub_strval(&bbs_startup.mqtt, TOPIC_HOST, "status", "offline");
mqtt_disconnect(&bbs_startup.mqtt);
mqtt_thread_stop(&bbs_startup.mqtt);
mqtt_close(&bbs_startup.mqtt);
mqtt_shutdown(&bbs_startup.mqtt);
return(0);
}
/* Synchronet Server values */
/****************************************************************************
* @format.tab-size 4 (Plain Text/Source Code File Header) *
* @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) *
* *
* Copyright Rob Swindell - http://www.synchro.net/copyright.html *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* See the GNU General Public License for more details: gpl.txt or *
* http://www.fsf.org/copyleft/gpl.html *
* *
* For Synchronet coding style and modification guidelines, see *
* http://www.synchro.net/source.html *
* *
* Note: If this box doesn't appear square, then you need to fix your tabs. *
****************************************************************************/
#ifndef SERVER_H_
#define SERVER_H_
enum server_type {
SERVER_TERM,
SERVER_MAIL,
SERVER_FTP,
SERVER_WEB,
SERVER_SERVICES,
SERVER_COUNT
};
// Approximate systemd service states (see sd_notify)
enum server_state {
SERVER_STOPPED,
SERVER_INIT,
SERVER_READY,
SERVER_RELOADING,
SERVER_STOPPING
};
#endif /* Don't add anything after this line */
......@@ -125,8 +125,8 @@ static int lprintf(int level, const char *fmt, ...)
if(level <= LOG_ERR) {
char errmsg[sizeof(sbuf)+16];
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, sbuf);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf);
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
......@@ -179,8 +179,11 @@ static char* server_host_name(void)
static void set_state(enum server_state state)
{
if(startup != NULL && startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state(&startup->mqtt, state);
}
}
static ulong active_clients(void)
......@@ -1857,7 +1860,7 @@ void services_thread(void* arg)
}
set_state(SERVER_INIT);
mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", services_ver());
mqtt_server_version(&startup->mqtt, services_ver());
#ifdef _THREAD_SUID_BROKEN
if(thread_suid_broken)
......
......@@ -24,52 +24,15 @@
typedef struct {
DWORD size; /* sizeof(bbs_struct_t) */
STARTUP_COMMON_ELEMENTS
struct in_addr outgoing4;
struct in6_addr outgoing6;
str_list_t interfaces;
DWORD options; /* See BBS_OPT definitions */
WORD sem_chk_freq; /* semaphore file checking frequency (in seconds) */
void* cbdata; /* Private data passed to callbacks */
/* Callbacks (NULL if unused) */