diff --git a/exec/commservice.js b/exec/commservice.js index 2d9fc0e8613fe190b6e94cc0d77ef856fc67f3b6..07d7447d67f280f11ec7ec0da56c35bd6186c7bc 100644 --- a/exec/commservice.js +++ b/exec/commservice.js @@ -29,201 +29,178 @@ */ load("funclib.js"); +load("synchronet-json.js"); -const version= "$Revision$"; -//Connection type constants -const REMOTE= "*"; -const LOCAL= "&"; -const FILESYNC= "@"; -const QUERY= "?"; +const version= "$Revision$"; +const hub_address= argv[0]; //default: bbs.thebrokenbubble.com +const hub_port= argv[1]; //default: 10088 +const is_hub= (hub_address&&hub_port)?false:true; -const hub_address= argv[0]; -const hub_port= argv[1]; -const connection_timeout= 5;//SECONDS -const connection_interval= 10; -const connection_attempts= 10; +const REMOTE= "*"; +const LOCAL= "&"; +const FILESYNC= "@"; +const QUERY= "?"; +const CONNECTION_TIMEOUT= 5;//SECONDS +const CONNECTION_INTERVAL= 10; +const CONNECTION_ATTEMPTS= 10; +const MAX_BUFFER= 512; +const MAX_RECV= 10240; -var local_sessions=[]; -var remote_sessions=[]; -var data_queue=[]; var hub=new Server(hub_address,hub_port); +var data_queue=[]; +var remote_sessions=[]; +var local_sessions=[]; +var awaiting_auth=[]; -//server.socket.debug = false; -//server.socket.nonblocking = true; - +server.socket.nonblocking = true; //main service loop -while(!js.terminated) +function cycle() { - try { - sock_cycle(); - if(!server.socket.poll(0.1)) - { - if(js.terminated) break; - continue; + while(!js.terminated) { + try { + if(sessions_active()) { + authenticate(); + inbound(); + outbound(); + } + if(!server.socket.poll(0.1)) { + if(js.terminated) break; + continue; + } + store_socket(server.socket.accept(),awaiting_auth); + } catch(e) { + debug(e,LOG_WARNING); + break; } - store_socket(server.socket.accept()); - } catch(e) { - debug(e,LOG_WARNING); } + if(hub.enabled) hub.disconnect(); } -if(hub.enabled) hub.disconnect(); - -function sock_cycle() +function inbound() { - if(sessions_active()) { - if(hub.enabled) hub.cycle(); - inbound(); - outbound(); - //send_updates(); - } else { - if(hub.enabled) hub.disconnect(); + if(hub.enabled) hub.receive(); + for(var s in local_sessions) { + receive_from(local_sessions[s]); } + receive_from(remote_sessions); } -function inbound() +function outbound() { - //read data from each socket - for(var l in local_sessions) { - for(var s=0;s<local_sessions[l].length;) { - var session=local_sessions[l][s]; - if(!session.receive()) delete_session(local_sessions[l],s); - else s++; - } + if(hub.enabled) { + hub.cycle(); } - for(var r=0;r<remote_sessions.length;) { - var session=remote_sessions[r]; - if(!session.receive()) delete_session(remote_sessions,r); - else r++; + for(var a=0;a<awaiting_auth.length;a++) { + if(!awaiting_auth[a].cycle()) { + delete_session(awaiting_auth,a); + a--; + } } -} -function outbound() -{ - //send all data to hub and to appropriate LOCAL socket group - while(data_queue.length) { - var data=data_queue.shift(); - if(hub.enabled) hub.send(data); - for(var r=0;r<remote_sessions.length;) { - if(!remote_sessions[r].send(data)) delete_session(remote_sessions,r); - else r++; + for(var r=0;r<remote_sessions.length;r++) { + if(!remote_sessions[r].cycle()) { + delete_session(remote_sessions,r); + r--; } - for(var s=0;local_sessions[data.id] && s<local_sessions[data.id].length;) { - if(!local_sessions[data.id][s].send(data)) delete_session(local_sessions[data.id],s); - else s++; + } + for(var s in local_sessions) { + for(var i=0;i<local_sessions[s].length;i++) { + if(!local_sessions[s][i].cycle()) { + delete_session(local_sessions[s],i); + i--; + } } } } -function parse_inbound(socket) +function receive_from(sock_array) { - if(!testSocket(socket)) { - debug("socket test failed",LOG_WARNING); - return false; + var socks=socket_select(sock_array); + for(var s in socks) { + var sock=sock_array[socks[s]]; + var data=sock.recvline(MAX_RECV); + if(data == null) return false; + data=JSON.parse(data); + queue(sock,data); } - var incoming=socket.recvline(1024,connection_timeout); - if(incoming != null) { - return js.eval(incoming); - } else return false; } -function socket_send(data,socket) +function authenticate() { - if(!testSocket(socket)) return false; - if(data.descriptor != socket.descriptor) { - var d=data.toSource() + "\r\n"; - socket.write(d); + var socks=socket_select(awaiting_auth); + for(var s in socks) { + var sock=awaiting_auth[socks[s]]; + if(!sock) continue; + debug("authenticating socket"); + var data=sock.recvline(); + if(data == null) continue; + data=JSON.parse(data); + switch(data.context) { + case LOCAL: + if(!local_sessions[data.id]) local_sessions[data.id]=[]; + local_sessions[data.id].push(sock); + log("local connection: " + data.alias + " running " + data.id); + break; + case REMOTE: + if(data.version==version) { + remote_sessions.push(sock); + log("remote connection: " + data.system); + break; + } else { + debug("incompatible with remote server version: " + data.version,LOG_WARNING); + sock.close(); + continue; + } + default: + debug("unknown connection type: " + data.context); + sock.close(); + continue; + } + sock.id=data.id; + sock.context=data.context; + sock.alias=data.alias; + sock.system=data.system; + server.client_add(sock); + awaiting_auth.splice(socks[s],1); + queue(sock,data); + log("clients: " + server.clients); } - return true; } -function delete_session(array,index) -{ - log("session terminated: " + array[index].address); - server.client_remove(array[index].sock); - array.splice(index,1); - log("clients: " + server.clients); -} -function store_socket(sock) -{ - debug("connection from " + sock.remote_ip_address,LOG_DEBUG); - //receive connection identifier from incoming socket connection (should always be first transmission) - var data=parse_inbound(sock); - if(!data) { - debug("timed out waiting for handshake",LOG_WARNING); - sock.write("please visit http://cvs.synchro.net and verify that your files are up to date\r\n"); - sock.close(); - return false; - } - if(!data.context) { - debug("invalid handshake",LOG_WARNING); - sock.write("please visit http://cvs.synchro.net and verify that your files are up to date\r\n"); - sock.close(); - return false; - } - switch(data.context) - { - case REMOTE: - add_remote_session(sock,data); - break; - case LOCAL: - add_local_session(sock,data); - break; +function queue(sock,data) +{ + data.descriptor=sock.descriptor; + switch(data.type) { case FILESYNC: - handle_filesync(sock,data); + if(!is_hub && hub.enabled) { + hub.enqueue(data); + } else if(is_hub && sock.context != LOCAL) { + handle_filesync(sock,data); + } break; case QUERY: - handle_query(sock,data); + if(!is_hub && hub.enabled) { + hub.enqueue(data); + } else if(is_hub) { + handle_query(sock,data); + } break; default: - debug("unknown connection type: " + data.context,LOG_WARNING); - sock.close(); - return false; - } - return true; -} -function add_remote_session(sock,data) -{ - if(!data.system) { - debug("invalid handshake",LOG_WARNING); - sock.write("please visit http://cvs.synchro.net and verify that your files are up to date\r\n"); - sock.close(); - return false; - } - var response=new Object(); - response.version=version; - - if(data.version != version) { - debug("incompatible with REMOTE server version: " + data.version,LOG_WARNING); - response.response="UPDATE"; - socket_send(response,sock); - sock.close(); - return false; - } - - log("REMOTE connection: " + data.system + "(" + sock.remote_ip_address + ")"); - response.response="OK"; - socket_send(response,sock); - remote_sessions.push(new Session(sock,data)); - server.client_add(sock); - log("clients: " + server.clients); - return true; -} -function add_local_session(sock,data) -{ - if(!data.alias || !data.id) { - debug("invalid handshake",LOG_WARNING); - debug(data,LOG_DEBUG); - sock.write("please visit http://cvs.synchro.net and verify that your files are up to date\r\n"); - sock.close(); - return false; + if(hub.enabled) { + hub.enqueue(data); + } + for(var r=0;r<remote_sessions.length;r++) { + remote_sessions[r].enqueue(data); + } + for(var s=0;local_sessions[data.id] && s<local_sessions[data.id].length;s++) { + local_sessions[data.id][s].enqueue(data); + } + break; } - log("LOCAL connection: " + data.alias); - if(!local_sessions[data.id]) local_sessions[data.id]=[]; - local_sessions[data.id].push(new Session(sock,data)); - server.client_add(sock); - log("clients: " + server.clients); - return true; } function sessions_active() { - if(remote_sessions.length) return true; - for(var l in local_sessions) { - if(local_sessions[l].length) return true; + if(awaiting_auth.length>0 || remote_sessions.length>0) { + return true; + } + for(var s in local_sessions) { + if(local_sessions[s].length>0) { + return true; + } } return false; } @@ -240,8 +217,161 @@ function count_remote_sockets() { return remote_sessions.length; } -function handle_query(socket,query) + +//FILESYNC +function handle_filesync(socket,query) /* Hub only! */ +{ + try { + var module=get_module(query.id); + if(!module) return false; + switch(query.command) + { + case "trysend": + sync_remote(socket,module.working_directory,query); + break; + case "tryrecv": + sync_local(socket,module.working_directory,query); + if(is_hub && socket.context==LOCAL) send_updates(socket,module.working_directory,query); + break; + case "dorecv": + if(recv_file(socket,module.working_directory,query)) { + if(is_hub) send_updates(socket,module.working_directory,query); + } + break; + case "dosend": + send_file(socket,module.working_directory,query); + break; + default: + debug("unknown sync request: " + query.command,LOG_WARNING); + break; + } + } catch(e) { + debug("FILESYNC ERROR: " + e,LOG_WARNING); + } + +} +function sync_remote(socket,dir,query) +{ + var files=directory(dir + file_getname(query.filemask)); + if(files.length>0) debug("sending " + files.length + " files",LOG_DEBUG); + else { + debug("file(s) not found: " + dir + query.filemask,LOG_WARNING); + return false; + } + for(var f=0;f<files.length;f++) { + query.command="tryrecv"; + query.filemask=file_getname(files[f]); + query.filedate=file_date(files[f]); + query.descriptor=server.socket.descriptor; + socket.enqueue(query); + } +} +function send_updates(socket,dir,query) +{ + for(r=0;r<remote_sessions.length;r++) { + if(remote_sessions[r].descriptor != socket.descriptor) { + log("sending node updates: " + query.filemask); + sync_remote(remote_sessions[r],dir,query); + } + } +} +function send_file(socket,dir,query) +{ + /* + "data" object already contains the properties + needed by commservice to recognize the information + as a file request + */ + data=load_file(dir + file_getname(query.filemask)); + if(data) { + data.descriptor=server.socket.descriptor; + data.id=query.id; + data.command="dorecv"; + data.type=FILESYNC; + socket.enqueue(data); + debug("file sent: " + data.filemask,LOG_DEBUG); + } +} +function sync_local(socket,dir,query) +{ + var r_filedate=query.filedate; + var l_filedate=file_date(dir+query.filemask); + if(compare_dates(l_filedate,r_filedate)) { + query.command="dosend"; + query.descriptor=server.socket.descriptor; + socket.enqueue(query); + } else { + debug("skipping file: " + query.filemask); + debug("local date: " + l_filedate + " remote date: " + r_filedate); + } +} +function recv_file(socket,dir,query) +{ + var filename=dir+file_getname(query.filemask); + log("receiving file: " + filename); + var file=new File(filename + ".tmp"); + file.open('w',true); + file.base64=true; + if(!file.is_open) { + log(LOG_WARNING,"error opening file: " + file.name); + return false; + } + file.writeAll(query.file); + log("received: " + query.filesize + " bytes"); + file.close(); + if(file_exists(filename+".bck")) file_remove(filename+".bck"); + file_rename(filename,filename+".bck"); + file_rename(file.name,filename); + file_utime(filename,time(),query.filedate); + return true; +} + +/* Non-socket file functions */ +function compare_dates(local,remote) +{ + if(local<0) return true; + //will reject files with a time_t older than the local copy + //if(local>remote) return true; + //will treat numbers with a difference of 1 or 0 as the same, due to issues with some file systems + if(Math.abs(local-remote)>1) return true; + else return false; +} +function load_file(filename) +{ + var d=new Object(); + var f=new File(filename); + f.open('r',true); + if(!f.is_open) { + log(LOG_WARNING,"error opening file: " + f.name); + return false; + } + f.base64=true; + d.file=f.readAll(); + f.close(); + d.filesize=file_size(filename); + d.filedate=file_date(filename); + d.filemask=file_getname(filename); + return d; +} +function get_module(id) { + var comminit=new File(system.ctrl_dir + "filesync.ini"); + comminit.open('r'); + var match=comminit.iniGetSections(id); + if(!match.length) { + debug("module not found, aborting: " + id,LOG_WARNING); + return false; + } + var module=comminit.iniGetObject(id); + comminit.close(); + return module; +} +//END FILESYNC + +//TODO QUERIES +function handle_query(socket,query) /* Hub only! */ +{ + log("query from: " + socket.remote_ip_address); switch(query.task) { case "who": @@ -265,35 +395,6 @@ function handle_query(socket,query) break; } } -function handle_filesync(socket,query) -{ - log("file sync request from: " + socket.remote_ip_address); - //background.push(load(true,"filesync.js",socket.descriptor,query.toSource(),hub.address,hub.port)); -} -function send_updates() -{ - if(background.length) - { - var bgq=background.shift(); - if(bgq.data_waiting) - { - var data=bgq.read(); - if(data) { - for(var r=0;r<remote_sessions.length;r++) - { - var session=remote_sessions[r]; - if(data.remote_ip_address != session.address) - { - load(true,"filesync.js",session.descriptor,data.toSource(),hub.address,hub.port); - } - } - } - } - else background.push(bgq); - } -} - -//TODO function whos_online(socket) { } @@ -315,153 +416,85 @@ function notify_sysop(msg) } //END TODO -function Session(socket,packet) -{ - this.sock=socket; - this.context=packet.context; - this.id=packet.id; - this.alias=packet.alias; - this.system=packet.system; - this.address=socket.remote_ip_address; - this.port=socket.remote_port; - this.descriptor=socket.descriptor; - this.receive=function() - { - if(!testSocket(this.sock)) return false; - if(this.sock.data_waiting) { - //store data in master array to be distributed later - var data=parse_inbound(this.sock); - if(data) { - switch(data.context) - { - case FILESYNC: - handle_filesync(this.sock,data); - break; - case QUERY: - handle_query(this.sock,data); - break; - default: - data.id=data.id?data.id:this.id; - data.descriptor=this.descriptor; - data_queue.push(data); - break; - } - } - } - return true; - } - this.send=function(data) - { - if(!socket_send(data,this.sock)) return false; - return true; - } -} function Server(addr,port) { this.address=addr; this.port=port; this.sock=false; - this.queue=[]; this.enabled=true; + this.queue=""; this.last_attempt=0; this.attempts=0; this.connect=function() { debug("connecting to " + this.address + " on port " + this.port,LOG_DEBUG); - this.sock=new Socket(); - this.sock.nonblocking=true; - this.sock.connect(this.address,this.port,connection_timeout); - if(this.sock.is_connected) + this.sock=get_server_socket(); + this.sock.connect(this.address,this.port,CONNECTION_TIMEOUT); + if(!this.sock.is_connected) { - //send node identifier and bbs name so hub knows this is a distribution point - var hello=new Object(); - hello.system=system.name; - hello.context=REMOTE; - hello.version=version; - socket_send(hello,this.sock); - var response=parse_inbound(this.sock); - if(response) { - switch(response.response) - { - case "OK": - debug("connection to " + this.address + " successful",LOG_DEBUG); - debug("REMOTE server " + response.version,LOG_DEBUG); - this.attempts=0; - this.last_attempt=0; - return true; - case "UPDATE": - var msg="service file out of date, please visit http://cvs.synchro.net/ to update to " + response.version; - debug(msg,LOG_WARNING); - notify_sysop(msg); - this.sock.close(); - this.enabled=false; - return false; - default: - debug("unknown response",LOG_WARNING); - debug(response,LOG_WARNING); - this.sock.close(); - break; - } - } - else { - debug("no hub response received",LOG_DEBUG); - this.sock.close(); - } - } else { - debug("connection to " + this.address + " failed with error " + this.sock.error,LOG_WARNING); - } - this.attempts++; - this.last_attempt=time(); - return false; + this.attempts++; + this.last_attempt=time(); + return false; + } + return true; } this.disconnect=function() { if(this.sock.is_connected) { log("disconnecting from server: " + this.address); - while(this.queue.length) { - var data=this.queue.shift(); - socket_send(data,this.sock); + while(this.sock.queue.length) { + if(!this.cycle()) break; } this.sock.close(); } } - this.cycle=function() + this.receive=function() { - if(!this.sock.is_connected) { - if(this.attempts>=connection_attempts) { + if(!this.sock || !this.sock.is_connected) { + if(this.attempts>=CONNECTION_ATTEMPTS) { this.enabled=false; return false; } - if((time()-this.last_attempt)>=connection_interval) { + if((time()-this.last_attempt)>=CONNECTION_INTERVAL) { this.connect(); } } - if(this.sock.is_connected) { - if(this.queue.length) { - var data=this.queue.shift(); - socket_send(data,this.sock); + if(this.sock.is_connected && this.sock.data_waiting) { + var data=this.sock.recvline(MAX_RECV); + if(data == null) return false; + data=JSON.parse(data); + switch(data.type) { + case FILESYNC: + handle_filesync(this.sock,data); + break; + case QUERY: + handle_query(this.sock,data); + break; + default: + queue(this.sock,data); + break; } - this.receive(); } } - this.receive=function() + this.enqueue=function(data) + { + this.sock.enqueue(data); + } + this.cycle=function() { - if(!testSocket(this.sock)) return false; - if(this.sock.data_waiting) { - //store data in master array to be distributed later - var data=parse_inbound(this.sock); - if(data) { - data.descriptor=this.sock.descriptor; - data_queue.push(data); + if(!this.sock || !this.sock.is_connected) { + if(this.attempts>=CONNECTION_ATTEMPTS) { + this.enabled=false; + return false; + } + if((time()-this.last_attempt)>=CONNECTION_INTERVAL) { + this.connect(); } } + if(this.sock) this.sock.cycle(); return true; } - this.send=function(data) - { - if(this.sock && (data.descriptor != this.sock.descriptor)) this.queue.push(data); - } if(!addr && !port) { this.enabled=false; @@ -471,3 +504,50 @@ function Server(addr,port) this.enabled=false; } } +function get_server_socket() +{ + var sock=new Socket(); + sock.queue=""; + sock.nonblocking=true; + sock.enqueue=function(data) { + if(data.descriptor==this.descriptor) return false; + data.context=REMOTE; + data.version=version; + data.system=system.name; + this.queue+=JSON.stringify(data)+"\r\n"; + } + sock.cycle=sock_cycle; + return sock; +} +function delete_session(array,index) +{ + log("session terminated: " + array[index].remote_ip_address); + server.client_remove(array[index]); + array[index].close(); + array.splice(index,1); + log("clients: " + server.clients); +} +function store_socket(sock,array) +{ + debug("connection from " + sock.remote_ip_address,LOG_DEBUG); + sock.nonblocking=true; + sock.queue=""; + sock.enqueue=sock_enqueue; + sock.cycle=sock_cycle; + array.push(sock); +} +function sock_cycle() +{ + if(!this.is_connected) return false; + if(this.queue.length>0 && this.write(this.queue.substr(0,MAX_BUFFER))) { + this.queue=this.queue.substr(MAX_BUFFER); + } + return true; +} +function sock_enqueue(data) +{ + if(data.descriptor==this.descriptor) return false; + this.queue+=JSON.stringify(data)+"\r\n"; +} + +cycle(); \ No newline at end of file