The KernelAdapter now keeps a rolling message buffer
for the endpoints when using a reliable connection. There is no guarantee that the buffers going out from a client won't be chopped up by the networking stack in various ways. It was always the intent to accumulate them like this (the client side already does after all) but it was an oversight. It's a testament to modern networking that this hasn't come up in practice yet. git-svn-id: https://jmonkeyengine.googlecode.com/svn/trunk@7449 75d07b2b-3a1a-0410-a2c5-0572b91ccdca
This commit is contained in:
parent
8406f0058e
commit
17221395dc
@ -34,7 +34,9 @@ package com.jme3.network.base;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
@ -70,6 +72,10 @@ public class KernelAdapter extends Thread
|
|||||||
private MessageListener<HostedConnection> messageDispatcher;
|
private MessageListener<HostedConnection> messageDispatcher;
|
||||||
private AtomicBoolean go = new AtomicBoolean(true);
|
private AtomicBoolean go = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
// Keeps track of the in-progress messages that are received
|
||||||
|
// on reliable connections
|
||||||
|
private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>();
|
||||||
|
|
||||||
// Marks the messages as reliable or not if they came
|
// Marks the messages as reliable or not if they came
|
||||||
// through this connector.
|
// through this connector.
|
||||||
private boolean reliable;
|
private boolean reliable;
|
||||||
@ -107,6 +113,10 @@ public class KernelAdapter extends Thread
|
|||||||
|
|
||||||
protected void connectionClosed( Endpoint p )
|
protected void connectionClosed( Endpoint p )
|
||||||
{
|
{
|
||||||
|
// Remove any message buffer we've been accumulating
|
||||||
|
// on behalf of this endpoing
|
||||||
|
messageBuffers.remove(p);
|
||||||
|
|
||||||
server.connectionClosed(p);
|
server.connectionClosed(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -156,16 +166,49 @@ public class KernelAdapter extends Thread
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected MessageProtocol getMessageBuffer( Endpoint p )
|
||||||
|
{
|
||||||
|
if( !reliable ) {
|
||||||
|
// Since UDP comes in packets and they aren't split
|
||||||
|
// up, there is no reason to buffer. In fact, there would
|
||||||
|
// be a down side because there is no way for us to reliably
|
||||||
|
// clean these up later since we'd create another one for
|
||||||
|
// any random UDP packet that comes to the port.
|
||||||
|
return new MessageProtocol();
|
||||||
|
} else {
|
||||||
|
// See if we already have one
|
||||||
|
MessageProtocol result = messageBuffers.get(p);
|
||||||
|
if( result != null ) {
|
||||||
|
result = new MessageProtocol();
|
||||||
|
messageBuffers.put(p, result);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void createAndDispatch( Envelope env )
|
protected void createAndDispatch( Envelope env )
|
||||||
{
|
{
|
||||||
MessageProtocol protocol = new MessageProtocol();
|
MessageProtocol protocol = getMessageBuffer(env.getSource());
|
||||||
|
|
||||||
byte[] data = env.getData();
|
byte[] data = env.getData();
|
||||||
ByteBuffer buffer = ByteBuffer.wrap(data);
|
ByteBuffer buffer = ByteBuffer.wrap(data);
|
||||||
|
|
||||||
int count = protocol.addBuffer( buffer );
|
int count = protocol.addBuffer( buffer );
|
||||||
if( count == 0 )
|
if( count == 0 ) {
|
||||||
|
// This can happen if there was only a partial message
|
||||||
|
// received. However, this should never happen for unreliable
|
||||||
|
// connections.
|
||||||
|
if( !reliable ) {
|
||||||
|
// Log some additional information about the packet.
|
||||||
|
int len = Math.min( 10, data.length );
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for( int i = 0; i < len; i++ ) {
|
||||||
|
sb.append( "[" + Integer.toHexString(data[i]) + "]" );
|
||||||
|
}
|
||||||
|
log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb );
|
||||||
throw new RuntimeException( "Envelope contained incomplete data:" + env );
|
throw new RuntimeException( "Envelope contained incomplete data:" + env );
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Should be complete... and maybe we should check but we don't
|
// Should be complete... and maybe we should check but we don't
|
||||||
Message m = null;
|
Message m = null;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user