diff --git a/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java b/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java index 872a7cb92..7a99c57ba 100644 --- a/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java +++ b/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java @@ -114,10 +114,17 @@ public class UdpEndpoint implements Endpoint if( !isConnected() ) { throw new KernelException( "Endpoint is not connected:" + this ); } + + try { DatagramPacket p = new DatagramPacket( data.array(), data.position(), data.remaining(), address ); - socket.send(p); + + // Just queue it up for the kernel threads to write + // out + kernel.enqueueWrite( this, p ); + + //socket.send(p); } catch( IOException e ) { throw new KernelException( "Error sending datagram to:" + address, e ); } diff --git a/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java b/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java index 728d4c171..1ed8fbc2b 100644 --- a/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java +++ b/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java @@ -37,6 +37,8 @@ import java.net.*; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -57,6 +59,8 @@ public class UdpKernel extends AbstractKernel private InetSocketAddress address; private HostThread thread; + private ExecutorService writer = Executors.newFixedThreadPool(2); + // The nature of UDP means that even through a firewall, // a user would have to have a unique address+port since UDP // can't really be NAT'ed. @@ -104,6 +108,7 @@ public class UdpKernel extends AbstractKernel try { thread.close(); + writer.shutdown(); thread = null; } catch( IOException e ) { throw new KernelException( "Error closing host connection:" + address, e ); @@ -120,8 +125,13 @@ public class UdpKernel extends AbstractKernel if( reliable ) throw new UnsupportedOperationException( "Reliable send not supported by this kernel." ); - // We ignore the copy flag because we know all outbound traffic - // goes instantly. + if( copy ) + { + // Copy the data just once + byte[] temp = new byte[data.remaining()]; + System.arraycopy(data.array(), data.position(), temp, 0, data.remaining()); + data = ByteBuffer.wrap(temp); + } // Hand it to all of the endpoints that match our routing for( UdpEndpoint p : socketEndpoints.values() ) { @@ -177,6 +187,40 @@ public class UdpKernel extends AbstractKernel addEnvelope( env ); } + protected void enqueueWrite( Endpoint endpoint, DatagramPacket packet ) + { + writer.execute( new MessageWriter(endpoint, packet) ); + } + + protected class MessageWriter implements Runnable + { + private Endpoint endpoint; + private DatagramPacket packet; + + public MessageWriter( Endpoint endpoint, DatagramPacket packet ) + { + this.endpoint = endpoint; + this.packet = packet; + } + + public void run() + { + // Not guaranteed to always work but an extra datagram + // to a dead connection isn't so big of a deal. + if( !endpoint.isConnected() ) { + return; + } + + try { + thread.getSocket().send(packet); + } catch( Exception e ) { + KernelException exc = new KernelException( "Error sending datagram to:" + address, e ); + exc.fillInStackTrace(); + reportError(exc); + } + } + } + protected class HostThread extends Thread { private DatagramSocket socket;