From fe1bd03c5138ba8af256d084da4c4dd2157bdc18 Mon Sep 17 00:00:00 2001 From: "PSp..om" Date: Fri, 16 Dec 2011 08:43:33 +0000 Subject: [PATCH] Unified how UDP and TCP kernels are handled into a more general "channel" list. This is to pave the way for additional channels. git-svn-id: https://jmonkeyengine.googlecode.com/svn/trunk@8935 75d07b2b-3a1a-0410-a2c5-0572b91ccdca --- .../com/jme3/network/base/DefaultServer.java | 141 +++++++++++------- 1 file changed, 84 insertions(+), 57 deletions(-) diff --git a/engine/src/networking/com/jme3/network/base/DefaultServer.java b/engine/src/networking/com/jme3/network/base/DefaultServer.java index b628cadb8..628e8ab54 100644 --- a/engine/src/networking/com/jme3/network/base/DefaultServer.java +++ b/engine/src/networking/com/jme3/network/base/DefaultServer.java @@ -55,13 +55,20 @@ import java.util.logging.Logger; public class DefaultServer implements Server { static Logger log = Logger.getLogger(DefaultServer.class.getName()); + + // First two channels are reserved for reliable and + // unreliable + private static final int CH_RELIABLE = 0; + private static final int CH_UNRELIABLE = 1; private boolean isRunning = false; private AtomicInteger nextId = new AtomicInteger(0); private String gameName; private int version; + //private KernelFactory kernelFactory = KernelFactory.DEFAULT; private KernelAdapter reliableAdapter; private KernelAdapter fastAdapter; + private List channels = new ArrayList(); private Redispatch dispatcher = new Redispatch(); private Map connections = new ConcurrentHashMap(); private Map endpointConnections @@ -87,6 +94,9 @@ public class DefaultServer implements Server if( fast != null ) { fastAdapter = new KernelAdapter( this, fast, dispatcher, false ); } + + channels.add( reliableAdapter ); + channels.add( fastAdapter ); } public String getGameName() @@ -105,15 +115,13 @@ public class DefaultServer implements Server throw new IllegalStateException( "Server is already started." ); // Initialize the kernels - reliableAdapter.initialize(); - if( fastAdapter != null ) { - fastAdapter.initialize(); + for( KernelAdapter ka : channels ) { + ka.initialize(); } // Start em up - reliableAdapter.start(); - if( fastAdapter != null ) { - fastAdapter.start(); + for( KernelAdapter ka : channels ) { + ka.start(); } isRunning = true; @@ -131,11 +139,10 @@ public class DefaultServer implements Server try { // Kill the adpaters, they will kill the kernels - if( fastAdapter != null ) { - fastAdapter.close(); + for( KernelAdapter ka : channels ) { + ka.close(); } - reliableAdapter.close(); isRunning = false; } catch( InterruptedException e ) { throw new RuntimeException( "Interrupted while closing", e ); @@ -240,10 +247,14 @@ public class DefaultServer implements Server } } + protected int getChannel( KernelAdapter ka ) + { + return channels.indexOf(ka); + } + protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m ) { Connection addedConnection = null; - Connection bootedConnection = null; // generally this will only be called by one thread but it's // important enough I won't take chances @@ -255,25 +266,20 @@ public class DefaultServer implements Server // See if we already have one Connection c = connecting.remove(tempId); if( c == null ) { - c = new Connection(); + c = new Connection(channels.size()); log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p ); } else { log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p ); } - // Fill in what we now know - if( ka == fastAdapter ) { - c.fast = p; - - if( c.reliable == null ) { - // Tuck it away for later - connecting.put(tempId, c); - } - - } else { - // It must be the reliable one - c.reliable = p; + // Fill in what we now know + int channel = getChannel(ka); + c.setChannel(channel, p); + log.log( Level.FINE, "Setting up channel:{0}", channel ); + // If it's channel 0 then this is the initial connection + // and we will send the connection information + if( channel == CH_RELIABLE ) { // Validate the name and version which is only sent // over the reliable connection at this point. if( !getGameName().equals(m.getGameName()) @@ -286,28 +292,29 @@ public class DefaultServer implements Server c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion() + " client:" + m.getGameName() + " v" + m.getVersion() ); return; - } - - if( c.fast == null && fastAdapter != null ) { - // Still waiting for the fast connection to - // register - connecting.put(tempId, c); } + + // Else send the extra channel information to the client + // TBD } - - if( !connecting.containsKey(tempId) ) { - + + if( c.isComplete() ) { // Then we are fully connected if( connections.put( c.getId(), c ) == null ) { - if( c.fast != null ) { - endpointConnections.put( c.fast, c ); - } - endpointConnections.put( c.reliable, c ); + for( Endpoint cp : c.channels ) { + if( cp == null ) + continue; + endpointConnections.put( cp, c ); + } addedConnection = c; } - } + } else { + // Need to keep getting channels so we'll keep it in + // the map + connecting.put(tempId, c); + } } // Best to do this outside of the synch block to avoid @@ -338,10 +345,10 @@ public class DefaultServer implements Server log.log( Level.INFO, "Connection closed:{0}.", p ); // Try to find the endpoint in all ways that it might - // exist. Note: by this point the channel is closed - // already. + // exist. Note: by this point the raw network channel is + // closed already. - // Also note: this method will be called twice per + // Also note: this method will be called multiple times per // HostedConnection if it has two endpoints. Connection removed = null; @@ -354,6 +361,9 @@ public class DefaultServer implements Server if( removed != null ) { connections.remove( removed.getId() ); } + + log.log( Level.FINE, "Connections size:{0}", connections.size() ); + log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() ); } // Better not to fire events while we hold a lock @@ -369,15 +379,32 @@ public class DefaultServer implements Server protected class Connection implements HostedConnection { private int id; - private Endpoint reliable; - private Endpoint fast; private boolean closed; - + private Endpoint[] channels; + private int setChannelCount = 0; + private Map sessionData = new ConcurrentHashMap(); - public Connection() + public Connection( int channelCount ) { id = nextId.getAndIncrement(); + channels = new Endpoint[channelCount]; + } + + void setChannel( int channel, Endpoint p ) + { + if( channels[channel] != null && channels[channel] != p ) { + throw new RuntimeException( "Channel has already been set:" + channel + + " = " + channels[channel] + ", cannot be set to:" + p ); + } + channels[channel] = p; + if( p != null ) + setChannelCount++; + } + + boolean isComplete() + { + return setChannelCount == channels.length; } public Server getServer() @@ -392,16 +419,16 @@ public class DefaultServer implements Server public String getAddress() { - return reliable == null ? null : reliable.getAddress(); + return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress(); } public void send( Message message ) { ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); - if( message.isReliable() || fast == null ) { - reliable.send( buffer ); + if( message.isReliable() || channels[CH_UNRELIABLE] == null ) { + channels[CH_RELIABLE].send( buffer ); } else { - fast.send( buffer ); + channels[CH_UNRELIABLE].send( buffer ); } } @@ -411,14 +438,13 @@ public class DefaultServer implements Server return; closed = true; - // Make sure both endpoints are closed. Note: reliable + // Make sure all endpoints are closed. Note: reliable // should always already be closed through all paths that I // can conceive... but it doesn't hurt to be sure. - if( reliable != null && reliable.isConnected() ) { - reliable.close(); - } - if( fast != null && fast.isConnected() ) { - fast.close(); + for( Endpoint p : channels ) { + if( p == null ) + continue; + p.close(); } fireConnectionRemoved( this ); @@ -437,10 +463,10 @@ public class DefaultServer implements Server // fast will be cleaned up as a side-effect // when closeConnection() is called by the // connectionClosed() endpoint callback. - if( reliable != null ) { + if( channels[CH_RELIABLE] != null ) { // Close with flush so we make sure our // message gets out - reliable.close(true); + channels[CH_RELIABLE].close(true); } } @@ -464,7 +490,8 @@ public class DefaultServer implements Server public String toString() { - return "Connection[ id=" + id + ", reliable=" + reliable + ", fast=" + fast + " ]"; + return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE] + + ", fast=" + channels[CH_UNRELIABLE] + " ]"; } }