Skip to content

Commit

Permalink
Use Knuth–Morris–Pratt algorithm for delimiter search in TextIO
Browse files Browse the repository at this point in the history
  • Loading branch information
baeminbo committed Sep 6, 2024
1 parent 4651fa0 commit 8e5a6cd
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 84 deletions.
129 changes: 46 additions & 83 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Bytes;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
Expand Down Expand Up @@ -139,7 +138,7 @@ static class TextBasedReader extends FileBasedReader<String> {
private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer

// Finder for custom delimiter.
private @Nullable DelimiterFinder delimiterFinder;
private @Nullable KMPDelimiterFinder delimiterFinder;

private TextBasedReader(TextSource source, byte[] delimiter) {
this(source, delimiter, 0);
Expand All @@ -154,7 +153,7 @@ private TextBasedReader(TextSource source, byte[] delimiter, int skipHeaderLines
this.skipHeaderLines = skipHeaderLines;

if (delimiter != null) {
delimiterFinder = new DelimiterFinder(delimiter);
delimiterFinder = new KMPDelimiterFinder(delimiter);
}
}

Expand Down Expand Up @@ -498,106 +497,70 @@ public String toString(int offset, int length, Charset charset) {
}

/**
* A state machine to match the delimiter in a byte stream.
*
* <pre>{@code
* DelimiterFinder finder = new DelimiterFinder([65, 65, 66]); // "AAB"
* finder.feed(65); // false. "A"
* finder.feed(66); // false. "AB"
* finder.feed(65); // false. "ABA"
* finder.feed(65); // false. "ABAA"
* finder.feed(66); // true. "ABAAB"
* finder.feed(65); // false. "ABAABA"
*
* }</pre>
* @see <a
* href="https://en.wikipedia.org/wiki/Knuth%E2%80%93Morris%E2%80%93Pratt_algorithm">Knuth–Morris–Pratt
* algorithm</a>
*/
static class DelimiterFinder {
static class KMPDelimiterFinder {
private final byte[] delimiter;
private final int[] subsequences;
private final int[][] trans;
private final int[] table;
int k; // the current position in delimiter

int position;

public DelimiterFinder(byte[] delimiter) {
public KMPDelimiterFinder(byte[] delimiter) {
this.delimiter = delimiter;
subsequences = new int[delimiter.length];
trans = new int[delimiter.length][256];
this.table = new int[delimiter.length];
compile();
}

public boolean feed(byte b) {
position = trans[position][byteToIndex(b)];
if (position == delimiter.length) {
position = 0;
return true;
} else {
return false;
// Modified "Description of pseudocode for the search algorithm" in Wikipedia
while (true) {
if (b == delimiter[k]) {
++k;
if (k == delimiter.length) {
// return when the first occurrence is found.
k = 0;
return true;
}
return false;
}

k = table[k];
if (k < 0) {
++k;
return false;
}
}
}

public void reset() {
position = 0;
}

public String describe() {
StringBuilder sb = new StringBuilder();
sb.append("delimiter:\n").append(Bytes.asList(delimiter)).append('\n');
sb.append("subsequences and trans:\n");
for (int i = 0; i < delimiter.length; ++i) {
sb.append(i).append(" (").append(subsequences[i]).append("): ");
for (int b = Byte.MIN_VALUE; b <= Byte.MAX_VALUE; ++b) {
int tran = trans[i][byteToIndex((byte) b)];
if (tran > 0) {
sb.append(b).append(" -> ").append(tran).append(", ");
}
}
sb.append("\n");
}
sb.append("current position: ").append(position).append("\n");
return sb.toString();
k = 0;
}

private void compile() {
// e.g. "AABAAC":
// 0 -> 0, "" -> ""
// 1 -> 0: "A" -> ""
// 2 -> 1: "AA" -> "A"
// 3 -> 0: "AAB" -> "B"
// 4 -> 1: "AABA" -> "A"
// 5 -> 2: "AABAA" -> "AA"

for (int i = 2; i < delimiter.length; i++) {
if (delimiter[i - 1] == delimiter[subsequences[i - 1]]) {
subsequences[i] = subsequences[i - 1] + 1;
} else if (delimiter[i - 1] == delimiter[0]) {
subsequences[i] = 1;
// the current position in table
int pos = 1;
// the zero-based index in delimiter of the next character of the current candidate substring
int cnd = 0;

table[0] = -1;

while (pos < delimiter.length) {
if (delimiter[pos] == delimiter[cnd]) {
table[pos] = table[cnd];
} else {
subsequences[i] = 0;
}
}
// e.g. "AABAAC":
// index (subsequence): trans
// 0 (0): "A" -> 1
// 1 (0): "A" -> 2
// 2 (1): "A" -> 2, "B" -> 3
// 3 (0): "A" -> 4
// 4 (1): "A" -> 5
// 5 (2): "A" -> 2, "B" -> 3, "C" -> 6
trans[0][byteToIndex(delimiter[0])] = 1;

for (int i = 1; i < delimiter.length; i++) {
for (int b = Byte.MIN_VALUE; b <= Byte.MAX_VALUE; b++) {
if (b == delimiter[i]) {
trans[i][byteToIndex((byte) b)] = i + 1;
} else {
trans[i][byteToIndex((byte) b)] = trans[subsequences[i]][byteToIndex((byte) b)];
table[pos] = cnd;
while (cnd >= 0 && delimiter[pos] != delimiter[cnd]) {
cnd = table[cnd];
}
}

++pos;
++cnd;
}
}

private int byteToIndex(byte b) {
return b + 128;
// We don't need the table entry at (pos + 1) in "Description of pseudocode for the
// table-building algorithm" in Wikipedia because we only checks the first occurrence.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void testDelimiterFinder() {

List<String> split(String delimiter, String text) {
byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
TextSource.DelimiterFinder finder = new TextSource.DelimiterFinder(delimiterBytes);
TextSource.KMPDelimiterFinder finder = new TextSource.KMPDelimiterFinder(delimiterBytes);

byte[] textBytes = text.getBytes(StandardCharsets.UTF_8);

Expand Down

0 comments on commit 8e5a6cd

Please sign in to comment.