@ -37,6 +37,8 @@ import java.net.*;
import java.nio.ByteBuffer ;
import java.util.* ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.concurrent.Executors ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.logging.Level ;
import java.util.logging.Logger ;
@ -57,6 +59,8 @@ public class UdpKernel extends AbstractKernel
private InetSocketAddress address ;
private HostThread thread ;
private ExecutorService writer = Executors . newFixedThreadPool ( 2 ) ;
// The nature of UDP means that even through a firewall,
// a user would have to have a unique address+port since UDP
// can't really be NAT'ed.
@ -104,6 +108,7 @@ public class UdpKernel extends AbstractKernel
try {
thread . close ( ) ;
writer . shutdown ( ) ;
thread = null ;
} catch ( IOException e ) {
throw new KernelException ( "Error closing host connection:" + address , e ) ;
@ -120,8 +125,13 @@ public class UdpKernel extends AbstractKernel
if ( reliable )
throw new UnsupportedOperationException ( "Reliable send not supported by this kernel." ) ;
// We ignore the copy flag because we know all outbound traffic
// goes instantly.
if ( copy )
{
// Copy the data just once
byte [ ] temp = new byte [ data . remaining ( ) ] ;
System . arraycopy ( data . array ( ) , data . position ( ) , temp , 0 , data . remaining ( ) ) ;
data = ByteBuffer . wrap ( temp ) ;
}
// Hand it to all of the endpoints that match our routing
for ( UdpEndpoint p : socketEndpoints . values ( ) ) {
@ -177,6 +187,40 @@ public class UdpKernel extends AbstractKernel
addEnvelope ( env ) ;
}
protected void enqueueWrite ( Endpoint endpoint , DatagramPacket packet )
{
writer . execute ( new MessageWriter ( endpoint , packet ) ) ;
}
protected class MessageWriter implements Runnable
{
private Endpoint endpoint ;
private DatagramPacket packet ;
public MessageWriter ( Endpoint endpoint , DatagramPacket packet )
{
this . endpoint = endpoint ;
this . packet = packet ;
}
public void run ( )
{
// Not guaranteed to always work but an extra datagram
// to a dead connection isn't so big of a deal.
if ( ! endpoint . isConnected ( ) ) {
return ;
}
try {
thread . getSocket ( ) . send ( packet ) ;
} catch ( Exception e ) {
KernelException exc = new KernelException ( "Error sending datagram to:" + address , e ) ;
exc . fillInStackTrace ( ) ;
reportError ( exc ) ;
}
}
}
protected class HostThread extends Thread
{
private DatagramSocket socket ;