Skip to content

Commit

Permalink
removed SBE dependency and replaced with AtomicBuffer usage.
Browse files Browse the repository at this point in the history
  • Loading branch information
tmontgomery committed Mar 13, 2014
1 parent e8740ef commit 2b5779d
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 52 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ Build
You require the following to build Aeron:

* Latest stable [Oracle JDK 8](http://www.oracle.com/technetwork/java/)
* Latest [Simple Binary Encoding (SBE)] (https://github.com/real-logic/simple-binary-encoding) installed in local maven repo

### Gradle Build

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import uk.co.real_logic.aeron.util.DataHeaderFlyweight;
import uk.co.real_logic.aeron.util.HeaderFlyweight;
import uk.co.real_logic.sbe.codec.java.DirectBuffer;
import uk.co.real_logic.aeron.util.concurrent.AtomicBuffer;

import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
Expand All @@ -37,7 +37,7 @@ public final class UdpTransport implements ReadHandler, AutoCloseable
public static final int READ_BYTE_BUFFER_SZ = 4096; // TODO: this needs to be configured in some way

private final ByteBuffer readByteBuffer;
private final DirectBuffer readBuffer;
private final AtomicBuffer readBuffer;
private final DatagramChannel channel;
private final HeaderFlyweight header;
private final DataHeaderFlyweight dataHeader;
Expand All @@ -48,7 +48,7 @@ public final class UdpTransport implements ReadHandler, AutoCloseable
public UdpTransport(final FrameHandler frameHandler, final InetSocketAddress local, final EventLoop eventLoop) throws Exception
{
this.readByteBuffer = ByteBuffer.allocateDirect(READ_BYTE_BUFFER_SZ);
this.readBuffer = new DirectBuffer(this.readByteBuffer);
this.readBuffer = new AtomicBuffer(this.readByteBuffer);
this.channel = DatagramChannel.open();
this.header = new HeaderFlyweight();
this.dataHeader = new DataHeaderFlyweight();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.junit.Test;
import uk.co.real_logic.aeron.util.DataHeaderFlyweight;
import uk.co.real_logic.aeron.util.HeaderFlyweight;
import uk.co.real_logic.sbe.codec.java.DirectBuffer;
import uk.co.real_logic.aeron.util.concurrent.AtomicBuffer;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
Expand All @@ -38,7 +38,7 @@ public class EventLoopTest
//private static final String RCV_UDP_URI = "udp://localhost:40123@localhost:40124";

private final ByteBuffer buffer = ByteBuffer.allocateDirect(256);
private final DirectBuffer directBuffer = new DirectBuffer(buffer);
private final AtomicBuffer atomicBuffer = new AtomicBuffer(buffer);
private final DataHeaderFlyweight encodeDataHeader = new DataHeaderFlyweight();
//private final InetSocketAddress srcRemoteAddr = new InetSocketAddress("localhost", RCV_PORT);
private final InetSocketAddress rcvRemoteAddr = new InetSocketAddress("localhost", SRC_PORT);
Expand Down Expand Up @@ -86,7 +86,7 @@ public void onControlFrame(final HeaderFlyweight header, final InetSocketAddress

final SrcFrameHandler src = new SrcFrameHandler(UdpDestination.parse(SRC_UDP_URI), eventLoop);

encodeDataHeader.reset(directBuffer, 0)
encodeDataHeader.reset(atomicBuffer, 0)
.version((byte)HeaderFlyweight.CURRENT_VERSION)
.headerType((short)HeaderFlyweight.HDR_TYPE_DATA)
.frameLength(20)
Expand Down Expand Up @@ -135,7 +135,7 @@ public void onControlFrame(final HeaderFlyweight header, final InetSocketAddress

final SrcFrameHandler src = new SrcFrameHandler(UdpDestination.parse(SRC_UDP_URI), eventLoop);

encodeDataHeader.reset(directBuffer, 0)
encodeDataHeader.reset(atomicBuffer, 0)
.version((byte)HeaderFlyweight.CURRENT_VERSION)
.headerType((short)HeaderFlyweight.HDR_TYPE_CONN)
.frameLength(8)
Expand Down Expand Up @@ -185,14 +185,14 @@ public void onControlFrame(final HeaderFlyweight header, final InetSocketAddress

final SrcFrameHandler src = new SrcFrameHandler(UdpDestination.parse(SRC_UDP_URI), eventLoop);

encodeDataHeader.reset(directBuffer, 0)
encodeDataHeader.reset(atomicBuffer, 0)
.version((byte) HeaderFlyweight.CURRENT_VERSION)
.headerType((short) HeaderFlyweight.HDR_TYPE_DATA)
.frameLength(20)
.sessionId(SESSION_ID);
encodeDataHeader.channelId(CHANNEL_ID)
.termId(TERM_ID);
encodeDataHeader.reset(directBuffer, 20)
encodeDataHeader.reset(atomicBuffer, 20)
.version((byte)HeaderFlyweight.CURRENT_VERSION)
.headerType((short)HeaderFlyweight.HDR_TYPE_DATA)
.frameLength(20)
Expand Down Expand Up @@ -242,7 +242,7 @@ public void onControlFrame(final HeaderFlyweight header, final InetSocketAddress

final RcvFrameHandler rcv = new RcvFrameHandler(rcvLocalAddr, eventLoop);

encodeDataHeader.reset(directBuffer, 0)
encodeDataHeader.reset(atomicBuffer, 0)
.version((byte) HeaderFlyweight.CURRENT_VERSION)
.headerType((short) HeaderFlyweight.HDR_TYPE_CONN)
.frameLength(8)
Expand Down
4 changes: 0 additions & 4 deletions aeron-util/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
* limitations under the License.
*/

dependencies {
compile 'uk.co.real_logic:sbe:0.1-SNAPSHOT'
}

compileJava {
// Suppress warnings about using Unsafe
options.compilerArgs << '-XDignore.symbol.file'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package uk.co.real_logic.aeron.util;

import uk.co.real_logic.sbe.codec.java.CodecUtil;

import java.nio.ByteOrder;

/**
Expand Down Expand Up @@ -53,7 +51,7 @@ public class DataHeaderFlyweight extends HeaderFlyweight
*/
public long channelId()
{
return CodecUtil.uint32Get(directBuffer, offset + CHANNEL_ID_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
return uint32Get(atomicBuffer, offset + CHANNEL_ID_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
}

/**
Expand All @@ -64,7 +62,7 @@ public long channelId()
*/
public DataHeaderFlyweight channelId(final long channelId)
{
CodecUtil.uint32Put(directBuffer, offset + CHANNEL_ID_FIELD_OFFSET, channelId, ByteOrder.LITTLE_ENDIAN);
uint32Put(atomicBuffer, offset + CHANNEL_ID_FIELD_OFFSET, channelId, ByteOrder.LITTLE_ENDIAN);
return this;
}

Expand All @@ -75,7 +73,7 @@ public DataHeaderFlyweight channelId(final long channelId)
*/
public long termId()
{
return CodecUtil.uint32Get(directBuffer, offset + TERM_ID_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
return uint32Get(atomicBuffer, offset + TERM_ID_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
}

/**
Expand All @@ -86,7 +84,7 @@ public long termId()
*/
public DataHeaderFlyweight termId(final long termId)
{
CodecUtil.uint32Put(directBuffer, offset + TERM_ID_FIELD_OFFSET, termId, ByteOrder.LITTLE_ENDIAN);
uint32Put(atomicBuffer, offset + TERM_ID_FIELD_OFFSET, termId, ByteOrder.LITTLE_ENDIAN);
return this;
}

Expand All @@ -97,7 +95,7 @@ public DataHeaderFlyweight termId(final long termId)
*/
public long sequenceNumber()
{
return CodecUtil.uint32Get(directBuffer, offset + SEQUENCE_NUMBER_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
return uint32Get(atomicBuffer, offset + SEQUENCE_NUMBER_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
}

/**
Expand All @@ -108,7 +106,7 @@ public long sequenceNumber()
*/
public DataHeaderFlyweight sequenceNumber(final long sequenceNumber)
{
CodecUtil.uint32Put(directBuffer, offset + SEQUENCE_NUMBER_FIELD_OFFSET, sequenceNumber, ByteOrder.LITTLE_ENDIAN);
uint32Put(atomicBuffer, offset + SEQUENCE_NUMBER_FIELD_OFFSET, sequenceNumber, ByteOrder.LITTLE_ENDIAN);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
*/
package uk.co.real_logic.aeron.util;

import uk.co.real_logic.sbe.codec.java.CodecUtil;
import uk.co.real_logic.sbe.codec.java.DirectBuffer;
import uk.co.real_logic.aeron.util.concurrent.AtomicBuffer;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
Expand Down Expand Up @@ -59,12 +58,12 @@ public class HeaderFlyweight
/** default version */
public static final int CURRENT_VERSION = 0x0;

private static final int VERS_FIELD_OFFSET = 0;
private static final int TYPE_FIELD_OFFSET = 1;
private static final int FRAME_LENGTH_FIELD_OFFSET = 2;
private static final int SESSION_ID_FIELD_OFFSET = 4;
public static final int VERS_FIELD_OFFSET = 0;
public static final int TYPE_FIELD_OFFSET = 1;
public static final int FRAME_LENGTH_FIELD_OFFSET = 2;
public static final int SESSION_ID_FIELD_OFFSET = 4;

protected DirectBuffer directBuffer;
protected AtomicBuffer atomicBuffer;
protected int offset;

public HeaderFlyweight()
Expand All @@ -73,25 +72,55 @@ public HeaderFlyweight()

public HeaderFlyweight reset(final ByteBuffer buffer, final int offset)
{
this.directBuffer = new DirectBuffer(buffer);
this.atomicBuffer = new AtomicBuffer(buffer);
this.offset = offset;
return this;
}

public HeaderFlyweight reset(final DirectBuffer buffer, final int offset)
public HeaderFlyweight reset(final AtomicBuffer buffer, final int offset)
{
this.directBuffer = buffer;
this.atomicBuffer = buffer;
this.offset = offset;
return this;
}

public static short uint8Get(final AtomicBuffer buffer, final int offset)
{
return (short)(buffer.getByte(offset) & 0xFF);
}

public static void uint8Put(final AtomicBuffer buffer, final int offset, final short value)
{
buffer.putByte(offset, (byte)value);
}

public static int uint16Get(final AtomicBuffer buffer, final int offset, final ByteOrder byteOrder)
{
return (int)(buffer.getShort(offset, byteOrder) & 0xFFFF);
}

public static void uint16Put(final AtomicBuffer buffer, final int offset, final int value, final ByteOrder byteOrder)
{
buffer.putShort(offset, (short)value, byteOrder);
}

public static long uint32Get(final AtomicBuffer buffer, final int offset, final ByteOrder byteOrder)
{
return (long)(buffer.getInt(offset, byteOrder) & 0xFFFFFFFFL);
}

public static void uint32Put(final AtomicBuffer buffer, final int offset, final long value, final ByteOrder byteOrder)
{
buffer.putInt(offset, (int)value, byteOrder);
}

/**
* return version field value
* @return ver field value
*/
public byte version()
{
final int versAndFlags = CodecUtil.uint8Get(directBuffer, offset + VERS_FIELD_OFFSET);
final int versAndFlags = uint8Get(atomicBuffer, offset + VERS_FIELD_OFFSET);

return (byte)(versAndFlags >> 4);
}
Expand All @@ -103,7 +132,7 @@ public byte version()
*/
public HeaderFlyweight version(final byte ver)
{
CodecUtil.uint8Put(directBuffer, offset + VERS_FIELD_OFFSET, (short)(ver << 4));
uint8Put(atomicBuffer, offset + VERS_FIELD_OFFSET, (byte)(ver << 4));
return this;
}

Expand All @@ -113,7 +142,7 @@ public HeaderFlyweight version(final byte ver)
*/
public short headerType()
{
return CodecUtil.uint8Get(directBuffer, offset + TYPE_FIELD_OFFSET);
return uint8Get(atomicBuffer, offset + TYPE_FIELD_OFFSET);
}

/**
Expand All @@ -123,7 +152,7 @@ public short headerType()
*/
public HeaderFlyweight headerType(final short type)
{
CodecUtil.uint8Put(directBuffer, offset + TYPE_FIELD_OFFSET, type);
uint8Put(atomicBuffer, offset + TYPE_FIELD_OFFSET, (byte)type);
return this;
}

Expand All @@ -133,7 +162,7 @@ public HeaderFlyweight headerType(final short type)
*/
public int frameLength()
{
return CodecUtil.uint16Get(directBuffer, offset + FRAME_LENGTH_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
return uint16Get(atomicBuffer, offset + FRAME_LENGTH_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
}

/**
Expand All @@ -143,7 +172,7 @@ public int frameLength()
*/
public HeaderFlyweight frameLength(final int length)
{
CodecUtil.uint16Put(directBuffer, offset + FRAME_LENGTH_FIELD_OFFSET, length, ByteOrder.LITTLE_ENDIAN);
uint16Put(atomicBuffer, offset + FRAME_LENGTH_FIELD_OFFSET, (short)length, ByteOrder.LITTLE_ENDIAN);
return this;
}

Expand All @@ -153,7 +182,7 @@ public HeaderFlyweight frameLength(final int length)
*/
public long sessionId()
{
return CodecUtil.uint32Get(directBuffer, offset + SESSION_ID_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
return uint32Get(atomicBuffer, offset + SESSION_ID_FIELD_OFFSET, ByteOrder.LITTLE_ENDIAN);
}

/**
Expand All @@ -163,7 +192,7 @@ public long sessionId()
*/
public HeaderFlyweight sessionId(final long sessionId)
{
CodecUtil.uint32Put(directBuffer, offset + SESSION_ID_FIELD_OFFSET, sessionId, ByteOrder.LITTLE_ENDIAN);
uint32Put(atomicBuffer, offset + SESSION_ID_FIELD_OFFSET, (int)sessionId, ByteOrder.LITTLE_ENDIAN);
return this;
}
}
Loading

0 comments on commit 2b5779d

Please sign in to comment.