diff --git a/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java b/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java index 1dad892b1..1a39c7109 100644 --- a/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java +++ b/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java @@ -32,19 +32,16 @@ package com.jme3.network.base; -import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; import com.jme3.network.ErrorListener; import com.jme3.network.Message; import com.jme3.network.MessageListener; import com.jme3.network.kernel.Connector; import com.jme3.network.kernel.ConnectorException; -import com.jme3.network.kernel.NamedThreadFactory; -import com.jme3.network.serializing.Serializer; +import java.util.concurrent.BlockingQueue; /** * Wraps a single Connector and forwards new messages @@ -64,18 +61,22 @@ import com.jme3.network.serializing.Serializer; */ public class ConnectorAdapter extends Thread { + private static final int OUTBOUND_BACKLOG = 16000; + private Connector connector; private MessageListener dispatcher; private ErrorListener errorHandler; private AtomicBoolean go = new AtomicBoolean(true); - + + private BlockingQueue outbound; + // Writes messages out on a background thread - private ExecutorService writer; + private WriterThread writer; // Marks the messages as reliable or not if they came // through this connector. private boolean reliable; - + public ConnectorAdapter( Connector connector, MessageListener dispatcher, ErrorListener errorHandler, boolean reliable ) { @@ -84,9 +85,34 @@ public class ConnectorAdapter extends Thread this.dispatcher = dispatcher; this.errorHandler = errorHandler; this.reliable = reliable; - setDaemon(true); - writer = Executors.newFixedThreadPool(1, - new NamedThreadFactory(String.valueOf(connector) + "-writer", true)); + setDaemon(true); + + // The backlog makes sure that the outbound channel blocks once + // a certain backlog level is reached. It is set high so that it + // is only reached in the worst cases... which are usually things like + // raw throughput tests. Technically, a saturated TCP channel could + // back up quite a bit if the buffers are full and the socket has + // stalled but 16,000 messages is still a big backlog. + outbound = new ArrayBlockingQueue(OUTBOUND_BACKLOG); + + // Note: this technically adds a potential deadlock case + // with the above code where there wasn't one before. For example, + // if a TCP outbound queue fills to capacity and a client sends + // in such a way that they block TCP message handling then if the HostedConnection + // on the server is similarly blocked then the TCP network buffers may + // all get full and no outbound messages move and we forever block + // on the queue. + // However, in practice this can't really happen... or at least it's + // the sign of other really bad things. + // First, currently the server-side outbound queues are all unbounded and + // so won't ever block the handling of messages if the outbound channel is full. + // Second, there would have to be a huge amount of data backlog for this + // to ever occur anyway. + // Third, it's a sign of a really poor architecture if 16,000 messages + // can go out in a way that blocks reads. + + writer = new WriterThread(); + writer.start(); } public void close() @@ -107,7 +133,11 @@ public class ConnectorAdapter extends Thread public void write( ByteBuffer data ) { - writer.execute( new MessageWriter(data) ); + try { + outbound.put( data ); + } catch( InterruptedException e ) { + throw new RuntimeException( "Interrupted while waiting for queue to drain", e ); + } } protected void handleError( Exception e ) @@ -147,28 +177,40 @@ public class ConnectorAdapter extends Thread handleError( e ); } } - - protected class MessageWriter implements Runnable + + protected class WriterThread extends Thread { - private ByteBuffer data; - - public MessageWriter( ByteBuffer data ) + public WriterThread() { - this.data = data; + super( String.valueOf(connector) + "-writer" ); } - - public void run() + + public void shutdown() + { + interrupt(); + } + + private void write( ByteBuffer data ) { - if( !go.get() ) - return; - try { connector.write(data); } catch( Exception e ) { handleError( e ); } - } - } - + } + public void run() + { + while( go.get() ) { + try { + ByteBuffer data = outbound.take(); + write(data); + } catch( InterruptedException e ) { + if( !go.get() ) + return; + throw new RuntimeException( "Interrupted waiting for data", e ); + } + } + } + } }