Refactored the base networking imlpementation's "MessageProtocol" to be an

interface and strictly implement the to/from ByteBuffer protocol.  In the future
this will allow the message protocol to be swappable.  Also moved out the buffering
aspect since often buffers need to be created 'on the fly' and are separate from
the protocol.  This will allow me to play with fixing the issue related to messages
being deserialized before the serialization registry message has been processed by
swapping out the GreedyMessageBuffer implementation for a LazyMessageBuffer implementation.
fix-openal-soft-deadlink
Paul Speed 5 years ago
parent a9afcecc41
commit 1c37d5a92d
  1. 11
      jme3-networking/src/main/java/com/jme3/network/base/ConnectorAdapter.java
  2. 10
      jme3-networking/src/main/java/com/jme3/network/base/DefaultClient.java
  3. 16
      jme3-networking/src/main/java/com/jme3/network/base/DefaultServer.java
  4. 22
      jme3-networking/src/main/java/com/jme3/network/base/KernelAdapter.java
  5. 65
      jme3-networking/src/main/java/com/jme3/network/base/MessageBuffer.java
  6. 153
      jme3-networking/src/main/java/com/jme3/network/base/MessageProtocol.java
  7. 157
      jme3-networking/src/main/java/com/jme3/network/base/protocol/GreedyMessageBuffer.java
  8. 102
      jme3-networking/src/main/java/com/jme3/network/base/protocol/SerializerMessageProtocol.java

@ -65,6 +65,7 @@ public class ConnectorAdapter extends Thread
private MessageListener<Object> dispatcher;
private ErrorListener<Object> errorHandler;
private AtomicBoolean go = new AtomicBoolean(true);
private MessageProtocol protocol;
private BlockingQueue<ByteBuffer> outbound;
@ -75,11 +76,13 @@ public class ConnectorAdapter extends Thread
// through this connector.
private boolean reliable;
public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher,
public ConnectorAdapter( Connector connector, MessageProtocol protocol,
MessageListener<Object> dispatcher,
ErrorListener<Object> errorHandler, boolean reliable )
{
super( String.valueOf(connector) );
this.connector = connector;
this.protocol = protocol;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.reliable = reliable;
@ -151,7 +154,7 @@ public class ConnectorAdapter extends Thread
public void run()
{
MessageProtocol protocol = new MessageProtocol();
MessageBuffer messageBuffer = protocol.createBuffer();
try {
while( go.get() ) {
@ -166,10 +169,10 @@ public class ConnectorAdapter extends Thread
}
}
protocol.addBuffer( buffer );
messageBuffer.addBytes(buffer);
Message m = null;
while( (m = protocol.getMessage()) != null ) {
while( (m = messageBuffer.pollMessage()) != null ) {
m.setReliable( reliable );
dispatch( m );
}

@ -33,6 +33,7 @@ package com.jme3.network.base;
import com.jme3.network.*;
import com.jme3.network.ClientStateListener.DisconnectInfo;
import com.jme3.network.base.protocol.SerializerMessageProtocol;
import com.jme3.network.kernel.Connector;
import com.jme3.network.message.ChannelInfoMessage;
import com.jme3.network.message.ClientRegistrationMessage;
@ -83,6 +84,7 @@ public class DefaultClient implements Client
private ConnectorFactory connectorFactory;
private ClientServiceManager services;
private MessageProtocol protocol = new SerializerMessageProtocol();
public DefaultClient( String gameName, int version )
{
@ -114,9 +116,9 @@ public class DefaultClient implements Client
throw new IllegalStateException( "Channels already exist." );
this.connectorFactory = connectorFactory;
channels.add(new ConnectorAdapter(reliable, dispatcher, dispatcher, true));
channels.add(new ConnectorAdapter(reliable, protocol, dispatcher, dispatcher, true));
if( fast != null ) {
channels.add(new ConnectorAdapter(fast, dispatcher, dispatcher, false));
channels.add(new ConnectorAdapter(fast, protocol, dispatcher, dispatcher, false));
} else {
// Add the null adapter to keep the indexes right
channels.add(null);
@ -279,7 +281,7 @@ public class DefaultClient implements Client
buffer.clear();
// Convert the message to bytes
buffer = MessageProtocol.messageToBuffer(message, buffer);
buffer = protocol.toByteBuffer(message, buffer);
// Since we share the buffer between invocations, we will need to
// copy this message's part out of it. This is because we actually
@ -431,7 +433,7 @@ public class DefaultClient implements Client
try {
for( int i = 0; i < ports.length; i++ ) {
Connector c = connectorFactory.createConnector( i, ports[i] );
ConnectorAdapter ca = new ConnectorAdapter(c, dispatcher, dispatcher, true);
ConnectorAdapter ca = new ConnectorAdapter(c, protocol, dispatcher, dispatcher, true);
int ch = channels.size();
channels.add( ca );

@ -32,6 +32,7 @@
package com.jme3.network.base;
import com.jme3.network.*;
import com.jme3.network.base.protocol.SerializerMessageProtocol;
import com.jme3.network.kernel.Endpoint;
import com.jme3.network.kernel.Kernel;
import com.jme3.network.message.ChannelInfoMessage;
@ -88,6 +89,7 @@ public class DefaultServer implements Server
private final List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
private HostedServiceManager services;
private MessageProtocol protocol = new SerializerMessageProtocol();
public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
{
@ -99,10 +101,10 @@ public class DefaultServer implements Server
this.services = new HostedServiceManager(this);
addStandardServices();
reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true );
reliableAdapter = new KernelAdapter(this, reliable, protocol, dispatcher, true);
channels.add( reliableAdapter );
if( fast != null ) {
fastAdapter = new KernelAdapter( this, fast, dispatcher, false );
fastAdapter = new KernelAdapter(this, fast, protocol, dispatcher, false);
channels.add( fastAdapter );
}
}
@ -153,7 +155,7 @@ public class DefaultServer implements Server
alternatePorts.add(port);
Kernel kernel = kernelFactory.createKernel(result, port);
channels.add( new KernelAdapter(this, kernel, dispatcher, true) );
channels.add( new KernelAdapter(this, kernel, protocol, dispatcher, true) );
return result;
} catch( IOException e ) {
@ -238,7 +240,7 @@ public class DefaultServer implements Server
if( connections.isEmpty() )
return;
ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
ByteBuffer buffer = protocol.toByteBuffer(message, null);
FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
@ -263,7 +265,7 @@ public class DefaultServer implements Server
checkChannel(channel);
ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
ByteBuffer buffer = protocol.toByteBuffer(message, null);
FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
@ -579,7 +581,7 @@ public class DefaultServer implements Server
if( log.isLoggable(Level.FINER) ) {
log.log(Level.FINER, "send({0})", message);
}
ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
ByteBuffer buffer = protocol.toByteBuffer(message, null);
if( message.isReliable() || channels[CH_UNRELIABLE] == null ) {
channels[CH_RELIABLE].send( buffer );
} else {
@ -594,7 +596,7 @@ public class DefaultServer implements Server
log.log(Level.FINER, "send({0}, {1})", new Object[]{channel, message});
}
checkChannel(channel);
ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
ByteBuffer buffer = protocol.toByteBuffer(message, null);
channels[channel+CH_FIRST].send(buffer);
}

@ -71,20 +71,23 @@ public class KernelAdapter extends Thread
private MessageListener<HostedConnection> messageDispatcher;
private AtomicBoolean go = new AtomicBoolean(true);
private MessageProtocol protocol;
// Keeps track of the in-progress messages that are received
// on reliable connections
private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>();
private Map<Endpoint, MessageBuffer> messageBuffers = new ConcurrentHashMap<>();
// Marks the messages as reliable or not if they came
// through this connector.
private boolean reliable;
public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher,
public KernelAdapter( DefaultServer server, Kernel kernel, MessageProtocol protocol, MessageListener<HostedConnection> messageDispatcher,
boolean reliable )
{
super( String.valueOf(kernel) );
this.server = server;
this.kernel = kernel;
this.protocol = protocol;
this.messageDispatcher = messageDispatcher;
this.reliable = reliable;
setDaemon(true);
@ -190,7 +193,7 @@ public class KernelAdapter extends Thread
}
}
protected MessageProtocol getMessageBuffer( Endpoint p )
protected MessageBuffer getMessageBuffer( Endpoint p )
{
if( !reliable ) {
// Since UDP comes in packets and they aren't split
@ -198,12 +201,12 @@ public class KernelAdapter extends Thread
// be a down side because there is no way for us to reliably
// clean these up later since we'd create another one for
// any random UDP packet that comes to the port.
return new MessageProtocol();
return protocol.createBuffer();
} else {
// See if we already have one
MessageProtocol result = messageBuffers.get(p);
MessageBuffer result = messageBuffers.get(p);
if( result == null ) {
result = new MessageProtocol();
result = protocol.createBuffer();
messageBuffers.put(p, result);
}
return result;
@ -212,13 +215,12 @@ public class KernelAdapter extends Thread
protected void createAndDispatch( Envelope env )
{
MessageProtocol protocol = getMessageBuffer(env.getSource());
MessageBuffer protocol = getMessageBuffer(env.getSource());
byte[] data = env.getData();
ByteBuffer buffer = ByteBuffer.wrap(data);
int count = protocol.addBuffer( buffer );
if( count == 0 ) {
if( !protocol.addBytes(buffer) ) {
// This can happen if there was only a partial message
// received. However, this should never happen for unreliable
// connections.
@ -236,7 +238,7 @@ public class KernelAdapter extends Thread
// Should be complete... and maybe we should check but we don't
Message m = null;
while( (m = protocol.getMessage()) != null ) {
while( (m = protocol.pollMessage()) != null ) {
m.setReliable(reliable);
dispatch(env.getSource(), m);
}

@ -0,0 +1,65 @@
/*
* Copyright (c) 2009-2019 jMonkeyEngine
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of 'jMonkeyEngine' nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.jme3.network.base;
import java.nio.ByteBuffer;
import com.jme3.network.Message;
/**
* Accumulates network data into Message objects. This allows
* random chunks of bytes to be assembled into messages even if
* the buffer boundaries don't line up.
*
* @author Paul Speed
*/
public interface MessageBuffer {
/**
* Returns the next message in the buffer or null if there are no more
* messages in the buffer.
*/
public Message pollMessage();
/**
* Returns true if there is a message waiting in the buffer.
*/
public boolean hasMessages();
/**
* Adds byte data to the message buffer. Returns true if there is
* a message waiting after this call.
*/
public boolean addBytes( ByteBuffer buffer );
}

@ -31,159 +31,26 @@
*/
package com.jme3.network.base;
import com.jme3.network.Message;
import com.jme3.network.serializing.Serializer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import com.jme3.network.Message;
/**
* Consolidates the conversion of messages to/from byte buffers
* and provides a rolling message buffer. ByteBuffers can be
* pushed in and messages will be extracted, accumulated, and
* available for retrieval. This is not thread safe and is meant
* to be used within a single message processing thread.
* available for retrieval. The MessageBuffers returned are generally
* not thread safe and are meant to be used within a single message
* processing thread. MessageProtocol implementations themselves should
* be thread safe.
*
* <p>The protocol is based on a simple length + data format
* where two bytes represent the (short) length of the data
* and the rest is the raw data for the Serializers class.</p>
* <p>The specific serialization protocol used is up to the implementation.</p>
*
* @version $Revision$
* @author Paul Speed
*/
public class MessageProtocol
{
private final LinkedList<Message> messages = new LinkedList<Message>();
private ByteBuffer current;
private int size;
private Byte carry;
/**
* Converts a message to a ByteBuffer using the Serializer
* and the (short length) + data protocol. If target is null
* then a 32k byte buffer will be created and filled.
*/
public static ByteBuffer messageToBuffer( Message message, ByteBuffer target )
{
// Could let the caller pass their own in
ByteBuffer buffer = target == null ? ByteBuffer.allocate( 32767 + 2 ) : target;
try {
buffer.position( 2 );
Serializer.writeClassAndObject( buffer, message );
buffer.flip();
short dataLength = (short)(buffer.remaining() - 2);
buffer.putShort( dataLength );
buffer.position( 0 );
return buffer;
} catch( IOException e ) {
throw new RuntimeException( "Error serializing message", e );
}
}
/**
* Retrieves and removes an extracted message from the accumulated buffer
* or returns null if there are no more messages.
*/
public Message getMessage()
{
if( messages.isEmpty() ) {
return null;
}
return messages.removeFirst();
}
/**
* Adds the specified buffer, extracting the contained messages
* and making them available to getMessage(). The left over
* data is buffered to be combined with future data.
*
* @return The total number of queued messages after this call.
*/
public int addBuffer( ByteBuffer buffer )
{
// push the data from the buffer into as
// many messages as we can
while( buffer.remaining() > 0 ) {
if( current == null ) {
// If we have a left over carry then we need to
// do manual processing to get the short value
if( carry != null ) {
byte high = carry;
byte low = buffer.get();
size = (high & 0xff) << 8 | (low & 0xff);
carry = null;
}
else if( buffer.remaining() < 2 ) {
// It's possible that the supplied buffer only has one
// byte in it... and in that case we will get an underflow
// when attempting to read the short below.
// It has to be 1 or we'd never get here... but one
// isn't enough so we stash it away.
carry = buffer.get();
break;
} else {
// We are not currently reading an object so
// grab the size.
// Note: this is somewhat limiting... int would
// be better.
size = buffer.getShort();
}
// Allocate the buffer into which we'll feed the
// data as we get it
current = ByteBuffer.allocate(size);
}
if( current.remaining() <= buffer.remaining() ) {
// We have at least one complete object so
// copy what we can into current, create a message,
// and then continue pulling from buffer.
// Artificially set the limit so we don't overflow
int extra = buffer.remaining() - current.remaining();
buffer.limit( buffer.position() + current.remaining() );
// Now copy the data
current.put( buffer );
current.flip();
// Now set the limit back to a good value
buffer.limit( buffer.position() + extra );
createMessage( current );
current = null;
} else {
// Not yet a complete object so just copy what we have
current.put( buffer );
}
}
return messages.size();
}
/**
* Creates a message from the properly sized byte buffer
* and adds it to the messages queue.
*/
protected void createMessage( ByteBuffer buffer )
{
try {
Object obj = Serializer.readClassAndObject( buffer );
Message m = (Message)obj;
messages.add(m);
} catch( IOException e ) {
throw new RuntimeException( "Error deserializing object, class ID:" + buffer.getShort(0), e );
}
}
public interface MessageProtocol {
public ByteBuffer toByteBuffer( Message message, ByteBuffer target );
public Message toMessage( ByteBuffer bytes );
public MessageBuffer createBuffer();
}

@ -0,0 +1,157 @@
/*
* Copyright (c) 2009-2019 jMonkeyEngine
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of 'jMonkeyEngine' nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.jme3.network.base.protocol;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import com.jme3.network.Message;
import com.jme3.network.base.MessageBuffer;
import com.jme3.network.base.MessageProtocol;
/**
* A MessageBuffer implementation that will aggressively create
* all messages as byte data comes in. In other words, if there
* are four messages in the ByteBuffer passed to addBuffer() then
* all of the messages will be deserialized during that call and
* queued up for later return. The down side is that if any of
* those messages was going to alter the MessageProtocol serialization
* behavior in a way that affects later messages then problems occur
* when those messages are all in one block.
*
* @author Paul Speed
*/
public class GreedyMessageBuffer implements MessageBuffer {
private MessageProtocol protocol;
private final LinkedList<Message> messages = new LinkedList<Message>();
private ByteBuffer current;
private int size;
private Byte carry;
public GreedyMessageBuffer( MessageProtocol protocol ) {
this.protocol = protocol;
}
/**
* Returns the next message in the buffer or null if there are no more
* messages in the buffer.
*/
public Message pollMessage() {
if( messages.isEmpty() ) {
return null;
}
return messages.removeFirst();
}
/**
* Returns true if there is a message waiting in the buffer.
*/
public boolean hasMessages() {
return !messages.isEmpty();
}
/**
* Adds byte data to the message buffer. Returns true if there is
* a message waiting after this call.
*/
public boolean addBytes( ByteBuffer buffer ) {
// push the data from the buffer into as
// many messages as we can
while( buffer.remaining() > 0 ) {
if( current == null ) {
// If we have a left over carry then we need to
// do manual processing to get the short value
if( carry != null ) {
byte high = carry;
byte low = buffer.get();
size = (high & 0xff) << 8 | (low & 0xff);
carry = null;
}
else if( buffer.remaining() < 2 ) {
// It's possible that the supplied buffer only has one
// byte in it... and in that case we will get an underflow
// when attempting to read the short below.
// It has to be 1 or we'd never get here... but one
// isn't enough so we stash it away.
carry = buffer.get();
break;
} else {
// We are not currently reading an object so
// grab the size.
// Note: this is somewhat limiting... int would
// be better.
size = buffer.getShort();
}
// Allocate the buffer into which we'll feed the
// data as we get it
current = ByteBuffer.allocate(size);
}
if( current.remaining() <= buffer.remaining() ) {
// We have at least one complete object so
// copy what we can into current, create a message,
// and then continue pulling from buffer.
// Artificially set the limit so we don't overflow
int extra = buffer.remaining() - current.remaining();
buffer.limit(buffer.position() + current.remaining());
// Now copy the data
current.put(buffer);
current.flip();
// Now set the limit back to a good value
buffer.limit(buffer.position() + extra);
messages.add(protocol.toMessage(current));
current = null;
} else {
// Not yet a complete object so just copy what we have
current.put(buffer);
}
}
return hasMessages();
}
}

@ -0,0 +1,102 @@
/*
* Copyright (c) 2009-2019 jMonkeyEngine
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of 'jMonkeyEngine' nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
* NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package com.jme3.network.base.protocol;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import com.jme3.network.Message;
import com.jme3.network.base.MessageBuffer;
import com.jme3.network.base.MessageProtocol;
import com.jme3.network.serializing.Serializer;
/**
* Implements a MessageProtocol providing message serializer/deserialization
* based on the built-in Serializer code.
*
* <p>The protocol is based on a simple length + data format
* where two bytes represent the (short) length of the data
* and the rest is the raw data for the Serializers class.</p>
*
* @version $Revision$
* @author Paul Speed
*/
public class SerializerMessageProtocol implements MessageProtocol {
public SerializerMessageProtocol() {
}
/**
* Converts a message to a ByteBuffer using the com.jme3.network.serializing.Serializer
* and the (short length) + data protocol. If target is null
* then a 32k byte buffer will be created and filled.
*/
public ByteBuffer toByteBuffer( Message message, ByteBuffer target ) {
// Could let the caller pass their own in
ByteBuffer buffer = target == null ? ByteBuffer.allocate(32767 + 2) : target;
try {
buffer.position(2);
Serializer.writeClassAndObject(buffer, message);
buffer.flip();
short dataLength = (short)(buffer.remaining() - 2);
buffer.putShort(dataLength);
buffer.position(0);
return buffer;
} catch( IOException e ) {
throw new RuntimeException("Error serializing message", e);
}
}
/**
* Creates and returns a message from the properly sized byte buffer
* using com.jme3.network.serializing.Serializer.
*/
public Message toMessage( ByteBuffer bytes ) {
try {
return (Message)Serializer.readClassAndObject(bytes);
} catch( IOException e ) {
throw new RuntimeException("Error deserializing object, class ID:" + bytes.getShort(0), e);
}
}
public MessageBuffer createBuffer() {
return new GreedyMessageBuffer(this);
}
}
Loading…
Cancel
Save