Unified how UDP and TCP kernels are handled into a

more general "channel" list.  This is to pave the way
for additional channels.


git-svn-id: https://jmonkeyengine.googlecode.com/svn/trunk@8935 75d07b2b-3a1a-0410-a2c5-0572b91ccdca
3.0
PSp..om 13 years ago
parent 2d0def329d
commit fe1bd03c51
  1. 129
      engine/src/networking/com/jme3/network/base/DefaultServer.java

@ -56,12 +56,19 @@ public class DefaultServer implements Server
{ {
static Logger log = Logger.getLogger(DefaultServer.class.getName()); static Logger log = Logger.getLogger(DefaultServer.class.getName());
// First two channels are reserved for reliable and
// unreliable
private static final int CH_RELIABLE = 0;
private static final int CH_UNRELIABLE = 1;
private boolean isRunning = false; private boolean isRunning = false;
private AtomicInteger nextId = new AtomicInteger(0); private AtomicInteger nextId = new AtomicInteger(0);
private String gameName; private String gameName;
private int version; private int version;
//private KernelFactory kernelFactory = KernelFactory.DEFAULT;
private KernelAdapter reliableAdapter; private KernelAdapter reliableAdapter;
private KernelAdapter fastAdapter; private KernelAdapter fastAdapter;
private List<KernelAdapter> channels = new ArrayList<KernelAdapter>();
private Redispatch dispatcher = new Redispatch(); private Redispatch dispatcher = new Redispatch();
private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>(); private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>();
private Map<Endpoint,HostedConnection> endpointConnections private Map<Endpoint,HostedConnection> endpointConnections
@ -87,6 +94,9 @@ public class DefaultServer implements Server
if( fast != null ) { if( fast != null ) {
fastAdapter = new KernelAdapter( this, fast, dispatcher, false ); fastAdapter = new KernelAdapter( this, fast, dispatcher, false );
} }
channels.add( reliableAdapter );
channels.add( fastAdapter );
} }
public String getGameName() public String getGameName()
@ -105,15 +115,13 @@ public class DefaultServer implements Server
throw new IllegalStateException( "Server is already started." ); throw new IllegalStateException( "Server is already started." );
// Initialize the kernels // Initialize the kernels
reliableAdapter.initialize(); for( KernelAdapter ka : channels ) {
if( fastAdapter != null ) { ka.initialize();
fastAdapter.initialize();
} }
// Start em up // Start em up
reliableAdapter.start(); for( KernelAdapter ka : channels ) {
if( fastAdapter != null ) { ka.start();
fastAdapter.start();
} }
isRunning = true; isRunning = true;
@ -131,11 +139,10 @@ public class DefaultServer implements Server
try { try {
// Kill the adpaters, they will kill the kernels // Kill the adpaters, they will kill the kernels
if( fastAdapter != null ) { for( KernelAdapter ka : channels ) {
fastAdapter.close(); ka.close();
} }
reliableAdapter.close();
isRunning = false; isRunning = false;
} catch( InterruptedException e ) { } catch( InterruptedException e ) {
throw new RuntimeException( "Interrupted while closing", e ); throw new RuntimeException( "Interrupted while closing", e );
@ -240,10 +247,14 @@ public class DefaultServer implements Server
} }
} }
protected int getChannel( KernelAdapter ka )
{
return channels.indexOf(ka);
}
protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m ) protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m )
{ {
Connection addedConnection = null; Connection addedConnection = null;
Connection bootedConnection = null;
// generally this will only be called by one thread but it's // generally this will only be called by one thread but it's
// important enough I won't take chances // important enough I won't take chances
@ -255,25 +266,20 @@ public class DefaultServer implements Server
// See if we already have one // See if we already have one
Connection c = connecting.remove(tempId); Connection c = connecting.remove(tempId);
if( c == null ) { if( c == null ) {
c = new Connection(); c = new Connection(channels.size());
log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p ); log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p );
} else { } else {
log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p ); log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p );
} }
// Fill in what we now know // Fill in what we now know
if( ka == fastAdapter ) { int channel = getChannel(ka);
c.fast = p; c.setChannel(channel, p);
log.log( Level.FINE, "Setting up channel:{0}", channel );
if( c.reliable == null ) {
// Tuck it away for later
connecting.put(tempId, c);
}
} else {
// It must be the reliable one
c.reliable = p;
// If it's channel 0 then this is the initial connection
// and we will send the connection information
if( channel == CH_RELIABLE ) {
// Validate the name and version which is only sent // Validate the name and version which is only sent
// over the reliable connection at this point. // over the reliable connection at this point.
if( !getGameName().equals(m.getGameName()) if( !getGameName().equals(m.getGameName())
@ -288,25 +294,26 @@ public class DefaultServer implements Server
return; return;
} }
if( c.fast == null && fastAdapter != null ) { // Else send the extra channel information to the client
// Still waiting for the fast connection to // TBD
// register
connecting.put(tempId, c);
}
} }
if( !connecting.containsKey(tempId) ) { if( c.isComplete() ) {
// Then we are fully connected // Then we are fully connected
if( connections.put( c.getId(), c ) == null ) { if( connections.put( c.getId(), c ) == null ) {
if( c.fast != null ) { for( Endpoint cp : c.channels ) {
endpointConnections.put( c.fast, c ); if( cp == null )
continue;
endpointConnections.put( cp, c );
} }
endpointConnections.put( c.reliable, c );
addedConnection = c; addedConnection = c;
} }
} else {
// Need to keep getting channels so we'll keep it in
// the map
connecting.put(tempId, c);
} }
} }
@ -338,10 +345,10 @@ public class DefaultServer implements Server
log.log( Level.INFO, "Connection closed:{0}.", p ); log.log( Level.INFO, "Connection closed:{0}.", p );
// Try to find the endpoint in all ways that it might // Try to find the endpoint in all ways that it might
// exist. Note: by this point the channel is closed // exist. Note: by this point the raw network channel is
// already. // closed already.
// Also note: this method will be called twice per // Also note: this method will be called multiple times per
// HostedConnection if it has two endpoints. // HostedConnection if it has two endpoints.
Connection removed = null; Connection removed = null;
@ -354,6 +361,9 @@ public class DefaultServer implements Server
if( removed != null ) { if( removed != null ) {
connections.remove( removed.getId() ); connections.remove( removed.getId() );
} }
log.log( Level.FINE, "Connections size:{0}", connections.size() );
log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() );
} }
// Better not to fire events while we hold a lock // Better not to fire events while we hold a lock
@ -369,15 +379,32 @@ public class DefaultServer implements Server
protected class Connection implements HostedConnection protected class Connection implements HostedConnection
{ {
private int id; private int id;
private Endpoint reliable;
private Endpoint fast;
private boolean closed; private boolean closed;
private Endpoint[] channels;
private int setChannelCount = 0;
private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>(); private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>();
public Connection() public Connection( int channelCount )
{ {
id = nextId.getAndIncrement(); id = nextId.getAndIncrement();
channels = new Endpoint[channelCount];
}
void setChannel( int channel, Endpoint p )
{
if( channels[channel] != null && channels[channel] != p ) {
throw new RuntimeException( "Channel has already been set:" + channel
+ " = " + channels[channel] + ", cannot be set to:" + p );
}
channels[channel] = p;
if( p != null )
setChannelCount++;
}
boolean isComplete()
{
return setChannelCount == channels.length;
} }
public Server getServer() public Server getServer()
@ -392,16 +419,16 @@ public class DefaultServer implements Server
public String getAddress() public String getAddress()
{ {
return reliable == null ? null : reliable.getAddress(); return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress();
} }
public void send( Message message ) public void send( Message message )
{ {
ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
if( message.isReliable() || fast == null ) { if( message.isReliable() || channels[CH_UNRELIABLE] == null ) {
reliable.send( buffer ); channels[CH_RELIABLE].send( buffer );
} else { } else {
fast.send( buffer ); channels[CH_UNRELIABLE].send( buffer );
} }
} }
@ -411,14 +438,13 @@ public class DefaultServer implements Server
return; return;
closed = true; closed = true;
// Make sure both endpoints are closed. Note: reliable // Make sure all endpoints are closed. Note: reliable
// should always already be closed through all paths that I // should always already be closed through all paths that I
// can conceive... but it doesn't hurt to be sure. // can conceive... but it doesn't hurt to be sure.
if( reliable != null && reliable.isConnected() ) { for( Endpoint p : channels ) {
reliable.close(); if( p == null )
} continue;
if( fast != null && fast.isConnected() ) { p.close();
fast.close();
} }
fireConnectionRemoved( this ); fireConnectionRemoved( this );
@ -437,10 +463,10 @@ public class DefaultServer implements Server
// fast will be cleaned up as a side-effect // fast will be cleaned up as a side-effect
// when closeConnection() is called by the // when closeConnection() is called by the
// connectionClosed() endpoint callback. // connectionClosed() endpoint callback.
if( reliable != null ) { if( channels[CH_RELIABLE] != null ) {
// Close with flush so we make sure our // Close with flush so we make sure our
// message gets out // message gets out
reliable.close(true); channels[CH_RELIABLE].close(true);
} }
} }
@ -464,7 +490,8 @@ public class DefaultServer implements Server
public String toString() public String toString()
{ {
return "Connection[ id=" + id + ", reliable=" + reliable + ", fast=" + fast + " ]"; return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE]
+ ", fast=" + channels[CH_UNRELIABLE] + " ]";
} }
} }

Loading…
Cancel
Save