Skip to content

Commit

Permalink
Change relpframe close (#89)
Browse files Browse the repository at this point in the history
* relp-frame-close
  • Loading branch information
kortemik authored Mar 7, 2024
1 parent 0805648 commit 147b775
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 18 deletions.
25 changes: 11 additions & 14 deletions src/main/java/com/teragrep/rlp_03/context/RelpReadImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,23 +221,20 @@ private void processFrame(RelpFrameLeaseful relpFrame) {
LOGGER.debug("close requested, not submitting next read runnable");
}

RelpFrameAccess relpFrameAccess = new RelpFrameAccess(relpFrame);
FrameContext frameContext = new FrameContext(connectionContext, relpFrameAccess);
FrameProcessor frameProcessor = frameProcessorPool.take(); // FIXME should this be locked to ensure visibility
try (RelpFrameAccess relpFrameAccess = new RelpFrameAccess(relpFrame)) {
FrameContext frameContext = new FrameContext(connectionContext, relpFrameAccess);
FrameProcessor frameProcessor = frameProcessorPool.take(); // FIXME should this be locked to ensure visibility

if (!frameProcessor.isStub()) {
frameProcessor.accept(frameContext); // this thread goes there
frameProcessorPool.offer(frameProcessor);
} else {
// TODO should this be IllegalState or should it just '0 serverclose 0' ?
LOGGER.warn("FrameProcessorPool closing, rejecting frame and closing connection for PeerAddress <{}> PeerPort <{}>", connectionContext.socket().getTransportInfo().getPeerAddress(), connectionContext.socket().getTransportInfo().getPeerPort());
connectionContext.close();
if (!frameProcessor.isStub()) {
frameProcessor.accept(frameContext); // this thread goes there
frameProcessorPool.offer(frameProcessor);
} else {
// TODO should this be IllegalState or should it just '0 serverclose 0' ?
LOGGER.warn("FrameProcessorPool closing, rejecting frame and closing connection for PeerAddress <{}> PeerPort <{}>", connectionContext.socket().getTransportInfo().getPeerAddress(), connectionContext.socket().getTransportInfo().getPeerPort());
connectionContext.close();
}
}

// TODO make relpFrame declare close() -> all envelopes close sub-envelope, when outer is closed
relpFrameAccess.close();
relpFrame.close();

LOGGER.debug("processed txFrame. End of thread's processing.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

import com.teragrep.rlp_03.context.frame.fragment.Fragment;

public interface RelpFrame {
public interface RelpFrame extends AutoCloseable {
Fragment txn();

Fragment command();
Expand All @@ -60,4 +60,7 @@ public interface RelpFrame {
Fragment endOfTransfer();

boolean isStub();

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@
import com.teragrep.rlp_03.context.frame.fragment.Fragment;
import com.teragrep.rlp_03.context.frame.fragment.FragmentAccess;

public class RelpFrameAccess implements RelpFrame, AutoCloseable {
public class RelpFrameAccess implements RelpFrame {

private final RelpFrame relpFrame;
private final Fragment txn;
private final Fragment command;
private final Fragment payloadLength;
private final Fragment payload;
private final Fragment endOfTransfer;
private final boolean isStub;

private final Access access;

public RelpFrameAccess(RelpFrame relpFrame) {
this.relpFrame = relpFrame;
this.access = new Access();
this.txn = new FragmentAccess(relpFrame.txn(), access);
this.command = new FragmentAccess(relpFrame.command(), access);
Expand Down Expand Up @@ -115,5 +116,6 @@ public String toString() {
@Override
public void close() {
access.terminate();
relpFrame.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,9 @@ public String toString() {
", endOfTransfer=" + endOfTransfer +
'}';
}

@Override
public void close() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import java.util.LinkedList;
import java.util.List;

public class RelpFrameLeaseful implements RelpFrame, AutoCloseable {
public class RelpFrameLeaseful implements RelpFrame {

private static final Logger LOGGER = LoggerFactory.getLogger(RelpFrameLeaseful.class);

Expand Down Expand Up @@ -114,6 +114,7 @@ public void close() {
}
bufferLeasePool.offer(bufferLease);
}
relpFrame.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@ public Fragment endOfTransfer() {
public boolean isStub() {
return true;
}

@Override
public void close() {
throw new IllegalStateException("RelpFrameStub does not allow this method");
}
}

0 comments on commit 147b775

Please sign in to comment.