Skip to content

Commit

Permalink
Adds Line-Feed Terminate option (#2)
Browse files Browse the repository at this point in the history
* Adds Line-Feed Terminate option which allows to choose between LF terminated events and EOF terminated events

* Renames LFT to lineFeedTermination
  • Loading branch information
p000010u authored Dec 22, 2022
1 parent ad41386 commit b22764a
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 36 deletions.
90 changes: 54 additions & 36 deletions src/main/java/com/teragrep/rlo_06/RFC5424Parser.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,20 @@

public final class RFC5424Parser {
private InputStream inputStream;
private final boolean lineFeedTermination;
private Boolean EOF = false;
private final byte[] buffer = new byte[256*1024];
private final byte[] buffer = new byte[256 * 1024];
private int pointer = -1;
private int read = -1;

public RFC5424Parser(InputStream inputStream) {
this.inputStream = inputStream;
this.lineFeedTermination = true;
}

public RFC5424Parser(InputStream inputStream, boolean lineFeedTermination) {
this.inputStream = inputStream;
this.lineFeedTermination = lineFeedTermination;
}

public void setInputStream(InputStream inputStream) {
Expand Down Expand Up @@ -97,20 +104,20 @@ private void parsePriority(ParserResultset resultset) throws IOException {

b = this.readBuffer();
if (b >= 48 && b <= 57) { // first is always a number between 0..9
if(resultset.PRIORITY!= null)
if (resultset.PRIORITY != null)
resultset.PRIORITY.put(b);
} else {
throw new ParseException("PRIORITY number incorrect");
}

b = this.readBuffer();
if (b >= 48 && b <= 57) { // second may be a number between 0..9
if(resultset.PRIORITY!= null)
if (resultset.PRIORITY != null)
resultset.PRIORITY.put(b);

b = this.readBuffer();
if (b >= 48 && b <= 57) { // third may be a number
if(resultset.PRIORITY!= null)
if (resultset.PRIORITY != null)
resultset.PRIORITY.put(b);

b = this.readBuffer();
Expand Down Expand Up @@ -167,7 +174,7 @@ private byte parseSD(ParserResultset resultset) throws IOException {
return b;
}

while(b == 91) { // '[' sd exists
while (b == 91) { // '[' sd exists
// structured data, oh wow the performance hit

// parse the sdId
Expand All @@ -178,7 +185,7 @@ private byte parseSD(ParserResultset resultset) throws IOException {
Payload:'[ID_A@1]'
*/
b = this.readBuffer();
while(sdId_max_left > 0 && b != 32 && b != 93) { // ' ' nor ']'
while (sdId_max_left > 0 && b != 32 && b != 93) { // ' ' nor ']'
resultset.sdIdIterator.put(b);
sdId_max_left--;
b = this.readBuffer();
Expand All @@ -187,16 +194,14 @@ private byte parseSD(ParserResultset resultset) throws IOException {

if (b != 32 && b != 93) { // ' ' nor ']'
throw new ParseException("SP missing after SD_ID or SD_ID too long");
}
else if (b == 93) { // ']', sdId only here: Payload:'[ID_A@1]' or Payload:'[ID_A@1][ID_B@1]'
} else if (b == 93) { // ']', sdId only here: Payload:'[ID_A@1]' or Payload:'[ID_A@1][ID_B@1]'
// clean up sdIterator for the next one
resultset.sdIdIterator.flip();
resultset.sdIdIterator.clear();

// MSG may not exist, no \n either, Parsing may be complete. readBuffer sets this.returnAfter to false
// Total payload: '<14>1 2015-06-20T09:14:07.12345+00:00 host02 serverd DEA MSG-01 [ID_A@1]'
}
else { // ' ', sdElement must exist
} else { // ' ', sdElement must exist
// check if we are interested in this sdId at all or skip to next sdId block

if (resultset.sdSubscription.containsKey(resultset.sdIdIterator)) {
Expand All @@ -220,22 +225,22 @@ else if (b == 93) { // ']', sdId only here: Payload:'[ID_A@1]' or Payload:'[ID_A
}

// check if this is for us
if(resultset.sdSubscription.get(resultset.sdIdIterator).containsKey(resultset.sdElementIterator)) {
ByteBuffer elementValue = resultset.sdSubscription.get(resultset.sdIdIterator).get(resultset.sdElementIterator);
short sdElemVal_max_left = 8*1024;
if (resultset.sdSubscription.get(resultset.sdIdIterator).containsKey(resultset.sdElementIterator)) {
ByteBuffer elementValue = resultset.sdSubscription.get(resultset.sdIdIterator).get(resultset.sdElementIterator);
short sdElemVal_max_left = 8 * 1024;
b = this.readBuffer();

while (sdElemVal_max_left > 0 && b != 34) { // '"'
// escaped are special: \" \\ \] ...
if(b == 92) { // \
if (b == 92) { // \
// insert
elementValue.put(b);
sdElemVal_max_left--;
// read next
b = this.readBuffer();

// if it is a '"' then it must be taken care of, loop can do the rest
if(b == 34) {
if (b == 34) {
if (sdElemVal_max_left > 0) {
elementValue.put(b);
sdElemVal_max_left--;
Expand Down Expand Up @@ -325,19 +330,33 @@ private byte parseMSG(ParserResultset resultset, byte lastByte) throws IOExcepti

// first of anything is ' '
// first
if (
b != 32 // space is skipped as "-xyz" or "- xyz" or "]xyz" or "] xyz" may exist
) {
if (b != 32) { // space is skipped as "-xyz" or "- xyz" or "]xyz" or "] xyz" may exist
resultset.MSG.put(b);
msg_current_left--;
} else { // read next byte because this one is a space
b = this.readBuffer();
}

// this little while here is the steam roller of this parser
while ( (b = this.readBuffer()) != 10 && this.EOF == false) {
if (msg_current_left > 0) {
if(resultset.MSG != null)
resultset.MSG.put(b);
msg_current_left--;
// this little while here is the steamroller of this parser
if (this.lineFeedTermination) { // Line-feed termination active
while (b != 10 && !this.EOF) {
if (msg_current_left > 0) {
if (resultset.MSG != null) {
resultset.MSG.put(b);
}
msg_current_left--;
}
b = this.readBuffer();
}
} else { // Line-feed termination inactive, reading until EOF
while (!this.EOF) {
if (msg_current_left > 0) {
if (resultset.MSG != null) {
resultset.MSG.put(b);
}
msg_current_left--;
}
b = this.readBuffer();
}
}
return b;
Expand All @@ -347,15 +366,14 @@ private void parseVERSION(ParserResultset resultset) throws IOException {
byte b;
b = this.readBuffer();
if (b == 49) {
if(resultset.VERSION != null)
resultset.VERSION.put(b);
if (resultset.VERSION != null)
resultset.VERSION.put(b);

b = this.readBuffer();
if (b != 32) { // omit ' '
throw new ParseException("SP missing after VERSION");
}
}
else {
} else {
throw new ParseException("VERSION not 1");
}
}
Expand All @@ -365,7 +383,7 @@ private void parseTIMESTAMP(ParserResultset resultset) throws IOException {
short ts_max_left = 32;
b = this.readBuffer();
while (ts_max_left > 0 && b != 32) {
if(resultset.TIMESTAMP != null)
if (resultset.TIMESTAMP != null)
resultset.TIMESTAMP.put(b);
ts_max_left--;
b = this.readBuffer();
Expand All @@ -381,8 +399,8 @@ private void parseHOSTNAME(ParserResultset resultset) throws IOException {
short hostname_max_left = 255;
b = this.readBuffer();
while (hostname_max_left > 0 && b != 32) {
if(resultset.HOSTNAME != null)
resultset.HOSTNAME.put(b);
if (resultset.HOSTNAME != null)
resultset.HOSTNAME.put(b);
hostname_max_left--;
b = this.readBuffer();
}
Expand All @@ -397,7 +415,7 @@ private void parseAPPNAME(ParserResultset resultset) throws IOException {
short appname_max_left = 48;
b = this.readBuffer();
while (appname_max_left > 0 && b != 32) {
if(resultset.APPNAME != null)
if (resultset.APPNAME != null)
resultset.APPNAME.put(b);
appname_max_left--;
b = this.readBuffer();
Expand All @@ -413,7 +431,7 @@ private void parsePROCID(ParserResultset resultset) throws IOException {
short procid_max_left = 128;
b = this.readBuffer();
while (procid_max_left > 0 && b != 32) {
if(resultset.PROCID != null)
if (resultset.PROCID != null)
resultset.PROCID.put(b);
procid_max_left--;
b = this.readBuffer();
Expand All @@ -429,7 +447,7 @@ private void parseMSGID(ParserResultset resultset) throws IOException {
short msgid_max_left = 32;
b = this.readBuffer();
while (msgid_max_left > 0 && b != 32) {
if(resultset.MSGID!= null)
if (resultset.MSGID != null)
resultset.MSGID.put(b);
msgid_max_left--;
b = this.readBuffer();
Expand Down Expand Up @@ -572,8 +590,8 @@ public boolean next(ParserResultset resultset) throws IOException {
// fall through

if (b != 10) {
throw new ParseException("NL missing after MSG or MSG too long");
}
throw new ParseException("NL missing after MSG or MSG too long");
}
return true; // there was data, returning true
} // public void parseStream(InputStream inputStream) throws IOException {
}
49 changes: 49 additions & 0 deletions src/test/java/com/teragrep/rlo_06/tests/SyntaxTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,55 @@ void consecutiveMoSDTest() throws Exception {
ResultsetAsString strings1 = new ResultsetAsString(res);
strings1.setResultset(res);

// Message Finished
Assertions.assertEquals("", strings1.getPriority());
Assertions.assertEquals("", strings1.getVersion());
Assertions.assertEquals("", strings1.getTimestamp());
Assertions.assertEquals("", strings1.getHostname());
Assertions.assertEquals("", strings1.getAppname());
Assertions.assertEquals("", strings1.getProcid());
Assertions.assertEquals("", strings1.getMsgid());
Assertions.assertEquals("", strings1.getMsg());
}
@Test
void multipleNewlinesInMsg() throws Exception {
String SYSLOG_MESSAGE = "<14>1 2022-12-13T14:41:29.715Z test-stream 9627df7a-testi - - - Testing text.\ntest\ning.\n";

RFC5424ParserSubscription subscription = new RFC5424ParserSubscription();
subscription.subscribeAll();

RFC5424ParserSDSubscription sdSubscription = new RFC5424ParserSDSubscription();
sdSubscription.subscribeElement("event_id@48577","hostname");

ParserResultset res = new ParserResultset(subscription, sdSubscription);

RFC5424Parser parser = new RFC5424Parser(null, false);

int count = 2;
for (int i = 0; i < count; i++) {
parser.setInputStream(new ByteArrayInputStream( (SYSLOG_MESSAGE).getBytes()));

assertTrue(parser.next(res));
ResultsetAsString strings1 = new ResultsetAsString(res);

// Message 1
Assertions.assertEquals("14", strings1.getPriority());
Assertions.assertEquals("1", strings1.getVersion());
Assertions.assertEquals("2022-12-13T14:41:29.715Z", strings1.getTimestamp());
Assertions.assertEquals("test-stream", strings1.getHostname());
Assertions.assertEquals("9627df7a-testi", strings1.getAppname());
Assertions.assertEquals("-", strings1.getProcid());
Assertions.assertEquals("-", strings1.getMsgid());
Assertions.assertEquals("Testing text.\ntest\ning.\n", strings1.getMsg());

res.clear();
}

// finally empty
assertFalse(parser.next(res));
ResultsetAsString strings1 = new ResultsetAsString(res);
strings1.setResultset(res);

// Message Finished
Assertions.assertEquals("", strings1.getPriority());
Assertions.assertEquals("", strings1.getVersion());
Expand Down

0 comments on commit b22764a

Please sign in to comment.