Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move FragmentClock out from FragmentImpl #187

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 44 additions & 42 deletions src/main/java/com/teragrep/rlp_03/frame/FrameClock.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,78 +46,80 @@
package com.teragrep.rlp_03.frame;

import com.teragrep.rlp_03.frame.fragment.Fragment;
import com.teragrep.rlp_03.frame.fragment.FragmentImpl;
import com.teragrep.rlp_03.frame.fragment.FragmentStub;
import com.teragrep.rlp_03.frame.function.*;
import com.teragrep.rlp_03.frame.fragment.clocks.*;

import java.nio.ByteBuffer;

public class FrameClock {

private final RelpFrame relpFrameStub;
private static final RelpFrameStub relpFrameStub = new RelpFrameStub();
private static final FragmentStub fragmentStub = new FragmentStub();

private final TransactionClock transactionClock;
private final CommandClock commandClock;
private final PayloadLengthClock payloadLengthClock;
private final PayloadClock payloadClock;
private final EndOfTransferClock endOfTransferClock;

private Fragment txn;
private Fragment command;
private Fragment payloadLength;
private int cachedPayloadLength;
private Fragment payload;
private Fragment endOfTransfer;

public FrameClock() {
this.relpFrameStub = new RelpFrameStub();
clear();
this.transactionClock = new TransactionClock();
this.commandClock = new CommandClock();
this.payloadLengthClock = new PayloadLengthClock();
this.payloadClock = new PayloadClock();
this.endOfTransferClock = new EndOfTransferClock();

reset();
}

private void clear() {
this.txn = new FragmentImpl(new TransactionFunction());
this.command = new FragmentImpl(new CommandFunction());
this.payloadLength = new FragmentImpl(new PayloadLengthFunction());
this.payload = new FragmentStub();
this.endOfTransfer = new FragmentImpl(new EndOfTransferFunction());
private void reset() {
txn = fragmentStub;
command = fragmentStub;
payloadLength = fragmentStub;
cachedPayloadLength = Integer.MIN_VALUE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentionally at -2^31?

payload = fragmentStub;
endOfTransfer = fragmentStub;
}

public synchronized RelpFrame submit(ByteBuffer input) {
boolean ready = false;
RelpFrame relpFrame = relpFrameStub;

while (input.hasRemaining() && !ready) {

if (!txn.isComplete()) {
txn.accept(input);
while (input.hasRemaining()) {
if (txn.isStub()) {
txn = transactionClock.submit(input);
}
else if (!command.isComplete()) {
command.accept(input);
else if (command.isStub()) {
command = commandClock.submit(input);
}
else if (!payloadLength.isComplete()) {
payloadLength.accept(input);

if (payloadLength.isComplete()) {
// PayloadFunction depends on payload length and needs to by dynamically created
int payloadSize = payloadLength.toInt();
payload = new FragmentImpl(new PayloadFunction(payloadSize));
else if (payloadLength.isStub()) {
payloadLength = payloadLengthClock.submit(input);
if (!payloadLength.isStub()) {
cachedPayloadLength = payloadLength.toInt();
}
}
else if (!payload.isComplete()) {
payload.accept(input);
else if (payload.isStub()) {
payload = payloadClock.submit(input, cachedPayloadLength);
}
else if (!endOfTransfer.isComplete()) {
endOfTransfer.accept(input);

if (endOfTransfer.isComplete()) {
// all complete
ready = true;
else if (endOfTransfer.isStub()) {
endOfTransfer = endOfTransferClock.submit(input);
if (!endOfTransfer.isStub()) {
relpFrame = new RelpFrameImpl(txn, command, payloadLength, payload, endOfTransfer);
reset();
break;
}
}
else {
throw new IllegalStateException("submit not allowed on a complete frame");
throw new IllegalStateException("FrameClock not in phase");
}
}

if (ready) {
RelpFrame relpFrame = new RelpFrameImpl(txn, command, payloadLength, payload, endOfTransfer);
clear();
return relpFrame;
}
else {
return relpFrameStub;
}
return relpFrame;
}
}
1 change: 0 additions & 1 deletion src/main/java/com/teragrep/rlp_03/frame/RelpFrameImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
package com.teragrep.rlp_03.frame;

import com.teragrep.rlp_03.frame.fragment.Fragment;
import com.teragrep.rlp_03.frame.function.*;

// TODO Design how to use Access properly if RelpFrames are also poolable
public class RelpFrameImpl implements RelpFrame {
Expand Down
10 changes: 1 addition & 9 deletions src/main/java/com/teragrep/rlp_03/frame/fragment/Fragment.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,10 @@
*/
package com.teragrep.rlp_03.frame.fragment;

import java.nio.ByteBuffer;
import java.util.function.Consumer;

public interface Fragment extends Consumer<ByteBuffer> {

@Override
void accept(ByteBuffer input);
public interface Fragment {

boolean isStub();

boolean isComplete();

byte[] toBytes();

String toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
import com.teragrep.rlp_03.frame.access.Access;
import com.teragrep.rlp_03.frame.access.Lease;

import java.nio.ByteBuffer;

public class FragmentAccess implements Fragment {

private final Fragment fragment;
Expand All @@ -60,21 +58,11 @@ public FragmentAccess(Fragment fragment, Access access) {
this.access = access;
}

@Override
public void accept(ByteBuffer input) {
fragment.accept(input);
}

@Override
public boolean isStub() {
return fragment.isStub();
}

@Override
public boolean isComplete() {
return fragment.isComplete();
}

@Override
public byte[] toBytes() {
try (Lease ignored = access.get()) {
Expand Down
46 changes: 4 additions & 42 deletions src/main/java/com/teragrep/rlp_03/frame/fragment/FragmentImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,52 +48,23 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.List;

public class FragmentImpl implements Fragment {

private final LinkedList<ByteBuffer> bufferSliceList;
private final List<ByteBuffer> bufferSliceList;

// BiFunction is the parser function that takes: input, storageList, return value
private final BiFunction<ByteBuffer, LinkedList<ByteBuffer>, Boolean> parseRule;

private final AtomicBoolean isComplete;

public FragmentImpl(BiFunction<ByteBuffer, LinkedList<ByteBuffer>, Boolean> parseRule) {
this.bufferSliceList = new LinkedList<>();
this.parseRule = parseRule;

this.isComplete = new AtomicBoolean();
}

@Override
public void accept(ByteBuffer input) {
if (isComplete.get()) {
throw new IllegalStateException("Fragment is complete, can not accept more.");
}

if (parseRule.apply(input, bufferSliceList)) { // TODO change to buffers and scatter gather pattern?
isComplete.set(true);
}
public FragmentImpl(List<ByteBuffer> byteBufferList) {
this.bufferSliceList = byteBufferList;
}

@Override
public boolean isStub() {
return false;
}

@Override
public boolean isComplete() {
return isComplete.get();
}

@Override
public byte[] toBytes() {
if (!isComplete.get()) {
throw new IllegalStateException("Fragment incomplete!");
}

int totalBytes = 0;
for (ByteBuffer slice : bufferSliceList) {
totalBytes = totalBytes + slice.remaining();
Expand Down Expand Up @@ -124,9 +95,6 @@ public int toInt() {

@Override
public FragmentWrite toFragmentWrite() {
if (!isComplete.get()) {
throw new IllegalStateException("Fragment incomplete!");
}
LinkedList<ByteBuffer> bufferCopies = new LinkedList<>();
for (ByteBuffer buffer : bufferSliceList) {
bufferCopies.add(buffer.asReadOnlyBuffer());
Expand All @@ -136,9 +104,6 @@ public FragmentWrite toFragmentWrite() {

@Override
public FragmentByteStream toFragmentByteStream() {
if (!isComplete.get()) {
throw new IllegalStateException("Fragment incomplete!");
}
LinkedList<ByteBuffer> bufferCopies = new LinkedList<>();
for (ByteBuffer buffer : bufferSliceList) {
bufferCopies.add(buffer.asReadOnlyBuffer());
Expand All @@ -148,9 +113,6 @@ public FragmentByteStream toFragmentByteStream() {

@Override
public long size() {
if (!isComplete.get()) {
throw new IllegalStateException("Fragment incomplete!");
}
long currentLength = 0;
for (ByteBuffer slice : bufferSliceList) {
currentLength = currentLength + ((ByteBuffer) slice).limit();
Expand Down
24 changes: 6 additions & 18 deletions src/main/java/com/teragrep/rlp_03/frame/fragment/FragmentStub.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,53 +45,41 @@
*/
package com.teragrep.rlp_03.frame.fragment;

import java.nio.ByteBuffer;

public class FragmentStub implements Fragment {

@Override
public void accept(ByteBuffer input) {
throw new IllegalStateException("FragmentStub can not accept");
}

@Override
public boolean isStub() {
return true;
}

@Override
public boolean isComplete() {
throw new IllegalStateException("FragmentStub can not be complete");
}

@Override
public byte[] toBytes() {
return new byte[0];
throw new IllegalStateException("FragmentStub does not implement toBytes");
}

@Override
public String toString() {
throw new IllegalStateException("FragmentStub can not resolve toString");
throw new IllegalStateException("FragmentStub does not implement toString");
}

@Override
public int toInt() {
throw new IllegalStateException("FragmentStub can not resolve toInt");
throw new IllegalStateException("FragmentStub does not implement toInt");
}

@Override
public FragmentWrite toFragmentWrite() {
throw new IllegalStateException("FragmentStub can not resolve toFragmentWrite");
throw new IllegalStateException("FragmentStub does not implement toFragmentWrite");
}

@Override
public FragmentByteStream toFragmentByteStream() {
throw new IllegalStateException("FragmentStub can not resolve toFragmentByteStream");
throw new IllegalStateException("FragmentStub does not implement toFragmentByteStream");
}

@Override
public long size() {
throw new IllegalStateException("FragmentStub can not resolve size");
throw new IllegalStateException("FragmentStub does not implement size");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,41 +43,55 @@
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.rlp_03.frame.function;
package com.teragrep.rlp_03.frame.fragment.clocks;

import com.teragrep.rlp_03.frame.fragment.Fragment;
import com.teragrep.rlp_03.frame.fragment.FragmentImpl;
import com.teragrep.rlp_03.frame.fragment.FragmentStub;

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.function.BiFunction;

public class CommandFunction implements BiFunction<ByteBuffer, LinkedList<ByteBuffer>, Boolean> {
public class CommandClock {

private static final int maximumStringLength = 32 + 1; // space
private static final FragmentStub fragmentStub = new FragmentStub();
private final LinkedList<ByteBuffer> bufferSliceList;

public CommandFunction() {
private static final int maximumStringLength = 32 + 1; // space
StrongestNumber9 marked this conversation as resolved.
Show resolved Hide resolved

public CommandClock() {
this.bufferSliceList = new LinkedList<>();
}

@Override
public Boolean apply(ByteBuffer input, LinkedList<ByteBuffer> bufferSliceList) {
public Fragment submit(ByteBuffer input) {

ByteBuffer slice = input.slice();
int bytesRead = 0;
boolean rv = false;
boolean complete = false;
while (input.hasRemaining()) {
byte b = input.get();
bytesRead++;
checkOverSize(bytesRead, bufferSliceList);
if (b == ' ') {
// remove ' ' from the input as it's complete
((ByteBuffer) slice).limit(bytesRead - 1);
rv = true;
complete = true;
break;
}
}

bufferSliceList.add(slice);

return rv;
Fragment fragment;
if (complete) {
fragment = new FragmentImpl(new LinkedList<>(bufferSliceList));
bufferSliceList.clear();
}
else {
fragment = fragmentStub;
}

return fragment;
}

private void checkOverSize(int bytesRead, LinkedList<ByteBuffer> bufferSliceList) {
Expand Down
Loading