diff --git a/engine/src/networking/com/jme3/network/base/DefaultServer.java b/engine/src/networking/com/jme3/network/base/DefaultServer.java index 437622414..27765a338 100644 --- a/engine/src/networking/com/jme3/network/base/DefaultServer.java +++ b/engine/src/networking/com/jme3/network/base/DefaultServer.java @@ -409,14 +409,12 @@ public class DefaultServer implements Server m.setReliable( true ); send( m ); - // Note: without a way to flush the pending messages - // during close, the above message may never - // go out. - // Just close the reliable endpoint // fast will be cleaned up as a side-effect if( reliable != null ) { - reliable.close(); + // Close with flush so we make sure our + // message gets out + reliable.close(true); } } diff --git a/engine/src/networking/com/jme3/network/kernel/Endpoint.java b/engine/src/networking/com/jme3/network/kernel/Endpoint.java index 67dde8739..93ac59adc 100644 --- a/engine/src/networking/com/jme3/network/kernel/Endpoint.java +++ b/engine/src/networking/com/jme3/network/kernel/Endpoint.java @@ -73,7 +73,16 @@ public interface Endpoint public void send( ByteBuffer data ); /** - * Closes this endpoint. + * Closes this endpoint without flushing any of its + * currently enqueued outbound data. */ public void close(); + + /** + * Closes this endpoint, optionally flushing any queued + * data before closing. As soon as this method is called, + * ne send() calls will fail with an exception... even while + * close() is still flushing the earlier queued messages. + */ + public void close(boolean flushData); } diff --git a/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java b/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java index e51363e81..ea0c2b4c7 100644 --- a/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java +++ b/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java @@ -50,10 +50,13 @@ import com.jme3.network.kernel.*; */ public class NioEndpoint implements Endpoint { + protected static final ByteBuffer CLOSE_MARKER = ByteBuffer.allocate(0); + private long id; private SocketChannel socket; private SelectorKernel kernel; private ConcurrentLinkedQueue outbound = new ConcurrentLinkedQueue(); + private boolean closing = false; public NioEndpoint( SelectorKernel kernel, long id, SocketChannel socket ) { @@ -69,6 +72,21 @@ public class NioEndpoint implements Endpoint public void close() { + close(false); + } + + public void close( boolean flushData ) + { + if( flushData ) { + closing = true; + + // Enqueue a close marker message to let the server + // know we should close + send( CLOSE_MARKER, false, true ); + + return; + } + try { kernel.closeEndpoint(this); } catch( IOException e ) { @@ -142,7 +160,13 @@ public class NioEndpoint implements Endpoint } public void send( ByteBuffer data ) - { + { + if( data == null ) { + throw new IllegalArgumentException( "Data cannot be null." ); + } + if( closing ) { + throw new KernelException( "Endpoint has been closed:" + socket ); + } send( data, true, true ); } diff --git a/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java b/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java index fc955577f..c91aef66c 100644 --- a/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java +++ b/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java @@ -354,6 +354,14 @@ public class SelectorKernel extends AbstractKernel // We will send what we can and move on. ByteBuffer current = p.peekPending(); + if( current == NioEndpoint.CLOSE_MARKER ) { + // This connection wants to be closed now + closeEndpoint(p); + + // Nothing more to do + return; + } + c.write( current ); // If we wrote all of that packet then we need to remove it diff --git a/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java b/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java index 9b72e66cd..306674807 100644 --- a/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java +++ b/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java @@ -74,6 +74,15 @@ public class UdpEndpoint implements Endpoint public void close() { + close( false ); + } + + public void close( boolean flush ) + { + // No real reason to flush UDP traffic yet... especially + // when considering that the outbound UDP isn't even + // queued. + try { kernel.closeEndpoint(this); } catch( IOException e ) {