Skip to content
Snippets Groups Projects
Commit a4985939 authored by mcmlxxix's avatar mcmlxxix
Browse files

Where do I begin? see DIFF

parent 9b2cc301
Branches
Tags
No related merge requests found
......@@ -29,201 +29,178 @@
*/
load("funclib.js");
load("synchronet-json.js");
const version= "$Revision$";
//Connection type constants
const REMOTE= "*";
const LOCAL= "&";
const FILESYNC= "@";
const QUERY= "?";
const version= "$Revision$";
const hub_address= argv[0]; //default: bbs.thebrokenbubble.com
const hub_port= argv[1]; //default: 10088
const is_hub= (hub_address&&hub_port)?false:true;
const hub_address= argv[0];
const hub_port= argv[1];
const connection_timeout= 5;//SECONDS
const connection_interval= 10;
const connection_attempts= 10;
const REMOTE= "*";
const LOCAL= "&";
const FILESYNC= "@";
const QUERY= "?";
const CONNECTION_TIMEOUT= 5;//SECONDS
const CONNECTION_INTERVAL= 10;
const CONNECTION_ATTEMPTS= 10;
const MAX_BUFFER= 512;
const MAX_RECV= 10240;
var local_sessions=[];
var remote_sessions=[];
var data_queue=[];
var hub=new Server(hub_address,hub_port);
var data_queue=[];
var remote_sessions=[];
var local_sessions=[];
var awaiting_auth=[];
//server.socket.debug = false;
//server.socket.nonblocking = true;
server.socket.nonblocking = true;
//main service loop
while(!js.terminated)
function cycle()
{
try {
sock_cycle();
if(!server.socket.poll(0.1))
{
if(js.terminated) break;
continue;
while(!js.terminated) {
try {
if(sessions_active()) {
authenticate();
inbound();
outbound();
}
if(!server.socket.poll(0.1)) {
if(js.terminated) break;
continue;
}
store_socket(server.socket.accept(),awaiting_auth);
} catch(e) {
debug(e,LOG_WARNING);
break;
}
store_socket(server.socket.accept());
} catch(e) {
debug(e,LOG_WARNING);
}
if(hub.enabled) hub.disconnect();
}
if(hub.enabled) hub.disconnect();
function sock_cycle()
function inbound()
{
if(sessions_active()) {
if(hub.enabled) hub.cycle();
inbound();
outbound();
//send_updates();
} else {
if(hub.enabled) hub.disconnect();
if(hub.enabled) hub.receive();
for(var s in local_sessions) {
receive_from(local_sessions[s]);
}
receive_from(remote_sessions);
}
function inbound()
function outbound()
{
//read data from each socket
for(var l in local_sessions) {
for(var s=0;s<local_sessions[l].length;) {
var session=local_sessions[l][s];
if(!session.receive()) delete_session(local_sessions[l],s);
else s++;
}
if(hub.enabled) {
hub.cycle();
}
for(var r=0;r<remote_sessions.length;) {
var session=remote_sessions[r];
if(!session.receive()) delete_session(remote_sessions,r);
else r++;
for(var a=0;a<awaiting_auth.length;a++) {
if(!awaiting_auth[a].cycle()) {
delete_session(awaiting_auth,a);
a--;
}
}
}
function outbound()
{
//send all data to hub and to appropriate LOCAL socket group
while(data_queue.length) {
var data=data_queue.shift();
if(hub.enabled) hub.send(data);
for(var r=0;r<remote_sessions.length;) {
if(!remote_sessions[r].send(data)) delete_session(remote_sessions,r);
else r++;
for(var r=0;r<remote_sessions.length;r++) {
if(!remote_sessions[r].cycle()) {
delete_session(remote_sessions,r);
r--;
}
for(var s=0;local_sessions[data.id] && s<local_sessions[data.id].length;) {
if(!local_sessions[data.id][s].send(data)) delete_session(local_sessions[data.id],s);
else s++;
}
for(var s in local_sessions) {
for(var i=0;i<local_sessions[s].length;i++) {
if(!local_sessions[s][i].cycle()) {
delete_session(local_sessions[s],i);
i--;
}
}
}
}
function parse_inbound(socket)
function receive_from(sock_array)
{
if(!testSocket(socket)) {
debug("socket test failed",LOG_WARNING);
return false;
var socks=socket_select(sock_array);
for(var s in socks) {
var sock=sock_array[socks[s]];
var data=sock.recvline(MAX_RECV);
if(data == null) return false;
data=JSON.parse(data);
queue(sock,data);
}
var incoming=socket.recvline(1024,connection_timeout);
if(incoming != null) {
return js.eval(incoming);
} else return false;
}
function socket_send(data,socket)
function authenticate()
{
if(!testSocket(socket)) return false;
if(data.descriptor != socket.descriptor) {
var d=data.toSource() + "\r\n";
socket.write(d);
var socks=socket_select(awaiting_auth);
for(var s in socks) {
var sock=awaiting_auth[socks[s]];
if(!sock) continue;
debug("authenticating socket");
var data=sock.recvline();
if(data == null) continue;
data=JSON.parse(data);
switch(data.context) {
case LOCAL:
if(!local_sessions[data.id]) local_sessions[data.id]=[];
local_sessions[data.id].push(sock);
log("local connection: " + data.alias + " running " + data.id);
break;
case REMOTE:
if(data.version==version) {
remote_sessions.push(sock);
log("remote connection: " + data.system);
break;
} else {
debug("incompatible with remote server version: " + data.version,LOG_WARNING);
sock.close();
continue;
}
default:
debug("unknown connection type: " + data.context);
sock.close();
continue;
}
sock.id=data.id;
sock.context=data.context;
sock.alias=data.alias;
sock.system=data.system;
server.client_add(sock);
awaiting_auth.splice(socks[s],1);
queue(sock,data);
log("clients: " + server.clients);
}
return true;
}
function delete_session(array,index)
{
log("session terminated: " + array[index].address);
server.client_remove(array[index].sock);
array.splice(index,1);
log("clients: " + server.clients);
}
function store_socket(sock)
{
debug("connection from " + sock.remote_ip_address,LOG_DEBUG);
//receive connection identifier from incoming socket connection (should always be first transmission)
var data=parse_inbound(sock);
if(!data) {
debug("timed out waiting for handshake",LOG_WARNING);
sock.write("please visit http://cvs.synchro.net and verify that your files are up to date\r\n");
sock.close();
return false;
}
if(!data.context) {
debug("invalid handshake",LOG_WARNING);
sock.write("please visit http://cvs.synchro.net and verify that your files are up to date\r\n");
sock.close();
return false;
}
switch(data.context)
{
case REMOTE:
add_remote_session(sock,data);
break;
case LOCAL:
add_local_session(sock,data);
break;
function queue(sock,data)
{
data.descriptor=sock.descriptor;
switch(data.type) {
case FILESYNC:
handle_filesync(sock,data);
if(!is_hub && hub.enabled) {
hub.enqueue(data);
} else if(is_hub && sock.context != LOCAL) {
handle_filesync(sock,data);
}
break;
case QUERY:
handle_query(sock,data);
if(!is_hub && hub.enabled) {
hub.enqueue(data);
} else if(is_hub) {
handle_query(sock,data);
}
break;
default:
debug("unknown connection type: " + data.context,LOG_WARNING);
sock.close();
return false;
}
return true;
}
function add_remote_session(sock,data)
{
if(!data.system) {
debug("invalid handshake",LOG_WARNING);
sock.write("please visit http://cvs.synchro.net and verify that your files are up to date\r\n");
sock.close();
return false;
}
var response=new Object();
response.version=version;
if(data.version != version) {
debug("incompatible with REMOTE server version: " + data.version,LOG_WARNING);
response.response="UPDATE";
socket_send(response,sock);
sock.close();
return false;
}
log("REMOTE connection: " + data.system + "(" + sock.remote_ip_address + ")");
response.response="OK";
socket_send(response,sock);
remote_sessions.push(new Session(sock,data));
server.client_add(sock);
log("clients: " + server.clients);
return true;
}
function add_local_session(sock,data)
{
if(!data.alias || !data.id) {
debug("invalid handshake",LOG_WARNING);
debug(data,LOG_DEBUG);
sock.write("please visit http://cvs.synchro.net and verify that your files are up to date\r\n");
sock.close();
return false;
if(hub.enabled) {
hub.enqueue(data);
}
for(var r=0;r<remote_sessions.length;r++) {
remote_sessions[r].enqueue(data);
}
for(var s=0;local_sessions[data.id] && s<local_sessions[data.id].length;s++) {
local_sessions[data.id][s].enqueue(data);
}
break;
}
log("LOCAL connection: " + data.alias);
if(!local_sessions[data.id]) local_sessions[data.id]=[];
local_sessions[data.id].push(new Session(sock,data));
server.client_add(sock);
log("clients: " + server.clients);
return true;
}
function sessions_active()
{
if(remote_sessions.length) return true;
for(var l in local_sessions) {
if(local_sessions[l].length) return true;
if(awaiting_auth.length>0 || remote_sessions.length>0) {
return true;
}
for(var s in local_sessions) {
if(local_sessions[s].length>0) {
return true;
}
}
return false;
}
......@@ -240,8 +217,161 @@ function count_remote_sockets()
{
return remote_sessions.length;
}
function handle_query(socket,query)
//FILESYNC
function handle_filesync(socket,query) /* Hub only! */
{
try {
var module=get_module(query.id);
if(!module) return false;
switch(query.command)
{
case "trysend":
sync_remote(socket,module.working_directory,query);
break;
case "tryrecv":
sync_local(socket,module.working_directory,query);
if(is_hub && socket.context==LOCAL) send_updates(socket,module.working_directory,query);
break;
case "dorecv":
if(recv_file(socket,module.working_directory,query)) {
if(is_hub) send_updates(socket,module.working_directory,query);
}
break;
case "dosend":
send_file(socket,module.working_directory,query);
break;
default:
debug("unknown sync request: " + query.command,LOG_WARNING);
break;
}
} catch(e) {
debug("FILESYNC ERROR: " + e,LOG_WARNING);
}
}
function sync_remote(socket,dir,query)
{
var files=directory(dir + file_getname(query.filemask));
if(files.length>0) debug("sending " + files.length + " files",LOG_DEBUG);
else {
debug("file(s) not found: " + dir + query.filemask,LOG_WARNING);
return false;
}
for(var f=0;f<files.length;f++) {
query.command="tryrecv";
query.filemask=file_getname(files[f]);
query.filedate=file_date(files[f]);
query.descriptor=server.socket.descriptor;
socket.enqueue(query);
}
}
function send_updates(socket,dir,query)
{
for(r=0;r<remote_sessions.length;r++) {
if(remote_sessions[r].descriptor != socket.descriptor) {
log("sending node updates: " + query.filemask);
sync_remote(remote_sessions[r],dir,query);
}
}
}
function send_file(socket,dir,query)
{
/*
"data" object already contains the properties
needed by commservice to recognize the information
as a file request
*/
data=load_file(dir + file_getname(query.filemask));
if(data) {
data.descriptor=server.socket.descriptor;
data.id=query.id;
data.command="dorecv";
data.type=FILESYNC;
socket.enqueue(data);
debug("file sent: " + data.filemask,LOG_DEBUG);
}
}
function sync_local(socket,dir,query)
{
var r_filedate=query.filedate;
var l_filedate=file_date(dir+query.filemask);
if(compare_dates(l_filedate,r_filedate)) {
query.command="dosend";
query.descriptor=server.socket.descriptor;
socket.enqueue(query);
} else {
debug("skipping file: " + query.filemask);
debug("local date: " + l_filedate + " remote date: " + r_filedate);
}
}
function recv_file(socket,dir,query)
{
var filename=dir+file_getname(query.filemask);
log("receiving file: " + filename);
var file=new File(filename + ".tmp");
file.open('w',true);
file.base64=true;
if(!file.is_open) {
log(LOG_WARNING,"error opening file: " + file.name);
return false;
}
file.writeAll(query.file);
log("received: " + query.filesize + " bytes");
file.close();
if(file_exists(filename+".bck")) file_remove(filename+".bck");
file_rename(filename,filename+".bck");
file_rename(file.name,filename);
file_utime(filename,time(),query.filedate);
return true;
}
/* Non-socket file functions */
function compare_dates(local,remote)
{
if(local<0) return true;
//will reject files with a time_t older than the local copy
//if(local>remote) return true;
//will treat numbers with a difference of 1 or 0 as the same, due to issues with some file systems
if(Math.abs(local-remote)>1) return true;
else return false;
}
function load_file(filename)
{
var d=new Object();
var f=new File(filename);
f.open('r',true);
if(!f.is_open) {
log(LOG_WARNING,"error opening file: " + f.name);
return false;
}
f.base64=true;
d.file=f.readAll();
f.close();
d.filesize=file_size(filename);
d.filedate=file_date(filename);
d.filemask=file_getname(filename);
return d;
}
function get_module(id)
{
var comminit=new File(system.ctrl_dir + "filesync.ini");
comminit.open('r');
var match=comminit.iniGetSections(id);
if(!match.length) {
debug("module not found, aborting: " + id,LOG_WARNING);
return false;
}
var module=comminit.iniGetObject(id);
comminit.close();
return module;
}
//END FILESYNC
//TODO QUERIES
function handle_query(socket,query) /* Hub only! */
{
log("query from: " + socket.remote_ip_address);
switch(query.task)
{
case "who":
......@@ -265,35 +395,6 @@ function handle_query(socket,query)
break;
}
}
function handle_filesync(socket,query)
{
log("file sync request from: " + socket.remote_ip_address);
//background.push(load(true,"filesync.js",socket.descriptor,query.toSource(),hub.address,hub.port));
}
function send_updates()
{
if(background.length)
{
var bgq=background.shift();
if(bgq.data_waiting)
{
var data=bgq.read();
if(data) {
for(var r=0;r<remote_sessions.length;r++)
{
var session=remote_sessions[r];
if(data.remote_ip_address != session.address)
{
load(true,"filesync.js",session.descriptor,data.toSource(),hub.address,hub.port);
}
}
}
}
else background.push(bgq);
}
}
//TODO
function whos_online(socket)
{
}
......@@ -315,153 +416,85 @@ function notify_sysop(msg)
}
//END TODO
function Session(socket,packet)
{
this.sock=socket;
this.context=packet.context;
this.id=packet.id;
this.alias=packet.alias;
this.system=packet.system;
this.address=socket.remote_ip_address;
this.port=socket.remote_port;
this.descriptor=socket.descriptor;
this.receive=function()
{
if(!testSocket(this.sock)) return false;
if(this.sock.data_waiting) {
//store data in master array to be distributed later
var data=parse_inbound(this.sock);
if(data) {
switch(data.context)
{
case FILESYNC:
handle_filesync(this.sock,data);
break;
case QUERY:
handle_query(this.sock,data);
break;
default:
data.id=data.id?data.id:this.id;
data.descriptor=this.descriptor;
data_queue.push(data);
break;
}
}
}
return true;
}
this.send=function(data)
{
if(!socket_send(data,this.sock)) return false;
return true;
}
}
function Server(addr,port)
{
this.address=addr;
this.port=port;
this.sock=false;
this.queue=[];
this.enabled=true;
this.queue="";
this.last_attempt=0;
this.attempts=0;
this.connect=function()
{
debug("connecting to " + this.address + " on port " + this.port,LOG_DEBUG);
this.sock=new Socket();
this.sock.nonblocking=true;
this.sock.connect(this.address,this.port,connection_timeout);
if(this.sock.is_connected)
this.sock=get_server_socket();
this.sock.connect(this.address,this.port,CONNECTION_TIMEOUT);
if(!this.sock.is_connected)
{
//send node identifier and bbs name so hub knows this is a distribution point
var hello=new Object();
hello.system=system.name;
hello.context=REMOTE;
hello.version=version;
socket_send(hello,this.sock);
var response=parse_inbound(this.sock);
if(response) {
switch(response.response)
{
case "OK":
debug("connection to " + this.address + " successful",LOG_DEBUG);
debug("REMOTE server " + response.version,LOG_DEBUG);
this.attempts=0;
this.last_attempt=0;
return true;
case "UPDATE":
var msg="service file out of date, please visit http://cvs.synchro.net/ to update to " + response.version;
debug(msg,LOG_WARNING);
notify_sysop(msg);
this.sock.close();
this.enabled=false;
return false;
default:
debug("unknown response",LOG_WARNING);
debug(response,LOG_WARNING);
this.sock.close();
break;
}
}
else {
debug("no hub response received",LOG_DEBUG);
this.sock.close();
}
} else {
debug("connection to " + this.address + " failed with error " + this.sock.error,LOG_WARNING);
}
this.attempts++;
this.last_attempt=time();
return false;
this.attempts++;
this.last_attempt=time();
return false;
}
return true;
}
this.disconnect=function()
{
if(this.sock.is_connected) {
log("disconnecting from server: " + this.address);
while(this.queue.length) {
var data=this.queue.shift();
socket_send(data,this.sock);
while(this.sock.queue.length) {
if(!this.cycle()) break;
}
this.sock.close();
}
}
this.cycle=function()
this.receive=function()
{
if(!this.sock.is_connected) {
if(this.attempts>=connection_attempts) {
if(!this.sock || !this.sock.is_connected) {
if(this.attempts>=CONNECTION_ATTEMPTS) {
this.enabled=false;
return false;
}
if((time()-this.last_attempt)>=connection_interval) {
if((time()-this.last_attempt)>=CONNECTION_INTERVAL) {
this.connect();
}
}
if(this.sock.is_connected) {
if(this.queue.length) {
var data=this.queue.shift();
socket_send(data,this.sock);
if(this.sock.is_connected && this.sock.data_waiting) {
var data=this.sock.recvline(MAX_RECV);
if(data == null) return false;
data=JSON.parse(data);
switch(data.type) {
case FILESYNC:
handle_filesync(this.sock,data);
break;
case QUERY:
handle_query(this.sock,data);
break;
default:
queue(this.sock,data);
break;
}
this.receive();
}
}
this.receive=function()
this.enqueue=function(data)
{
this.sock.enqueue(data);
}
this.cycle=function()
{
if(!testSocket(this.sock)) return false;
if(this.sock.data_waiting) {
//store data in master array to be distributed later
var data=parse_inbound(this.sock);
if(data) {
data.descriptor=this.sock.descriptor;
data_queue.push(data);
if(!this.sock || !this.sock.is_connected) {
if(this.attempts>=CONNECTION_ATTEMPTS) {
this.enabled=false;
return false;
}
if((time()-this.last_attempt)>=CONNECTION_INTERVAL) {
this.connect();
}
}
if(this.sock) this.sock.cycle();
return true;
}
this.send=function(data)
{
if(this.sock && (data.descriptor != this.sock.descriptor)) this.queue.push(data);
}
if(!addr && !port) {
this.enabled=false;
......@@ -471,3 +504,50 @@ function Server(addr,port)
this.enabled=false;
}
}
function get_server_socket()
{
var sock=new Socket();
sock.queue="";
sock.nonblocking=true;
sock.enqueue=function(data) {
if(data.descriptor==this.descriptor) return false;
data.context=REMOTE;
data.version=version;
data.system=system.name;
this.queue+=JSON.stringify(data)+"\r\n";
}
sock.cycle=sock_cycle;
return sock;
}
function delete_session(array,index)
{
log("session terminated: " + array[index].remote_ip_address);
server.client_remove(array[index]);
array[index].close();
array.splice(index,1);
log("clients: " + server.clients);
}
function store_socket(sock,array)
{
debug("connection from " + sock.remote_ip_address,LOG_DEBUG);
sock.nonblocking=true;
sock.queue="";
sock.enqueue=sock_enqueue;
sock.cycle=sock_cycle;
array.push(sock);
}
function sock_cycle()
{
if(!this.is_connected) return false;
if(this.queue.length>0 && this.write(this.queue.substr(0,MAX_BUFFER))) {
this.queue=this.queue.substr(MAX_BUFFER);
}
return true;
}
function sock_enqueue(data)
{
if(data.descriptor==this.descriptor) return false;
this.queue+=JSON.stringify(data)+"\r\n";
}
cycle();
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment