/* Synchronet MQTT (publish/subscribe messaging protocol) functions */ /**************************************************************************** * @format.tab-size 4 (Plain Text/Source Code File Header) * * @format.use-tabs true (see http://www.synchro.net/ptsc_hdr.html) * * * * Copyright Rob Swindell - http://www.synchro.net/copyright.html * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * * as published by the Free Software Foundation; either version 2 * * of the License, or (at your option) any later version. * * See the GNU General Public License for more details: gpl.txt or * * http://www.fsf.org/copyleft/gpl.html * * * * For Synchronet coding style and modification guidelines, see * * http://www.synchro.net/source.html * * * * Note: If this box doesn't appear square, then you need to fix your tabs. * ****************************************************************************/ #include <string.h> #include "mqtt.h" int mqtt_init(struct mqtt* mqtt, scfg_t* cfg, const char* host, const char* server) { if(mqtt == NULL || cfg == NULL) return MQTT_FAILURE; if(!cfg->mqtt.enabled) return MQTT_SUCCESS; if(mqtt != NULL) { memset(mqtt, 0, sizeof(*mqtt)); mqtt->cfg = cfg; mqtt->host = host; mqtt->server = server; #ifdef USE_MOSQUITTO return mosquitto_lib_init(); #endif } return MQTT_FAILURE; } static char* format_topic(struct mqtt* mqtt, enum topic_depth depth, char* str, size_t size, const char* sbuf) { switch(depth) { case TOPIC_ROOT: safe_snprintf(str, size, "sbbs/%s", sbuf); break; case TOPIC_BBS: safe_snprintf(str, size, "sbbs/%s/%s", mqtt->cfg->sys_id, sbuf); break; case TOPIC_HOST: safe_snprintf(str, size, "sbbs/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, sbuf); break; case TOPIC_SERVER: safe_snprintf(str, size, "sbbs/%s/%s/%s/%s", mqtt->cfg->sys_id, mqtt->host, mqtt->server, sbuf); break; case TOPIC_EVENT: safe_snprintf(str, size, "sbbs/%s/%s/event/%s", mqtt->cfg->sys_id, mqtt->host, sbuf); break; default: safe_snprintf(str, size, "%s", sbuf); break; } return str; } char* mqtt_topic(struct mqtt* mqtt, enum topic_depth depth, char* str, size_t size, const char* fmt, ...) { va_list argptr; char sbuf[1024]; va_start(argptr, fmt); vsnprintf(sbuf, sizeof(sbuf), fmt, argptr); sbuf[sizeof(sbuf) - 1]=0; va_end(argptr); return format_topic(mqtt, depth, str, size, sbuf); } static int mqtt_sub(struct mqtt* mqtt, const char* topic) { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; if(!mqtt->cfg->mqtt.enabled) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL && topic != NULL) { return mosquitto_subscribe(mqtt->handle, /* mid: */NULL, topic, mqtt->cfg->mqtt.subscribe_qos); } #endif return MQTT_FAILURE; } int mqtt_subscribe(struct mqtt* mqtt, enum topic_depth depth, char* str, size_t size, const char* fmt, ...) { va_list argptr; char sbuf[1024]; va_start(argptr, fmt); vsnprintf(sbuf, sizeof(sbuf), fmt, argptr); sbuf[sizeof(sbuf) - 1]=0; va_end(argptr); format_topic(mqtt, depth, str, size, sbuf); return mqtt_sub(mqtt, str); } int mqtt_lputs(struct mqtt* mqtt, enum topic_depth depth, int level, const char* str) { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; if(!mqtt->cfg->mqtt.enabled) return MQTT_SUCCESS; if(level > mqtt->cfg->mqtt.log_level) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt->handle != NULL && str != NULL) { char sub[128]; mqtt_topic(mqtt, depth, sub, sizeof(sub), "log/%d", level); char lvl[32]; sprintf(lvl, "%d", level); mosquitto_publish_v5(mqtt->handle, /* mid: */NULL, /* topic: */sub, /* payloadlen */strlen(str), /* payload */str, /* qos */mqtt->cfg->mqtt.publish_qos, /* retain */false, /* properties */NULL); mqtt_topic(mqtt, depth, sub, sizeof(sub), "log"); mosquitto_property* props = NULL; mosquitto_property_add_string_pair(&props, MQTT_PROP_USER_PROPERTY, "level", lvl); int result = mosquitto_publish_v5(mqtt->handle, /* mid: */NULL, /* topic: */sub, /* payloadlen */strlen(str), /* payload */str, /* qos */mqtt->cfg->mqtt.publish_qos, /* retain */false, /* properties */props); mosquitto_property_free_all(&props); return result; } #endif return MQTT_FAILURE; } int mqtt_pub_noval(struct mqtt* mqtt, enum topic_depth depth, const char* key) { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; if(!mqtt->cfg->mqtt.enabled) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt != NULL && mqtt->handle != NULL) { char sub[128]; mqtt_topic(mqtt, depth, sub, sizeof(sub), "%s", key); return mosquitto_publish_v5(mqtt->handle, /* mid: */NULL, /* topic: */sub, /* payloadlen */0, /* payload */NULL, /* qos */mqtt->cfg->mqtt.publish_qos, /* retain */true, /* properties */NULL); } #endif return MQTT_FAILURE; } int mqtt_pub_strval(struct mqtt* mqtt, enum topic_depth depth, const char* key, const char* str) { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; if(!mqtt->cfg->mqtt.enabled) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt != NULL && mqtt->handle != NULL) { char sub[128]; mqtt_topic(mqtt, depth, sub, sizeof(sub), "%s", key); return mosquitto_publish_v5(mqtt->handle, /* mid: */NULL, /* topic: */sub, /* payloadlen */strlen(str), /* payload */str, /* qos */mqtt->cfg->mqtt.publish_qos, /* retain */true, /* properties */NULL); } #endif return MQTT_FAILURE; } int mqtt_pub_uintval(struct mqtt* mqtt, enum topic_depth depth, const char* key, ulong value) { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; if(!mqtt->cfg->mqtt.enabled) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt != NULL && mqtt->handle != NULL) { char str[128]; sprintf(str, "%lu", value); char sub[128]; mqtt_topic(mqtt, depth, sub, sizeof(sub), "%s", key); return mosquitto_publish_v5(mqtt->handle, /* mid: */NULL, /* topic: */sub, /* payloadlen */strlen(str), /* payload */str, /* qos */mqtt->cfg->mqtt.publish_qos, /* retain */true, /* properties */NULL); } #endif return MQTT_FAILURE; } int mqtt_pub_message(struct mqtt* mqtt, enum topic_depth depth, const char* key, const void* buf, size_t len) { if(mqtt == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; if(!mqtt->cfg->mqtt.enabled) return MQTT_SUCCESS; #ifdef USE_MOSQUITTO if(mqtt != NULL && mqtt->handle != NULL) { char sub[128]; mqtt_topic(mqtt, depth, sub, sizeof(sub), "%s", key); return mosquitto_publish_v5(mqtt->handle, /* mid: */NULL, /* topic: */sub, /* payloadlen */len, /* payload */buf, /* qos */mqtt->cfg->mqtt.publish_qos, /* retain */false, /* properties */NULL); } #endif return MQTT_FAILURE; } char* mqtt_libver(char* str, size_t size) { #ifdef USE_MOSQUITTO int major, minor, revision; mosquitto_lib_version(&major, &minor, &revision); safe_snprintf(str, size, "mosquitto %d.%d.%d", major, minor, revision); return str; #else return NULL; #endif } int mqtt_open(struct mqtt* mqtt) { char client_id[256]; if(mqtt == NULL) return MQTT_FAILURE; if(mqtt->handle != NULL) // already open return MQTT_FAILURE; SAFEPRINTF(client_id, "sbbs-%s", mqtt->host); #ifdef USE_MOSQUITTO mqtt->handle = mosquitto_new(client_id, /* clean_session: */true, /* userdata: */mqtt); return mqtt->handle == NULL ? MQTT_FAILURE : MQTT_SUCCESS; #else return MQTT_FAILURE; #endif } void mqtt_close(struct mqtt* mqtt) { #ifdef USE_MOSQUITTO if(mqtt->handle != NULL) { mosquitto_destroy(mqtt->handle); mqtt->handle = NULL; } #endif } static int pw_callback(char* buf, int size, int rwflag, void* userdata) { struct mqtt* mqtt = (struct mqtt*)userdata; strncpy(buf, mqtt->cfg->mqtt.tls.keypass, size); return strlen(mqtt->cfg->mqtt.tls.keypass); } int mqtt_connect(struct mqtt* mqtt, const char* bind_address) { if(mqtt == NULL || mqtt->handle == NULL || mqtt->cfg == NULL) return MQTT_FAILURE; #ifdef USE_MOSQUITTO char topic[128]; char* username = mqtt->cfg->mqtt.username; char* password = mqtt->cfg->mqtt.password; if(*username == '\0') username = NULL; if(*password == '\0') password = NULL; mosquitto_int_option(mqtt->handle, MOSQ_OPT_PROTOCOL_VERSION, mqtt->cfg->mqtt.protocol_version); mosquitto_username_pw_set(mqtt->handle, username, password); const char* value = "disconnected"; mosquitto_will_set(mqtt->handle ,mqtt_topic(mqtt, TOPIC_HOST, topic, sizeof(topic), "status") ,strlen(value), value, /* QOS: */2, /* retain: */true); if(mqtt->cfg->mqtt.tls.mode == MQTT_TLS_CERT) { char* certfile = NULL; char* keyfile = NULL; if(mqtt->cfg->mqtt.tls.certfile[0] && mqtt->cfg->mqtt.tls.keyfile[0]) { certfile = mqtt->cfg->mqtt.tls.certfile; keyfile = mqtt->cfg->mqtt.tls.keyfile; } int result = mosquitto_tls_set(mqtt->handle, mqtt->cfg->mqtt.tls.cafile, NULL, // capath certfile, keyfile, pw_callback); if(result != MOSQ_ERR_SUCCESS) return result; } else if(mqtt->cfg->mqtt.tls.mode == MQTT_TLS_PSK) { int result = mosquitto_tls_psk_set(mqtt->handle, mqtt->cfg->mqtt.tls.psk, mqtt->cfg->mqtt.tls.identity, NULL // ciphers (default) ); if(result != MOSQ_ERR_SUCCESS) return result; } return mosquitto_connect_bind(mqtt->handle, mqtt->cfg->mqtt.broker_addr, mqtt->cfg->mqtt.broker_port, mqtt->cfg->mqtt.keepalive, bind_address); #else return MQTT_FAILURE; #endif } int mqtt_disconnect(struct mqtt* mqtt) { if(mqtt == NULL || mqtt->handle == NULL) return MQTT_FAILURE; #ifdef USE_MOSQUITTO return mosquitto_disconnect(mqtt->handle); #else return MQTT_FAILURE; #endif } int mqtt_thread_start(struct mqtt* mqtt) { if(mqtt == NULL || mqtt->handle == NULL) return MQTT_FAILURE; #ifdef USE_MOSQUITTO return mosquitto_loop_start(mqtt->handle); #else return MQTT_FAILURE; #endif } int mqtt_thread_stop(struct mqtt* mqtt) { if(mqtt == NULL || mqtt->handle == NULL) return MQTT_FAILURE; #ifdef USE_MOSQUITTO return mosquitto_loop_stop(mqtt->handle, /* force: */false); #else return MQTT_FAILURE; #endif }