@ -112,14 +112,19 @@ public class SelectorKernel extends AbstractKernel
}
}
}
}
public void broadcast ( Filter < ? super Endpoint > filter , ByteBuffer data , boolean reliable )
public void broadcast ( Filter < ? super Endpoint > filter , ByteBuffer data , boolean reliable ,
boolean copy )
{
{
if ( ! reliable )
if ( ! reliable )
throw new UnsupportedOperationException ( "Unreliable send not supported by this kernel." ) ;
throw new UnsupportedOperationException ( "Unreliable send not supported by this kernel." ) ;
// Copy the data just once
if ( copy )
byte [ ] copy = new byte [ data . remaining ( ) ] ;
{
System . arraycopy ( data . array ( ) , data . position ( ) , copy , 0 , data . remaining ( ) ) ;
// Copy the data just once
byte [ ] temp = new byte [ data . remaining ( ) ] ;
System . arraycopy ( data . array ( ) , data . position ( ) , temp , 0 , data . remaining ( ) ) ;
data = ByteBuffer . wrap ( temp ) ;
}
// Hand it to all of the endpoints that match our routing
// Hand it to all of the endpoints that match our routing
for ( NioEndpoint p : endpoints . values ( ) ) {
for ( NioEndpoint p : endpoints . values ( ) ) {
@ -127,8 +132,10 @@ public class SelectorKernel extends AbstractKernel
if ( filter ! = null & & ! filter . apply ( p ) )
if ( filter ! = null & & ! filter . apply ( p ) )
continue ;
continue ;
// Give it the data
// Give it the data... but let each endpoint track their
p . send ( data , false , false ) ;
// own completion over the shared array of bytes by
// duplicating it
p . send ( data . duplicate ( ) , false , false ) ;
}
}
// Wake up the selector so it can reinitialize its
// Wake up the selector so it can reinitialize its
@ -268,9 +275,10 @@ public class SelectorKernel extends AbstractKernel
// efficiently done as change requests... or simply
// efficiently done as change requests... or simply
// keeping a thread-safe set of endpoints with pending
// keeping a thread-safe set of endpoints with pending
// writes. For most cases, it shouldn't matter.
// writes. For most cases, it shouldn't matter.
for ( Map . Entry < NioEndpoint , SelectionKey > e : endpointKeys . entrySet ( ) ) {
for ( Map . Entry < NioEndpoint , SelectionKey > e : endpointKeys . entrySet ( ) ) {
if ( e . getKey ( ) . hasPending ( ) )
if ( e . getKey ( ) . hasPending ( ) ) {
e . getValue ( ) . interestOps ( SelectionKey . OP_WRITE ) ;
e . getValue ( ) . interestOps ( SelectionKey . OP_WRITE ) ;
}
}
}
}
}
@ -363,7 +371,7 @@ public class SelectorKernel extends AbstractKernel
}
}
c . write ( current ) ;
c . write ( current ) ;
// If we wrote all of that packet then we need to remove it
// If we wrote all of that packet then we need to remove it
if ( current . remaining ( ) = = 0 ) {
if ( current . remaining ( ) = = 0 ) {
p . removePending ( ) ;
p . removePending ( ) ;