Fixes to keep the kernel adapter threads from hanging.

Closing the underlying kernel didn't wake up any readers
and so the read() operation in the adapter was never returning.
KernelAdapter.close() was modified to call join() after closing
the underlying kernel so that the method won't complete until
all of the threads are done.
Then the kernels were modified to wakeup the readers (and this
is now standardized in a base class method) so that they don't
hang forever.
This commit is contained in:
Paul Speed 2016-01-16 03:28:10 -05:00
parent 84a38d1363
commit ac672df63a
4 changed files with 26 additions and 20 deletions

View File

@ -112,6 +112,8 @@ public class KernelAdapter extends Thread
// Kill the kernel
kernel.terminate();
join();
}
protected void reportError( Endpoint p, Object context, Exception e )
@ -119,7 +121,9 @@ public class KernelAdapter extends Thread
// Should really be queued up so the outer thread can
// retrieve them. For now we'll just log it. FIXME
log.log( Level.SEVERE, "Unhandled error, endpoint:" + p + ", context:" + context, e );
//if( p.isConnected() )
System.out.println("Is p connected:" + p.isConnected());
// In lieu of other options, at least close the endpoint
p.close();
}

View File

@ -76,6 +76,18 @@ public abstract class AbstractKernel implements Kernel
log.log( Level.SEVERE, "Unhanddled kernel error", e );
}
protected void wakeupReader() {
// If there are no pending messages then add one so that the
// kernel-user knows to wake up if it is only listening for
// envelopes.
if( !hasEnvelopes() ) {
// Note: this is not really a race condition. At worst, our
// event has already been handled by now and it does no harm
// to check again.
addEnvelope( EVENTS_PENDING );
}
}
protected long nextEndpointId()
{
return nextId.getAndIncrement();

View File

@ -106,6 +106,9 @@ public class SelectorKernel extends AbstractKernel
try {
thread.close();
thread = null;
// Need to let any caller waiting for a read() wakeup
wakeupReader();
} catch( IOException e ) {
throw new KernelException( "Error closing host connection:" + address, e );
}
@ -164,15 +167,7 @@ public class SelectorKernel extends AbstractKernel
// Enqueue an endpoint event for the listeners
addEvent( EndpointEvent.createRemove( this, p ) );
// If there are no pending messages then add one so that the
// kernel-user knows to wake up if it is only listening for
// envelopes.
if( !hasEnvelopes() ) {
// Note: this is not really a race condition. At worst, our
// event has already been handled by now and it does no harm
// to check again.
addEnvelope( EVENTS_PENDING );
}
wakeupReader();
}
/**

View File

@ -110,6 +110,9 @@ public class UdpKernel extends AbstractKernel
thread.close();
writer.shutdown();
thread = null;
// Need to let any caller waiting for a read() wakeup
wakeupReader();
} catch( IOException e ) {
throw new KernelException( "Error closing host connection:" + address, e );
}
@ -169,16 +172,8 @@ public class UdpKernel extends AbstractKernel
log.log( Level.FINE, "Socket endpoints size:{0}", socketEndpoints.size() );
addEvent( EndpointEvent.createRemove( this, p ) );
// If there are no pending messages then add one so that the
// kernel-user knows to wake up if it is only listening for
// envelopes.
if( !hasEnvelopes() ) {
// Note: this is not really a race condition. At worst, our
// event has already been handled by now and it does no harm
// to check again.
addEnvelope( EVENTS_PENDING );
}
wakeupReader();
}
protected void newData( DatagramPacket packet )