Commit 6ed3f6dc authored by Rob Swindell's avatar Rob Swindell 💬
Browse files

MQTT support overhaul, mainly for SBBSCTRL compatibility

The only difference in the data/scheme is that the "error" topic (error log) is now under each server rather than each host. I don't *think* there are any other changes from the MQTT consumer side.

Still not done: subscribing (e.g. support for recycle or node-spy-input via MQTT) and NT services support.

This change also includes a cool feature that will prompt the sysop if there's a timeout (30 seconds) while waiting for servers to shutdown gracefully and giving the sysop the option to abort (Cancel) the wait (and shutdown ungracefully) or continue the wait (OK).
parent 30942ff2
Pipeline #3563 passed with stage
in 6 minutes and 24 seconds
......@@ -171,6 +171,7 @@ static void thread_up(void* p, BOOL up, BOOL setuid)
threads++;
else if(threads>0)
threads--;
mqtt_thread_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, threads);
ReleaseMutex(mutex);
}
......@@ -188,6 +189,7 @@ void socket_open(void* p, BOOL open)
sockets++;
else if(sockets>0)
sockets--;
mqtt_socket_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, sockets);
ReleaseMutex(mutex);
}
......@@ -201,8 +203,10 @@ static void client_add(void* p, BOOL add)
if(add) {
clients++;
total_clients++;
mqtt_served_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, total_clients);
} else if(clients>0)
clients--;
mqtt_client_count(&MainForm->bbs_startup.mqtt, TOPIC_HOST, clients);
}
static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update)
......@@ -212,13 +216,15 @@ static void client_on(void* p, BOOL on, int sock, client_t* client, BOOL update)
time_t t;
static HANDLE mutex;
TListItem* Item;
mqtt_client_on(&MainForm->bbs_startup.mqtt, on, sock, client, update);
if(!mutex)
mutex=CreateMutex(NULL,false,NULL);
WaitForSingleObject(mutex,INFINITE);
WaitForSingleObject(ClientForm->ListMutex,INFINITE);
/* Search for exising entry for this socket */
/* Search for existing entry for this socket */
for(i=0;i<ClientForm->ListView->Items->Count;i++) {
if(ClientForm->ListView->Items->Item[i]->Caption.ToIntDef(0)==sock)
break;
......@@ -332,6 +338,7 @@ static void bbs_set_state(void* p, enum server_state state)
{
TelnetForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->bbs_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->TelnetStart->Enabled=true;
......@@ -429,6 +436,7 @@ static void services_set_state(void* p, enum server_state state)
{
ServicesForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->services_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->ServicesStart->Enabled=true;
......@@ -495,6 +503,7 @@ static void mail_set_state(void* p, enum server_state state)
{
MailForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->mail_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->MailStart->Enabled=true;
......@@ -590,6 +599,7 @@ static void ftp_set_state(void* p, enum server_state state)
{
FtpForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->ftp_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->FtpStart->Enabled=true;
......@@ -658,6 +668,7 @@ static void web_set_state(void* p, enum server_state state)
{
WebForm->Status->Caption = server_state_str(state);
mqtt_server_state(&MainForm->web_startup.mqtt, state);
switch(state) {
case SERVER_STOPPED:
MainForm->WebStart->Enabled=true;
......@@ -782,6 +793,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&bbs_startup,0,sizeof(bbs_startup));
bbs_startup.size=sizeof(bbs_startup);
bbs_startup.type = SERVER_TERM;
bbs_startup.cbdata=&bbs_log_list;
bbs_startup.event_cbdata=&event_log_list;
bbs_startup.first_node=1;
......@@ -802,6 +814,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&mail_startup,0,sizeof(mail_startup));
mail_startup.size=sizeof(mail_startup);
mail_startup.type = SERVER_MAIL;
mail_startup.cbdata=&mail_log_list;
mail_startup.smtp_port=IPPORT_SMTP;
mail_startup.relay_port=IPPORT_SMTP;
......@@ -824,6 +837,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&ftp_startup,0,sizeof(ftp_startup));
ftp_startup.size=sizeof(ftp_startup);
ftp_startup.type = SERVER_FTP;
ftp_startup.cbdata=&ftp_log_list;
ftp_startup.port=IPPORT_FTP;
ftp_startup.lputs=lputs;
......@@ -842,6 +856,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&web_startup,0,sizeof(web_startup));
web_startup.size=sizeof(web_startup);
web_startup.type = SERVER_WEB;
web_startup.cbdata=&web_log_list;
web_startup.lputs=lputs;
web_startup.errormsg=errormsg;
......@@ -855,6 +870,7 @@ __fastcall TMainForm::TMainForm(TComponent* Owner)
memset(&services_startup,0,sizeof(services_startup));
services_startup.size=sizeof(services_startup);
services_startup.type = SERVER_SERVICES;
services_startup.cbdata=&services_log_list;
services_startup.lputs=lputs;
services_startup.errormsg=errormsg;
......@@ -1038,6 +1054,7 @@ BOOL __fastcall TMainForm::servicesServiceEnabled(void)
//---------------------------------------------------------------------------
void __fastcall TMainForm::FormClose(TObject *Sender, TCloseAction &Action)
{
mqtt_terminating(&bbs_startup.mqtt);
UpTimer->Enabled=false; /* Stop updating the status bar */
StatsTimer->Enabled=false;
......@@ -1055,19 +1072,25 @@ void __fastcall TMainForm::FormClose(TObject *Sender, TCloseAction &Action)
|| (FtpStop->Enabled && !ftpServiceEnabled())
|| (WebStop->Enabled && !webServiceEnabled())
|| (ServicesStop->Enabled && !servicesServiceEnabled())) {
if(time(NULL)-start>30)
break;
if(time(NULL)-start>30) {
if(Application->MessageBox("Abort wait for servers to terminate?"
,"Synchronet Server Still Running", MB_OKCANCEL) == IDOK)
break;
start = time(NULL);
}
Application->ProcessMessages();
YIELD();
}
StatusBar->Panels->Items[STATUSBAR_LAST_PANEL]->Text="Closing...";
Application->ProcessMessages();
mqtt_terminating(&bbs_startup.mqtt);
LogTimer->Enabled=false;
ServiceStatusTimer->Enabled=false;
NodeForm->Timer->Enabled=false;
ClientForm->Timer->Enabled=false;
mqtt_shutdown(&bbs_startup.mqtt);
}
//---------------------------------------------------------------------------
void __fastcall TMainForm::FormCloseQuery(TObject *Sender, bool &CanClose)
......@@ -1947,7 +1970,21 @@ void __fastcall TMainForm::StartupTimerTick(TObject *Sender)
StartupTimer->Interval = 2500; // Let 'em see the logo for a bit
StartupTimer->Enabled = true;
} else {
if(!bbsServiceEnabled()) {
mqtt_startup(&bbs_startup.mqtt, &cfg, SERVER_TERM, ver()
,/* lputs: */NULL
,/* shared_client_list: */TRUE);
ftp_startup.mqtt = bbs_startup.mqtt;
ftp_startup.mqtt.server_type = SERVER_FTP;
web_startup.mqtt = bbs_startup.mqtt;
web_startup.mqtt.server_type = SERVER_WEB;
mail_startup.mqtt = bbs_startup.mqtt;
mail_startup.mqtt.server_type = SERVER_MAIL;
services_startup.mqtt = bbs_startup.mqtt;
services_startup.mqtt.server_type = SERVER_SERVICES;
}
DisplayMainPanels(Sender);
mqtt_online(&bbs_startup.mqtt);
}
Initialized=true;
}
......
......@@ -142,8 +142,8 @@ static int lprintf(int level, const char *fmt, ...)
if(level <= LOG_ERR) {
char errmsg[sizeof(sbuf)+16];
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, sbuf);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf);
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
......@@ -196,8 +196,11 @@ static char* server_host_name(void)
static void set_state(enum server_state state)
{
if(startup != NULL && startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state(&startup->mqtt, state);
}
}
static void update_clients(void)
......@@ -206,7 +209,7 @@ static void update_clients(void)
uint32_t count = protected_uint32_value(active_clients);
if(startup->clients != NULL)
startup->clients(startup->cbdata, count);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", count);
mqtt_client_count(&startup->mqtt, TOPIC_SERVER, count);
}
}
......@@ -230,7 +233,7 @@ static void thread_up(BOOL setuid)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,TRUE, setuid);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", protected_uint32_value(thread_count));
mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(thread_count));
}
}
......@@ -240,7 +243,7 @@ static int32_t thread_down(void)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,FALSE, FALSE);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", count);
mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, count);
}
return count;
}
......@@ -4952,7 +4955,7 @@ void ftp_server(void* arg)
}
set_state(SERVER_INIT);
mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", ftp_ver());
mqtt_server_version(&startup->mqtt, ftp_ver());
uptime=0;
served=0;
......@@ -5093,7 +5096,7 @@ void ftp_server(void* arg)
set_state(SERVER_READY);
lprintf(LOG_INFO,"FTP Server thread started");
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", startup->max_clients);
mqtt_client_max(&startup->mqtt, startup->max_clients);
while(ftp_set!=NULL && !terminate_server) {
YIELD();
......
......@@ -26,7 +26,7 @@
typedef struct {
DWORD size; /* sizeof(ftp_startup_t) */
STARTUP_COMMON_ELEMENTS
WORD port;
WORD max_clients;
#define FTP_DEFAULT_MAX_CLIENTS 10
......@@ -34,7 +34,6 @@ typedef struct {
#define FTP_DEFAULT_MAX_INACTIVITY 300
WORD qwk_timeout;
#define FTP_DEFAULT_QWK_TIMEOUT 600
WORD sem_chk_freq; /* semaphore file checking frequency (in seconds) */
struct in_addr outgoing4;
struct in6_addr outgoing6;
str_list_t interfaces;
......@@ -42,47 +41,10 @@ typedef struct {
struct in6_addr pasv_ip6_addr;
WORD pasv_port_low;
WORD pasv_port_high;
DWORD options; /* See FTP_OPT definitions */
int64_t min_fsize; /* Minimum file size accepted for upload */
int64_t max_fsize; /* Maximum file size accepted for upload (0=unlimited) */
void* cbdata; /* Private data passed to callbacks */
/* Callbacks (NULL if unused) */
int (*lputs)(void*, int level, const char* msg);
void (*errormsg)(void*, int level, const char* msg);
void (*set_state)(void*, enum server_state);
void (*recycle)(void*);
void (*terminated)(void*, int code);
void (*clients)(void*, int active);
void (*thread_up)(void*, BOOL up, BOOL setuid);
void (*socket_open)(void*, BOOL open);
void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update);
BOOL (*seteuid)(BOOL user);
BOOL (*setuid)(BOOL force);
/* Paths */
char ctrl_dir[INI_MAX_VALUE_LEN];
char index_file_name[64];
char temp_dir[INI_MAX_VALUE_LEN];
char ini_fname[INI_MAX_VALUE_LEN];
/* Misc */
char host_name[128];
BOOL recycle_now;
BOOL shutdown_now;
int log_level;
uint bind_retry_count; /* Number of times to retry bind() calls */
uint bind_retry_delay; /* Time to wait between each bind() retry */
struct startup_sound_settings sound;
/* Login Attempt parameters */
struct login_attempt_settings login_attempt;
link_list_t* login_attempt_list;
uint max_concurrent_connections;
struct mqtt mqtt;
char index_file_name[64];
} ftp_startup_t;
......
......@@ -150,9 +150,7 @@ extern "C" int errorlog(scfg_t* cfg, struct mqtt* mqtt, int level, const char* h
SAFEPRINTF2(subject, "%s %sERROR occurred", host, level <= LOG_CRIT ? "CRITICAL " : "");
notify(cfg, cfg->node_erruser, subject, text);
}
if(mqtt != NULL) {
mqtt_pub_strval(mqtt, TOPIC_HOST, "error", text);
}
mqtt_errormsg(mqtt, level, text);
return 0;
}
......
......@@ -191,8 +191,8 @@ static int lprintf(int level, const char *fmt, ...)
if(level <= LOG_ERR) {
char errmsg[sizeof(sbuf)+16];
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, sbuf), stats.errors++;
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, sbuf);
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name,errmsg), stats.errors++;
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
......@@ -248,8 +248,11 @@ static char* server_host_name(void)
static void set_state(enum server_state state)
{
if(startup != NULL && startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state(&startup->mqtt, state);
}
}
static void update_clients(void)
......@@ -257,7 +260,7 @@ static void update_clients(void)
if(startup != NULL) {
if(startup->clients != NULL)
startup->clients(startup->cbdata,protected_uint32_value(active_clients)+active_sendmail);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", protected_uint32_value(active_clients));
mqtt_client_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(active_clients));
}
}
......@@ -281,7 +284,7 @@ static void thread_up(BOOL setuid)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,TRUE,setuid);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", protected_uint32_value(thread_count));
mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(thread_count));
}
}
......@@ -291,7 +294,7 @@ static int32_t thread_down(void)
if(startup != NULL) {
if(startup->thread_up != NULL)
startup->thread_up(startup->cbdata,FALSE,FALSE);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "thread_count", count);
mqtt_thread_count(&startup->mqtt, TOPIC_SERVER, count);
}
return count;
}
......@@ -310,7 +313,7 @@ void mail_open_socket(SOCKET sock, void* cb_protocol)
stats.sockets++;
if(startup != NULL)
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "socket_count", stats.sockets);
mqtt_socket_count(&startup->mqtt, TOPIC_SERVER, stats.sockets);
}
void mail_close_socket_cb(SOCKET sock, void* cb_protocol)
......@@ -319,7 +322,7 @@ void mail_close_socket_cb(SOCKET sock, void* cb_protocol)
startup->socket_open(startup->cbdata,FALSE);
stats.sockets--;
if(startup != NULL)
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "socket_count", stats.sockets);
mqtt_socket_count(&startup->mqtt, TOPIC_SERVER, stats.sockets);
}
int mail_close_socket(SOCKET *sock, int *sess)
......@@ -339,7 +342,7 @@ int mail_close_socket(SOCKET *sock, int *sess)
startup->socket_open(startup->cbdata,FALSE);
stats.sockets--;
if(startup != NULL)
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "socket_count", stats.sockets);
mqtt_socket_count(&startup->mqtt, TOPIC_SERVER, stats.sockets);
if(result!=0) {
if(ERROR_VALUE!=ENOTSOCK)
lprintf(LOG_WARNING,"%04d !ERROR %d closing socket",*sock, ERROR_VALUE);
......@@ -6052,7 +6055,7 @@ void mail_server(void* arg)
set_state(SERVER_INIT);
mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", mail_ver());
mqtt_server_version(&startup->mqtt, mail_ver());
ZERO_VAR(js_server_props);
SAFEPRINTF3(js_server_props.version,"%s %s%c",server_name, VERSION, REVISION);
......@@ -6273,7 +6276,7 @@ void mail_server(void* arg)
set_state(SERVER_READY);
lprintf(LOG_INFO,"Mail Server thread started");
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", startup->max_clients);
mqtt_client_max(&startup->mqtt, startup->max_clients);
while(!terminated && !terminate_server) {
YIELD();
......
......@@ -27,7 +27,7 @@
typedef struct {
DWORD size; /* sizeof(mail_startup_t) */
STARTUP_COMMON_ELEMENTS
WORD smtp_port;
WORD pop3_port;
WORD pop3s_port;
......@@ -46,37 +46,16 @@ typedef struct {
#define MAIL_DEFAULT_LINES_PER_YIELD 10
WORD max_recipients;
#define MAIL_DEFAULT_MAX_RECIPIENTS 100
WORD sem_chk_freq; /* semaphore file checking frequency (in seconds) */
struct in_addr outgoing4;
struct in6_addr outgoing6;
str_list_t interfaces;
str_list_t pop3_interfaces;
DWORD options; /* See MAIL_OPT definitions */
DWORD max_msg_size; /* Max msg size in bytes (0=unlimited) */
#define MAIL_DEFAULT_MAX_MSG_SIZE (20*1024*1024) /* 20MB */
DWORD max_msgs_waiting; /* Max msgs in user's inbox (0=unlimited) */
#define MAIL_DEFAULT_MAX_MSGS_WAITING 100
DWORD connect_timeout; /* in seconds, for non-blocking connect (0=blocking socket) */
#define MAIL_DEFAULT_CONNECT_TIMEOUT 30 /* seconds */
void* cbdata; /* Private data passed to callbacks */
/* Callbacks (NULL if unused) */
int (*lputs)(void*, int level, const char* msg);
void (*errormsg)(void*, int level, const char* msg);
void (*set_state)(void*, enum server_state);
void (*recycle)(void*);
void (*terminated)(void*, int code);
void (*clients)(void*, int active);
void (*thread_up)(void*, BOOL up, BOOL setuid);
void (*socket_open)(void*, BOOL open);
void (*client_on)(void*, BOOL on, int sock, client_t*, BOOL update);
BOOL (*seteuid)(BOOL user);
BOOL (*setuid)(BOOL force);
/* Paths */
char ctrl_dir[INI_MAX_VALUE_LEN];
char temp_dir[INI_MAX_VALUE_LEN];
char ini_fname[INI_MAX_VALUE_LEN];
/* Strings */
char dns_server[128];
......@@ -88,13 +67,7 @@ typedef struct {
char pop3_sound[INI_MAX_VALUE_LEN];
/* Misc */
char host_name[128];
BOOL recycle_now;
BOOL shutdown_now;
int log_level;
int tls_error_level; /* Cap the severity of TLS error log messages */
uint bind_retry_count; /* Number of times to retry bind() calls */
uint bind_retry_delay; /* Time to wait between each bind() retry */
/* Relay Server */
char relay_server[128];
......@@ -105,13 +78,7 @@ typedef struct {
/* JavaScript operating parameters */
js_startup_t js;
struct startup_sound_settings sound;
/* Login Attempt parameters */
struct login_attempt_settings login_attempt;
link_list_t* login_attempt_list;
uint max_concurrent_connections;
struct mqtt mqtt;
} mail_startup_t;
......
......@@ -160,8 +160,11 @@ static bbs_startup_t* startup=NULL;
static void set_state(enum server_state state)
{
if(startup != NULL && startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
if(startup != NULL) {
if(startup->set_state != NULL)
startup->set_state(startup->cbdata, state);
mqtt_server_state(&startup->mqtt, state);
}
}
static void update_clients()
......@@ -169,7 +172,7 @@ static void update_clients()
if(startup != NULL) {
if(startup->clients != NULL)
startup->clients(startup->cbdata,protected_uint32_value(node_threads_running));
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "client_count", protected_uint32_value(node_threads_running));
mqtt_client_count(&startup->mqtt, TOPIC_SERVER, protected_uint32_value(node_threads_running));
}
}
......@@ -206,8 +209,8 @@ int lputs(int level, const char* str)
mqtt_lputs(&startup->mqtt, TOPIC_SERVER, level, str);
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, str);
SAFEPRINTF2(errmsg, "%s %s", server_abbrev, str);
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata,level,errmsg);
}
......@@ -233,8 +236,8 @@ int eputs(int level, const char *str)
if(level <= LOG_ERR) {
char errmsg[1024];
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, str);
SAFEPRINTF(errmsg, "evnt %s", str);
errorlog(&scfg, &startup->mqtt, level, startup==NULL ? NULL:startup->host_name, errmsg);
if(startup!=NULL && startup->errormsg!=NULL)
startup->errormsg(startup->cbdata, level, errmsg);
}
......@@ -4834,6 +4837,7 @@ void bbs_thread(void* arg)
}
if(startup->size!=sizeof(bbs_startup_t)) { // verify size
int sz = sizeof(bbs_startup_t);
sbbs_beep(100,500);
sbbs_beep(300,500);
sbbs_beep(100,500);
......@@ -4937,7 +4941,7 @@ void bbs_thread(void* arg)
cleanup(1);
return;
}
mqtt_pub_strval(&startup->mqtt, TOPIC_SERVER, "version", bbs_ver());
mqtt_server_version(&startup->mqtt, bbs_ver());
t=time(NULL);
lprintf(LOG_INFO,"Initializing on %.24s with options: %x"
......@@ -5229,7 +5233,7 @@ NO_SSH:
set_state(SERVER_READY);
lprintf(LOG_INFO,"Terminal Server thread started for nodes %d through %d", first_node, last_node);
mqtt_pub_uintval(&startup->mqtt, TOPIC_SERVER, "max_clients", (last_node - first_node) + 1);
mqtt_client_max(&startup->mqtt, (last_node - first_node) + 1);
while(!terminate_server) {
YIELD();
......
......@@ -22,9 +22,24 @@
#include <string.h>
#include "mqtt.h"
#include "startup.h"
int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, const char* host, const char* server)
const char* server_type_desc(enum server_type type)
{
switch(type) {
case SERVER_TERM: return "term";
case SERVER_FTP: return "ftp";
case SERVER_WEB: return "web";
case SERVER_MAIL: return "mail";
case SERVER_SERVICES: return "srvc";
}
return "???";
}
int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, enum server_type type)
{
char hostname[256]="undefined-hostname";
if(mqtt == NULL || cfg == NULL)
return MQTT_FAILURE;
if(!cfg->mqtt.enabled)
......@@ -32,8 +47,14 @@ int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, const char* host, const char* serv
if(mqtt != NULL) {
memset(mqtt, 0, sizeof(*mqtt));
mqtt->cfg = cfg;
mqtt->host = host;
mqtt->server = server;
mqtt->server_type = type;
listInit(&mqtt->client_list, LINK_LIST_MUTEX);
#ifdef _WIN32
WSADATA WSAData;
WSAStartup(MAKEWORD(1,1), &WSAData);
#endif
gethostname(hostname, sizeof(hostname));
mqtt->host = strdup(hostname);
#ifdef USE_MOSQUITTO
return mosquitto_lib_init();
#endif
......@@ -54,7 +75,7 @@ static char* format_topic(struct mqtt* mqtt, enum topic_depth depth, char* str,
safe_snprintf(str, size, "sbbs/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
break;
case TOPIC_SERVER:
safe_snprintf(str, size, "sbbs/%s/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, mqtt->server, sbuf);
safe_snprintf(str, size, "sbbs/%s/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(mqtt->server_type), sbuf);
break;
case TOPIC_EVENT:
safe_snprintf(str, size, "sbbs/%s/%s/event/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
......@@ -277,8 +298,10 @@ void mqtt_close(struct mqtt* mqtt)
if(mqtt->handle != NULL) {
mosquitto_destroy(mqtt->handle);
mqtt->handle = NULL;
listFree(&mqtt->client_list);
}
#endif