Pulled the ExecutorService stuff out and replaced it with a

plain single thread.  This gives more control over the queuing
behavior as with an ThreadPoolExecutor it is difficult to implement
blocking execute() behavior.  Also, this avoids creating an
extra object per-message.
Anyway, this code now implements a blocking queue instead of
a boundless queue.  It's set to 16,000 messages which should only
bottom out in the worst of cases.  I was seeing it in the throughput
tests where the sockets were backing up and the queues were 
consuming all of the heap until an out of memory error occurred.
Outbound messaging is only throttled this way on the client.
Servers typically wouldn't do this sort of spamming anyway.


git-svn-id: https://jmonkeyengine.googlecode.com/svn/trunk@7977 75d07b2b-3a1a-0410-a2c5-0572b91ccdca
3.0
PSp..om 14 years ago
parent d62c1311fb
commit d67adb8266
  1. 94
      engine/src/networking/com/jme3/network/base/ConnectorAdapter.java

@ -32,19 +32,16 @@
package com.jme3.network.base;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import com.jme3.network.ErrorListener;
import com.jme3.network.Message;
import com.jme3.network.MessageListener;
import com.jme3.network.kernel.Connector;
import com.jme3.network.kernel.ConnectorException;
import com.jme3.network.kernel.NamedThreadFactory;
import com.jme3.network.serializing.Serializer;
import java.util.concurrent.BlockingQueue;
/**
* Wraps a single Connector and forwards new messages
@ -64,18 +61,22 @@ import com.jme3.network.serializing.Serializer;
*/
public class ConnectorAdapter extends Thread
{
private static final int OUTBOUND_BACKLOG = 16000;
private Connector connector;
private MessageListener<Object> dispatcher;
private ErrorListener<Object> errorHandler;
private AtomicBoolean go = new AtomicBoolean(true);
private BlockingQueue<ByteBuffer> outbound;
// Writes messages out on a background thread
private ExecutorService writer;
private WriterThread writer;
// Marks the messages as reliable or not if they came
// through this connector.
private boolean reliable;
public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher,
ErrorListener<Object> errorHandler, boolean reliable )
{
@ -84,9 +85,34 @@ public class ConnectorAdapter extends Thread
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.reliable = reliable;
setDaemon(true);
writer = Executors.newFixedThreadPool(1,
new NamedThreadFactory(String.valueOf(connector) + "-writer", true));
setDaemon(true);
// The backlog makes sure that the outbound channel blocks once
// a certain backlog level is reached. It is set high so that it
// is only reached in the worst cases... which are usually things like
// raw throughput tests. Technically, a saturated TCP channel could
// back up quite a bit if the buffers are full and the socket has
// stalled but 16,000 messages is still a big backlog.
outbound = new ArrayBlockingQueue<ByteBuffer>(OUTBOUND_BACKLOG);
// Note: this technically adds a potential deadlock case
// with the above code where there wasn't one before. For example,
// if a TCP outbound queue fills to capacity and a client sends
// in such a way that they block TCP message handling then if the HostedConnection
// on the server is similarly blocked then the TCP network buffers may
// all get full and no outbound messages move and we forever block
// on the queue.
// However, in practice this can't really happen... or at least it's
// the sign of other really bad things.
// First, currently the server-side outbound queues are all unbounded and
// so won't ever block the handling of messages if the outbound channel is full.
// Second, there would have to be a huge amount of data backlog for this
// to ever occur anyway.
// Third, it's a sign of a really poor architecture if 16,000 messages
// can go out in a way that blocks reads.
writer = new WriterThread();
writer.start();
}
public void close()
@ -107,7 +133,11 @@ public class ConnectorAdapter extends Thread
public void write( ByteBuffer data )
{
writer.execute( new MessageWriter(data) );
try {
outbound.put( data );
} catch( InterruptedException e ) {
throw new RuntimeException( "Interrupted while waiting for queue to drain", e );
}
}
protected void handleError( Exception e )
@ -147,28 +177,40 @@ public class ConnectorAdapter extends Thread
handleError( e );
}
}
protected class MessageWriter implements Runnable
protected class WriterThread extends Thread
{
private ByteBuffer data;
public MessageWriter( ByteBuffer data )
public WriterThread()
{
this.data = data;
super( String.valueOf(connector) + "-writer" );
}
public void run()
public void shutdown()
{
interrupt();
}
private void write( ByteBuffer data )
{
if( !go.get() )
return;
try {
connector.write(data);
} catch( Exception e ) {
handleError( e );
}
}
}
}
public void run()
{
while( go.get() ) {
try {
ByteBuffer data = outbound.take();
write(data);
} catch( InterruptedException e ) {
if( !go.get() )
return;
throw new RuntimeException( "Interrupted waiting for data", e );
}
}
}
}
}

Loading…
Cancel
Save