From 45ac901687848c9012e129a574546138c28eba35 Mon Sep 17 00:00:00 2001 From: "Rob Swindell (on ChromeOS)" <rob@synchro.net> Date: Sat, 27 May 2023 12:36:50 -0700 Subject: [PATCH] Add subscribe support --- src/sbbs3/js_mqtt.c | 170 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 157 insertions(+), 13 deletions(-) diff --git a/src/sbbs3/js_mqtt.c b/src/sbbs3/js_mqtt.c index 66d1ff609b..b7812ea366 100644 --- a/src/sbbs3/js_mqtt.c +++ b/src/sbbs3/js_mqtt.c @@ -28,6 +28,7 @@ typedef struct int retval; mqtt_handle_t handle; struct mqtt_cfg cfg; + msg_queue_t q; } private_t; @@ -38,9 +39,12 @@ static void js_finalize_mqtt(JSContext* cx, JSObject* obj) if((p = (private_t*)JS_GetPrivate(cx,obj)) == NULL) return; - if(p->handle != NULL) - mosquitto_destroy(p->handle); + if(p->handle != NULL) { + mosquitto_loop_stop(p->handle, /* force: */true); + mosquitto_destroy(p->handle); + } + msgQueueFree(&p->q); free(p); JS_SetPrivate(cx, obj, NULL); @@ -166,7 +170,7 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist) jsrefcount rc; bool retain = false; - JS_SET_RVAL(cx, arglist, JSVAL_VOID); + JS_SET_RVAL(cx, arglist, JSVAL_FALSE); if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL) return JS_FALSE; @@ -175,7 +179,6 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist) if(!js_argc(cx, argc, 2)) return JS_FALSE; - JS_SET_RVAL(cx, arglist, JSVAL_FALSE); if(p->handle == NULL) return JS_TRUE; @@ -214,6 +217,107 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist) return JS_TRUE; } +static JSBool js_subscribe(JSContext* cx, uintN argc, jsval *arglist) +{ + JSObject* obj = JS_THIS_OBJECT(cx, arglist); + jsval* argv = JS_ARGV(cx, arglist); + char* topic = NULL; + private_t* p; + jsrefcount rc; + + JS_SET_RVAL(cx, arglist, JSVAL_FALSE); + + if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL) + return JS_FALSE; + + int qos = p->cfg.subscribe_qos; + if(!js_argc(cx, argc, 1)) + return JS_FALSE; + + if(p->handle == NULL) + return JS_TRUE; + + uintN argn = 0; + if(argn < argc && JSVAL_IS_NUMBER(argv[argn])) { + qos = JSVAL_TO_INT(argv[argn]); + ++argn; + } + JSVALUE_TO_MSTRING(cx, argv[argn], topic, NULL); + HANDLE_PENDING(cx, topic); + ++argn; + + rc = JS_SUSPENDREQUEST(cx); + p->retval = mosquitto_subscribe(p->handle, /* msg-id: */NULL, topic, qos); + JS_SET_RVAL(cx, arglist, BOOLEAN_TO_JSVAL(p->retval == MOSQ_ERR_SUCCESS)); + free(topic); + JS_RESUMEREQUEST(cx, rc); + + return JS_TRUE; +} + +static JSBool js_read(JSContext* cx, uintN argc, jsval *arglist) +{ + JSObject* obj = JS_THIS_OBJECT(cx, arglist); + jsval* argv = JS_ARGV(cx, arglist); + private_t* p; + jsrefcount rc; + int timeout = 0; + bool verbose = false; + struct mosquitto_message* msg; + + JS_SET_RVAL(cx, arglist, JSVAL_FALSE); + + if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL) + return JS_FALSE; + + if(p->handle == NULL) + return JS_TRUE; + + uintN argn = 0; + if(argn < argc && JSVAL_IS_NUMBER(argv[argn])) { + timeout = JSVAL_TO_INT(argv[argn]); + ++argn; + } + if(argn < argc && JSVAL_IS_BOOLEAN(argv[argn])) { + verbose = JSVAL_TO_BOOLEAN(argv[argn]); + ++argn; + } + + rc = JS_SUSPENDREQUEST(cx); + msg = msgQueueRead(&p->q, timeout); + if(msg != NULL) { + if(verbose) { + JSString* str = JS_NewStringCopyZ(cx, msg->topic); + obj = JS_NewObject(cx, NULL, NULL, obj); + if(obj != NULL) { + JS_DefineProperty(cx, obj, "topic" + ,STRING_TO_JSVAL(str) + ,NULL,NULL,JSPROP_ENUMERATE); + str = JS_NewStringCopyN(cx, msg->payload, msg->payloadlen); + JS_DefineProperty(cx, obj, "data" + ,STRING_TO_JSVAL(str) + ,NULL,NULL,JSPROP_ENUMERATE); + JS_DefineProperty(cx, obj, "mid", INT_TO_JSVAL(msg->mid) + ,NULL,NULL,JSPROP_ENUMERATE); + JS_DefineProperty(cx, obj, "qos", INT_TO_JSVAL(msg->qos) + ,NULL,NULL,JSPROP_ENUMERATE); + JS_DefineProperty(cx, obj, "retain", BOOLEAN_TO_JSVAL(msg->retain) + ,NULL,NULL,JSPROP_ENUMERATE); + + JS_SET_RVAL(cx, arglist, OBJECT_TO_JSVAL(obj)); + } + } else { + JSString* str = JS_NewStringCopyN(cx, msg->payload, msg->payloadlen); + if(str != NULL) + JS_SET_RVAL(cx, arglist, STRING_TO_JSVAL(str)); + } + mosquitto_message_free(&msg); + } + JS_RESUMEREQUEST(cx, rc); + + return JS_TRUE; +} + /* Properites */ enum { MQTT_PROP_ERROR @@ -233,6 +337,8 @@ enum { ,MQTT_PROP_TLS_KEYPASS ,MQTT_PROP_TLS_PSK ,MQTT_PROP_TLS_IDENTITY + ,MQTT_PROP_DATA_WAITING + ,MQTT_PROP_READ_LEVEL }; #ifdef BUILD_JSDOCS @@ -254,6 +360,8 @@ static char* com_prop_desc[] = { ,"Private key file password" ,"TLS Pre-Shared-Key" ,"TLS PSK Identity" + ,"<i>true</i> if messages are waiting to be read" + ,"Number of messages waiting to be read" ,NULL }; #endif @@ -447,6 +555,12 @@ static JSBool js_mqtt_get(JSContext* cx, JSObject* obj, jsid id, jsval *vp) *vp = STRING_TO_JSVAL(js_str); rc = JS_SUSPENDREQUEST(cx); break; + case MQTT_PROP_DATA_WAITING: + *vp = BOOLEAN_TO_JSVAL(INT_TO_BOOL(msgQueueReadLevel(&p->q))); + break; + case MQTT_PROP_READ_LEVEL: + *vp = INT_TO_JSVAL(msgQueueReadLevel(&p->q)); + break; } JS_RESUMEREQUEST(cx, rc); @@ -475,21 +589,37 @@ static jsSyncPropertySpec js_mqtt_properties[] = { { "tls_key_password" ,MQTT_PROP_TLS_KEYPASS ,MQTT_PROP_FLAGS, 320 }, { "tls_psk" ,MQTT_PROP_TLS_PSK ,MQTT_PROP_FLAGS, 320 }, { "tls_psk_identity" ,MQTT_PROP_TLS_IDENTITY ,MQTT_PROP_FLAGS, 320 }, + { "data_waiting" ,MQTT_PROP_DATA_WAITING ,MQTT_PROP_ROFLAGS, 320 }, + { "read_level" ,MQTT_PROP_READ_LEVEL ,MQTT_PROP_ROFLAGS, 320 }, {0} }; static jsSyncMethodSpec js_mqtt_functions[] = { - {"connect", js_connect, 0, JSTYPE_BOOLEAN, JSDOCSTR("[string broker_address] [,number broker_port] [,string username] [,string password]") - ,JSDOCSTR("Connect to an MQTT broker") - ,320 + {"connect", js_connect, 0, JSTYPE_BOOLEAN + ,JSDOCSTR("[string broker_address] [,number broker_port] [,string username] [,string password]") + ,JSDOCSTR("Connect to an MQTT broker") + ,320 + }, + {"disconnect", js_disconnect, 0, JSTYPE_VOID + ,JSDOCSTR("") + ,JSDOCSTR("Close an open connection to the MQTT broker") + ,320 }, - {"disconnect", js_disconnect, 0, JSTYPE_VOID, JSDOCSTR("") - ,JSDOCSTR("Close an open connection to the MQTT broker") - ,320 + {"publish", js_publish, 4, JSTYPE_BOOLEAN + ,JSDOCSTR("[bool retain=false,] [number qos,] topic, data") + ,JSDOCSTR("Publish a string to specified topic") + ,320 }, - {"publish", js_publish, 4, JSTYPE_BOOLEAN, JSDOCSTR("[bool retain=false,] [number qos,] topic, data") - ,JSDOCSTR("Publish a string to specified topic") - ,320 + {"subscribe", js_subscribe, 2, JSTYPE_BOOLEAN + ,JSDOCSTR("[number qos,] topic") + ,JSDOCSTR("Subscribe to specified topic at (optional) QOS level") + ,320 + }, + {"read", js_read, 0, JSTYPE_STRING + ,JSDOCSTR("[timeout=0] [,bool verbpose=false]") + ,JSDOCSTR("Read next message, optionally waiting for <i>timeout</i> milliseconds, " + "returns an object when <i>verbose</i> is <tt>true</tt>") + ,320 }, {0} }; @@ -532,6 +662,17 @@ JSClass js_mqtt_class = { ,js_finalize_mqtt /* finalize */ }; +static void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const struct mosquitto_message* msg) +{ + private_t* p = (private_t*)cbdata; + + if(p != NULL && msg != NULL) { + struct mosquitto_message m; + if(mosquitto_message_copy(&m, msg) == MOSQ_ERR_SUCCESS) + msgQueueWrite(&p->q, &m, sizeof m); + } +} + static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist) { JSObject* obj; @@ -559,6 +700,7 @@ static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist) return JS_FALSE; } memset(p, 0, sizeof *p); + msgQueueInit(&p->q, /* flags: */0); p->cfg = scfg->mqtt; p->handle = mosquitto_new(client_id, /* clean_session: */true, /* userdata: */p); free(client_id); @@ -567,6 +709,8 @@ static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist) free(p); return JS_FALSE; } + mosquitto_message_callback_set(p->handle, mqtt_message_received); + mosquitto_loop_start(p->handle); #ifdef BUILD_JSDOCS js_DescribeSyncObject(cx,obj,"Class used for MQTT communications",320); -- GitLab