Newer
Older
/* Synchronet MQTT (publish/subscribe messaging protocol) functions */
/****************************************************************************
* @format.tab-size 4 (Plain Text/Source Code File Header) *
* @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) *
* *
* Copyright Rob Swindell - http://www.synchro.net/copyright.html *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* See the GNU General Public License for more details: gpl.txt or *
* http://www.fsf.org/copyleft/gpl.html *
* *
* For Synchronet coding style and modification guidelines, see *
* http://www.synchro.net/source.html *
* *
* Note: If this box doesn't appear square, then you need to fix your tabs. *
****************************************************************************/
#include <string.h>
#include "mqtt.h"
#include "startup.h"
#include "xpdatetime.h"
#include "date_str.h"
const char* server_type_desc(enum server_type type)
switch(type) {
case SERVER_TERM: return "term";
case SERVER_FTP: return "ftp";
case SERVER_WEB: return "web";
case SERVER_MAIL: return "mail";
case SERVER_SERVICES: return "srvc";
case SERVER_COUNT:
default:
return "???";
const char* server_state_desc(enum server_state state)
{
switch(state) {
case SERVER_STOPPED: return "stopped";
case SERVER_INIT: return "initializing";
case SERVER_READY: return "ready";
case SERVER_RELOADING: return "reloading";
case SERVER_STOPPING: return "stopping";
default: return "???";
}
}
const char* log_level_desc(int level)
{
switch(level) {
case LOG_DEBUG: return "debug";
case LOG_INFO: return "info";
case LOG_NOTICE: return "notice";
case LOG_WARNING: return "warn";
case LOG_ERR: return "error";
default:
if(level <= LOG_CRIT)
return "critical";
break;
}
return "????";
}
int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup)
{
char hostname[256]="undefined-hostname";
if(mqtt == NULL || cfg == NULL || startup == NULL)
return MQTT_FAILURE;
if(!cfg->mqtt.enabled)
return MQTT_SUCCESS;
mqtt->handle = NULL;
mqtt->cfg = cfg;
mqtt->startup = startup;
listInit(&mqtt->client_list, LINK_LIST_MUTEX);
WSADATA WSAData;
WSAStartup(MAKEWORD(1,1), &WSAData);
#ifdef USE_MOSQUITTO
#endif
return MQTT_FAILURE;
}
static char* format_topic(struct mqtt* mqtt, enum server_type type, enum topic_depth depth, char* str, size_t size, const char* sbuf)
{
switch(depth) {
case TOPIC_ROOT:
safe_snprintf(str, size, "sbbs/%s", sbuf);
break;
case TOPIC_BBS_LEVEL:
safe_snprintf(str, size, "sbbs/%s", mqtt->cfg->sys_id);
break;
case TOPIC_BBS:
safe_snprintf(str, size, "sbbs/%s/%s", mqtt->cfg->sys_id, sbuf);
case TOPIC_BBS_ACTION:
safe_snprintf(str, size, "sbbs/%s/action/%s", mqtt->cfg->sys_id, sbuf);
break;
case TOPIC_HOST_LEVEL:
safe_snprintf(str, size, "sbbs/%s/host/%s", mqtt->cfg->sys_id, mqtt->host);
case TOPIC_HOST:
safe_snprintf(str, size, "sbbs/%s/host/%s/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
case TOPIC_HOST_EVENT:
safe_snprintf(str, size, "sbbs/%s/host/%s/event/%s", mqtt->cfg->sys_id, mqtt->host, sbuf);
case TOPIC_SERVER:
safe_snprintf(str, size, "sbbs/%s/host/%s/server/%s/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(type), sbuf);
safe_snprintf(str, size, "sbbs/%s/host/%s/server/%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(type));
default:
safe_snprintf(str, size, "%s", sbuf);
break;
}
return str;
}
char* mqtt_topic(struct mqtt* mqtt, enum topic_depth depth, char* str, size_t size, const char* fmt, ...)
{
va_list argptr;
if(fmt != NULL) {
va_start(argptr, fmt);
vsnprintf(sbuf, sizeof(sbuf), fmt, argptr);
sbuf[sizeof(sbuf) - 1]=0;
va_end(argptr);
}
return format_topic(mqtt, mqtt->startup->type, depth, str, size, sbuf);
static int mqtt_sub(struct mqtt* mqtt, const char* topic)
return MQTT_SUCCESS;
#ifdef USE_MOSQUITTO
if(mqtt->handle != NULL && topic != NULL) {
return mosquitto_subscribe(mqtt->handle, /* msg-id: */NULL, topic, mqtt->cfg->mqtt.subscribe_qos);
}
#endif
return MQTT_FAILURE;
}
int mqtt_subscribe(struct mqtt* mqtt, enum topic_depth depth, char* str, size_t size, const char* fmt, ...)
{
va_list argptr;
char sbuf[1024];
va_start(argptr, fmt);
vsnprintf(sbuf, sizeof(sbuf), fmt, argptr);
sbuf[sizeof(sbuf) - 1]=0;
va_end(argptr);
format_topic(mqtt, mqtt->startup->type, depth, str, size, sbuf);
int mqtt_lputs(struct mqtt* mqtt, enum topic_depth depth, int level, const char* str)
return MQTT_SUCCESS;
#ifdef USE_MOSQUITTO
char sub[128];
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
if(mqtt->cfg->mqtt.protocol_version < 5) {
mqtt_topic(mqtt, depth, sub, sizeof(sub), "log/%d", level);
result = mosquitto_publish_v5(mqtt->handle,
/* mid: */NULL,
/* topic: */sub,
/* payloadlen */strlen(str),
/* payload */str,
/* qos */mqtt->cfg->mqtt.publish_qos,
/* retain */true,
/* properties */NULL);
} else {
mqtt_topic(mqtt, depth, sub, sizeof(sub), "log");
char lvl[32];
sprintf(lvl, "%d", level);
mosquitto_property* props = NULL;
mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY, "level", lvl);
result = mosquitto_publish_v5(mqtt->handle,
/* mid: */NULL,
/* topic: */sub,
/* payloadlen */strlen(str),
/* payload */str,
/* qos */mqtt->cfg->mqtt.publish_qos,
/* retain */true,
/* properties */props);
mosquitto_property_free_all(&props);
}
return result;
}
#endif
return MQTT_FAILURE;
}
int mqtt_pub_noval(struct mqtt* mqtt, enum topic_depth depth, const char* key)
return mqtt_pub_strval(mqtt, depth, key, NULL);
int mqtt_pub_strval(struct mqtt* mqtt, enum topic_depth depth, const char* key, const char* str)
return MQTT_SUCCESS;
#ifdef USE_MOSQUITTO
char sub[128];
mqtt_topic(mqtt, depth, sub, sizeof(sub), "%s", key);
return mosquitto_publish_v5(mqtt->handle,
/* mid: */NULL,
/* topic: */sub,
/* payloadlen */(str == NULL) ? 0 : strlen(str),
/* payload */str,
/* retain */true,
/* properties */NULL);
}
#endif
return MQTT_FAILURE;
}
int mqtt_pub_timestamped_msg(struct mqtt* mqtt, enum topic_depth depth, const char* key, time_t t, const char* msg)
{
char timestamp[64];
char str[1024];
if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled)
return MQTT_SUCCESS;
time_to_isoDateTimeStr(t, xpTimeZone_local(), timestamp, sizeof(timestamp));
SAFEPRINTF2(str, "%s\t%s", timestamp, msg);
return mqtt_pub_strval(mqtt, depth, key, str);
}
int mqtt_pub_uintval(struct mqtt* mqtt, enum topic_depth depth, const char* key, ulong value)
return MQTT_SUCCESS;
#ifdef USE_MOSQUITTO
char str[128];
sprintf(str, "%lu", value);
char sub[128];
mqtt_topic(mqtt, depth, sub, sizeof(sub), "%s", key);
return mosquitto_publish_v5(mqtt->handle,
/* mid: */NULL,
/* topic: */sub,
/* payloadlen */strlen(str),
/* payload */str,
/* retain */true,
/* properties */NULL);
}
#endif
return MQTT_FAILURE;
}
int mqtt_pub_message(struct mqtt* mqtt, enum topic_depth depth, const char* key, const void* buf, size_t len, BOOL retain)
return MQTT_SUCCESS;
#ifdef USE_MOSQUITTO
char sub[128];
mqtt_topic(mqtt, depth, sub, sizeof(sub), "%s", key);
return mosquitto_publish_v5(mqtt->handle,
/* mid: */NULL,
/* topic: */sub,
/* payloadlen */len,
/* payload */buf,
/* retain */retain,
/* properties */NULL);
}
#endif
return MQTT_FAILURE;
}
char* mqtt_libver(char* str, size_t size)
{
#ifdef USE_MOSQUITTO
int major, minor, revision;
mosquitto_lib_version(&major, &minor, &revision);
safe_snprintf(str, size, "mosquitto %d.%d.%d", major, minor, revision);
return str;
#else
return NULL;
#endif
}
return MQTT_FAILURE;
snprintf(client_id, sizeof(client_id), "sbbs-%s-%s-%s", mqtt->cfg->sys_id, mqtt->host, server_type_desc(mqtt->startup->type));
#ifdef USE_MOSQUITTO
mqtt->handle = mosquitto_new(client_id, /* clean_session: */true, /* userdata: */mqtt);
return mqtt->handle == NULL ? MQTT_FAILURE : MQTT_SUCCESS;
#else
return MQTT_FAILURE;
#endif
}
{
#ifdef USE_MOSQUITTO
if(mqtt->handle != NULL) {
mosquitto_destroy(mqtt->handle);
mqtt->handle = NULL;
listFree(&mqtt->client_list);
static int pw_callback(char* buf, int size, int rwflag, void* userdata)
{
struct mqtt* mqtt = (struct mqtt*)userdata;
strncpy(buf, mqtt->cfg->mqtt.tls.keypass, size);
return strlen(mqtt->cfg->mqtt.tls.keypass);
int mqtt_connect(struct mqtt* mqtt, const char* bind_address)
{
if(mqtt == NULL || mqtt->handle == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE;
#ifdef USE_MOSQUITTO
char* username = mqtt->cfg->mqtt.username;
char* password = mqtt->cfg->mqtt.password;
if(*username == '\0')
username = NULL;
if(*password == '\0')
password = NULL;
mosquitto_int_option(mqtt->handle, MOSQ_OPT_PROTOCOL_VERSION, mqtt->cfg->mqtt.protocol_version);
mosquitto_username_pw_set(mqtt->handle, username, password);
char value[128];
SAFECOPY(value, "DISCONNECTED");
,mqtt_topic(mqtt, TOPIC_SERVER_LEVEL, topic, sizeof(topic), NULL)
,strlen(value), value, /* QOS: */2, /* retain: */true);
if(mqtt->cfg->mqtt.tls.mode == MQTT_TLS_CERT) {
if(mqtt->cfg->mqtt.tls.certfile[0] && mqtt->cfg->mqtt.tls.keyfile[0]) {
certfile = mqtt->cfg->mqtt.tls.certfile;
keyfile = mqtt->cfg->mqtt.tls.keyfile;
int result = mosquitto_tls_set(mqtt->handle,
mqtt->cfg->mqtt.tls.cafile,
NULL, // capath
certfile,
keyfile,
pw_callback);
if(result != MOSQ_ERR_SUCCESS)
return result;
}
else if(mqtt->cfg->mqtt.tls.mode == MQTT_TLS_PSK) {
int result = mosquitto_tls_psk_set(mqtt->handle,
mqtt->cfg->mqtt.tls.psk,
mqtt->cfg->mqtt.tls.identity,
NULL // ciphers (default)
);
if(result != MOSQ_ERR_SUCCESS)
return result;
}
return mosquitto_connect_bind(mqtt->handle,
mqtt->cfg->mqtt.broker_addr,
mqtt->cfg->mqtt.broker_port,
mqtt->cfg->mqtt.keepalive,
bind_address);
#else
return MQTT_FAILURE;
#endif
}
return MQTT_FAILURE;
#ifdef USE_MOSQUITTO
#else
return MQTT_FAILURE;
#endif
}
return MQTT_FAILURE;
#ifdef USE_MOSQUITTO
#else
return MQTT_FAILURE;
#endif
}
return MQTT_FAILURE;
#ifdef USE_MOSQUITTO
return mosquitto_loop_stop(mqtt->handle, /* force: */false);
#else
return MQTT_FAILURE;
#endif
}
static int lprintf(int (*lputs)(int level, const char* str), int level, const char *fmt, ...)
{
va_list argptr;
char sbuf[1024];
if(lputs == NULL)
return -1;
va_start(argptr,fmt);
vsnprintf(sbuf,sizeof(sbuf),fmt,argptr);
sbuf[sizeof(sbuf)-1]=0;
va_end(argptr);
static void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const struct mosquitto_message* msg)
if(mqtt->startup->type == SERVER_TERM) {
bbs_startup_t* bbs_startup = (bbs_startup_t*)mqtt->startup;
for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) {
mqtt_topic(mqtt, TOPIC_BBS, topic, sizeof(topic), "node/%d/input", i);
if(strcmp(msg->topic, topic) != 0)
continue;
if(bbs_startup->node_inbuf != NULL && bbs_startup->node_inbuf[i - 1] != NULL)
RingBufWrite(bbs_startup->node_inbuf[i - 1], msg->payload, msg->payloadlen);
return;
}
}
if(strcmp(msg->topic, mqtt_topic(mqtt, TOPIC_HOST, topic, sizeof(topic), "recycle")) == 0
|| strcmp(msg->topic, mqtt_topic(mqtt, TOPIC_SERVER, topic, sizeof(topic), "recycle")) == 0) {
mqtt->startup->recycle_now = true;
return;
int mqtt_startup(struct mqtt* mqtt, scfg_t* cfg, struct startup* startup, const char* version
,int (*lputs)(int level, const char* str))
{
int result = MQTT_FAILURE;
char str[128];
if(mqtt == NULL || cfg == NULL || version == NULL)
return MQTT_FAILURE;
if(!cfg->mqtt.enabled)
return MQTT_SUCCESS;
if(result != MQTT_SUCCESS) {
lprintf(lputs, LOG_ERR, "MQTT init failure: %d", result);
lprintf(lputs, LOG_DEBUG, "MQTT lib: %s", mqtt_libver(str, sizeof(str)));
result = mqtt_open(mqtt);
if(result != MQTT_SUCCESS) {
lprintf(lputs, LOG_ERR, "MQTT open failure: %d", result);
if(result != MQTT_SUCCESS) {
lprintf(lputs, LOG_ERR, "Error %d starting pub/sub thread", result);
lprintf(lputs, LOG_DEBUG, "MQTT connecting to broker %s:%u", cfg->mqtt.broker_addr, cfg->mqtt.broker_port);
result = mqtt_connect(mqtt, /* bind_address: */NULL);
if(result == MQTT_SUCCESS) {
lprintf(lputs, LOG_INFO, "MQTT broker-connect (%s:%d) successful", cfg->mqtt.broker_addr, cfg->mqtt.broker_port);
lprintf(lputs, LOG_ERR, "MQTT broker-connect (%s:%d) failure: %d", cfg->mqtt.broker_addr, cfg->mqtt.broker_port, result);
}
}
}
}
mqtt_pub_strval(mqtt, TOPIC_BBS_LEVEL, NULL, mqtt->cfg->sys_name);
mqtt_pub_strval(mqtt, TOPIC_HOST_LEVEL, NULL, startup->host_name);
mqtt_pub_strval(mqtt, TOPIC_SERVER, "version", version);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "served", mqtt->served);
if(mqtt->handle != NULL)
mosquitto_message_callback_set(mqtt->handle, mqtt_message_received);
bbs_startup_t* bbs_startup = (bbs_startup_t*)startup;
char str[128];
for(int i = bbs_startup->first_node; i <= bbs_startup->last_node; i++) {
mqtt_subscribe(mqtt, TOPIC_BBS, str, sizeof(str), "node/%d/input", i);
mqtt_pub_noval(mqtt, TOPIC_SERVER, "recycle");
mqtt_pub_noval(mqtt, TOPIC_SERVER, "client");
mqtt_subscribe(mqtt, TOPIC_SERVER, str, sizeof(str), "recycle");
mqtt_subscribe(mqtt, TOPIC_HOST, str, sizeof(str), "recycle");
return result;
}
int mqtt_server_state(struct mqtt* mqtt, enum server_state state)
if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE;
if(mqtt->cfg->mqtt.verbose) {
char errors[64] = "";
if(mqtt->error_count)
snprintf(errors, sizeof(errors), "%lu error%s", mqtt->error_count, mqtt->error_count > 1 ? "s" : "");
char served[64] = "";
if(mqtt->served)
snprintf(served, sizeof(served), "%lu served", mqtt->served);
char max_clients[64] = "";
if(mqtt->max_clients)
snprintf(max_clients, sizeof(max_clients), "/%lu", mqtt->max_clients);
char clients[64] = "";
if(mqtt->client_list.count)
snprintf(clients, sizeof(clients), "%lu%s clients", mqtt->client_list.count, max_clients);
snprintf(str, sizeof(str), "%s\t%s\t%s\t%s"
,server_state_desc(state)
,clients
,served
,errors);
} else
SAFECOPY(str, server_state_desc(state));
int result = mqtt_pub_strval(mqtt, TOPIC_SERVER_LEVEL, NULL, str);
if(mqtt->server_state != state) {
char topic[128];
snprintf(topic, sizeof(topic), "state/%s", server_state_desc(state));
result = mqtt_pub_timestamped_msg(mqtt, TOPIC_SERVER, topic, time(NULL), server_state_desc(mqtt->server_state));
mqtt->server_state = state;
}
return result;
int mqtt_errormsg(struct mqtt* mqtt, int level, const char* msg)
char topic[128];
time_t t = time(NULL);
if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE;
++mqtt->error_count;
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "error_count", mqtt->error_count);
if(mqtt->cfg->mqtt.verbose)
mqtt_server_state(mqtt, mqtt->server_state);
snprintf(topic, sizeof(topic), "error/%s", log_level_desc(level));
return mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, t, msg);
int mqtt_client_max(struct mqtt* mqtt, ulong count)
if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE;
mqtt->max_clients = count;
return mqtt_client_count(mqtt);
}
static void format_client_info(char* str, size_t size, int sock, client_t* client, time_t t)
{
char timestamp[32];
snprintf(str, size, "%s\t%s\t%d\t%s\t%s\t%s\t%hu\t%d"
,time_to_isoDateTimeStr(t, xpTimeZone_local(), timestamp, sizeof(timestamp))
,client->protocol
,client->usernum
,client->user
,client->addr
,client->host
,client->port
,sock
);
int mqtt_client_on(struct mqtt* mqtt, BOOL on, int sock, client_t* client, BOOL update)
#define MAX_CLIENT_STRLEN 512
char str[MAX_CLIENT_STRLEN + 1];
return MQTT_FAILURE;
return MQTT_SUCCESS;
if(on) {
if(update) {
list_node_t* node;
if((node=listFindTaggedNode(&mqtt->client_list, sock)) != NULL) {
memcpy(node->data, client, sizeof(client_t));
format_client_info(str, sizeof(str), sock, client, time(NULL));
mqtt_pub_strval(mqtt, TOPIC_SERVER, "client/action/update", str);
}
listAddNodeData(&mqtt->client_list, client, sizeof(client_t), sock, LAST_NODE);
format_client_info(str, sizeof(str), sock, client, client->time);
mqtt_pub_strval(mqtt, TOPIC_SERVER, "client/action/connect", str);
client = listRemoveTaggedNode(&mqtt->client_list, sock, /* free_data: */FALSE);
if(client != NULL) {
format_client_info(str, sizeof(str), sock, client, time(NULL));
mqtt_pub_strval(mqtt, TOPIC_SERVER, "client/action/disconnect", str);
FREE_AND_NULL(client);
}
str_list_t list = strListInit();
for(list_node_t* node = mqtt->client_list.first; node != NULL; node = node->next) {
client_t* client = node->data;
format_client_info(str, sizeof(str), node->tag, client, client->time);
strListPush(&list, str);
client_count++;
}
char* buf = NULL;
if(client_count > 0) {
size_t buflen = client_count * MAX_CLIENT_STRLEN * 2;
buf = malloc(buflen);
strListJoin(list, buf, buflen, "\n");
}
strListFree(&list);
mqtt_pub_uintval(mqtt, TOPIC_SERVER, "served", mqtt->served);
int result = mqtt_pub_strval(mqtt, TOPIC_SERVER, "client/list", buf);
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
static int mqtt_user_login(struct mqtt* mqtt, client_t* client)
{
char str[128];
char topic[128];
if(mqtt == NULL || mqtt->cfg == NULL || client == NULL)
return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled)
return MQTT_SUCCESS;
snprintf(topic, sizeof(topic), "login/%s", client->protocol);
snprintf(str, sizeof(str), "%u\t%s\t%s\t%s"
,client->usernum
,client->user
,client->addr
,client->host
);
return mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, time(NULL), str);
}
static int mqtt_user_logout(struct mqtt* mqtt, client_t* client, time_t logintime)
{
char str[128];
char tmp[128];
char topic[128];
if(mqtt == NULL || mqtt->cfg == NULL || client == NULL)
return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled)
return MQTT_SUCCESS;
long tused = (long)(time(NULL) - logintime);
if(tused < 0)
tused = 0;
snprintf(topic, sizeof(topic), "logout/%s", client->protocol);
snprintf(str, sizeof(str), "%u\t%s\t%s\t%s\t%s"
,client->usernum
,client->user
,client->addr
,client->host
,sectostr(tused, tmp)
);
return mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, time(NULL), str);
}
int mqtt_client_count(struct mqtt* mqtt)
{
if(mqtt == NULL || mqtt->cfg == NULL)
return MQTT_FAILURE;
if(mqtt->cfg->mqtt.verbose)
mqtt_server_state(mqtt, mqtt->server_state);
if(mqtt->max_clients)
snprintf(str, sizeof(str), "%ld total\t%ld max", mqtt->client_list.count, mqtt->max_clients);
else
snprintf(str, sizeof(str), "%ld total", mqtt->client_list.count);
return mqtt_pub_strval(mqtt, TOPIC_SERVER, "client", str);
if(mqtt != NULL && mqtt->cfg != NULL && mqtt->cfg->mqtt.enabled) {
mqtt_disconnect(mqtt);
mqtt_thread_stop(mqtt);
mqtt_close(mqtt);
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
static int mqtt_file_xfer(struct mqtt* mqtt, user_t* user, file_t* f, off_t bytes, client_t* client, const char* xfer)
{
if(mqtt == NULL || mqtt->cfg == NULL || user == NULL || f == NULL || client == NULL)
return MQTT_FAILURE;
if(!mqtt->cfg->mqtt.enabled)
return MQTT_SUCCESS;
char str[256];
char topic[128];
snprintf(topic, sizeof(topic), "%s/%s", xfer, mqtt->cfg->dir[f->dir]->code);
snprintf(str, sizeof(str), "%u\t%s\t%u\t%s\t%" PRIdOFF "\t%s"
,user->number, user->alias, user->uls, f->name, bytes, client->protocol);
return mqtt_pub_timestamped_msg(mqtt, TOPIC_BBS_ACTION, topic, time(NULL), str);
}
int mqtt_file_upload(struct mqtt* mqtt, user_t* user, file_t* f, off_t bytes, client_t* client)
{
return mqtt_file_xfer(mqtt, user, f, bytes, client, "upload");
}
int mqtt_file_download(struct mqtt* mqtt, user_t* user, file_t* f, off_t bytes, client_t* client)
{
return mqtt_file_xfer(mqtt, user, f, bytes, client, "download");
}