Skip to content
Snippets Groups Projects
Commit cbb76cb4 authored by mcmlxxix's avatar mcmlxxix
Browse files

support optional timeout in db queries. default: nonblocking writes, blocking...

support optional timeout in db queries. default: nonblocking writes, blocking reads. timeout 0 = nonblocking r/w with acknowledgement. timeout > 0 = blocking r/w for duration of timeout with acknowledgement (or read result)
parent 5b991ccd
No related branches found
No related tags found
No related merge requests found
......@@ -77,7 +77,8 @@ function JSONClient(serverAddr,serverPort) {
CONNECTION_TIMEOUT: 5,
PING_INTERVAL: 60*1000,
PING_TIMEOUT: 10*1000,
RECV_TIMEOUT: 10
SOCK_TIMEOUT: 10*1000,
TIMEOUT: -1
};
this.socket=undefined;
......@@ -106,15 +107,21 @@ function JSONClient(serverAddr,serverPort) {
oper:"SUBSCRIBE",
nick:user?user.alias:undefined,
system:system?system.name:undefined,
location:location
location:location,
timeout:this.settings.TIMEOUT
});
if(this.settings.TIMEOUT >= 0)
return this.wait();
}
this.unsubscribe=function(scope,location) {
this.send(scope,"QUERY",{
oper:"UNSUBSCRIBE",
location:location
location:location,
timeout:this.settings.TIMEOUT
});
if(this.settings.TIMEOUT >= 0)
return this.wait();
}
/* lock an object */
......@@ -122,13 +129,16 @@ function JSONClient(serverAddr,serverPort) {
this.send(scope,"QUERY",{
location:location,
oper:"LOCK",
data:lock
});
data:lock,
timeout:this.settings.TIMEOUT
});
if(this.settings.TIMEOUT >= 0)
return this.wait();
}
/* unlock an object */
this.unlock = function(scope,location) {
this.lock(scope,location,-1);
return this.lock(scope,location,-1);
}
/* read object data (lock for reading or writing, blocking) */
......@@ -136,9 +146,10 @@ function JSONClient(serverAddr,serverPort) {
this.send(scope,"QUERY",{
oper:"READ",
location:location,
lock:lock
});
return this.wait("RESPONSE");
lock:lock,
timeout:this.settings.TIMEOUT
});
return this.wait();
}
/* read object keys (lock for reading or writing, blocking) */
......@@ -146,9 +157,10 @@ function JSONClient(serverAddr,serverPort) {
this.send(scope,"QUERY",{
oper:"KEYS",
location:location,
lock:lock
});
return this.wait("RESPONSE");
lock:lock,
timeout:this.settings.TIMEOUT
});
return this.wait();
}
/* shift object data (lock for reading or writing, blocking) */
......@@ -156,9 +168,10 @@ function JSONClient(serverAddr,serverPort) {
this.send(scope,"QUERY",{
oper:"SHIFT",
location:location,
lock:lock
lock:lock,
timeout:this.settings.TIMEOUT
});
return this.wait("RESPONSE");
return this.wait();
}
/* pop object data (lock for reading or writing, blocking) */
......@@ -166,9 +179,10 @@ function JSONClient(serverAddr,serverPort) {
this.send(scope,"QUERY",{
oper:"POP",
location:location,
lock:lock
lock:lock,
timeout:this.settings.TIMEOUT
});
return this.wait("RESPONSE");
return this.wait();
}
/* store object data (lock for writing) */
......@@ -177,8 +191,11 @@ function JSONClient(serverAddr,serverPort) {
oper:"WRITE",
location:location,
data:data,
lock:lock
lock:lock,
timeout:this.settings.TIMEOUT
});
if(this.settings.TIMEOUT >= 0)
return this.wait(this.settings.TIMEOUT);
}
/* store object data (lock for writing) */
......@@ -187,8 +204,11 @@ function JSONClient(serverAddr,serverPort) {
oper:"WRITE",
location:location,
data:undefined,
lock:lock
lock:lock,
timeout:this.settings.TIMEOUT
});
if(this.settings.TIMEOUT >= 0)
return this.wait();
}
/* unshift object data (lock for writing) */
......@@ -197,8 +217,11 @@ function JSONClient(serverAddr,serverPort) {
oper:"UNSHIFT",
location:location,
data:data,
lock:lock
lock:lock,
timeout:this.settings.TIMEOUT
});
if(this.settings.TIMEOUT >= 0)
return this.wait();
}
/* push object data (lock for writing) */
......@@ -207,8 +230,11 @@ function JSONClient(serverAddr,serverPort) {
oper:"PUSH",
location:location,
data:data,
lock:lock
lock:lock,
timeout:this.settings.TIMEOUT
});
if(this.settings.TIMEOUT >= 0)
return this.wait();
}
/* package a query and send through the socket */
......@@ -245,23 +271,20 @@ function JSONClient(serverAddr,serverPort) {
}
/* do not return until the expected response is received */
this.wait=function(func) {
var start = time();
this.wait=function() {
var start = Date.now();
do {
var packet = this.receive();
if(!packet)
continue;
else if(packet.func == func)
else if(packet.func == "RESPONSE")
return packet.data;
else if(typeof this.callback == "function")
this.callback(packet.data);
else
this.updates.push(packet.data);
} while(time() - start < this.settings.RECV_TIMEOUT);
log(LOG_ERROR,"timed out waiting for server response");
exit();
} while(Date.now() - start < this.settings.SOCK_TIMEOUT);
throw("timed out waiting for server response");
}
/* check socket for data, and process it if a callback is specified */
......@@ -288,18 +311,22 @@ function JSONClient(serverAddr,serverPort) {
this.who=function(scope,location) {
this.send(scope,"QUERY",{
oper:"WHO",
location:location
location:location,
timeout:this.settings.TIMEOUT
});
return this.wait("RESPONSE");
if(this.settings.TIMEOUT >= 0)
return this.wait();
}
/* retrieve the overall lock and subscription status of an object */
this.status=function(scope,location) {
this.send(scope,"QUERY",{
oper:"STATUS",
location:location
location:location,
timeout:this.settings.TIMEOUT
});
return this.wait("RESPONSE");
if(this.settings.TIMEOUT >= 0)
return this.wait();
}
this.connect();
......
......@@ -49,7 +49,7 @@ function JSONdb (fileName) {
this.shadow={};
/* queued array of data requests (get/put/create/delete) */
this.queue=[];
this.queue={};
/* general list of db subscribers (for quick release if no subscriptions) */
this.subscriptions=[];
......@@ -64,6 +64,7 @@ function JSONdb (fileName) {
KEEP_READABLE:false,
READ_ONLY:false,
UPDATES:false,
DEFAULT_TIMEOUT:0
};
/* lock constants */
......@@ -108,8 +109,9 @@ function JSONdb (fileName) {
/*************************** database methods ****************************/
/* subscribe a client to an object */
this.subscribe = function(client,record) {
/* TODO: expire existing subscribers after certain amount of time, maybe */
this.subscribe = function(request,record) {
var client = request.client;
/* TODO: expire existing subscribers after certain amount of time, maybe */
if(!this.subscriptions[client.id]) {
this.subscriptions[client.id] = {};
}
......@@ -125,7 +127,8 @@ function JSONdb (fileName) {
};
/* unsubscribe a client from an object */
this.unsubscribe = function(client,record) {
this.unsubscribe = function(request,record) {
var client = request.client;
if(this.subscriptions[client.id][record.location]) {
delete record.shadow[record.child_name]._subscribers[client.id];
delete this.subscriptions[client.id][record.location];
......@@ -141,8 +144,9 @@ function JSONdb (fileName) {
/* The point of a read lock is *only* to prevent someone from
getting a write lock... just a simple counter is usually enough. */
this.lock = function(client,record,lock_type) {
switch(lock_type) {
this.lock = function(request,record) {
var client = request.client;
switch(request.data) {
/* if the client wants to read... */
case locks.READ:
/* if this is a duplicate lock attempt */
......@@ -159,7 +163,7 @@ function JSONdb (fileName) {
/* otherwise, we can read lock */
else {
record.shadow[record.child_name]._lock[client.id] = new Lock(
lock_type,
request.data,
Date.now()
);
return true;
......@@ -170,7 +174,7 @@ function JSONdb (fileName) {
/* if the record isnt locked at all, we can lock */
case locks.NONE:
record.shadow[record.child_name]._lock[client.id] = new Lock(
lock_type,
request.data,
Date.now()
);
return true;
......@@ -197,7 +201,7 @@ function JSONdb (fileName) {
/* ...and the record isnt locked, lock for writing and remove flag */
case locks.NONE:
record.shadow[record.child_name]._lock[client.id] = new Lock(
lock_type,
request.data,
Date.now()
);
record.shadow[record.child_name]._lock_pending=false;
......@@ -222,13 +226,15 @@ function JSONdb (fileName) {
this.error(client,errors.NOT_LOCKED);
return true;
}
case locks.NONE:
return true;
break
}
/* fallthrough? */
return false;
};
/* server's data retrieval method (not directly called by client) */
this.read = function(client,record) {
this.read = function(request,record) {
var client = request.client;
/* if the requested data does not exist, result is undefined */
if(record.data === undefined) {
send_packet(client,undefined,"RESPONSE");
......@@ -246,7 +252,8 @@ function JSONdb (fileName) {
};
/* pop a record off the end of an array */
this.pop = function(client,record) {
this.pop = function(request,record) {
var client = request.client;
/* if the requested data does not exist, result is undefined */
if(record.data === undefined) {
send_packet(client,undefined,"RESPONSE");
......@@ -268,7 +275,8 @@ function JSONdb (fileName) {
}
/* shift a record off the end of an array */
this.shift = function(client,record) {
this.shift = function(request,record) {
var client = request.client;
/* if the requested data does not exist, result is undefined */
if(record.data === undefined) {
send_packet(client,undefined,"RESPONSE");
......@@ -290,7 +298,9 @@ function JSONdb (fileName) {
}
/* push a record onto the end of an array */
this.push = function(client,record,data) {
this.push = function(request,record) {
var client = request.client;
var data = request.data;
/* if the requested data does not exist, result is undefined */
if(record.data === undefined) {
this.error(client,errors.OBJECT_NOT_FOUND);
......@@ -316,7 +326,9 @@ function JSONdb (fileName) {
}
/* push a record onto the end of an array */
this.unshift = function(client,record,data) {
this.unshift = function(request,record) {
var client = request.client;
var data = request.data;
/* if the requested data does not exist, result is undefined */
if(record.data === undefined) {
this.error(client,errors.OBJECT_NOT_FOUND);
......@@ -342,8 +354,9 @@ function JSONdb (fileName) {
}
/* server's data submission method (not directly called by client) */
this.write = function(client,record,data) {
this.write = function(request,record) {
var client = request.client;
var data = request.data;
/* if this client has this record locked */
if(record.info.lock[client.id] &&
record.info.lock[client.id].type == locks.WRITE) {
......@@ -360,7 +373,8 @@ function JSONdb (fileName) {
};
/* retrieve a list of object keys */
this.keys = function(client,record) {
this.keys = function(request,record) {
var client = request.client;
var keys=[];
/* if the requested data does not exist, result is undefined */
if(record.data === undefined) {
......@@ -381,7 +395,8 @@ function JSONdb (fileName) {
}
/* remove a record from the database (requires WRITE_LOCK) */
this.remove = function(client,record) {
this.remove = function(request,record) {
var client = request.client;
/* if the requested data does not exist, do nothing */
if(record.data === undefined)
return true;
......@@ -399,7 +414,8 @@ function JSONdb (fileName) {
};
/* return an object representing a record's overall subscriber and lock status */
this.status = function(client,record) {
this.status = function(request,record) {
var client = request.client;
var sub=[];
for(var s in record.info.subscribers)
sub.push(s);
......@@ -414,7 +430,8 @@ function JSONdb (fileName) {
}
/* return a list of subscriptions and associated IP address for clients */
this.who = function(client,record) {
this.who = function(request,record) {
var client = request.client;
var data = get_subscriber_list(record);
send_packet(client,data,"RESPONSE");
return true;
......@@ -447,28 +464,26 @@ function JSONdb (fileName) {
client.system = query.system;
}
/* if an operation is requested */
if(query.oper !== undefined) {
if(!valid_oper(query.oper)) {
this.error(client,errors.INVALID_OPER);
return false;
}
if(query.oper == "LOCK" && !valid_lock(query.data)) {
this.error(client,errors.INVALID_LOCK);
return false;
}
/* if the requested operation is invalid, error */
if(!valid_oper(query.oper)) {
this.error(client,errors.INVALID_OPER);
return false;
}
request = new Request(client,query.oper,parent_name,child_name,query.data);
/* push this query into a queue to be processed at the next response cycle (this.cycle()) */;
q.push(request);
/* if the requested lock is invalid, error */
if(!valid_lock(query.lock)) {
this.error(client,errors.INVALID_LOCK);
return false;
}
q.push(new Request(
client,query.oper,parent_name,child_name,query.data,query.timeout
));
/* push this query into a queue to be processed at the next response cycle (this.cycle()) */;
/* if there is an attached lock operation, process accordingly */
if(query.lock !== undefined) {
if(!valid_lock(query.lock)) {
this.error(client,errors.INVALID_LOCK);
return false;
}
if(query.lock !== locks.NONE) {
/* put lock ahead of the operation in request queue */
q.unshift(new Request(
client,"LOCK",parent_name,child_name,query.lock
......@@ -480,7 +495,9 @@ function JSONdb (fileName) {
}
/* add the temporary queue to the main queue */
this.queue=this.queue.concat(q);
if(!this.queue[client.id])
this.queue[client.id]=[];
this.queue[client.id]=this.queue[client.id].concat(q);
}
/* generate an error object and send it to client */
......@@ -583,9 +600,9 @@ function JSONdb (fileName) {
/* schedule client for subscription and lock release */
this.release = function(client) {
this.queue.push(new Request(
client,"CLOSE"
));
if(!this.queue[client.id])
this.queue[client.id] = [];
this.queue[client.id].push(new Request(client,"CLOSE"));
}
/* release any locks or subscriptions held by a disconnected client */
......@@ -596,96 +613,97 @@ function JSONdb (fileName) {
/* release any subscriptions the client holds */
cancel_subscriptions(client,this.subscriptions[client.id]);
/* remove any remaining queries from the queue */
/* remove any remaining client queries */
fuh_queue(client,this.queue);
return true;
};
/* main "loop" called by server */
this.cycle = function(timestamp) {
this.cycle = function() {
/* process request queue, removing successful operations */
for(var r=0;r<this.queue.length;r++) {
var request=this.queue[r];
var result=false;
/* if we have been asked to clear a client's locks, DEW IT */
if(request.oper.toUpperCase() == "CLOSE") {
this.queue.splice(r--,1);
this.close(request.client);
continue;
}
/* locate the requested record within the database */
var record=identify_remains.call(
this,request.client,request.parent_name,request.child_name,
request.oper.toUpperCase() == "WRITE"
);
if(!record) {
log(LOG_WARNING,"db: bad request removed from queue");
this.queue.splice(r--,1);
for(var c in this.queue) {
if(!this.queue[c] || this.queue[c].length == 0) {
delete this.queue[c];
continue;
}
switch(request.oper.toUpperCase()) {
case "READ":
result=this.read(request.client,record);
break;
case "POP":
result=this.pop(request.client,record);
break;
case "SHIFT":
result=this.shift(request.client,record);
break;
case "WRITE":
result=this.write(request.client,record,request.data);
break;
case "KEYS":
result=this.keys(request.client,record);
break;
case "PUSH":
result=this.push(request.client,record,request.data);
break;
case "UNSHIFT":
result=this.unshift(request.client,record,request.data);
break;
case "DELETE":
result=this.remove(request.client,record);
break;
case "SUBSCRIBE":
result=this.subscribe(request.client,record);
break;
case "UNSUBSCRIBE":
result=this.unsubscribe(request.client,record);
break;
case "LOCK":
result=this.lock(request.client,record,request.data);
break;
case "STATUS":
result=this.status(request.client,record);
break;
case "WHO":
result=this.who(request.client,record);
break;
}
if(result == true) {
log(LOG_DEBUG,format("db: %s %s %s OK",
request.client.id,request.oper,record.location));
this.queue.splice(r--,1);
}
else {
log(LOG_DEBUG,format("db: %s %s %s FAILED",
request.client.id,request.oper,record.location));
for(var r=0;r<this.queue[c].length;r++) {
var request=this.queue[c][r];
var result=false;
/* if we have been asked to clear a client's locks, DEW IT */
if(request.oper.toUpperCase() == "CLOSE") {
this.queue[c].splice(r--,1);
this.close(request.client);
break;
}
/* locate the requested record within the database */
var record=identify_remains.call(
this,request.client,request.parent_name,request.child_name,
request.oper.toUpperCase() == "WRITE"
);
/* if there was an error parsing object location, delete request */
if(!record) {
log(LOG_WARNING,"db: bad request removed from queue");
this.error(request.client,errors.INVALID_REQUEST);
this.queue[c].splice(r--,1);
continue;
}
switch(request.oper.toUpperCase()) {
case "READ":
result=this.read(request,record);
break;
case "POP":
result=this.pop(request,record);
break;
case "SHIFT":
result=this.shift(request,record);
break;
case "WRITE":
result=this.write(request,record);
break;
case "KEYS":
result=this.keys(request,record);
break;
case "PUSH":
result=this.push(request,record);
break;
case "UNSHIFT":
result=this.unshift(request,record);
break;
case "DELETE":
result=this.remove(request,record);
break;
case "SUBSCRIBE":
result=this.subscribe(request,record);
break;
case "UNSUBSCRIBE":
result=this.unsubscribe(request,record);
break;
case "LOCK":
result=this.lock(request,record);
break;
case "STATUS":
result=this.status(request,record);
break;
case "WHO":
result=this.who(request,record);
break;
}
/* if the request did not succeed, move to the next user queue */
if(!check_result(request,record,result))
break;
this.queue[c].splice(r--,1);
}
}
if(!this.settings.UPDATES)
return;
if(!this.settings.SAVE_INTERVAL > 0)
return;
if(!timestamp)
timestamp = Date.now();
if(timestamp - this.settings.LAST_SAVE < (this.settings.SAVE_INTERVAL * 1000))
if(Date.now() - this.settings.LAST_SAVE < (this.settings.SAVE_INTERVAL * 1000))
return;
this.save();
};
......@@ -728,12 +746,14 @@ function JSONdb (fileName) {
/* request object generated by queue() method
contains the requested object parent, the specific child property requested,
data (in the case of a PUT operation ) */
function Request(client,operation,parent_name,child_name,data) {
function Request(client,operation,parent_name,child_name,data,timeout) {
this.client=client;
this.oper=operation;
this.parent_name=parent_name;
this.child_name=child_name;
this.data=data;
this.time=Date.now();
this.timeout=timeout;
}
/*************************** database functions ****************************/
......@@ -830,14 +850,12 @@ function JSONdb (fileName) {
/* remove any remaining client queries from queue */
function fuh_queue(client,queue) {
for(var c=0;c<queue.length;c++) {
var query = queue[c];
if(query.client.id == client.id) {
log(LOG_DEBUG,format("removing query: %s %s.%s",
query.oper,query.parent_name,query.child_name));
queue.splice(c--,1);
}
while(queue[client.id] && queue[client.id].length > 0) {
var query = queue[client.id].shift();
log(LOG_DEBUG,format("removing query: %s %s.%s",
query.oper,query.parent_name,query.child_name));
}
delete queue[client.id];
}
/* release subscriptions on an object recursively */
......@@ -879,6 +897,38 @@ function JSONdb (fileName) {
return info;
}
/* check the result of a request */
function check_result(request,record,result) {
if(result) {
log(LOG_DEBUG,format("db: %s %s %s OK",
request.client.id,request.oper,record.location));
if(request.timeout >= 0) {
switch(request.oper.toUpperCase()) {
case "WRITE":
case "PUSH":
case "UNSHIFT":
case "DELETE":
case "SUBSCRIBE":
case "UNSUBSCRIBE":
case "LOCK":
//notify client of success on non-read operations
send_packet(request.client,true,"RESPONSE");
break;
}
}
return true;
}
else {
log(LOG_DEBUG,format("db: %s %s %s FAILED",
request.client.id,request.oper,record.location));
if(request.timeout >= 0 && timeout_expired(request)) {
send_packet(request.client,false,"RESPONSE");
return true;
}
return false;
}
}
/* send updates of this object to all subscribers */
function send_data_updates(client,record) {
var data = {
......@@ -966,6 +1016,11 @@ function JSONdb (fileName) {
return str;
}
/* calculate timeout */
function timeout_expired(request) {
return (Date.now() - request.time >= request.timeout);
}
/* count object members */
function count(obj) {
var c=0;
......@@ -989,3 +1044,5 @@ function JSONdb (fileName) {
};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment