Integrated the new service manager stuff with Client and

Server.  (Untested at the moment but straight forward.)
Also fixed a small but silent bug in DefaultServer when
closing out endpoints that were never fully connected.
Garbage was left around in the "connecting" data structure.
This commit is contained in:
Paul Speed 2015-04-26 00:57:02 -04:00
parent 1eb2ba7276
commit 35155c6b5b
5 changed files with 83 additions and 8 deletions

View File

@ -72,6 +72,12 @@ public interface Client extends MessageConnection
* be able to connect to. * be able to connect to.
*/ */
public int getVersion(); public int getVersion();
/**
* Returns the manager for client services. Client services extend
* the functionality of the client.
*/
public ClientServiceManager getServices();
/** /**
* Sends a message to the server. * Sends a message to the server.

View File

@ -33,6 +33,8 @@ package com.jme3.network;
import java.util.Collection; import java.util.Collection;
import com.jme3.network.service.HostedServiceManager;
/** /**
* Represents a host that can send and receive messages to * Represents a host that can send and receive messages to
* a set of remote client connections. * a set of remote client connections.
@ -54,6 +56,12 @@ public interface Server
*/ */
public int getVersion(); public int getVersion();
/**
* Returns the manager for hosted services. Hosted services extend
* the functionality of the server.
*/
public HostedServiceManager getServices();
/** /**
* Sends the specified message to all connected clients. * Sends the specified message to all connected clients.
*/ */

View File

@ -37,6 +37,7 @@ import com.jme3.network.kernel.Connector;
import com.jme3.network.message.ChannelInfoMessage; import com.jme3.network.message.ChannelInfoMessage;
import com.jme3.network.message.ClientRegistrationMessage; import com.jme3.network.message.ClientRegistrationMessage;
import com.jme3.network.message.DisconnectMessage; import com.jme3.network.message.DisconnectMessage;
import com.jme3.network.service.ClientServiceManager;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
@ -54,7 +55,7 @@ import java.util.logging.Logger;
*/ */
public class DefaultClient implements Client public class DefaultClient implements Client
{ {
static Logger log = Logger.getLogger(DefaultClient.class.getName()); static final Logger log = Logger.getLogger(DefaultClient.class.getName());
// First two channels are reserved for reliable and // First two channels are reserved for reliable and
// unreliable. Note: channels are endpoint specific so these // unreliable. Note: channels are endpoint specific so these
@ -80,10 +81,13 @@ public class DefaultClient implements Client
private ConnectorFactory connectorFactory; private ConnectorFactory connectorFactory;
private ClientServiceManager services;
public DefaultClient( String gameName, int version ) public DefaultClient( String gameName, int version )
{ {
this.gameName = gameName; this.gameName = gameName;
this.version = version; this.version = version;
this.services = new ClientServiceManager(this);
} }
public DefaultClient( String gameName, int version, Connector reliable, Connector fast, public DefaultClient( String gameName, int version, Connector reliable, Connector fast,
@ -200,6 +204,11 @@ public class DefaultClient implements Client
{ {
return version; return version;
} }
public ClientServiceManager getServices()
{
return services;
}
public void send( Message message ) public void send( Message message )
{ {
@ -260,7 +269,7 @@ public class DefaultClient implements Client
{ {
checkRunning(); checkRunning();
closeConnections( null ); closeConnections( null );
} }
protected void closeConnections( DisconnectInfo info ) protected void closeConnections( DisconnectInfo info )
@ -268,6 +277,10 @@ public class DefaultClient implements Client
if( !isRunning ) if( !isRunning )
return; return;
// Let the services get a chance to stop before we
// kill the connection.
services.stop();
// Send a close message // Send a close message
// Tell the thread it's ok to die // Tell the thread it's ok to die
@ -285,6 +298,9 @@ public class DefaultClient implements Client
fireDisconnected(info); fireDisconnected(info);
isRunning = false; isRunning = false;
// Terminate the services
services.terminate();
} }
public void addClientStateListener( ClientStateListener listener ) public void addClientStateListener( ClientStateListener listener )
@ -329,6 +345,9 @@ public class DefaultClient implements Client
protected void fireConnected() protected void fireConnected()
{ {
// Let the services know we are finally started
services.start();
for( ClientStateListener l : stateListeners ) { for( ClientStateListener l : stateListeners ) {
l.clientConnected( this ); l.clientConnected( this );
} }

View File

@ -37,6 +37,7 @@ import com.jme3.network.kernel.Kernel;
import com.jme3.network.message.ChannelInfoMessage; import com.jme3.network.message.ChannelInfoMessage;
import com.jme3.network.message.ClientRegistrationMessage; import com.jme3.network.message.ClientRegistrationMessage;
import com.jme3.network.message.DisconnectMessage; import com.jme3.network.message.DisconnectMessage;
import com.jme3.network.service.HostedServiceManager;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.*; import java.util.*;
@ -55,7 +56,7 @@ import java.util.logging.Logger;
*/ */
public class DefaultServer implements Server public class DefaultServer implements Server
{ {
static Logger log = Logger.getLogger(DefaultServer.class.getName()); static final Logger log = Logger.getLogger(DefaultServer.class.getName());
// First two channels are reserved for reliable and // First two channels are reserved for reliable and
// unreliable // unreliable
@ -85,6 +86,8 @@ public class DefaultServer implements Server
= new MessageListenerRegistry<HostedConnection>(); = new MessageListenerRegistry<HostedConnection>();
private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>(); private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
private HostedServiceManager services;
public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast ) public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
{ {
if( reliable == null ) if( reliable == null )
@ -92,6 +95,7 @@ public class DefaultServer implements Server
this.gameName = gameName; this.gameName = gameName;
this.version = version; this.version = version;
this.services = new HostedServiceManager(this);
reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true ); reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true );
channels.add( reliableAdapter ); channels.add( reliableAdapter );
@ -110,6 +114,11 @@ public class DefaultServer implements Server
{ {
return version; return version;
} }
public HostedServiceManager getServices()
{
return services;
}
public int addChannel( int port ) public int addChannel( int port )
{ {
@ -164,7 +173,10 @@ public class DefaultServer implements Server
ka.start(); ka.start();
} }
isRunning = true; isRunning = true;
// Start the services
services.start();
} }
public boolean isRunning() public boolean isRunning()
@ -177,13 +189,20 @@ public class DefaultServer implements Server
if( !isRunning ) if( !isRunning )
throw new IllegalStateException( "Server is not started." ); throw new IllegalStateException( "Server is not started." );
// First stop the services since we are about to
// kill the connections they are using
services.stop();
try { try {
// Kill the adpaters, they will kill the kernels // Kill the adpaters, they will kill the kernels
for( KernelAdapter ka : channels ) { for( KernelAdapter ka : channels ) {
ka.close(); ka.close();
} }
isRunning = false; isRunning = false;
// Now terminate all of the services
services.terminate();
} catch( InterruptedException e ) { } catch( InterruptedException e ) {
throw new RuntimeException( "Interrupted while closing", e ); throw new RuntimeException( "Interrupted while closing", e );
} }
@ -396,6 +415,18 @@ public class DefaultServer implements Server
return endpointConnections.get(endpoint); return endpointConnections.get(endpoint);
} }
protected void removeConnecting( Endpoint p )
{
// No easy lookup for connecting Connections
// from endpoint.
for( Map.Entry<Long,Connection> e : connecting.entrySet() ) {
if( e.getValue().hasEndpoint(p) ) {
connecting.remove(e.getKey());
return;
}
}
}
protected void connectionClosed( Endpoint p ) protected void connectionClosed( Endpoint p )
{ {
if( p.isConnected() ) { if( p.isConnected() ) {
@ -411,10 +442,10 @@ public class DefaultServer implements Server
// Also note: this method will be called multiple times per // Also note: this method will be called multiple times per
// HostedConnection if it has multiple endpoints. // HostedConnection if it has multiple endpoints.
Connection removed = null; Connection removed;
synchronized( this ) { synchronized( this ) {
// Just in case the endpoint was still connecting // Just in case the endpoint was still connecting
connecting.values().remove(p); removeConnecting(p);
// And the regular management // And the regular management
removed = (Connection)endpointConnections.remove(p); removed = (Connection)endpointConnections.remove(p);
@ -452,6 +483,16 @@ public class DefaultServer implements Server
id = nextId.getAndIncrement(); id = nextId.getAndIncrement();
channels = new Endpoint[channelCount]; channels = new Endpoint[channelCount];
} }
boolean hasEndpoint( Endpoint p )
{
for( Endpoint e : channels ) {
if( p == e ) {
return true;
}
}
return false;
}
void setChannel( int channel, Endpoint p ) void setChannel( int channel, Endpoint p )
{ {
@ -557,6 +598,7 @@ public class DefaultServer implements Server
return Collections.unmodifiableSet(sessionData.keySet()); return Collections.unmodifiableSet(sessionData.keySet());
} }
@Override
public String toString() public String toString()
{ {
return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE] return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE]

View File

@ -53,7 +53,7 @@ public interface Service<S> {
public void start(); public void start();
/** /**
* Called when the service is shutting down. All services * Called when the service manager is shutting down. All services
* are stopped and any service manager resources are closed * are stopped and any service manager resources are closed
* before the services are terminated. * before the services are terminated.
*/ */