Skip to content

Commit

Permalink
refactor SyslogFrameProcessor a bit (#85)
Browse files Browse the repository at this point in the history
* refactor SyslogFrameProcessor a bit

* use map.getOrDefault
  • Loading branch information
kortemik authored Mar 7, 2024
1 parent 01a7aed commit a5c744e
Showing 1 changed file with 89 additions and 60 deletions.
149 changes: 89 additions & 60 deletions src/main/java/com/teragrep/rlp_03/SyslogFrameProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

package com.teragrep.rlp_03;


import com.teragrep.rlp_01.RelpCommand;
import com.teragrep.rlp_01.RelpFrameTX;
import com.teragrep.rlp_03.context.frame.RelpFrame;
Expand All @@ -54,7 +55,9 @@

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

/**
Expand All @@ -65,70 +68,30 @@ public class SyslogFrameProcessor implements FrameProcessor, AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(SyslogFrameProcessor.class);

private final Consumer<FrameContext> cbFunction;
private final Map<String, Consumer<FrameContext>> relpCommandConsumerMap;

private final Consumer<FrameContext> relpEventServerClose;

public SyslogFrameProcessor(Consumer<FrameContext> cbFunction) {
this.cbFunction = cbFunction;

this.relpCommandConsumerMap = new HashMap<>();
this.relpCommandConsumerMap.put(RelpCommand.CLOSE, new RelpEventClose());
this.relpCommandConsumerMap.put(RelpCommand.OPEN, new RelpEventOpen());
this.relpCommandConsumerMap.put(RelpCommand.SYSLOG, new RelpEventSyslog());

this.relpEventServerClose = new RelpEventServerClose();
}

@Override
public void accept(FrameContext frameContext) {
// TODO add TxID checker that they increase monotonically

List<RelpFrameTX> txFrameList = new ArrayList<>(); // FIXME
switch (frameContext.relpFrame().command().toString()) {
case RelpCommand.ABORT:
// abort sends always serverclose
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.SERVER_CLOSE, ""));
break;

case RelpCommand.CLOSE:
// close is responded with rsp
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.RESPONSE, ""));


// closure is immediate!
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.SERVER_CLOSE, ""));
break;

case RelpCommand.OPEN:
String responseData = "200 OK\nrelp_version=0\n"
+ "relp_software=RLP-01,1.0.1,https://teragrep.com\n"
+ "commands=" + RelpCommand.SYSLOG + "\n";
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.RESPONSE, responseData));
break;

case RelpCommand.RESPONSE:
// client must not respond
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.SERVER_CLOSE, ""));
break;

case RelpCommand.SERVER_CLOSE:
// client must not send serverclose
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.SERVER_CLOSE, ""));
break;

case RelpCommand.SYSLOG:
if (frameContext.relpFrame().payload().size() > 0) {
try {
cbFunction.accept(frameContext);
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.RESPONSE, "200 OK"));
} catch (Exception e) {
LOGGER.error("EXCEPTION WHILE PROCESSING SYSLOG PAYLOAD", e);
txFrameList.add(createResponse(frameContext.relpFrame(),
RelpCommand.RESPONSE, "500 EXCEPTION WHILE PROCESSING SYSLOG PAYLOAD"));
}
} else {
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.RESPONSE, "500 NO PAYLOAD"));
String relpCommand = frameContext.relpFrame().command().toString();

}
break;
Consumer<FrameContext> commandConsumer = relpCommandConsumerMap.getOrDefault(relpCommand, relpEventServerClose);

default:
break;

}

frameContext.connectionContext().relpWrite().accept(txFrameList);
commandConsumer.accept(frameContext);
}

@Override
Expand All @@ -143,13 +106,79 @@ public boolean isStub() {
return false;
}

private RelpFrameTX createResponse(
RelpFrame rxFrame,
String command,
String response) {
RelpFrameTX txFrame = new RelpFrameTX(command, response.getBytes(StandardCharsets.UTF_8));
txFrame.setTransactionNumber(rxFrame.txn().toInt());
return txFrame;
private class RelpEventServerClose extends RelpEvent {
@Override
public void accept(FrameContext frameContext) {
List<RelpFrameTX> txFrameList = new ArrayList<>();

txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.SERVER_CLOSE, ""));

frameContext.connectionContext().relpWrite().accept(txFrameList);
}
}

private class RelpEventClose extends RelpEvent {

@Override
public void accept(FrameContext frameContext) {
List<RelpFrameTX> txFrameList = new ArrayList<>();

txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.RESPONSE, ""));
// closure is immediate!
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.SERVER_CLOSE, ""));

frameContext.connectionContext().relpWrite().accept(txFrameList);
}
}

private class RelpEventOpen extends RelpEvent {

private static final String responseData = "200 OK\nrelp_version=0\n"
+ "relp_software=RLP-01,1.0.1,https://teragrep.com\n"
+ "commands=" + RelpCommand.SYSLOG + "\n";

@Override
public void accept(FrameContext frameContext) {
List<RelpFrameTX> txFrameList = new ArrayList<>();

txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.RESPONSE, responseData));

frameContext.connectionContext().relpWrite().accept(txFrameList);
}
}

private class RelpEventSyslog extends RelpEvent {

@Override
public void accept(FrameContext frameContext) {
List<RelpFrameTX> txFrameList = new ArrayList<>();

if (frameContext.relpFrame().payload().size() > 0) {
try {
cbFunction.accept(frameContext);
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.RESPONSE, "200 OK"));
} catch (Exception e) {
LOGGER.error("EXCEPTION WHILE PROCESSING SYSLOG PAYLOAD", e);
txFrameList.add(createResponse(frameContext.relpFrame(),
RelpCommand.RESPONSE, "500 EXCEPTION WHILE PROCESSING SYSLOG PAYLOAD"));
}
} else {
txFrameList.add(createResponse(frameContext.relpFrame(), RelpCommand.RESPONSE, "500 NO PAYLOAD"));

}
frameContext.connectionContext().relpWrite().accept(txFrameList);
}
}

private abstract class RelpEvent implements Consumer<FrameContext> {
protected RelpFrameTX createResponse(
RelpFrame rxFrame,
String command,
String response) {
RelpFrameTX txFrame = new RelpFrameTX(command, response.getBytes(StandardCharsets.UTF_8));
txFrame.setTransactionNumber(rxFrame.txn().toInt());
return txFrame;
}
}
}

0 comments on commit a5c744e

Please sign in to comment.