Synchronet now requires the libarchive development package (e.g. libarchive-dev on Debian-based Linux distros, libarchive.org for more info) to build successfully.

Commit 53884cce authored by rswindell's avatar rswindell

Always use a passthru (proxy) socket when executing external programs that

use socket I/O (e.g. sexyz, door32.sys doors, sync-xsdk doors) - not just when
the client is connected via SSH. This solves a number of problems:
- programs that change socket options
- programs that don't support non-CP437 charsets (e.g. PETSCII, UTF-8)
- programs that don't support Telnet-IAC escaping

This greatly simplifies (reduces the need for) input_thread locking.

The passthru_thread now handles chunks of up to 4000 bytes per transfer
between socket and ringbuf, instead of a char at a time, greatly improving the
performance.
parent b22f0643
......@@ -404,7 +404,6 @@ bool sbbs_t::answer()
if(!(telnet_mode&TELNET_MODE_OFF)) {
/* Stop the input thread from writing to the telnet_* vars */
pthread_mutex_lock(&input_thread_mutex);
input_thread_mutex_locked = true;
if(stricmp(telnet_terminal,"sexpots")==0) { /* dial-up connection (via SexPOTS) */
SAFEPRINTF2(str,"%s connection detected at %lu bps", terminal, cur_rate);
......@@ -446,7 +445,6 @@ bool sbbs_t::answer()
if(telnet_rows >= TERM_ROWS_MIN && telnet_rows <= TERM_ROWS_MAX)
rows = telnet_rows;
pthread_mutex_unlock(&input_thread_mutex);
input_thread_mutex_locked = false;
}
lprintf(LOG_INFO, "terminal type: %lux%lu %s", cols, rows, terminal);
SAFECOPY(client_ipaddr, cid); /* Over-ride IP address with Caller-ID info */
......
......@@ -1113,6 +1113,7 @@ js_print(JSContext *cx, uintN argc, jsval *arglist)
}
else for (i=0; i < argc; i++) {
JSVALUE_TO_RASTRING(cx, argv[i], cstr, &cstr_sz, NULL);
if(cstr==NULL)
return(JS_FALSE);
rc=JS_SUSPENDREQUEST(cx);
......@@ -1857,10 +1858,8 @@ js_lock_input(JSContext *cx, uintN argc, jsval *arglist)
rc=JS_SUSPENDREQUEST(cx);
if(lock) {
pthread_mutex_lock(&sbbs->input_thread_mutex);
sbbs->input_thread_mutex_locked=true;
} else if(sbbs->input_thread_mutex_locked) {
} else {
pthread_mutex_unlock(&sbbs->input_thread_mutex);
sbbs->input_thread_mutex_locked=false;
}
JS_RESUMEREQUEST(cx, rc);
......
......@@ -2060,6 +2060,11 @@ void input_thread(void *arg)
if(wr > (int)sizeof(telbuf))
lprintf(LOG_ERR,"!TELBUF OVERFLOW (%d>%d)",wr,(int)sizeof(telbuf));
if(sbbs->passthru_socket_active == true) {
sendsocket(sbbs->passthru_socket, (char*)wrbuf, wr);
continue;
}
/* First level Ctrl-C checking */
if(!(sbbs->cfg.ctrlkey_passthru&(1<<CTRL_C))
&& sbbs->rio_abortable
......@@ -2101,142 +2106,20 @@ void input_thread(void *arg)
,sbbs->cfg.node_num, total_recv, total_pkts);
}
#ifdef USE_CRYPTLIB
/*
* This thread copies anything received from the client to the passthru_socket
* It can only do that when the input thread is locked.
* Luckily, the input thread is currently locked exactly when we want it to be.
* Since the passthru socket is 8-bit clean and does NOT use a protocol,
* we must handle telnet stuff HERE.
* However, for JS stuff, direct operations on client_socket should generally
* be done to the passthru_socket instead... THIS is the biggest problem here.
*/
void passthru_output_thread(void* arg)
{
fd_set socket_set;
sbbs_t *sbbs = (sbbs_t*) arg;
struct timeval tv;
int i;
BYTE inbuf[4000];
BYTE telbuf[sizeof(inbuf)];
BYTE *wrbuf;
int rd;
int wr;
SetThreadName("sbbs/ptOutput");
thread_up(FALSE /* setuid */);
while(sbbs->client_socket!=INVALID_SOCKET && sbbs->passthru_socket!=INVALID_SOCKET && !terminate_server) {
if(!sbbs->input_thread_mutex_locked) {
SLEEP(1);
continue;
}
FD_ZERO(&socket_set);
FD_SET(sbbs->client_socket,&socket_set);
tv.tv_sec=1;
tv.tv_usec=0;
if((i=select(sbbs->client_socket+1,&socket_set,NULL,NULL,&tv))<1) {
if(i==0) {
YIELD(); /* This kludge is necessary on some Linux distros */
continue; /* to allow other threads to lock the input_thread_mutex */
}
if(sbbs->client_socket==INVALID_SOCKET)
break;
if(ERROR_VALUE == ENOTSOCK)
lprintf(LOG_NOTICE,"Node %d socket closed by peer on input->select", sbbs->cfg.node_num);
else if(ERROR_VALUE==ESHUTDOWN)
lprintf(LOG_NOTICE,"Node %d socket shutdown on input->select", sbbs->cfg.node_num);
else if(ERROR_VALUE==EINTR)
lprintf(LOG_DEBUG,"Node %d passthru output thread interrupted",sbbs->cfg.node_num);
else if(ERROR_VALUE==ECONNRESET)
lprintf(LOG_NOTICE,"Node %d connection reset by peer on input->select", sbbs->cfg.node_num);
else if(ERROR_VALUE==ECONNABORTED)
lprintf(LOG_NOTICE,"Node %d connection aborted by peer on input->select", sbbs->cfg.node_num);
else
lprintf(LOG_WARNING,"Node %d !ERROR %d ->select socket %d"
,sbbs->cfg.node_num, ERROR_VALUE, sbbs->client_socket);
break;
}
if(sbbs->client_socket==INVALID_SOCKET)
break;
rd=sizeof(inbuf);
if(sbbs->ssh_mode) {
pthread_mutex_lock(&sbbs->ssh_mutex);
if(!cryptStatusOK(crypt_pop_channel_data(sbbs, (char*)inbuf, rd, &i))) {
GCES(rd, sbbs->cfg.node_num, sbbs->ssh_session, "popping data");
rd=0;
} else {
if(!i) {
pthread_mutex_unlock(&sbbs->ssh_mutex);
continue;
}
rd=i;
}
pthread_mutex_unlock(&sbbs->ssh_mutex);
}
else
rd = recv(sbbs->client_socket, (char*)inbuf, rd, 0);
if(rd == SOCKET_ERROR)
{
if(ERROR_VALUE == ENOTSOCK)
lprintf(LOG_NOTICE,"Node %d socket closed by peer on receive", sbbs->cfg.node_num);
else if(ERROR_VALUE==ECONNRESET)
lprintf(LOG_NOTICE,"Node %d connection reset by peer on receive", sbbs->cfg.node_num);
else if(ERROR_VALUE==ESHUTDOWN)
lprintf(LOG_NOTICE,"Node %d socket shutdown on receive", sbbs->cfg.node_num);
else if(ERROR_VALUE==ECONNABORTED)
lprintf(LOG_NOTICE,"Node %d connection aborted by peer on receive", sbbs->cfg.node_num);
else
lprintf(LOG_WARNING,"Node %d !ERROR %d receiving from socket %d"
,sbbs->cfg.node_num, ERROR_VALUE, sbbs->client_socket);
break;
}
if(rd == 0)
{
lprintf(LOG_DEBUG,"Node %d passthru input socket disconnected", sbbs->cfg.node_num);
break;
}
// telbuf and wr are modified to reflect telnet escaped data
wr=rd;
if(sbbs->telnet_mode&TELNET_MODE_OFF)
wrbuf=inbuf;
else
wrbuf=telnet_interpret(sbbs, inbuf, rd, telbuf, wr);
if(wr > (int)sizeof(telbuf))
lprintf(LOG_ERR,"!TELBUF OVERFLOW (%d>%d)",wr,(int)sizeof(telbuf));
/*
* TODO: This should check for writability etc.
*/
sendsocket(sbbs->passthru_socket, (char*)wrbuf, wr);
}
sbbs->passthru_output_thread_running = false;
}
/*
* This thread simply copies anything it manages to read from the
* passthru_socket into the output ringbuffer.
*/
void passthru_input_thread(void* arg)
void passthru_thread(void* arg)
{
fd_set r_set;
sbbs_t *sbbs = (sbbs_t*) arg;
struct timeval tv;
BYTE ch;
int i;
int rd;
char inbuf[4000];
SetThreadName("sbbs/ptInput");
SetThreadName("sbbs/passthru");
thread_up(FALSE /* setuid */);
while(sbbs->passthru_socket!=INVALID_SOCKET && !terminate_server) {
......@@ -2271,37 +2154,37 @@ void passthru_input_thread(void* arg)
if(!RingBufFree(&sbbs->outbuf))
continue;
i = recv(sbbs->passthru_socket, (char*)(&ch), 1, 0);
rd = recv(sbbs->passthru_socket, inbuf, sizeof(inbuf), 0);
if(i == SOCKET_ERROR)
if(rd == SOCKET_ERROR)
{
if(ERROR_VALUE == ENOTSOCK)
lprintf(LOG_NOTICE,"Node %d passthru socket closed by peer on receive", sbbs->cfg.node_num);
else if(ERROR_VALUE==ECONNRESET)
lprintf(LOG_NOTICE,"Node %d passthru connection reset by peer on receive", sbbs->cfg.node_num);
else if(ERROR_VALUE==ESHUTDOWN)
lprintf(LOG_NOTICE,"Node %d passthru socket shutdown on receive", sbbs->cfg.node_num);
else if(ERROR_VALUE==ECONNABORTED)
lprintf(LOG_NOTICE,"Node %d passthru connection aborted by peer on receive", sbbs->cfg.node_num);
else
lprintf(LOG_WARNING,"Node %d !ERROR %d receiving from passthru socket %d"
,sbbs->cfg.node_num, ERROR_VALUE, sbbs->passthru_socket);
break;
if(ERROR_VALUE == ENOTSOCK)
lprintf(LOG_NOTICE,"Node %d passthru socket closed by peer on receive", sbbs->cfg.node_num);
else if(ERROR_VALUE==ECONNRESET)
lprintf(LOG_NOTICE,"Node %d passthru connection reset by peer on receive", sbbs->cfg.node_num);
else if(ERROR_VALUE==ESHUTDOWN)
lprintf(LOG_NOTICE,"Node %d passthru socket shutdown on receive", sbbs->cfg.node_num);
else if(ERROR_VALUE==ECONNABORTED)
lprintf(LOG_NOTICE,"Node %d passthru connection aborted by peer on receive", sbbs->cfg.node_num);
else
lprintf(LOG_WARNING,"Node %d !ERROR %d receiving from passthru socket %d"
,sbbs->cfg.node_num, ERROR_VALUE, sbbs->passthru_socket);
break;
}
if(i == 0)
if(rd == 0)
{
lprintf(LOG_DEBUG,"Node %d SSH passthru disconnected", sbbs->cfg.node_num);
lprintf(LOG_DEBUG,"Node %d passthru disconnected", sbbs->cfg.node_num);
break;
}
if(sbbs->xtrn_mode & EX_BIN) {
if(!RingBufWrite(&sbbs->outbuf, &ch, 1)) {
if(!RingBufWrite(&sbbs->outbuf, (BYTE*)inbuf, rd)) {
lprintf(LOG_ERR,"Cannot pass from passthru socket to outbuf");
break;
}
} else {
sbbs->rputs((char*)&ch, sizeof(ch));
sbbs->rputs(inbuf, rd);
}
}
if(sbbs->passthru_socket!=INVALID_SOCKET) {
......@@ -2310,9 +2193,8 @@ void passthru_input_thread(void* arg)
}
thread_down();
sbbs->passthru_input_thread_running = false;
sbbs->passthru_thread_running = false;
}
#endif
void output_thread(void* arg)
{
......@@ -3367,7 +3249,8 @@ sbbs_t::sbbs_t(ushort node_num, union xp_sockaddr *addr, size_t addr_len, const
event_thread_running = false;
input_thread_running = false;
output_thread_running = false;
input_thread_mutex_locked = false;
passthru_socket_active = false;
passthru_thread_running = false;
if(client_info==NULL)
memset(&client,0,sizeof(client));
......@@ -3395,8 +3278,6 @@ sbbs_t::sbbs_t(ushort node_num, union xp_sockaddr *addr, size_t addr_len, const
#ifdef USE_CRYPTLIB
ssh_mode=false;
ssh_mutex_created=false;
passthru_input_thread_running = false;
passthru_output_thread_running = false;
#endif
rio_abortable=false;
......@@ -3840,7 +3721,7 @@ sbbs_t::~sbbs_t()
node_inbuf[cfg.node_num-1]=NULL;
if(!input_thread_running)
RingBufDispose(&inbuf);
if(!output_thread_running && !passthru_input_thread_running)
if(!output_thread_running && !passthru_thread_running)
RingBufDispose(&outbuf);
if(telnet_ack_event!=NULL)
......@@ -4601,9 +4482,7 @@ void node_thread(void* arg)
// Wait for all node threads to terminate
if(sbbs->input_thread_running || sbbs->output_thread_running
#ifdef USE_CRYPTLIB
|| sbbs->passthru_input_thread_running || sbbs->passthru_output_thread_running
#endif
|| sbbs->passthru_thread_running
) {
lprintf(LOG_INFO,"Node %d Waiting for %s to terminate..."
,sbbs->cfg.node_num
......@@ -4613,9 +4492,7 @@ void node_thread(void* arg)
time_t start=time(NULL);
while(sbbs->input_thread_running
|| sbbs->output_thread_running
#ifdef USE_CRYPTLIB
|| sbbs->passthru_input_thread_running || sbbs->passthru_output_thread_running
#endif
|| sbbs->passthru_thread_running
) {
if(time(NULL)-start>TIMEOUT_THREAD_WAIT) {
lprintf(LOG_NOTICE,"Node %d !TIMEOUT waiting for %s to terminate"
......@@ -5839,8 +5716,8 @@ NO_SSH:
new_node->sys_status|=SS_RLOGIN;
new_node->telnet_mode|=TELNET_MODE_OFF; // RLogin does not use Telnet commands
}
#ifdef USE_CRYPTLIB
if(ssh) {
{
/* TODO: IPv6? */
SOCKET tmp_sock;
SOCKADDR_IN tmp_addr={0};
......@@ -5932,17 +5809,17 @@ NO_SSH:
goto NO_PASSTHRU;
}
close_socket(tmp_sock);
new_node->passthru_output_thread_running = true;
_beginthread(passthru_output_thread, 0, new_node);
new_node->passthru_input_thread_running = true;
_beginthread(passthru_input_thread, 0, new_node);
NO_PASSTHRU:
SAFECOPY(new_node->connection,"SSH");
new_node->node_connection=NODE_CONNECTION_SSH;
new_node->sys_status|=SS_SSH;
new_node->telnet_mode|=TELNET_MODE_OFF; // SSH does not use Telnet commands
new_node->ssh_session=sbbs->ssh_session;
new_node->passthru_thread_running = true;
_beginthread(passthru_thread, 0, new_node);
NO_PASSTHRU:
if(ssh) {
SAFECOPY(new_node->connection,"SSH");
new_node->node_connection=NODE_CONNECTION_SSH;
new_node->sys_status|=SS_SSH;
new_node->telnet_mode|=TELNET_MODE_OFF; // SSH does not use Telnet commands
new_node->ssh_session=sbbs->ssh_session;
}
/* Wait for pending data to be sent then turn off ssh_mode for uber-output */
while(sbbs->output_thread_running && RingBufFull(&sbbs->outbuf))
SLEEP(1);
......@@ -5951,7 +5828,6 @@ NO_PASSTHRU:
sbbs->ssh_session=0; // Don't allow subsequent SSH connections to affect this one (!)
pthread_mutex_unlock(&sbbs->ssh_mutex);
}
#endif
protected_uint32_adjust(&node_threads_running, 1);
new_node->input_thread_running = true;
......@@ -6035,20 +5911,18 @@ NO_PASSTHRU:
lprintf(LOG_INFO, "Done waiting for system output thread to terminate");
}
// Wait for BBS passthru input thread to terminate
if(sbbs->passthru_input_thread_running || sbbs->passthru_output_thread_running) {
lprintf(LOG_INFO,"Waiting for passthru I/O threads to terminate...");
// Wait for BBS passthru thread to terminate
if(sbbs->passthru_thread_running) {
lprintf(LOG_INFO,"Waiting for passthru thread to terminate...");
start=time(NULL);
while(sbbs->passthru_input_thread_running || sbbs->passthru_output_thread_running) {
while(sbbs->passthru_thread_running) {
if(time(NULL)-start>TIMEOUT_THREAD_WAIT) {
lprintf(LOG_ERR,"!TIMEOUT waiting for passthru %s thread to terminate"
,sbbs->passthru_input_thread_running && sbbs->passthru_output_thread_running ? "I/O"
: sbbs->passthru_input_thread_running ? "input" : "output");
lprintf(LOG_ERR,"!TIMEOUT waiting for passthru thread to terminate");
break;
}
mswait(100);
}
lprintf(LOG_INFO, "Done waiting for passthru I/O threads to terminate");
lprintf(LOG_INFO, "Done waiting for passthru threads to terminate");
}
// Set all nodes' status to OFFLINE
......@@ -6065,7 +5939,7 @@ NO_PASSTHRU:
delete events;
}
if(sbbs->passthru_input_thread_running || sbbs->output_thread_running)
if(sbbs->passthru_thread_running || sbbs->output_thread_running)
lprintf(LOG_ERR,"!System I/O thread still running, can't delete");
else
delete sbbs;
......
......@@ -327,14 +327,12 @@ public:
char client_ident[128];
char client_ipaddr[INET6_ADDRSTRLEN];
char local_addr[INET6_ADDRSTRLEN];
#ifdef USE_CRYPTLIB
CRYPT_SESSION ssh_session;
int session_channel;
bool ssh_mode;
SOCKET passthru_socket;
bool passthru_output_thread_running;
bool passthru_input_thread_running;
#endif
bool passthru_socket_active;
bool passthru_thread_running;
scfg_t cfg;
......@@ -347,7 +345,6 @@ public:
RingBuf outbuf;
HANDLE input_thread;
pthread_mutex_t input_thread_mutex;
bool input_thread_mutex_locked; // by someone other than the input_thread
bool input_thread_mutex_created;
pthread_mutex_t ssh_mutex;
bool ssh_mutex_created;
......
......@@ -703,15 +703,8 @@ int sbbs_t::external(const char* cmdline, long mode, const char* startup_dir)
}
if(native && !(mode&EX_OFFLINE)) {
if(!(mode&EX_STDIN) && input_thread_running) {
pthread_mutex_lock(&input_thread_mutex);
input_thread_mutex_locked=true;
}
if(!(mode&EX_STDOUT)) { /* Native Socket I/O program */
/* Enable the Nagle algorithm */
BOOL nodelay=FALSE;
setsockopt(client_socket,IPPROTO_TCP,TCP_NODELAY,(char*)&nodelay,sizeof(nodelay));
if(!(mode&EX_STDIN)) {
passthru_socket_active = true;
}
}
......@@ -732,10 +725,7 @@ int sbbs_t::external(const char* cmdline, long mode, const char* startup_dir)
if(!success) {
XTRN_CLEANUP;
if(input_thread_mutex_locked && input_thread_running) {
pthread_mutex_unlock(&input_thread_mutex);
input_thread_mutex_locked=false;
}
passthru_socket_active = false;
SetLastError(last_error); /* Restore LastError */
errormsg(WHERE, ERR_EXEC, realcmdline, mode);
SetLastError(last_error); /* Restore LastError */
......@@ -1105,19 +1095,7 @@ int sbbs_t::external(const char* cmdline, long mode, const char* startup_dir)
if(!(mode&EX_OFFLINE)) { /* !off-line execution */
if(native) {
/* Re-enable blocking (incase disabled by xtrn program) */
ulong l=0;
ioctlsocket(client_socket, FIONBIO, &l);
/* Re-set socket options */
if(set_socket_options(&cfg, client_socket, client.protocol, str, sizeof(str)))
lprintf(LOG_ERR,"%04d !ERROR %s",client_socket, str);
if(input_thread_mutex_locked && input_thread_running) {
pthread_mutex_unlock(&input_thread_mutex);
input_thread_mutex_locked=false;
}
passthru_socket_active = false;
}
curatr=~0; // Can't guarantee current attributes
......@@ -1693,11 +1671,8 @@ int sbbs_t::external(const char* cmdline, long mode, const char* startup_dir)
#endif
}
if(!(mode&EX_STDIN) && input_thread_running) {
lprintf(LOG_DEBUG,"Locking input thread mutex");
if(pthread_mutex_lock(&input_thread_mutex)!=0)
errormsg(WHERE,ERR_LOCK,"input_thread_mutex",0);
input_thread_mutex_locked=true;
if(!(mode&EX_STDIN)) {
passthru_socket_active = true;
}
if(!(mode&EX_NOLOG) && pipe(err_pipe)!=0) {
......@@ -1723,11 +1698,7 @@ int sbbs_t::external(const char* cmdline, long mode, const char* startup_dir)
winsize.ws_row=rows;
winsize.ws_col=cols;
if((pid=forkpty(&in_pipe[1],NULL,&term,&winsize))==-1) {
if(input_thread_mutex_locked && input_thread_running) {
if(pthread_mutex_unlock(&input_thread_mutex)!=0)
errormsg(WHERE,ERR_UNLOCK,"input_thread_mutex",0);
input_thread_mutex_locked=false;
}
passthru_socket_active = false;
errormsg(WHERE,ERR_EXEC,fullcmdline,0);
return(-1);
}
......@@ -1747,11 +1718,7 @@ int sbbs_t::external(const char* cmdline, long mode, const char* startup_dir)
if((pid=FORK())==-1) {
if(input_thread_mutex_locked && input_thread_running) {
if(pthread_mutex_unlock(&input_thread_mutex)!=0)
errormsg(WHERE,ERR_UNLOCK,"input_thread_mutex",0);
input_thread_mutex_locked=false;
}
passthru_socket_active = false;
errormsg(WHERE,ERR_EXEC,fullcmdline,0);
return(-1);
}
......@@ -2003,13 +1970,6 @@ int sbbs_t::external(const char* cmdline, long mode, const char* startup_dir)
close(in_pipe[1]);
close(out_pipe[0]);
}
#if 0
else {
/* Enable the Nagle algorithm */
int nodelay=FALSE;
setsockopt(client_socket,IPPROTO_TCP,TCP_NODELAY,(char*)&nodelay,sizeof(nodelay));
}
#endif
if(mode&EX_NOLOG)
waitpid(pid, &i, 0);
else {
......@@ -2040,14 +2000,6 @@ int sbbs_t::external(const char* cmdline, long mode, const char* startup_dir)
}
if(!(mode&EX_OFFLINE)) { /* !off-line execution */
/* Re-enable blocking (incase disabled by xtrn program) */
ulong l=0;
ioctlsocket(client_socket, FIONBIO, &l);
/* Re-set socket options */
if(set_socket_options(&cfg, client_socket, client.protocol, str, sizeof(str)))
lprintf(LOG_ERR,"%04d !ERROR %s",client_socket, str);
curatr=~0; // Can't guarantee current attributes
attr(LIGHTGRAY); // Force to "normal"
......@@ -2060,11 +2012,7 @@ int sbbs_t::external(const char* cmdline, long mode, const char* startup_dir)
if(!(mode&EX_NOLOG))
close(err_pipe[0]);
if(input_thread_mutex_locked && input_thread_running) {
if(pthread_mutex_unlock(&input_thread_mutex)!=0)
errormsg(WHERE,ERR_UNLOCK,"input_thread_mutex",0);
input_thread_mutex_locked=false;
}
passthru_socket_active = false;
return(errorlevel = WEXITSTATUS(i));
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment