Skip to content

Commit

Permalink
move FragmentClock out from FragmentImpl (#187)
Browse files Browse the repository at this point in the history
* move FragmentClock out from FragmentImpl

* remove FragmentClock and create separate clocks for each fragment
  • Loading branch information
kortemik authored May 21, 2024
1 parent e9267a7 commit c3f22cf
Show file tree
Hide file tree
Showing 17 changed files with 327 additions and 267 deletions.
86 changes: 44 additions & 42 deletions src/main/java/com/teragrep/rlp_03/frame/FrameClock.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,78 +46,80 @@
package com.teragrep.rlp_03.frame;

import com.teragrep.rlp_03.frame.fragment.Fragment;
import com.teragrep.rlp_03.frame.fragment.FragmentImpl;
import com.teragrep.rlp_03.frame.fragment.FragmentStub;
import com.teragrep.rlp_03.frame.function.*;
import com.teragrep.rlp_03.frame.fragment.clocks.*;

import java.nio.ByteBuffer;

public class FrameClock {

private final RelpFrame relpFrameStub;
private static final RelpFrameStub relpFrameStub = new RelpFrameStub();
private static final FragmentStub fragmentStub = new FragmentStub();

private final TransactionClock transactionClock;
private final CommandClock commandClock;
private final PayloadLengthClock payloadLengthClock;
private final PayloadClock payloadClock;
private final EndOfTransferClock endOfTransferClock;

private Fragment txn;
private Fragment command;
private Fragment payloadLength;
private int cachedPayloadLength;
private Fragment payload;
private Fragment endOfTransfer;

public FrameClock() {
this.relpFrameStub = new RelpFrameStub();
clear();
this.transactionClock = new TransactionClock();
this.commandClock = new CommandClock();
this.payloadLengthClock = new PayloadLengthClock();
this.payloadClock = new PayloadClock();
this.endOfTransferClock = new EndOfTransferClock();

reset();
}

private void clear() {
this.txn = new FragmentImpl(new TransactionFunction());
this.command = new FragmentImpl(new CommandFunction());
this.payloadLength = new FragmentImpl(new PayloadLengthFunction());
this.payload = new FragmentStub();
this.endOfTransfer = new FragmentImpl(new EndOfTransferFunction());
private void reset() {
txn = fragmentStub;
command = fragmentStub;
payloadLength = fragmentStub;
cachedPayloadLength = Integer.MIN_VALUE;
payload = fragmentStub;
endOfTransfer = fragmentStub;
}

public synchronized RelpFrame submit(ByteBuffer input) {
boolean ready = false;
RelpFrame relpFrame = relpFrameStub;

while (input.hasRemaining() && !ready) {

if (!txn.isComplete()) {
txn.accept(input);
while (input.hasRemaining()) {
if (txn.isStub()) {
txn = transactionClock.submit(input);
}
else if (!command.isComplete()) {
command.accept(input);
else if (command.isStub()) {
command = commandClock.submit(input);
}
else if (!payloadLength.isComplete()) {
payloadLength.accept(input);

if (payloadLength.isComplete()) {
// PayloadFunction depends on payload length and needs to by dynamically created
int payloadSize = payloadLength.toInt();
payload = new FragmentImpl(new PayloadFunction(payloadSize));
else if (payloadLength.isStub()) {
payloadLength = payloadLengthClock.submit(input);
if (!payloadLength.isStub()) {
cachedPayloadLength = payloadLength.toInt();
}
}
else if (!payload.isComplete()) {
payload.accept(input);
else if (payload.isStub()) {
payload = payloadClock.submit(input, cachedPayloadLength);
}
else if (!endOfTransfer.isComplete()) {
endOfTransfer.accept(input);

if (endOfTransfer.isComplete()) {
// all complete
ready = true;
else if (endOfTransfer.isStub()) {
endOfTransfer = endOfTransferClock.submit(input);
if (!endOfTransfer.isStub()) {
relpFrame = new RelpFrameImpl(txn, command, payloadLength, payload, endOfTransfer);
reset();
break;
}
}
else {
throw new IllegalStateException("submit not allowed on a complete frame");
throw new IllegalStateException("FrameClock not in phase");
}
}

if (ready) {
RelpFrame relpFrame = new RelpFrameImpl(txn, command, payloadLength, payload, endOfTransfer);
clear();
return relpFrame;
}
else {
return relpFrameStub;
}
return relpFrame;
}
}
1 change: 0 additions & 1 deletion src/main/java/com/teragrep/rlp_03/frame/RelpFrameImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
package com.teragrep.rlp_03.frame;

import com.teragrep.rlp_03.frame.fragment.Fragment;
import com.teragrep.rlp_03.frame.function.*;

// TODO Design how to use Access properly if RelpFrames are also poolable
public class RelpFrameImpl implements RelpFrame {
Expand Down
10 changes: 1 addition & 9 deletions src/main/java/com/teragrep/rlp_03/frame/fragment/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,10 @@
*/
package com.teragrep.rlp_03.frame.fragment;

import java.nio.ByteBuffer;
import java.util.function.Consumer;

public interface Fragment extends Consumer<ByteBuffer> {

@Override
void accept(ByteBuffer input);
public interface Fragment {

boolean isStub();

boolean isComplete();

byte[] toBytes();

String toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import com.teragrep.rlp_03.frame.access.Access;
import com.teragrep.rlp_03.frame.access.Lease;

import java.nio.ByteBuffer;

public class FragmentAccess implements Fragment {

private final Fragment fragment;
Expand All @@ -60,21 +58,11 @@ public FragmentAccess(Fragment fragment, Access access) {
this.access = access;
}

@Override
public void accept(ByteBuffer input) {
fragment.accept(input);
}

@Override
public boolean isStub() {
return fragment.isStub();
}

@Override
public boolean isComplete() {
return fragment.isComplete();
}

@Override
public byte[] toBytes() {
try (Lease ignored = access.get()) {
Expand Down
46 changes: 4 additions & 42 deletions src/main/java/com/teragrep/rlp_03/frame/fragment/FragmentImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,52 +48,23 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.List;

public class FragmentImpl implements Fragment {

private final LinkedList<ByteBuffer> bufferSliceList;
private final List<ByteBuffer> bufferSliceList;

// BiFunction is the parser function that takes: input, storageList, return value
private final BiFunction<ByteBuffer, LinkedList<ByteBuffer>, Boolean> parseRule;

private final AtomicBoolean isComplete;

public FragmentImpl(BiFunction<ByteBuffer, LinkedList<ByteBuffer>, Boolean> parseRule) {
this.bufferSliceList = new LinkedList<>();
this.parseRule = parseRule;

this.isComplete = new AtomicBoolean();
}

@Override
public void accept(ByteBuffer input) {
if (isComplete.get()) {
throw new IllegalStateException("Fragment is complete, can not accept more.");
}

if (parseRule.apply(input, bufferSliceList)) { // TODO change to buffers and scatter gather pattern?
isComplete.set(true);
}
public FragmentImpl(List<ByteBuffer> byteBufferList) {
this.bufferSliceList = byteBufferList;
}

@Override
public boolean isStub() {
return false;
}

@Override
public boolean isComplete() {
return isComplete.get();
}

@Override
public byte[] toBytes() {
if (!isComplete.get()) {
throw new IllegalStateException("Fragment incomplete!");
}

int totalBytes = 0;
for (ByteBuffer slice : bufferSliceList) {
totalBytes = totalBytes + slice.remaining();
Expand Down Expand Up @@ -124,9 +95,6 @@ public int toInt() {

@Override
public FragmentWrite toFragmentWrite() {
if (!isComplete.get()) {
throw new IllegalStateException("Fragment incomplete!");
}
LinkedList<ByteBuffer> bufferCopies = new LinkedList<>();
for (ByteBuffer buffer : bufferSliceList) {
bufferCopies.add(buffer.asReadOnlyBuffer());
Expand All @@ -136,9 +104,6 @@ public FragmentWrite toFragmentWrite() {

@Override
public FragmentByteStream toFragmentByteStream() {
if (!isComplete.get()) {
throw new IllegalStateException("Fragment incomplete!");
}
LinkedList<ByteBuffer> bufferCopies = new LinkedList<>();
for (ByteBuffer buffer : bufferSliceList) {
bufferCopies.add(buffer.asReadOnlyBuffer());
Expand All @@ -148,9 +113,6 @@ public FragmentByteStream toFragmentByteStream() {

@Override
public long size() {
if (!isComplete.get()) {
throw new IllegalStateException("Fragment incomplete!");
}
long currentLength = 0;
for (ByteBuffer slice : bufferSliceList) {
currentLength = currentLength + ((ByteBuffer) slice).limit();
Expand Down
24 changes: 6 additions & 18 deletions src/main/java/com/teragrep/rlp_03/frame/fragment/FragmentStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,53 +45,41 @@
*/
package com.teragrep.rlp_03.frame.fragment;

import java.nio.ByteBuffer;

public class FragmentStub implements Fragment {

@Override
public void accept(ByteBuffer input) {
throw new IllegalStateException("FragmentStub can not accept");
}

@Override
public boolean isStub() {
return true;
}

@Override
public boolean isComplete() {
throw new IllegalStateException("FragmentStub can not be complete");
}

@Override
public byte[] toBytes() {
return new byte[0];
throw new IllegalStateException("FragmentStub does not implement toBytes");
}

@Override
public String toString() {
throw new IllegalStateException("FragmentStub can not resolve toString");
throw new IllegalStateException("FragmentStub does not implement toString");
}

@Override
public int toInt() {
throw new IllegalStateException("FragmentStub can not resolve toInt");
throw new IllegalStateException("FragmentStub does not implement toInt");
}

@Override
public FragmentWrite toFragmentWrite() {
throw new IllegalStateException("FragmentStub can not resolve toFragmentWrite");
throw new IllegalStateException("FragmentStub does not implement toFragmentWrite");
}

@Override
public FragmentByteStream toFragmentByteStream() {
throw new IllegalStateException("FragmentStub can not resolve toFragmentByteStream");
throw new IllegalStateException("FragmentStub does not implement toFragmentByteStream");
}

@Override
public long size() {
throw new IllegalStateException("FragmentStub can not resolve size");
throw new IllegalStateException("FragmentStub does not implement size");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,41 +43,55 @@
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.frame.function;
package com.teragrep.rlp_03.frame.fragment.clocks;

import com.teragrep.rlp_03.frame.fragment.Fragment;
import com.teragrep.rlp_03.frame.fragment.FragmentImpl;
import com.teragrep.rlp_03.frame.fragment.FragmentStub;

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.function.BiFunction;

public class CommandFunction implements BiFunction<ByteBuffer, LinkedList<ByteBuffer>, Boolean> {
public class CommandClock {

private static final int maximumStringLength = 32 + 1; // space
private static final FragmentStub fragmentStub = new FragmentStub();
private final LinkedList<ByteBuffer> bufferSliceList;

public CommandFunction() {
private static final int maximumStringLength = 32 + 1; // space

public CommandClock() {
this.bufferSliceList = new LinkedList<>();
}

@Override
public Boolean apply(ByteBuffer input, LinkedList<ByteBuffer> bufferSliceList) {
public Fragment submit(ByteBuffer input) {

ByteBuffer slice = input.slice();
int bytesRead = 0;
boolean rv = false;
boolean complete = false;
while (input.hasRemaining()) {
byte b = input.get();
bytesRead++;
checkOverSize(bytesRead, bufferSliceList);
if (b == ' ') {
// remove ' ' from the input as it's complete
((ByteBuffer) slice).limit(bytesRead - 1);
rv = true;
complete = true;
break;
}
}

bufferSliceList.add(slice);

return rv;
Fragment fragment;
if (complete) {
fragment = new FragmentImpl(new LinkedList<>(bufferSliceList));
bufferSliceList.clear();
}
else {
fragment = fragmentStub;
}

return fragment;
}

private void checkOverSize(int bytesRead, LinkedList<ByteBuffer> bufferSliceList) {
Expand Down
Loading

0 comments on commit c3f22cf

Please sign in to comment.