Skip to content
Snippets Groups Projects
Commit 45ac9016 authored by Rob Swindell's avatar Rob Swindell :speech_balloon:
Browse files

Add subscribe support

parent 0655a1ff
No related branches found
No related tags found
1 merge request!463MRC mods by Codefenix (2024-10-20)
......@@ -28,6 +28,7 @@ typedef struct
int retval;
mqtt_handle_t handle;
struct mqtt_cfg cfg;
msg_queue_t q;
} private_t;
......@@ -38,9 +39,12 @@ static void js_finalize_mqtt(JSContext* cx, JSObject* obj)
if((p = (private_t*)JS_GetPrivate(cx,obj)) == NULL)
return;
if(p->handle != NULL)
mosquitto_destroy(p->handle);
if(p->handle != NULL) {
mosquitto_loop_stop(p->handle, /* force: */true);
mosquitto_destroy(p->handle);
}
msgQueueFree(&p->q);
free(p);
JS_SetPrivate(cx, obj, NULL);
......@@ -166,7 +170,7 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist)
jsrefcount rc;
bool retain = false;
JS_SET_RVAL(cx, arglist, JSVAL_VOID);
JS_SET_RVAL(cx, arglist, JSVAL_FALSE);
if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL)
return JS_FALSE;
......@@ -175,7 +179,6 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist)
if(!js_argc(cx, argc, 2))
return JS_FALSE;
JS_SET_RVAL(cx, arglist, JSVAL_FALSE);
if(p->handle == NULL)
return JS_TRUE;
......@@ -214,6 +217,107 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist)
return JS_TRUE;
}
static JSBool js_subscribe(JSContext* cx, uintN argc, jsval *arglist)
{
JSObject* obj = JS_THIS_OBJECT(cx, arglist);
jsval* argv = JS_ARGV(cx, arglist);
char* topic = NULL;
private_t* p;
jsrefcount rc;
JS_SET_RVAL(cx, arglist, JSVAL_FALSE);
if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL)
return JS_FALSE;
int qos = p->cfg.subscribe_qos;
if(!js_argc(cx, argc, 1))
return JS_FALSE;
if(p->handle == NULL)
return JS_TRUE;
uintN argn = 0;
if(argn < argc && JSVAL_IS_NUMBER(argv[argn])) {
qos = JSVAL_TO_INT(argv[argn]);
++argn;
}
JSVALUE_TO_MSTRING(cx, argv[argn], topic, NULL);
HANDLE_PENDING(cx, topic);
++argn;
rc = JS_SUSPENDREQUEST(cx);
p->retval = mosquitto_subscribe(p->handle, /* msg-id: */NULL, topic, qos);
JS_SET_RVAL(cx, arglist, BOOLEAN_TO_JSVAL(p->retval == MOSQ_ERR_SUCCESS));
free(topic);
JS_RESUMEREQUEST(cx, rc);
return JS_TRUE;
}
static JSBool js_read(JSContext* cx, uintN argc, jsval *arglist)
{
JSObject* obj = JS_THIS_OBJECT(cx, arglist);
jsval* argv = JS_ARGV(cx, arglist);
private_t* p;
jsrefcount rc;
int timeout = 0;
bool verbose = false;
struct mosquitto_message* msg;
JS_SET_RVAL(cx, arglist, JSVAL_FALSE);
if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL)
return JS_FALSE;
if(p->handle == NULL)
return JS_TRUE;
uintN argn = 0;
if(argn < argc && JSVAL_IS_NUMBER(argv[argn])) {
timeout = JSVAL_TO_INT(argv[argn]);
++argn;
}
if(argn < argc && JSVAL_IS_BOOLEAN(argv[argn])) {
verbose = JSVAL_TO_BOOLEAN(argv[argn]);
++argn;
}
rc = JS_SUSPENDREQUEST(cx);
msg = msgQueueRead(&p->q, timeout);
if(msg != NULL) {
if(verbose) {
JSString* str = JS_NewStringCopyZ(cx, msg->topic);
obj = JS_NewObject(cx, NULL, NULL, obj);
if(obj != NULL) {
JS_DefineProperty(cx, obj, "topic"
,STRING_TO_JSVAL(str)
,NULL,NULL,JSPROP_ENUMERATE);
str = JS_NewStringCopyN(cx, msg->payload, msg->payloadlen);
JS_DefineProperty(cx, obj, "data"
,STRING_TO_JSVAL(str)
,NULL,NULL,JSPROP_ENUMERATE);
JS_DefineProperty(cx, obj, "mid", INT_TO_JSVAL(msg->mid)
,NULL,NULL,JSPROP_ENUMERATE);
JS_DefineProperty(cx, obj, "qos", INT_TO_JSVAL(msg->qos)
,NULL,NULL,JSPROP_ENUMERATE);
JS_DefineProperty(cx, obj, "retain", BOOLEAN_TO_JSVAL(msg->retain)
,NULL,NULL,JSPROP_ENUMERATE);
JS_SET_RVAL(cx, arglist, OBJECT_TO_JSVAL(obj));
}
} else {
JSString* str = JS_NewStringCopyN(cx, msg->payload, msg->payloadlen);
if(str != NULL)
JS_SET_RVAL(cx, arglist, STRING_TO_JSVAL(str));
}
mosquitto_message_free(&msg);
}
JS_RESUMEREQUEST(cx, rc);
return JS_TRUE;
}
/* Properites */
enum {
MQTT_PROP_ERROR
......@@ -233,6 +337,8 @@ enum {
,MQTT_PROP_TLS_KEYPASS
,MQTT_PROP_TLS_PSK
,MQTT_PROP_TLS_IDENTITY
,MQTT_PROP_DATA_WAITING
,MQTT_PROP_READ_LEVEL
};
#ifdef BUILD_JSDOCS
......@@ -254,6 +360,8 @@ static char* com_prop_desc[] = {
,"Private key file password"
,"TLS Pre-Shared-Key"
,"TLS PSK Identity"
,"<i>true</i> if messages are waiting to be read"
,"Number of messages waiting to be read"
,NULL
};
#endif
......@@ -447,6 +555,12 @@ static JSBool js_mqtt_get(JSContext* cx, JSObject* obj, jsid id, jsval *vp)
*vp = STRING_TO_JSVAL(js_str);
rc = JS_SUSPENDREQUEST(cx);
break;
case MQTT_PROP_DATA_WAITING:
*vp = BOOLEAN_TO_JSVAL(INT_TO_BOOL(msgQueueReadLevel(&p->q)));
break;
case MQTT_PROP_READ_LEVEL:
*vp = INT_TO_JSVAL(msgQueueReadLevel(&p->q));
break;
}
JS_RESUMEREQUEST(cx, rc);
......@@ -475,21 +589,37 @@ static jsSyncPropertySpec js_mqtt_properties[] = {
{ "tls_key_password" ,MQTT_PROP_TLS_KEYPASS ,MQTT_PROP_FLAGS, 320 },
{ "tls_psk" ,MQTT_PROP_TLS_PSK ,MQTT_PROP_FLAGS, 320 },
{ "tls_psk_identity" ,MQTT_PROP_TLS_IDENTITY ,MQTT_PROP_FLAGS, 320 },
{ "data_waiting" ,MQTT_PROP_DATA_WAITING ,MQTT_PROP_ROFLAGS, 320 },
{ "read_level" ,MQTT_PROP_READ_LEVEL ,MQTT_PROP_ROFLAGS, 320 },
{0}
};
static jsSyncMethodSpec js_mqtt_functions[] = {
{"connect", js_connect, 0, JSTYPE_BOOLEAN, JSDOCSTR("[string broker_address] [,number broker_port] [,string username] [,string password]")
,JSDOCSTR("Connect to an MQTT broker")
,320
{"connect", js_connect, 0, JSTYPE_BOOLEAN
,JSDOCSTR("[string broker_address] [,number broker_port] [,string username] [,string password]")
,JSDOCSTR("Connect to an MQTT broker")
,320
},
{"disconnect", js_disconnect, 0, JSTYPE_VOID
,JSDOCSTR("")
,JSDOCSTR("Close an open connection to the MQTT broker")
,320
},
{"disconnect", js_disconnect, 0, JSTYPE_VOID, JSDOCSTR("")
,JSDOCSTR("Close an open connection to the MQTT broker")
,320
{"publish", js_publish, 4, JSTYPE_BOOLEAN
,JSDOCSTR("[bool retain=false,] [number qos,] topic, data")
,JSDOCSTR("Publish a string to specified topic")
,320
},
{"publish", js_publish, 4, JSTYPE_BOOLEAN, JSDOCSTR("[bool retain=false,] [number qos,] topic, data")
,JSDOCSTR("Publish a string to specified topic")
,320
{"subscribe", js_subscribe, 2, JSTYPE_BOOLEAN
,JSDOCSTR("[number qos,] topic")
,JSDOCSTR("Subscribe to specified topic at (optional) QOS level")
,320
},
{"read", js_read, 0, JSTYPE_STRING
,JSDOCSTR("[timeout=0] [,bool verbpose=false]")
,JSDOCSTR("Read next message, optionally waiting for <i>timeout</i> milliseconds, "
"returns an object when <i>verbose</i> is <tt>true</tt>")
,320
},
{0}
};
......@@ -532,6 +662,17 @@ JSClass js_mqtt_class = {
,js_finalize_mqtt /* finalize */
};
static void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const struct mosquitto_message* msg)
{
private_t* p = (private_t*)cbdata;
if(p != NULL && msg != NULL) {
struct mosquitto_message m;
if(mosquitto_message_copy(&m, msg) == MOSQ_ERR_SUCCESS)
msgQueueWrite(&p->q, &m, sizeof m);
}
}
static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist)
{
JSObject* obj;
......@@ -559,6 +700,7 @@ static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist)
return JS_FALSE;
}
memset(p, 0, sizeof *p);
msgQueueInit(&p->q, /* flags: */0);
p->cfg = scfg->mqtt;
p->handle = mosquitto_new(client_id, /* clean_session: */true, /* userdata: */p);
free(client_id);
......@@ -567,6 +709,8 @@ static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist)
free(p);
return JS_FALSE;
}
mosquitto_message_callback_set(p->handle, mqtt_message_received);
mosquitto_loop_start(p->handle);
#ifdef BUILD_JSDOCS
js_DescribeSyncObject(cx,obj,"Class used for MQTT communications",320);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment