From 8e5a6cdbf119d5d6bed854336fe49d3bdf5301a8 Mon Sep 17 00:00:00 2001 From: Minbo Bae Date: Thu, 5 Sep 2024 19:50:01 -0700 Subject: [PATCH] =?UTF-8?q?Use=20Knuth=E2=80=93Morris=E2=80=93Pratt=20algo?= =?UTF-8?q?rithm=20for=20delimiter=20search=20in=20TextIO?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/beam/sdk/io/TextSource.java | 129 +++++++----------- .../apache/beam/sdk/io/TextSourceTest.java | 2 +- 2 files changed, 47 insertions(+), 84 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index c72532d09553..60d9b61bc247 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Bytes; import org.checkerframework.checker.nullness.qual.Nullable; /** @@ -139,7 +138,7 @@ static class TextBasedReader extends FileBasedReader { private boolean skipLineFeedAtStart; // skip an LF if at the start of the next buffer // Finder for custom delimiter. - private @Nullable DelimiterFinder delimiterFinder; + private @Nullable KMPDelimiterFinder delimiterFinder; private TextBasedReader(TextSource source, byte[] delimiter) { this(source, delimiter, 0); @@ -154,7 +153,7 @@ private TextBasedReader(TextSource source, byte[] delimiter, int skipHeaderLines this.skipHeaderLines = skipHeaderLines; if (delimiter != null) { - delimiterFinder = new DelimiterFinder(delimiter); + delimiterFinder = new KMPDelimiterFinder(delimiter); } } @@ -498,106 +497,70 @@ public String toString(int offset, int length, Charset charset) { } /** - * A state machine to match the delimiter in a byte stream. - * - *
{@code
-   * DelimiterFinder finder = new DelimiterFinder([65, 65, 66]); // "AAB"
-   * finder.feed(65); // false. "A"
-   * finder.feed(66); // false. "AB"
-   * finder.feed(65); // false. "ABA"
-   * finder.feed(65); // false. "ABAA"
-   * finder.feed(66); // true.  "ABAAB"
-   * finder.feed(65); // false. "ABAABA"
-   *
-   * }
+ * @see Knuth–Morris–Pratt + * algorithm */ - static class DelimiterFinder { + static class KMPDelimiterFinder { private final byte[] delimiter; - private final int[] subsequences; - private final int[][] trans; + private final int[] table; + int k; // the current position in delimiter - int position; - - public DelimiterFinder(byte[] delimiter) { + public KMPDelimiterFinder(byte[] delimiter) { this.delimiter = delimiter; - subsequences = new int[delimiter.length]; - trans = new int[delimiter.length][256]; + this.table = new int[delimiter.length]; compile(); } public boolean feed(byte b) { - position = trans[position][byteToIndex(b)]; - if (position == delimiter.length) { - position = 0; - return true; - } else { - return false; + // Modified "Description of pseudocode for the search algorithm" in Wikipedia + while (true) { + if (b == delimiter[k]) { + ++k; + if (k == delimiter.length) { + // return when the first occurrence is found. + k = 0; + return true; + } + return false; + } + + k = table[k]; + if (k < 0) { + ++k; + return false; + } } } public void reset() { - position = 0; - } - - public String describe() { - StringBuilder sb = new StringBuilder(); - sb.append("delimiter:\n").append(Bytes.asList(delimiter)).append('\n'); - sb.append("subsequences and trans:\n"); - for (int i = 0; i < delimiter.length; ++i) { - sb.append(i).append(" (").append(subsequences[i]).append("): "); - for (int b = Byte.MIN_VALUE; b <= Byte.MAX_VALUE; ++b) { - int tran = trans[i][byteToIndex((byte) b)]; - if (tran > 0) { - sb.append(b).append(" -> ").append(tran).append(", "); - } - } - sb.append("\n"); - } - sb.append("current position: ").append(position).append("\n"); - return sb.toString(); + k = 0; } private void compile() { - // e.g. "AABAAC": - // 0 -> 0, "" -> "" - // 1 -> 0: "A" -> "" - // 2 -> 1: "AA" -> "A" - // 3 -> 0: "AAB" -> "B" - // 4 -> 1: "AABA" -> "A" - // 5 -> 2: "AABAA" -> "AA" - - for (int i = 2; i < delimiter.length; i++) { - if (delimiter[i - 1] == delimiter[subsequences[i - 1]]) { - subsequences[i] = subsequences[i - 1] + 1; - } else if (delimiter[i - 1] == delimiter[0]) { - subsequences[i] = 1; + // the current position in table + int pos = 1; + // the zero-based index in delimiter of the next character of the current candidate substring + int cnd = 0; + + table[0] = -1; + + while (pos < delimiter.length) { + if (delimiter[pos] == delimiter[cnd]) { + table[pos] = table[cnd]; } else { - subsequences[i] = 0; - } - } - // e.g. "AABAAC": - // index (subsequence): trans - // 0 (0): "A" -> 1 - // 1 (0): "A" -> 2 - // 2 (1): "A" -> 2, "B" -> 3 - // 3 (0): "A" -> 4 - // 4 (1): "A" -> 5 - // 5 (2): "A" -> 2, "B" -> 3, "C" -> 6 - trans[0][byteToIndex(delimiter[0])] = 1; - - for (int i = 1; i < delimiter.length; i++) { - for (int b = Byte.MIN_VALUE; b <= Byte.MAX_VALUE; b++) { - if (b == delimiter[i]) { - trans[i][byteToIndex((byte) b)] = i + 1; - } else { - trans[i][byteToIndex((byte) b)] = trans[subsequences[i]][byteToIndex((byte) b)]; + table[pos] = cnd; + while (cnd >= 0 && delimiter[pos] != delimiter[cnd]) { + cnd = table[cnd]; } } + + ++pos; + ++cnd; } - } - private int byteToIndex(byte b) { - return b + 128; + // We don't need the table entry at (pos + 1) in "Description of pseudocode for the + // table-building algorithm" in Wikipedia because we only checks the first occurrence. } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java index 9507a3f8bdef..4df4bec2b223 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextSourceTest.java @@ -119,7 +119,7 @@ public void testDelimiterFinder() { List split(String delimiter, String text) { byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); - TextSource.DelimiterFinder finder = new TextSource.DelimiterFinder(delimiterBytes); + TextSource.KMPDelimiterFinder finder = new TextSource.KMPDelimiterFinder(delimiterBytes); byte[] textBytes = text.getBytes(StandardCharsets.UTF_8);