Skip to content
Snippets Groups Projects
Commit 45ac9016 authored by Rob Swindell's avatar Rob Swindell :speech_balloon:
Browse files

Add subscribe support

parent 0655a1ff
No related branches found
No related tags found
1 merge request!463MRC mods by Codefenix (2024-10-20)
...@@ -28,6 +28,7 @@ typedef struct ...@@ -28,6 +28,7 @@ typedef struct
int retval; int retval;
mqtt_handle_t handle; mqtt_handle_t handle;
struct mqtt_cfg cfg; struct mqtt_cfg cfg;
msg_queue_t q;
} private_t; } private_t;
...@@ -38,9 +39,12 @@ static void js_finalize_mqtt(JSContext* cx, JSObject* obj) ...@@ -38,9 +39,12 @@ static void js_finalize_mqtt(JSContext* cx, JSObject* obj)
if((p = (private_t*)JS_GetPrivate(cx,obj)) == NULL) if((p = (private_t*)JS_GetPrivate(cx,obj)) == NULL)
return; return;
if(p->handle != NULL)
mosquitto_destroy(p->handle);
if(p->handle != NULL) {
mosquitto_loop_stop(p->handle, /* force: */true);
mosquitto_destroy(p->handle);
}
msgQueueFree(&p->q);
free(p); free(p);
JS_SetPrivate(cx, obj, NULL); JS_SetPrivate(cx, obj, NULL);
...@@ -166,7 +170,7 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist) ...@@ -166,7 +170,7 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist)
jsrefcount rc; jsrefcount rc;
bool retain = false; bool retain = false;
JS_SET_RVAL(cx, arglist, JSVAL_VOID); JS_SET_RVAL(cx, arglist, JSVAL_FALSE);
if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL) if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL)
return JS_FALSE; return JS_FALSE;
...@@ -175,7 +179,6 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist) ...@@ -175,7 +179,6 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist)
if(!js_argc(cx, argc, 2)) if(!js_argc(cx, argc, 2))
return JS_FALSE; return JS_FALSE;
JS_SET_RVAL(cx, arglist, JSVAL_FALSE);
if(p->handle == NULL) if(p->handle == NULL)
return JS_TRUE; return JS_TRUE;
...@@ -214,6 +217,107 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist) ...@@ -214,6 +217,107 @@ static JSBool js_publish(JSContext* cx, uintN argc, jsval *arglist)
return JS_TRUE; return JS_TRUE;
} }
static JSBool js_subscribe(JSContext* cx, uintN argc, jsval *arglist)
{
JSObject* obj = JS_THIS_OBJECT(cx, arglist);
jsval* argv = JS_ARGV(cx, arglist);
char* topic = NULL;
private_t* p;
jsrefcount rc;
JS_SET_RVAL(cx, arglist, JSVAL_FALSE);
if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL)
return JS_FALSE;
int qos = p->cfg.subscribe_qos;
if(!js_argc(cx, argc, 1))
return JS_FALSE;
if(p->handle == NULL)
return JS_TRUE;
uintN argn = 0;
if(argn < argc && JSVAL_IS_NUMBER(argv[argn])) {
qos = JSVAL_TO_INT(argv[argn]);
++argn;
}
JSVALUE_TO_MSTRING(cx, argv[argn], topic, NULL);
HANDLE_PENDING(cx, topic);
++argn;
rc = JS_SUSPENDREQUEST(cx);
p->retval = mosquitto_subscribe(p->handle, /* msg-id: */NULL, topic, qos);
JS_SET_RVAL(cx, arglist, BOOLEAN_TO_JSVAL(p->retval == MOSQ_ERR_SUCCESS));
free(topic);
JS_RESUMEREQUEST(cx, rc);
return JS_TRUE;
}
static JSBool js_read(JSContext* cx, uintN argc, jsval *arglist)
{
JSObject* obj = JS_THIS_OBJECT(cx, arglist);
jsval* argv = JS_ARGV(cx, arglist);
private_t* p;
jsrefcount rc;
int timeout = 0;
bool verbose = false;
struct mosquitto_message* msg;
JS_SET_RVAL(cx, arglist, JSVAL_FALSE);
if((p = (private_t*)js_GetClassPrivate(cx, obj, &js_mqtt_class)) == NULL)
return JS_FALSE;
if(p->handle == NULL)
return JS_TRUE;
uintN argn = 0;
if(argn < argc && JSVAL_IS_NUMBER(argv[argn])) {
timeout = JSVAL_TO_INT(argv[argn]);
++argn;
}
if(argn < argc && JSVAL_IS_BOOLEAN(argv[argn])) {
verbose = JSVAL_TO_BOOLEAN(argv[argn]);
++argn;
}
rc = JS_SUSPENDREQUEST(cx);
msg = msgQueueRead(&p->q, timeout);
if(msg != NULL) {
if(verbose) {
JSString* str = JS_NewStringCopyZ(cx, msg->topic);
obj = JS_NewObject(cx, NULL, NULL, obj);
if(obj != NULL) {
JS_DefineProperty(cx, obj, "topic"
,STRING_TO_JSVAL(str)
,NULL,NULL,JSPROP_ENUMERATE);
str = JS_NewStringCopyN(cx, msg->payload, msg->payloadlen);
JS_DefineProperty(cx, obj, "data"
,STRING_TO_JSVAL(str)
,NULL,NULL,JSPROP_ENUMERATE);
JS_DefineProperty(cx, obj, "mid", INT_TO_JSVAL(msg->mid)
,NULL,NULL,JSPROP_ENUMERATE);
JS_DefineProperty(cx, obj, "qos", INT_TO_JSVAL(msg->qos)
,NULL,NULL,JSPROP_ENUMERATE);
JS_DefineProperty(cx, obj, "retain", BOOLEAN_TO_JSVAL(msg->retain)
,NULL,NULL,JSPROP_ENUMERATE);
JS_SET_RVAL(cx, arglist, OBJECT_TO_JSVAL(obj));
}
} else {
JSString* str = JS_NewStringCopyN(cx, msg->payload, msg->payloadlen);
if(str != NULL)
JS_SET_RVAL(cx, arglist, STRING_TO_JSVAL(str));
}
mosquitto_message_free(&msg);
}
JS_RESUMEREQUEST(cx, rc);
return JS_TRUE;
}
/* Properites */ /* Properites */
enum { enum {
MQTT_PROP_ERROR MQTT_PROP_ERROR
...@@ -233,6 +337,8 @@ enum { ...@@ -233,6 +337,8 @@ enum {
,MQTT_PROP_TLS_KEYPASS ,MQTT_PROP_TLS_KEYPASS
,MQTT_PROP_TLS_PSK ,MQTT_PROP_TLS_PSK
,MQTT_PROP_TLS_IDENTITY ,MQTT_PROP_TLS_IDENTITY
,MQTT_PROP_DATA_WAITING
,MQTT_PROP_READ_LEVEL
}; };
#ifdef BUILD_JSDOCS #ifdef BUILD_JSDOCS
...@@ -254,6 +360,8 @@ static char* com_prop_desc[] = { ...@@ -254,6 +360,8 @@ static char* com_prop_desc[] = {
,"Private key file password" ,"Private key file password"
,"TLS Pre-Shared-Key" ,"TLS Pre-Shared-Key"
,"TLS PSK Identity" ,"TLS PSK Identity"
,"<i>true</i> if messages are waiting to be read"
,"Number of messages waiting to be read"
,NULL ,NULL
}; };
#endif #endif
...@@ -447,6 +555,12 @@ static JSBool js_mqtt_get(JSContext* cx, JSObject* obj, jsid id, jsval *vp) ...@@ -447,6 +555,12 @@ static JSBool js_mqtt_get(JSContext* cx, JSObject* obj, jsid id, jsval *vp)
*vp = STRING_TO_JSVAL(js_str); *vp = STRING_TO_JSVAL(js_str);
rc = JS_SUSPENDREQUEST(cx); rc = JS_SUSPENDREQUEST(cx);
break; break;
case MQTT_PROP_DATA_WAITING:
*vp = BOOLEAN_TO_JSVAL(INT_TO_BOOL(msgQueueReadLevel(&p->q)));
break;
case MQTT_PROP_READ_LEVEL:
*vp = INT_TO_JSVAL(msgQueueReadLevel(&p->q));
break;
} }
JS_RESUMEREQUEST(cx, rc); JS_RESUMEREQUEST(cx, rc);
...@@ -475,22 +589,38 @@ static jsSyncPropertySpec js_mqtt_properties[] = { ...@@ -475,22 +589,38 @@ static jsSyncPropertySpec js_mqtt_properties[] = {
{ "tls_key_password" ,MQTT_PROP_TLS_KEYPASS ,MQTT_PROP_FLAGS, 320 }, { "tls_key_password" ,MQTT_PROP_TLS_KEYPASS ,MQTT_PROP_FLAGS, 320 },
{ "tls_psk" ,MQTT_PROP_TLS_PSK ,MQTT_PROP_FLAGS, 320 }, { "tls_psk" ,MQTT_PROP_TLS_PSK ,MQTT_PROP_FLAGS, 320 },
{ "tls_psk_identity" ,MQTT_PROP_TLS_IDENTITY ,MQTT_PROP_FLAGS, 320 }, { "tls_psk_identity" ,MQTT_PROP_TLS_IDENTITY ,MQTT_PROP_FLAGS, 320 },
{ "data_waiting" ,MQTT_PROP_DATA_WAITING ,MQTT_PROP_ROFLAGS, 320 },
{ "read_level" ,MQTT_PROP_READ_LEVEL ,MQTT_PROP_ROFLAGS, 320 },
{0} {0}
}; };
static jsSyncMethodSpec js_mqtt_functions[] = { static jsSyncMethodSpec js_mqtt_functions[] = {
{"connect", js_connect, 0, JSTYPE_BOOLEAN, JSDOCSTR("[string broker_address] [,number broker_port] [,string username] [,string password]") {"connect", js_connect, 0, JSTYPE_BOOLEAN
,JSDOCSTR("[string broker_address] [,number broker_port] [,string username] [,string password]")
,JSDOCSTR("Connect to an MQTT broker") ,JSDOCSTR("Connect to an MQTT broker")
,320 ,320
}, },
{"disconnect", js_disconnect, 0, JSTYPE_VOID, JSDOCSTR("") {"disconnect", js_disconnect, 0, JSTYPE_VOID
,JSDOCSTR("")
,JSDOCSTR("Close an open connection to the MQTT broker") ,JSDOCSTR("Close an open connection to the MQTT broker")
,320 ,320
}, },
{"publish", js_publish, 4, JSTYPE_BOOLEAN, JSDOCSTR("[bool retain=false,] [number qos,] topic, data") {"publish", js_publish, 4, JSTYPE_BOOLEAN
,JSDOCSTR("[bool retain=false,] [number qos,] topic, data")
,JSDOCSTR("Publish a string to specified topic") ,JSDOCSTR("Publish a string to specified topic")
,320 ,320
}, },
{"subscribe", js_subscribe, 2, JSTYPE_BOOLEAN
,JSDOCSTR("[number qos,] topic")
,JSDOCSTR("Subscribe to specified topic at (optional) QOS level")
,320
},
{"read", js_read, 0, JSTYPE_STRING
,JSDOCSTR("[timeout=0] [,bool verbpose=false]")
,JSDOCSTR("Read next message, optionally waiting for <i>timeout</i> milliseconds, "
"returns an object when <i>verbose</i> is <tt>true</tt>")
,320
},
{0} {0}
}; };
...@@ -532,6 +662,17 @@ JSClass js_mqtt_class = { ...@@ -532,6 +662,17 @@ JSClass js_mqtt_class = {
,js_finalize_mqtt /* finalize */ ,js_finalize_mqtt /* finalize */
}; };
static void mqtt_message_received(struct mosquitto* mosq, void* cbdata, const struct mosquitto_message* msg)
{
private_t* p = (private_t*)cbdata;
if(p != NULL && msg != NULL) {
struct mosquitto_message m;
if(mosquitto_message_copy(&m, msg) == MOSQ_ERR_SUCCESS)
msgQueueWrite(&p->q, &m, sizeof m);
}
}
static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist) static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist)
{ {
JSObject* obj; JSObject* obj;
...@@ -559,6 +700,7 @@ static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist) ...@@ -559,6 +700,7 @@ static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist)
return JS_FALSE; return JS_FALSE;
} }
memset(p, 0, sizeof *p); memset(p, 0, sizeof *p);
msgQueueInit(&p->q, /* flags: */0);
p->cfg = scfg->mqtt; p->cfg = scfg->mqtt;
p->handle = mosquitto_new(client_id, /* clean_session: */true, /* userdata: */p); p->handle = mosquitto_new(client_id, /* clean_session: */true, /* userdata: */p);
free(client_id); free(client_id);
...@@ -567,6 +709,8 @@ static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist) ...@@ -567,6 +709,8 @@ static JSBool js_mqtt_constructor(JSContext* cx, uintN argc, jsval *arglist)
free(p); free(p);
return JS_FALSE; return JS_FALSE;
} }
mosquitto_message_callback_set(p->handle, mqtt_message_received);
mosquitto_loop_start(p->handle);
#ifdef BUILD_JSDOCS #ifdef BUILD_JSDOCS
js_DescribeSyncObject(cx,obj,"Class used for MQTT communications",320); js_DescribeSyncObject(cx,obj,"Class used for MQTT communications",320);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment