Skip to content

Commit

Permalink
apacheGH-2402: Improve RDFPatchReaderBinary EOF check
Browse files Browse the repository at this point in the history
Previously the RDFPatchReaderBinary simply exited its read loop when it
detected an EOF error as Thrift doesn't have a clean way of apriori
detecting whether we've reached the EOF.  The downside of this approach
was that it meant the reader would silently ignore some genuinely
malformed inputs if they happened to be the right bytes for Thrift to
think it had encountered a field it knew about.

To address this a new method is introduced that inspects the
StackTraceElement's associated with the Thrift EOF exception to detect a
couple of cases where the input is clearly malformed and throw an
appropriate error.

Test cases around malformed patch inputs are also expanded to further
test this capability.
  • Loading branch information
rvesse committed Apr 25, 2024
1 parent a89b739 commit b028b3e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.jena.datatypes.RDFDatatype;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.NodeFactory;
Expand All @@ -31,14 +32,21 @@
import org.apache.jena.riot.thrift.TRDF;
import org.apache.jena.riot.thrift.wire.*;
import org.apache.thrift.TException;
import org.apache.thrift.TUnion;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolUtil;
import org.apache.thrift.transport.TTransportException;

/**
* Read a binary patch.
* @see PatchProcessor
*/
public class RDFPatchReaderBinary implements PatchProcessor {
private static final RDF_Patch_Row EMPTY_ROW = new RDF_Patch_Row();
private static final String TPROTOCOL_UTIL = TProtocolUtil.class.getCanonicalName();
private static final String TUNION = TUnion.class.getCanonicalName();
private static final String SKIP = "skip";
private static final String READ = "read";
private final InputStream input;

private RDFPatchReaderBinary(InputStream input) {
Expand Down Expand Up @@ -88,8 +96,12 @@ private static PatchHeader readHeader(TProtocol protocol) {
row.clear();
try { row.read(protocol) ; }
catch (TTransportException e) {
if ( e.getType() == TTransportException.END_OF_FILE )
if ( e.getType() == TTransportException.END_OF_FILE) {
if (eofDueToMalformedInput(e)) {
throw new PatchException("Thrift exception", e);
}
break;
}
throw new PatchException("Thrift exception", e);
}
catch (TException e) {
Expand Down Expand Up @@ -120,8 +132,11 @@ public static void read(TProtocol protocol, RDFChanges changes) {
row.clear();
try { row.read(protocol) ; }
catch (TTransportException e) {
if ( e.getType() == TTransportException.END_OF_FILE ) {
if ( e.getType() == TTransportException.END_OF_FILE) {
changes.finish();
if (eofDueToMalformedInput(e)) {
throw new PatchException("Thrift exception", e);
}
return;
}
throw new PatchException("Thrift exception", e);
Expand All @@ -134,6 +149,37 @@ public static void read(TProtocol protocol, RDFChanges changes) {
}
}

/**
* Makes the best effort to detect whether we've hit an EOF exception due to genuine EOF versus malformed input
* <p>
* This is somewhat brittle as the Thrift exception doesn't tell us directly where the read was when the error
* happened, rather we have to go through the stack trace and detect where we were. This looks for two clear
* indicators of malformed input. Firstly a {@link TProtocolUtil#skip(TProtocol, byte)} call as those happen when
* encountering a valid field ID with a mismatched type ID. Secondly for recursive calls into
* {@link TUnion#read(TProtocol)} as that indicates we were partway through reading a valid data structure when we
* encountered the EOF.
* </p>
* @param e EOF Exception
* @return True if EOF due to malformed input, false if due to genuine EOF.
*/
private static boolean eofDueToMalformedInput(TTransportException e) {
int unionReadCount = 0;

for (StackTraceElement element : e.getStackTrace()) {
// Firstly TProtocolUtil.skip() is invoked when Thrift encounters a valid field ID but a wrong type ID, if
// while attempting to skip over the malformed field we hit an EOF then this is definitely malformed input
if (element.getClassName().equals(TPROTOCOL_UTIL) && StringUtils.equals(element.getMethodName(), SKIP)) {
return true;
} else if (element.getClassName().equals(TUNION) && StringUtils.equals(element.getMethodName(), READ)) {
unionReadCount++;
}
}
// Secondly TUnion.read() is invoked for reading union types, since the top level type in the schema is a
// union type then we always expect to see this invoked once. However, if invoked more than once this means
// we started reading a valid data item and then hit an unexpected EOF e.g. due to truncated/corrupted data
return unionReadCount > 1 ? true : false;
}

private static void dispatch(RDF_Patch_Row row, RDFChanges changes) {
if ( row.isSetHeader() ) {
Patch_Header h = row.getHeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,9 @@ public long getCountTxnAbort() {
public long getCountSegment() {
return countSegment;
}

public boolean isEmpty() {
return (this.countStart + this.countFinish + this.countHeader + this.countAddData + this.countDeleteData + this.countAddPrefix + this.countDeletePrefix + this.countTxnBegin + this.countTxnCommit + this.countTxnAbort + this.countSegment) == 0;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,21 @@

package org.apache.jena.rdfpatch;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;

import org.apache.jena.graph.Node;
import org.apache.jena.graph.NodeFactory;
import org.apache.jena.rdfpatch.changes.PatchSummary;
import org.apache.jena.rdfpatch.changes.RDFChangesCollector;
import org.apache.jena.sparql.sse.SSE;
import org.junit.Assert;
import org.junit.Test;

public abstract class AbstractTestPatchIO {
Expand Down Expand Up @@ -124,4 +128,31 @@ private void write_read(Consumer<RDFChanges> action) {
changes.txnCommit();
});
}

@Test(expected = RuntimeException.class) public void write_read_trailing_junk_01() {
RDFPatch patch1 = makePatch(changes-> { changes.txnBegin(); changes.add(g1, s1, p1, o1); changes.txnCommit(); });
byte[] data = write(patch1);
// Add some junk null bytes to the end of the data and try to read it back in
byte[] withTrailer = new byte[data.length + 10];
System.arraycopy(data, 0, withTrailer, 0, data.length);
read(withTrailer);
}

@Test(expected = RuntimeException.class) public void write_read_trailing_junk_02() {
RDFPatch patch1 = makePatch(changes-> { changes.txnBegin(); changes.add(g1, s1, p1, o1); changes.txnCommit(); });
byte[] data = write(patch1);
// Add some junk bytes to the end of the data and try to read it back in
byte[] withTrailer = new byte[data.length + 10];
System.arraycopy(data, 0, withTrailer, 0, data.length);
System.arraycopy("junk".getBytes(StandardCharsets.UTF_8), 0, withTrailer, data.length, 4);
read(withTrailer);
}

@Test
public void read_empty() {
byte[] empty = new byte[0];
RDFPatch patch = this.read(empty);
PatchSummary summary = RDFPatchOps.summary(patch);
assertTrue(summary.isEmpty());
}
}

0 comments on commit b028b3e

Please sign in to comment.