Skip to content
Snippets Groups Projects
Commit 5b3b3de1 authored by mcmlxxix's avatar mcmlxxix
Browse files

release locks and subscriptions when client disconnects. send object location...

release locks and subscriptions when client disconnects. send object location and object itself with update packets.
parent 6c4b1eb3
No related branches found
No related tags found
No related merge requests found
load("synchronet-json.js");
/*
JSON database - for Synchronet 3.15a+ (2011)
......@@ -11,17 +10,16 @@ load("synchronet-json.js");
methods:
- Repository.query(client,query);
- Repository.cycle(Date.now());
- Repository.load();
- Repository.save(Date.now());
- Repository.lock(client,record,child_name,lock_type);
- Repository.read(client,record,child_name);
- Repository.write(client,record,child_name,data);
- Repository.create(client,record,child_name,data);
- Repository.delete(client,record,child_name);
- Repository.subscribe(client,record,child_name);
- Repository.unsubscribe(client,record,child_name);
- Database.query(client,query);
- Database.cycle(Date.now());
- Database.load();
- Database.save(Date.now());
- Database.lock(client,record,child_name,lock_type);
- Database.read(client,record,child_name);
- Database.write(client,record,child_name,data);
- Database.delete(client,record,child_name);
- Database.subscribe(client,record,child_name);
- Database.unsubscribe(client,record,child_name);
optional arguments:
......@@ -29,11 +27,13 @@ load("synchronet-json.js");
*/
if(!this.Repository) {
if(!this.Database) {
Repository = new (function() {
Database = new (function() {
this.VERSION = "$Revision$".split(' ')[1];
var settings={
this.settings={
/* record lock constants, incremental (do not change the order or value) */
LOCK_UNLOCK:-1,
......@@ -57,8 +57,8 @@ Repository = new (function() {
ERROR_DUPLICATE_LOCK:5
};
/**************************** repository objects *****************************/
/* locking object generated by Repository.lock() method
/**************************** database objects *****************************/
/* locking object generated by Database.lock() method
contains the type of lock requested, and the time at which the request came in
TODO: possibly "expire" unfulfilled lock requests after a certain period */
function Lock(lock_type,timestamp) {
......@@ -70,29 +70,7 @@ Repository = new (function() {
function Error(error_num,error_desc) {
this.operation="ERROR";
this.error_num=error_num;
switch(error_num) {
case settings.ERROR_INVALID_REQUEST:
this.error_desc="Invalid record request";
break;
case settings.ERROR_OBJECT_NOT_FOUND:
this.error_desc="Record not found";
break;
case settings.ERROR_NOT_LOCKED:
this.error_desc="Record not locked";
break;
case settings.ERROR_LOCKED_WRITE:
this.error_desc="Record locked for writing";
break;
case settings.ERROR_LOCKED_READ:
this.error_desc="Record locked for reading";
break;
case settings.ERROR_DUPLICATE_LOCK:
this.error_desc="Duplicate record lock request";
break;
default:
this.error_desc="Unknown error";
break;
}
this.error_desc=error_desc;
}
/* shadow properties generated by composite_sketch()
......@@ -107,13 +85,15 @@ Repository = new (function() {
contains the requested object, its shadow object,
the prevailing lock state of that object (affected by parents/children),
and a list of subscribers associated with that object or its parents */
function Record(data,shadow,info) {
function Record(data,shadow,parent_name,child_name,info) {
this.data=data;
this.shadow=shadow;
this.parent_name=parent_name;
this.child_name=child_name;
this.info=info;
}
/* request object generated by Repository.queue() method
/* request object generated by Database.queue() method
contains the requested object parent, the specific child property requested,
data (in the case of a PUT operation ) */
function Request(client,operation,parent_name,child_name,data) {
......@@ -131,15 +111,13 @@ Repository = new (function() {
it would also allow for a much simpler mechanism for removing
games you no longer wish to host (simply delete the db file and edit your service inis) */
/*************************** repository data ******************************/
/*************************** database data ******************************/
/* database storage file */
if(file_exists(argv[0])) {
if(argv[0])
this.file=new File(argv[0]);
}
else {
this.file=new File(system.data_dir + "db.json");
}
else
this.file=undefined;
/* master database object */
this.data={};
......@@ -149,48 +127,54 @@ Repository = new (function() {
/* queued array of data requests (get/put/create/delete) */
this.queue=[];
/*************************** repository methods ****************************/
/*************************** database methods ****************************/
/* subscribe a client to an object */
this.subscribe = function(client,record,child_name) {
record.shadow[child_name]._subscribers[client.id]=client;
this.subscribe = function(client,record) {
record.shadow[record.child_name]._subscribers[client.id]=client;
/* TODO: track duplicate subscribers (deny?) and
expire existing subscribers after certain amount of time, maybe */
return true;
};
/* unsubscribe a client from an object */
this.unsubscribe = function(client,record,child_name) {
delete record.shadow[child_name]._subscribers[client.id];
this.unsubscribe = function(client,record) {
delete record.shadow[record.child_name]._subscribers[client.id];
/* TODO: validate existing subscription
before attempting to delete */
return true;
};
/* The point of a read lock is *only* to prevent someone from
getting a write lock... just a simple counter is usually enough. */
this.lock = function(client,record,child_name,lock_type) {
this.lock = function(client,record,lock_type) {
switch(lock_type) {
/* if the client wants to read... */
case settings.LOCK_READ:
case this.settings.LOCK_READ:
/* if this is a duplicate lock attempt */
if(record.info.lock[client.id]) {
this.error(client,this.settings.ERROR_DUPLICATE_LOCK);
return true;
}
switch(record.info.lock_type) {
case settings.LOCK_READ:
case this.settings.LOCK_READ:
/* if there are any pending write locks, deny */
if(record.info.lock_pending) {
return false;
}
/* otherwise, we can read lock */
else {
record.shadow[child_name]._lock[client.id] = new Lock(
record.shadow[record.child_name]._lock[client.id] = new Lock(
lock_type,
Date.now()
);
return true;
}
/* we cant lock a record that is already locked for reading */
case settings.LOCK_WRITE:
case this.settings.LOCK_WRITE:
return false;
/* if the record isnt locked at all, we can lock */
case settings.LOCK_NONE:
record.shadow[child_name]._lock[client.id] = new Lock(
case this.settings.LOCK_NONE:
record.shadow[record.child_name]._lock[client.id] = new Lock(
lock_type,
Date.now()
);
......@@ -198,33 +182,39 @@ Repository = new (function() {
}
break;
/* if the client wants to write... */
case settings.LOCK_WRITE:
case this.settings.LOCK_WRITE:
/* if this is a duplicate lock attempt */
if(record.info.lock[client.id]) {
this.error(client,this.settings.ERROR_DUPLICATE_LOCK);
return true;
}
switch(record.info.lock_type) {
/* ...and the record is already locked, flag for pending write lock */
case settings.LOCK_READ:
case settings.LOCK_WRITE:
record.shadow[child_name]._lock_pending=true;
case this.settings.LOCK_READ:
case this.settings.LOCK_WRITE:
record.shadow[record.child_name]._lock_pending=true;
return false;
/* ...and the record isnt locked, lock for writing and remove flag */
case settings.LOCK_NONE:
record.shadow[child_name]._lock[client.id] = new Lock(
case this.settings.LOCK_NONE:
record.shadow[record.child_name]._lock[client.id] = new Lock(
lock_type,
Date.now()
);
record.shadow[child_name]._lock_pending=false;
record.shadow[record.child_name]._lock_pending=false;
return true;
}
break;
/* if the client wants to unlock, check credentials */
case settings.LOCK_UNLOCK:
case this.settings.LOCK_UNLOCK:
/* if the client has a lock on this record, release the lock */
if(record.shadow[child_name]._lock[client.id]) {
delete record.shadow[child_name]._lock[client.id];
if(record.shadow[record.child_name]._lock[client.id]) {
delete record.shadow[record.child_name]._lock[client.id];
return true;
}
/* otherwise deny */
else {
return false;
this.error(client,this.settings.ERROR_NOT_LOCKED);
return true;
}
}
/* fallthrough? */
......@@ -232,163 +222,188 @@ Repository = new (function() {
};
/* server's data retrieval method (not directly called by client) */
this.read = function(client,record,child_name) {
this.read = function(client,record) {
/* if this client has this record locked, read */
if(record.shadow[child_name]._lock[client.id]) {
client.send(record.data[child_name]);
if(record.info.lock[client.id]) {
var data = clone(record.data[record.child_name]);
send_packet(client,data,"RESPONSE");
return true;
}
/* if there is no lock for this client, error */
else {
client.send(new Error(settings.ERROR_NOT_LOCKED));
return false;
}
};
/* server's data submission method (not directly called by client) */
this.write = function(client,record,child_name,data) {
this.write = function(client,record,data) {
/* if this client has this record locked */
if(record.shadow[child_name]._lock[client.id] &&
record.shadow[child_name]._lock[client.id].type == settings.LOCK_WRITE) {
record.data=data;
record.shadow=composite_sketch(data);
if(record.info.lock[client.id] &&
record.info.lock[client.id].type == this.settings.LOCK_WRITE) {
record.data[record.child_name]=data;
/* populate this object's children with shadow objects */
composite_sketch(record.data[record.child_name],record.shadow[record.child_name]);
/* send data updates to all subscribers */
send_updates(record);
this.updates=true;
return true;
}
/* if there is no lock for this client, error */
else {
client.send(new Error(settings.ERROR_NOT_LOCKED));
return false;
}
};
/* remove a record from the database (requires WRITE_LOCK) */
this.delete = function(client,record,child_name) {
this.delete = function(client,record) {
/* if this client has this record locked */
if(record.shadow[child_name]._lock[client.id] &&
record.shadow[child_name]._lock[client.id].type == settings.LOCK_WRITE) {
delete record.data[child_name];
delete record.shadow[child_name];
if(record.shadow[record.child_name]._lock[client.id] &&
record.shadow[record.child_name]._lock[client.id].type == this.settings.LOCK_WRITE) {
delete record.data[record.child_name];
delete record.shadow[record.child_name];
/* send data updates to all subscribers */
send_updates(record);
this.updates=true;
return true;
}
/* if there is no lock for this client, error */
else {
client.send(new Error(settings.ERROR_NOT_LOCKED));
this.error(client,this.settings.ERROR_NOT_LOCKED);
return false;
}
};
/* create a new database record (requires WRITE_LOCK)
TODO: it seems this method may end up doing the exact same thing
as the put() method, do we need it? can we lock an object that
doesn't yet exist? does it matter? */
this.create = function(client,record,child_name,data) {
/* let parent adopt this new child */
record.data[child_name] = data;
/* create shadow data for this object */
record.shadow[child_name] = composite_sketch(data);
/* send data updates to all subscribers */
send_updates(record);
/* return an object representing a record's overall subscriber and lock status */
this.status = function(client,record) {
var sub=[];
for(var s in record.info.subscribers)
sub.push(s);
var data={
subscribers:sub,
lock:record.info.lock_type,
pending:record.info.lock_pending,
location:record.parent_name + "." + record.child_name
}
send_packet(client,data,"RESPONSE");
return true;
}
/* TODO: track subscribers and distribute updates */
return true;
};
/* generic query handler, will process locks, reads, writes, and unlocks
and put them into the appropriate queues */
this.query = function(client,query) {
/* strip the last child identifier from the string */
var parent_name = query.object_name.substring(0,query.object_name.lastIndexOf("."));
var parent_name = query.location.substring(0,query.location.lastIndexOf("."));
/* store the child name */
var child_name = query.object_name.substr(query.object_name.lastIndexOf(".")+1);
var child_name = query.location.substr(query.location.lastIndexOf(".")+1);
/* temporary array for queue additions */
var q=[];
/* push this query into a queue to be processed at the next response cycle (this.cycle()) */;
q.push(new Request(
client,
query.operation,
parent_name,
child_name,
query.data
));
/* if an operation is requested */
if(query.operation !== undefined) {
request = new Request(client,query.operation,parent_name,child_name,query.data);
/* push this query into a queue to be processed at the next response cycle (this.cycle()) */;
q.push(request);
}
/* if there is an attached lock operation, process accordingly */
switch(query.lock) {
/* if this is a read or write lock, put it ahead of the operation in the request queue */
case settings.LOCK_READ:
case settings.LOCK_WRITE:
q.unshift(new Request(
client,
"LOCK",
parent_name,
child_name,
query.lock
));
break;
/* if this is an unlock, process it after the operation in the request queue */
case settings.LOCK_UNLOCK:
q.push(new Request(
client,
"LOCK",
parent_name,
child_name,
query.lock
));
break;
/* if no lock has been specified, do nothing */
case settings.LOCK_NONE:
break;
if(query.lock !== undefined) {
request = new Request(client,"LOCK",parent_name,child_name,query.lock);
/* if this is a read or write lock, put it ahead of the operation in the request queue */
if(query.lock == this.settings.LOCK_UNLOCK)
q.unshift(request);
/* if this is an unlock, process it after the operation in the request queue */
else if(query.lock == this.settings.LOCK_READ || query.lock == this.settings.LOCK_WRITE)
q.push(request);
}
/* add the temporary queue to the main queue */
this.queue=this.queue.concat(q);
}
/* generate an error object and send it to client */
this.error = function(client,error_num) {
var error_desc="Unknown error";
switch(error_num) {
case this.settings.ERROR_INVALID_REQUEST:
error_desc="Invalid record request";
break;
case this.settings.ERROR_OBJECT_NOT_FOUND:
error_desc="Record not found";
break;
case this.settings.ERROR_NOT_LOCKED:
error_desc="Record not locked";
break;
case this.settings.ERROR_LOCKED_WRITE:
error_desc="Record locked for writing";
break;
case this.settings.ERROR_LOCKED_READ:
error_desc="Record locked for reading";
break;
case this.settings.ERROR_DUPLICATE_LOCK:
error_desc="Duplicate record lock request";
break;
}
var error=new Error(error_num,error_desc);
send_packet(client,error,"ERROR");
}
/* internal periodic data storage method
TODO: this should probably eventually be a background
thread to prevent lag when writing a large database to file */
this.save = function(timestamp) {
/* strip _location tags from data before saving */
dismember(this.data);
//TODO: create n backups before overwriting data file
this.file.open("w");
// This function gets called every 30 seconds or so
// And flushes all objects to disk in case of crash
// Also, this is called on clean exit.
this.file.write(JSON.stringify(this.data));
this.file.close();
settings.LAST_SAVE=timestamp;
if(!this.file)
return;
if(!timestamp)
timestamp = Date.now();
/* if we are due for a data update, save everything to file */
if(timestamp - this.settings.LAST_SAVE >= this.settings.AUTO_SAVE
&& this.updates) {
//TODO: create n backups before overwriting data file
this.file.open("w");
// This function gets called every 30 seconds or so
// And flushes all objects to disk in case of crash
// Also, this is called on clean exit.
this.file.write(JSON.stringify(this.data));
this.file.close();
this.settings.LAST_SAVE=timestamp;
}
};
/* data initialization (likely happens only once) */
this.load = function() {
this.file.open("r");
this.data=JSON.parse(this.file.readAll(settings.FILE_BUFFER).join('\n'));
this.file.close();
if(!this.file)
return;
if(file_exists(this.file.name)) {
this.file.open("r");
this.data = JSON.parse(
this.file.readAll(this.settings.FILE_BUFFER).join('\n')
);
this.file.close();
}
};
/* create a copy of data object keys and give them
locking properties */
this.init=function() {
this.init = function() {
this.load();
this.shadow=composite_sketch(this.data);
/* initialize autosave timer */
settings.LAST_SAVE = Date.now();
this.settings.LAST_SAVE = Date.now();
log(LOG_INFO,"JSON database initialized (v" + this.VERSION + ")");
}
/* main "loop" called by server */
this.cycle=function(timestamp) {
if(!timestamp)
timestamp = Date.now();
/* release any locks or subscriptions held by a disconnected client */
this.release = function(client_id) {
free_prisoner(client_id,this.shadow);
for(var c=0;c<this.queue.length;c++) {
if(this.queue[c].client.id == client_id)
this.queue.splice(c--,1);
}
}
/* if we are due for a data update, save everything to file */
if(timestamp - settings.LAST_SAVE >= settings.AUTO_SAVE)
this.save(timestamp);
/* main "loop" called by server */
this.cycle = function(timestamp) {
this.save();
/* process request queue, removing successful operations */
for(var r=0;r<this.queue.length;r++) {
......@@ -396,137 +411,166 @@ Repository = new (function() {
var result=false;
/* locate the requested record within the database */
var record=identify_remains.call(this,request.client,request.parent_name);
var record=identify_remains.call(this,request.client,request.parent_name,request.child_name);
if(!record) {
log(LOG_DEBUG,"db: bad request removed from queue");
this.queue.splice(r--,1);
}
switch(request.operation.toUpperCase()) {
case "READ":
result=this.read(request.client,record,request.child_name);
result=this.read(request.client,record);
break;
case "WRITE":
result=this.write(request.client,record,request.child_name,request.data);
result=this.write(request.client,record,request.data);
break;
case "DELETE":
result=this.delete(request.client,record,request.child_name);
result=this.delete(request.client,record);
break;
case "CREATE":
result=this.create(request.client,record,request.child_name,request.data);
result=this.create(request.client,record,request.data);
break;
case "SUBSCRIBE":
result=this.subscribe(request.client,record,request.child_name);
result=this.subscribe(request.client,record);
break;
case "UNSUBSCRIBE":
result=this.unsubscribe(request.client,record,request.child_name);
result=this.unsubscribe(request.client,record);
break;
case "LOCK":
result=this.lock(request.client,record,request.child_name,request.data);
result=this.lock(request.client,record,request.data);
break;
case "STATUS":
result=this.status(request.client,record);
break;
}
if(result == true) {
log(LOG_DEBUG,
"done: " +
log(LOG_DEBUG,"db: " +
request.operation + " " +
request.parent_name + "." +
request.child_name
request.child_name + " OK"
);
this.queue.splice(r--,1);
}
else {
log(LOG_DEBUG,
"fail: " +
log(LOG_DEBUG,"db: " +
request.operation + " " +
request.parent_name + "." +
request.child_name
);
request.child_name + " FAILED"
);
}
}
}
};
/*************************** repository functions ****************************/
/*************************** database functions ****************************/
/* traverse object and create a shadow copy of the object structure
for record locking and subscribers, and create hash names for database objects */
function composite_sketch(obj,name,shadow,location) {
/* generate serialized location (maybe this can be removed )*/
if(!location)
location=name;
else
location+="."+name;
for record locking and subscribers, and create location names for database objects */
function composite_sketch(obj,shadow) {
/* create shadow object */
shadow=new Shadow();
if(!shadow)
shadow=new Shadow();
/* iterate object members */
for(var p in obj) {
shadow[p]=composite_sketch(obj[p],p,shadow[p],location);
if(typeof obj[p] == "object")
shadow[p]=composite_sketch(obj[p],shadow[p]);
}
/* assign generated location id to this object */
obj._location=location;
/* returns an object containing the passed objects property keys
with their own lock, subscribers, and name properties
also adds a location property to keys of original object */
return shadow;
}
/* traverse object and remove location (hash name) tags */
function dismember(obj) {
delete obj._location;
for(var p in obj) {
dismember(obj[p]);
}
}
/* parse an object hash name and return the object (ex: dicewarz2.games.1.players.1.tiles.0)
/* parse an object location name and return the object (ex: dicewarz2.games.1.players.1.tiles.0)
an object containing the corresponding data and its shadow object */
function identify_remains(client,object_name) {
function identify_remains(client,parent_name,child_name) {
var p=object_name.split(/\./);
/* if the data request is invalid or empty, return an empty object */
var p=parent_name.split(/\./);
/* if the data request is invalid or empty, return an error */
if(!p) {
// Error!
client.send(new Error(settings.ERROR_INVALID_REQUEST));
this.error(client,this.settings.ERROR_INVALID_REQUEST);
return false;
}
var data=this.data;
var shadow=this.shadow;
var info={
lock_type:settings.LOCK_NONE,
lock_pending:false
lock:[],
lock_type:this.settings.LOCK_NONE,
lock_pending:false,
subscribers:[]
}
/* iterate through split object name checking the keys against the database and
checking the lock statuses against the shadow copy */
for each(var c in p) {
verify(data,shadow,c);
/* keep track of current object, and store the immediate parent of the request object */
data=data[c];
shadow=shadow[c];
/* if this key doesnt exist in the database, return error */
if(!data || !shadow) {
client.send(new Error(settings.ERROR_OBJECT_NOT_FOUND));
return false;
}
/* check the current object's lock and subscriber status along the way */
info = investigate(shadow,info);
}
verify(data,shadow,child_name);
/* continue on through the selected shadow object's children to check for locked children */
info = search_party(data,shadow,info);
info = search_party(data[child_name],shadow[child_name],info);
/* return selected database object, shadow object, and overall lock status of the chosen tree */
return new Record(data,shadow,info);
return new Record(data,shadow,parent_name,child_name,info);
}
/* if the requested child object does not exist, create it */
function verify(data,shadow,child_name) {
if(!data[child_name]) {
log(LOG_DEBUG,"creating new data: " + child_name);
data[child_name]={};
}
if(!shadow[child_name]) {
log(LOG_DEBUG,"creating new shadow: " + child_name);
shadow[child_name]=new Shadow();
}
}
/* release subscriptions and locks on an object recursively */
function free_prisoner(client_id,shadow) {
if(shadow._lock) {
if(shadow._lock[client_id]) {
log(LOG_DEBUG,"releasing lock: " + client_id);
delete shadow._lock[client_id];
}
}
if(shadow._subscribers) {
if(shadow._subscribers[client_id]) {
log(LOG_DEBUG,"releasing subscriber: " + client_id);
delete shadow._subscribers[client_id];
}
}
for(var s in shadow) {
if(typeof shadow[s] == "object")
free_prisoner(client_id,shadow[s]);
}
}
/* make a copy of an object to avoid modifying the original */
function clone(obj) {
var newobj=eval(obj.toSource());
return newobj;
}
/* return the prevailing lock type and pending lock status for an object */
function investigate(shadow, info) {
/* if we havent found a write locked record yet, keep searching */
if(info.lock_type == undefined) {
for(var l in shadow._lock) {
info.lock_type = shadow._lock[l].type;
break;
info.lock[l] = shadow._lock[l];
}
}
for(var s in shadow._subscribers) {
......@@ -552,12 +596,24 @@ Repository = new (function() {
/* send updates of this object to all subscribers */
function send_updates(record) {
for each(var c in record.info.subscribers) {
c.send(JSON.stringify(record.data)+"\r\n");
var data = {
location:record.parent_name + "." + record.child_name,
data:record.data[record.child_name]
};
send_packet(c,data,"UPDATE");
}
}
/* send data packet to a client */
function send_packet(client,object,func) {
var data={
func:func,
data:object
};
client.sendJSON(data);
}
/* constructor */
this.load();
this.init();
})();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment