From 7a27d4046323203926122f447009e36c1ae88c75 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sat, 1 Feb 2025 15:51:59 -0500 Subject: [PATCH] Offers new ways of computing bulk load plans (#4898) Two new ways of computing bulk import load plans are offered in these change. First the RFile API was modified to support computing a LoadPlan as the RFile is written. Second a new LoadPlan.compute() method was added that creates a LoadPlan from an existing RFile. In addition to these changes methods were added to LoadPlan that support serializing and deserializing load plans to/from json. All of these changes together support the use case of computing load plans in a distributed manner. For example, with a bulk import directory with N files the following use case is now supported. 1. For eack file a task is spun up on a remote server that calls the new LoadPlan.compute() API to determine what tablets the file overlaps. Then the new LoadPlan.toJson() method is called to serialize the load plan and send it to a central place. 2. All the load plans from the remote servers are deserialized calling the new LoadPlan.fromJson() method and merged into a single load plan that is used to do the bulk import. Another use case these new APIs could support is running this new code in the map reduce job that generates bulk import data. 1. In each reducer as it writes to an rfile it could also be building a LoadPlan. A load plan can be obtained from the Rfile after closing it and serialized using LoadPlan.toJson() and the result saved to a file. So after the map reduce job completes each rfile would have corresponding file with a load plan for that file. 2. Another process that runs after the map reduce job can load all the load plans from files and merge them using the new LoadPlan.fromJson() method. Then the merged LoadPlan can be used to do the bulk import. Both of these use cases avoid doing the analysis of files on a single machine doing the bulk import. Bulk import V1 had this functionality and would ask random tservers to do the file analysis. This could cause unexpected load on those tservers. Bulk V1 would interleave analyzing files and adding them to tablets. This could lead to odd situations where files are partially imported to some tablets and analysis fails, leaving the file partially imported. Bulk v2 does all analysis before any files are added to tablets, however it lacks this distributed analysis capability. These changes provide the building blocks to do the distributed analysis that bulk v1 did for bulk v2. Co-authored-by: Daniel Roberts Co-authored-by: Christopher Tubbs --- .../org/apache/accumulo/core/Constants.java | 1 + .../core/client/rfile/LoadPlanCollector.java | 131 ++++++++++ .../accumulo/core/client/rfile/RFile.java | 10 + .../core/client/rfile/RFileWriter.java | 36 ++- .../core/client/rfile/RFileWriterBuilder.java | 35 ++- .../core/clientImpl/bulk/BulkImport.java | 39 ++- .../apache/accumulo/core/data/LoadPlan.java | 242 +++++++++++++++++- .../accumulo/core/file/FileOperations.java | 5 + .../core/client/rfile/RFileClientTest.java | 210 +++++++++++++++ .../accumulo/core/data/LoadPlanTest.java | 48 ++++ .../accumulo/core/file/rfile/RFileTest.java | 1 - .../accumulo/test/functional/BulkNewIT.java | 61 ++++- 12 files changed, 792 insertions(+), 27 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index c31e205585a..1caf157e90c 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -104,6 +104,7 @@ public class Constants { public static final String BULK_PREFIX = "b-"; public static final String BULK_RENAME_FILE = "renames.json"; public static final String BULK_LOAD_MAPPING = "loadmap.json"; + public static final String BULK_WORKING_PREFIX = "accumulo-bulk-"; public static final String CLONE_PREFIX = "c-"; public static final byte[] CLONE_PREFIX_BYTES = CLONE_PREFIX.getBytes(UTF_8); diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java new file mode 100644 index 00000000000..511e3fed51c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/LoadPlanCollector.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.client.rfile; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.io.Text; + +import com.google.common.base.Preconditions; + +class LoadPlanCollector { + + private final LoadPlan.SplitResolver splitResolver; + private boolean finished = false; + private Text lgFirstRow; + private Text lgLastRow; + private Text firstRow; + private Text lastRow; + private Set overlappingExtents; + private KeyExtent currentExtent; + private long appended = 0; + + LoadPlanCollector(LoadPlan.SplitResolver splitResolver) { + this.splitResolver = splitResolver; + this.overlappingExtents = new HashSet<>(); + } + + LoadPlanCollector() { + splitResolver = null; + this.overlappingExtents = null; + + } + + private void appendNoSplits(Key key) { + if (lgFirstRow == null) { + lgFirstRow = key.getRow(); + lgLastRow = lgFirstRow; + } else { + var row = key.getRow(); + lgLastRow = row; + } + } + + private static final TableId FAKE_ID = TableId.of("123"); + + private void appendSplits(Key key) { + var row = key.getRow(); + if (currentExtent == null || !currentExtent.contains(row)) { + var tableSplits = splitResolver.apply(row); + var extent = new KeyExtent(FAKE_ID, tableSplits.getEndRow(), tableSplits.getPrevRow()); + Preconditions.checkState(extent.contains(row), "%s does not contain %s", tableSplits, row); + if (currentExtent != null) { + overlappingExtents.add(currentExtent); + } + currentExtent = extent; + } + } + + public void append(Key key) { + if (splitResolver == null) { + appendNoSplits(key); + } else { + appendSplits(key); + } + appended++; + } + + public void startLocalityGroup() { + if (lgFirstRow != null) { + if (firstRow == null) { + firstRow = lgFirstRow; + lastRow = lgLastRow; + } else { + // take the minimum + firstRow = firstRow.compareTo(lgFirstRow) < 0 ? firstRow : lgFirstRow; + // take the maximum + lastRow = lastRow.compareTo(lgLastRow) > 0 ? lastRow : lgLastRow; + } + lgFirstRow = null; + lgLastRow = null; + } + } + + public LoadPlan getLoadPlan(String filename) { + Preconditions.checkState(finished, "Attempted to get load plan before closing"); + + if (appended == 0) { + return LoadPlan.builder().build(); + } + + if (splitResolver == null) { + return LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, firstRow, lastRow) + .build(); + } else { + var builder = LoadPlan.builder(); + overlappingExtents.add(currentExtent); + for (var extent : overlappingExtents) { + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, extent.prevEndRow(), + extent.endRow()); + } + return builder.build(); + } + } + + public void close() { + finished = true; + // compute the overall min and max rows + startLocalityGroup(); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java index 3b6d10aade1..71c186f3eb6 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFile.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.client.summary.Summary.FileStatistics; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.fs.FileSystem; @@ -428,6 +429,15 @@ default WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) */ WriterOptions withVisibilityCacheSize(int maxSize); + /** + * @param splitResolver builds a {@link LoadPlan} using table split points provided by the given + * splitResolver. + * @return this + * @see RFileWriter#getLoadPlan(String) + * @since 2.1.4 + */ + WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver); + /** * @return a new RfileWriter created with the options previously specified. */ diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java index b4d6def4a23..ffa43e3bc47 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriter.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.security.ColumnVisibility; @@ -92,12 +93,15 @@ public class RFileWriter implements AutoCloseable { private final FileSKVWriter writer; private final LRUMap validVisibilities; + + private final LoadPlanCollector loadPlanCollector; private boolean startedLG; private boolean startedDefaultLG; - RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize) { + RFileWriter(FileSKVWriter fileSKVWriter, int visCacheSize, LoadPlanCollector loadPlanCollector) { this.writer = fileSKVWriter; this.validVisibilities = new LRUMap<>(visCacheSize); + this.loadPlanCollector = loadPlanCollector; } private void _startNewLocalityGroup(String name, Set columnFamilies) @@ -106,6 +110,7 @@ private void _startNewLocalityGroup(String name, Set columnFamilie "Cannot start a locality group after starting the default locality group"); writer.startNewLocalityGroup(name, columnFamilies); startedLG = true; + loadPlanCollector.startLocalityGroup(); } /** @@ -175,6 +180,7 @@ public void startNewLocalityGroup(String name, String... families) throws IOExce public void startDefaultLocalityGroup() throws IOException { Preconditions.checkState(!startedDefaultLG); + loadPlanCollector.startLocalityGroup(); writer.startDefaultLocalityGroup(); startedDefaultLG = true; startedLG = true; @@ -204,6 +210,7 @@ public void append(Key key, Value val) throws IOException { validVisibilities.put(new ArrayByteSequence(Arrays.copyOf(cv, cv.length)), Boolean.TRUE); } writer.append(key, val); + loadPlanCollector.append(key); } /** @@ -249,6 +256,31 @@ public void append(Iterable> keyValues) throws IOException { @Override public void close() throws IOException { - writer.close(); + try { + writer.close(); + } finally { + loadPlanCollector.close(); + } + } + + /** + * If no split resolver was provided when the RFileWriter was built then this method will return a + * simple load plan of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#FILE} using + * the first and last row seen. If a splitResolver was provided then this will return a load plan + * of type {@link org.apache.accumulo.core.data.LoadPlan.RangeType#TABLE} that has the split + * ranges the rows written overlapped. + * + * @param filename This file name will be used in the load plan and it should match the name that + * will be used when bulk importing this file. Only a filename is needed, not a full path. + * @return load plan computed from the keys written to the rfile. + * @see org.apache.accumulo.core.client.rfile.RFile.WriterOptions#withSplitResolver(LoadPlan.SplitResolver) + * @since 2.1.4 + * @throws IllegalStateException is attempting to get load plan before calling {@link #close()} + * @throws IllegalArgumentException is a full path is passed instead of a filename + */ + public LoadPlan getLoadPlan(String filename) { + Preconditions.checkArgument(!filename.contains("/"), + "Unexpected path %s seen instead of file name", filename); + return loadPlanCollector.getLoadPlan(filename); } } diff --git a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java index 6382d568b5f..5b12f4d73b3 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileWriterBuilder.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.client.rfile; import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; import java.io.IOException; import java.io.OutputStream; @@ -26,7 +27,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.stream.Stream; import org.apache.accumulo.core.client.rfile.RFile.WriterFSOptions; @@ -37,6 +37,7 @@ import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.crypto.CryptoFactoryLoader; +import org.apache.accumulo.core.data.LoadPlan; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.metadata.ValidationUtil; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; @@ -72,6 +73,7 @@ OutputStream getOutputStream() { private int visCacheSize = 1000; private Map samplerProps = Collections.emptyMap(); private Map summarizerProps = Collections.emptyMap(); + private LoadPlan.SplitResolver splitResolver; private void checkDisjoint(Map props, Map derivedProps, String kind) { @@ -81,7 +83,7 @@ private void checkDisjoint(Map props, Map derivedP @Override public WriterOptions withSampler(SamplerConfiguration samplerConf) { - Objects.requireNonNull(samplerConf); + requireNonNull(samplerConf); Map tmp = new SamplerConfigurationImpl(samplerConf).toTablePropertiesMap(); checkDisjoint(tableConfig, tmp, "sampler"); this.samplerProps = tmp; @@ -106,6 +108,9 @@ public RFileWriter build() throws IOException { CryptoService cs = CryptoFactoryLoader.getServiceForClient(CryptoEnvironment.Scope.TABLE, tableConfig); + var loadPlanCollector = + splitResolver == null ? new LoadPlanCollector() : new LoadPlanCollector(splitResolver); + if (out.getOutputStream() != null) { FSDataOutputStream fsdo; if (out.getOutputStream() instanceof FSDataOutputStream) { @@ -116,17 +121,19 @@ public RFileWriter build() throws IOException { return new RFileWriter( fileops.newWriterBuilder().forOutputStream(".rf", fsdo, out.getConf(), cs) .withTableConfiguration(acuconf).withStartDisabled().build(), - visCacheSize); + visCacheSize, loadPlanCollector); } else { - return new RFileWriter(fileops.newWriterBuilder() - .forFile(out.path.toString(), out.getFileSystem(out.path), out.getConf(), cs) - .withTableConfiguration(acuconf).withStartDisabled().build(), visCacheSize); + return new RFileWriter( + fileops.newWriterBuilder() + .forFile(out.path.toString(), out.getFileSystem(out.path), out.getConf(), cs) + .withTableConfiguration(acuconf).withStartDisabled().build(), + visCacheSize, loadPlanCollector); } } @Override public WriterOptions withFileSystem(FileSystem fs) { - Objects.requireNonNull(fs); + requireNonNull(fs); out.fs = fs; return this; } @@ -140,14 +147,14 @@ public WriterFSOptions to(String filename) { @Override public WriterOptions to(OutputStream out) { - Objects.requireNonNull(out); + requireNonNull(out); this.out = new OutputArgs(out); return this; } @Override public WriterOptions withTableProperties(Iterable> tableConfig) { - Objects.requireNonNull(tableConfig); + requireNonNull(tableConfig); HashMap cfg = new HashMap<>(); for (Entry entry : tableConfig) { cfg.put(entry.getKey(), entry.getValue()); @@ -161,7 +168,7 @@ public WriterOptions withTableProperties(Iterable> tableCon @Override public WriterOptions withTableProperties(Map tableConfig) { - Objects.requireNonNull(tableConfig); + requireNonNull(tableConfig); return withTableProperties(tableConfig.entrySet()); } @@ -172,9 +179,15 @@ public WriterOptions withVisibilityCacheSize(int maxSize) { return this; } + @Override + public WriterOptions withSplitResolver(LoadPlan.SplitResolver splitResolver) { + this.splitResolver = requireNonNull(splitResolver); + return this; + } + @Override public WriterOptions withSummarizers(SummarizerConfiguration... summarizerConf) { - Objects.requireNonNull(summarizerConf); + requireNonNull(summarizerConf); Map tmp = SummarizerConfiguration.toTableProperties(summarizerConf); checkDisjoint(tableConfig, tmp, "summarizer"); this.summarizerProps = tmp; diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java index a85db74a86a..1c155cd5102 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/BulkImport.java @@ -48,6 +48,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.stream.Stream; import org.apache.accumulo.core.Constants; @@ -324,19 +325,25 @@ public interface KeyExtentCache { KeyExtent lookup(Text row); } - public static List findOverlappingTablets(KeyExtentCache extentCache, - FileSKVIterator reader) throws IOException { + /** + * Function that will find a row in a file being bulk imported that is >= the row passed to the + * function. If there is no row then it should return null. + */ + public interface NextRowFunction { + Text apply(Text row) throws IOException; + } + + public static List findOverlappingTablets(Function rowToExtentResolver, + NextRowFunction nextRowFunction) throws IOException { List result = new ArrayList<>(); - Collection columnFamilies = Collections.emptyList(); Text row = new Text(); while (true) { - reader.seek(new Range(row, null), columnFamilies, false); - if (!reader.hasTop()) { + row = nextRowFunction.apply(row); + if (row == null) { break; } - row = reader.getTopKey().getRow(); - KeyExtent extent = extentCache.lookup(row); + KeyExtent extent = rowToExtentResolver.apply(row); result.add(extent); row = extent.endRow(); if (row != null) { @@ -356,13 +363,23 @@ private static Text nextRow(Text row) { } public static List findOverlappingTablets(ClientContext context, - KeyExtentCache extentCache, Path file, FileSystem fs, Cache fileLenCache, + KeyExtentCache keyExtentCache, Path file, FileSystem fs, Cache fileLenCache, CryptoService cs) throws IOException { try (FileSKVIterator reader = FileOperations.getInstance().newReaderBuilder() .forFile(file.toString(), fs, fs.getConf(), cs) .withTableConfiguration(context.getConfiguration()).withFileLenCache(fileLenCache) .seekToBeginning().build()) { - return findOverlappingTablets(extentCache, reader); + + Collection columnFamilies = Collections.emptyList(); + NextRowFunction nextRowFunction = row -> { + reader.seek(new Range(row, null), columnFamilies, false); + if (!reader.hasTop()) { + return null; + } + return reader.getTopKey().getRow(); + }; + + return findOverlappingTablets(keyExtentCache::lookup, nextRowFunction); } } @@ -517,8 +534,8 @@ public static List filterInvalid(FileStatus[] files) { continue; } - if (FileOperations.getBulkWorkingFiles().contains(fname)) { - log.debug("{} is an internal working file, ignoring.", fileStatus.getPath()); + if (FileOperations.isBulkWorkingFile(fname)) { + log.trace("{} is an internal working file, ignoring.", fileStatus.getPath()); continue; } diff --git a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java index aecd25f6637..c06fbb83c22 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java +++ b/core/src/main/java/org/apache/accumulo/core/data/LoadPlan.java @@ -20,17 +20,31 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.IOException; +import java.net.URI; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Base64; import java.util.Collection; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.SortedSet; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.admin.TableOperations.ImportMappingOptions; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.clientImpl.bulk.BulkImport; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.primitives.UnsignedBytes; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -125,7 +139,7 @@ private Destination(String fileName, RangeType rangeType, byte[] startRow, byte[ "Start row is greater than or equal to end row : " + srs + " " + ers); } } else { - throw new RuntimeException(); + throw new IllegalStateException(); } } @@ -228,4 +242,230 @@ public LoadPlan build() { } }; } + + private static class JsonDestination { + String fileName; + String startRow; + String endRow; + RangeType rangeType; + + JsonDestination() {} + + JsonDestination(Destination destination) { + fileName = destination.getFileName(); + startRow = destination.getStartRow() == null ? null + : Base64.getUrlEncoder().encodeToString(destination.getStartRow()); + endRow = destination.getEndRow() == null ? null + : Base64.getUrlEncoder().encodeToString(destination.getEndRow()); + rangeType = destination.getRangeType(); + } + + Destination toDestination() { + return new Destination(fileName, rangeType, + startRow == null ? null : Base64.getUrlDecoder().decode(startRow), + endRow == null ? null : Base64.getUrlDecoder().decode(endRow)); + } + } + + private static final class JsonAll { + List destinations; + + JsonAll() {} + + JsonAll(List destinations) { + this.destinations = + destinations.stream().map(JsonDestination::new).collect(Collectors.toList()); + } + + } + + private static final Gson gson = new GsonBuilder().disableJdkUnsafe().serializeNulls().create(); + + /** + * Serializes the load plan to json that looks like the following. The values of startRow and + * endRow field are base64 encoded using {@link Base64#getUrlEncoder()}. + * + *
+   * {
+   *   "destinations": [
+   *     {
+   *       "fileName": "f1.rf",
+   *       "startRow": null,
+   *       "endRow": "MDAz",
+   *       "rangeType": "TABLE"
+   *     },
+   *     {
+   *       "fileName": "f2.rf",
+   *       "startRow": "MDA0",
+   *       "endRow": "MDA3",
+   *       "rangeType": "FILE"
+   *     },
+   *     {
+   *       "fileName": "f1.rf",
+   *       "startRow": "MDA1",
+   *       "endRow": "MDA2",
+   *       "rangeType": "TABLE"
+   *     },
+   *     {
+   *       "fileName": "f3.rf",
+   *       "startRow": "MDA4",
+   *       "endRow": null,
+   *       "rangeType": "TABLE"
+   *     }
+   *   ]
+   * }
+   * 
+ * + * @since 2.1.4 + */ + public String toJson() { + return gson.toJson(new JsonAll(destinations)); + } + + /** + * Deserializes json to a load plan. + * + * @param json produced by {@link #toJson()} + */ + public static LoadPlan fromJson(String json) { + var dests = gson.fromJson(json, JsonAll.class).destinations.stream() + .map(JsonDestination::toDestination).collect(Collectors.toUnmodifiableList()); + return new LoadPlan(dests); + } + + /** + * Represents two split points that exist in a table being bulk imported to. + * + * @since 2.1.4 + */ + public static class TableSplits { + private final Text prevRow; + private final Text endRow; + + public TableSplits(Text prevRow, Text endRow) { + Preconditions.checkArgument( + prevRow == null || endRow == null || prevRow.compareTo(endRow) < 0, "%s >= %s", prevRow, + endRow); + this.prevRow = prevRow; + this.endRow = endRow; + } + + public Text getPrevRow() { + return prevRow; + } + + public Text getEndRow() { + return endRow; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TableSplits that = (TableSplits) o; + return Objects.equals(prevRow, that.prevRow) && Objects.equals(endRow, that.endRow); + } + + @Override + public int hashCode() { + return Objects.hash(prevRow, endRow); + } + + @Override + public String toString() { + return "(" + prevRow + "," + endRow + "]"; + } + } + + /** + * A function that maps a row to two table split points that contain the row. These splits must + * exist in the table being bulk imported to. There is no requirement that the splits are + * contiguous. For example if a table has splits C,D,E,M and we ask for splits containing row H + * its ok to return D,M, but that could result in the file mapping to more actual tablets than + * needed. For a row that falls before or after all table splits, use null to represent -inf and + * +inf. For example if a table has splits C,D,E,M and row B is resolved it is ok to return + * null,C. If row Q is resolved for table splits C,D,E,M it would be ok to return M,null. For a + * table with zero splits, the resolver should return null,null for all rows. + * + * @since 2.1.4 + */ + public interface SplitResolver extends Function { + static SplitResolver from(SortedSet splits) { + return row -> { + var headSet = splits.headSet(row); + Text prevRow = headSet.isEmpty() ? null : headSet.last(); + var tailSet = splits.tailSet(row); + Text endRow = tailSet.isEmpty() ? null : tailSet.first(); + return new TableSplits(prevRow, endRow); + }; + } + + /** + * For a given row, R, this function should find two split points, S1 and S2, that exist in the + * table being bulk imported to, such that S1 < R <= S2. The closer S1 and S2 are to each + * other, the better. + */ + @Override + TableSplits apply(Text row); + } + + /** + * Computes a load plan for a given rfile. This will open the rfile and find every + * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. + * + * @since 2.1.4 + */ + public static LoadPlan compute(URI file, SplitResolver splitResolver) throws IOException { + return compute(file, Map.of(), splitResolver); + } + + // KeyExtent requires a tableId and this code needs to use KeyExtent functionality but does not + // have a tableId or care what the tableId is. So this fake id is used with KeyExtent. + private static final TableId FAKE_ID = TableId.of("999"); + + /** + * Computes a load plan for a given rfile. This will open the rfile and find every + * {@link TableSplits} that overlaps rows in the file and add those to the returned load plan. + * + * @param properties used when opening the rfile, see + * {@link org.apache.accumulo.core.client.rfile.RFile.ScannerOptions#withTableProperties(Map)} + * @since 2.1.4 + */ + public static LoadPlan compute(URI file, Map properties, + SplitResolver splitResolver) throws IOException { + try (var scanner = RFile.newScanner().from(file.toString()).withoutSystemIterators() + .withTableProperties(properties).withIndexCache(10_000_000).build()) { + BulkImport.NextRowFunction nextRowFunction = row -> { + scanner.setRange(new Range(row, null)); + var iter = scanner.iterator(); + if (iter.hasNext()) { + return iter.next().getKey().getRow(); + } else { + return null; + } + }; + + Function rowToExtentResolver = row -> { + var tabletRange = splitResolver.apply(row); + var extent = new KeyExtent(FAKE_ID, tabletRange.endRow, tabletRange.prevRow); + Preconditions.checkState(extent.contains(row), "%s does not contain %s", tabletRange, row); + return extent; + }; + + List overlapping = + BulkImport.findOverlappingTablets(rowToExtentResolver, nextRowFunction); + + Path path = new Path(file); + + var builder = builder(); + for (var extent : overlapping) { + builder.loadFileTo(path.getName(), RangeType.TABLE, extent.prevEndRow(), extent.endRow()); + } + return builder.build(); + } + } } diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java index db82b0d1493..f83d202b2f5 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java @@ -53,6 +53,11 @@ public abstract class FileOperations { Set.of(Constants.BULK_LOAD_MAPPING, Constants.BULK_RENAME_FILE, FileOutputCommitter.SUCCEEDED_FILE_NAME, HADOOP_JOBHISTORY_LOCATION); + public static boolean isBulkWorkingFile(String fileName) { + return fileName.startsWith(Constants.BULK_WORKING_PREFIX) + || bulkWorkingFiles.contains(fileName); + } + public static Set getValidExtensions() { return validExtensions; } diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java index 716803b3563..58e56abf0a6 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileClientTest.java @@ -28,6 +28,7 @@ import java.io.File; import java.io.IOException; import java.net.ConnectException; +import java.net.URI; import java.security.SecureRandom; import java.util.AbstractMap; import java.util.ArrayList; @@ -40,7 +41,11 @@ import java.util.Map; import java.util.Map.Entry; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; @@ -57,6 +62,8 @@ import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.LoadPlan; +import org.apache.accumulo.core.data.LoadPlanTest; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; @@ -881,4 +888,207 @@ public void testFileSystemFromUri() throws Exception { assertTrue(Arrays.stream(exception3.getStackTrace()) .anyMatch(ste -> ste.getClassName().contains(localFsClass))); } + + @Test + public void testLoadPlanEmpty() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + LoadPlan.SplitResolver splitResolver = + LoadPlan.SplitResolver.from(new TreeSet<>(List.of(new Text("m")))); + + for (boolean withSplits : List.of(true, false)) { + String testFile = createTmpTestFile(); + var builder = RFile.newWriter().to(testFile).withFileSystem(localFs); + if (withSplits) { + builder = builder.withSplitResolver(splitResolver); + } + var writer = builder.build(); + + // can not get load plan before closing file + assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + + try (writer) { + writer.startDefaultLocalityGroup(); + assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + } + var loadPlan = writer.getLoadPlan(new Path(testFile).getName()); + assertEquals(0, loadPlan.getDestinations().size()); + + loadPlan = LoadPlan.compute(new URI(testFile), splitResolver); + assertEquals(0, loadPlan.getDestinations().size()); + } + } + + @Test + public void testLoadPlanLocalityGroupsNoSplits() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer) { + writer.startNewLocalityGroup("LG1", "F1"); + writer.append(new Key("001", "F1"), "V1"); + writer.append(new Key("005", "F1"), "V2"); + writer.startNewLocalityGroup("LG2", "F3"); + writer.append(new Key("003", "F3"), "V3"); + writer.append(new Key("004", "F3"), "V4"); + writer.startDefaultLocalityGroup(); + writer.append(new Key("007", "F4"), "V5"); + writer.append(new Key("009", "F4"), "V6"); + } + + var filename = new Path(testFile).getName(); + var loadPlan = writer.getLoadPlan(filename); + assertEquals(1, loadPlan.getDestinations().size()); + + // The minimum and maximum rows happend in different locality groups, the load plan should + // reflect this + var expectedLoadPlan = + LoadPlan.builder().loadFileTo(filename, LoadPlan.RangeType.FILE, "001", "009").build(); + assertEquals(expectedLoadPlan.toJson(), loadPlan.toJson()); + } + + @Test + public void testLoadPlanLocalityGroupsSplits() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + SortedSet splits = + Stream.of("001", "002", "003", "004", "005", "006", "007", "008", "009").map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + var splitResolver = LoadPlan.SplitResolver.from(splits); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + try (writer) { + writer.startNewLocalityGroup("LG1", "F1"); + writer.append(new Key("001", "F1"), "V1"); + writer.append(new Key("005", "F1"), "V2"); + writer.startNewLocalityGroup("LG2", "F3"); + writer.append(new Key("003", "F3"), "V3"); + writer.append(new Key("005", "F3"), "V3"); + writer.append(new Key("007", "F3"), "V4"); + writer.startDefaultLocalityGroup(); + writer.append(new Key("007", "F4"), "V5"); + writer.append(new Key("009", "F4"), "V6"); + } + + var filename = new Path(testFile).getName(); + var loadPlan = writer.getLoadPlan(filename); + assertEquals(5, loadPlan.getDestinations().size()); + + var builder = LoadPlan.builder(); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, null, "001"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "004", "005"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "002", "003"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "006", "007"); + builder.loadFileTo(filename, LoadPlan.RangeType.TABLE, "008", "009"); + assertEquals(LoadPlanTest.toString(builder.build().getDestinations()), + LoadPlanTest.toString(loadPlan.getDestinations())); + + loadPlan = LoadPlan.compute(new URI(testFile), splitResolver); + assertEquals(LoadPlanTest.toString(builder.build().getDestinations()), + LoadPlanTest.toString(loadPlan.getDestinations())); + } + + @Test + public void testIncorrectSplitResolver() throws Exception { + // for some rows the returns table splits will not contain the row. This should cause an error. + LoadPlan.SplitResolver splitResolver = + row -> new LoadPlan.TableSplits(new Text("003"), new Text("005")); + + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + try (writer) { + writer.startDefaultLocalityGroup(); + writer.append(new Key("004", "F4"), "V2"); + var e = assertThrows(IllegalStateException.class, + () -> writer.append(new Key("007", "F4"), "V2")); + assertTrue(e.getMessage().contains("(003,005]")); + assertTrue(e.getMessage().contains("007")); + } + + var testFile2 = createTmpTestFile(); + var writer2 = RFile.newWriter().to(testFile2).withFileSystem(localFs).build(); + try (writer2) { + writer2.startDefaultLocalityGroup(); + writer2.append(new Key("004", "F4"), "V2"); + writer2.append(new Key("007", "F4"), "V2"); + } + + var e = assertThrows(IllegalStateException.class, + () -> LoadPlan.compute(new URI(testFile), splitResolver)); + assertTrue(e.getMessage().contains("(003,005]")); + assertTrue(e.getMessage().contains("007")); + } + + @Test + public void testGetLoadPlanBeforeClose() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + try (writer) { + var e = assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + assertEquals("Attempted to get load plan before closing", e.getMessage()); + writer.startDefaultLocalityGroup(); + writer.append(new Key("004", "F4"), "V2"); + var e2 = assertThrows(IllegalStateException.class, + () -> writer.getLoadPlan(new Path(testFile).getName())); + assertEquals("Attempted to get load plan before closing", e2.getMessage()); + } + } + + @Test + public void testGetLoadPlanWithPath() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs).build(); + writer.close(); + + var e = + assertThrows(IllegalArgumentException.class, () -> writer.getLoadPlan(testFile.toString())); + assertTrue(e.getMessage().contains("Unexpected path")); + assertEquals(0, writer.getLoadPlan(new Path(testFile).getName()).getDestinations().size()); + } + + @Test + public void testComputeLoadPlanWithPath() throws Exception { + LocalFileSystem localFs = FileSystem.getLocal(new Configuration()); + + SortedSet splits = + Stream.of("001", "002", "003", "004", "005", "006", "007", "008", "009").map(Text::new) + .collect(Collectors.toCollection(TreeSet::new)); + var splitResolver = LoadPlan.SplitResolver.from(splits); + + String testFile = createTmpTestFile(); + var writer = RFile.newWriter().to(testFile).withFileSystem(localFs) + .withSplitResolver(splitResolver).build(); + writer.startDefaultLocalityGroup(); + writer.append(new Key("001", "V4"), "test"); + writer.append(new Key("002", "V4"), "test"); + writer.append(new Key("003", "V4"), "test"); + writer.append(new Key("004", "V4"), "test"); + writer.close(); + + var e = assertThrows(IllegalArgumentException.class, () -> writer.getLoadPlan(testFile)); + assertTrue(e.getMessage().contains("Unexpected path")); + assertEquals(4, writer.getLoadPlan(new Path(testFile).getName()).getDestinations().size()); + assertEquals(4, LoadPlan.compute(new URI(testFile), splitResolver).getDestinations().size()); + + String hdfsHost = "127.0.0.5:8080"; + String fileUri = "hdfs://" + hdfsHost + "/bulk-xyx/file1.rf"; + URI uri = new URI(fileUri); + var err = assertThrows(RuntimeException.class, () -> LoadPlan.compute(uri, splitResolver)); + assertTrue(err.getMessage().contains("to " + hdfsHost + " failed on connection exception")); + assertTrue(Arrays.stream(err.getCause().getStackTrace()) + .anyMatch(ste -> ste.getClassName().contains(DistributedFileSystem.class.getName()))); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java index bba30a555d3..18f2038163d 100644 --- a/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java +++ b/core/src/test/java/org/apache/accumulo/core/data/LoadPlanTest.java @@ -23,8 +23,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.Base64; +import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import org.apache.accumulo.core.data.LoadPlan.Destination; import org.apache.accumulo.core.data.LoadPlan.RangeType; @@ -100,6 +103,47 @@ public void testTypes() { assertEquals(expected, actual); + var loadPlan2 = LoadPlan.fromJson(loadPlan.toJson()); + Set actual2 = + loadPlan2.getDestinations().stream().map(LoadPlanTest::toString).collect(toSet()); + assertEquals(expected, actual2); + } + + @Test + public void testJson() { + var loadPlan = LoadPlan.builder().build(); + assertEquals(0, loadPlan.getDestinations().size()); + assertEquals("{\"destinations\":[]}", loadPlan.toJson()); + + var builder = LoadPlan.builder(); + builder.loadFileTo("f1.rf", RangeType.TABLE, null, "003"); + builder.loadFileTo("f2.rf", RangeType.FILE, "004", "007"); + builder.loadFileTo("f1.rf", RangeType.TABLE, "005", "006"); + builder.loadFileTo("f3.rf", RangeType.TABLE, "008", null); + String json = builder.build().toJson(); + + String b64003 = Base64.getUrlEncoder().encodeToString("003".getBytes(UTF_8)); + String b64004 = Base64.getUrlEncoder().encodeToString("004".getBytes(UTF_8)); + String b64005 = Base64.getUrlEncoder().encodeToString("005".getBytes(UTF_8)); + String b64006 = Base64.getUrlEncoder().encodeToString("006".getBytes(UTF_8)); + String b64007 = Base64.getUrlEncoder().encodeToString("007".getBytes(UTF_8)); + String b64008 = Base64.getUrlEncoder().encodeToString("008".getBytes(UTF_8)); + + String expected = "{'destinations':[{'fileName':'f1.rf','startRow':null,'endRow':'" + b64003 + + "','rangeType':'TABLE'},{'fileName':'f2.rf','startRow':'" + b64004 + "','endRow':'" + + b64007 + "','rangeType':'FILE'},{'fileName':'f1.rf','startRow':'" + b64005 + + "','endRow':'" + b64006 + "','rangeType':'TABLE'},{'fileName':'f3.rf','startRow':'" + + b64008 + "','endRow':null,'rangeType':'TABLE'}]}"; + + assertEquals(expected.replace("'", "\""), json); + } + + @Test + public void testTableSplits() { + assertThrows(IllegalArgumentException.class, + () -> new LoadPlan.TableSplits(new Text("004"), new Text("004"))); + assertThrows(IllegalArgumentException.class, + () -> new LoadPlan.TableSplits(new Text("004"), new Text("003"))); } private static String toString(Destination d) { @@ -110,4 +154,8 @@ private static String toString(Destination d) { private static String toString(byte[] r) { return r == null ? null : new String(r, UTF_8); } + + public static Set toString(Collection destinations) { + return destinations.stream().map(d -> toString(d)).collect(Collectors.toSet()); + } } diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java index 33545639efd..4b0c533dd1d 100644 --- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java +++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java @@ -366,7 +366,6 @@ static String formatString(String prefix, int i) { public void test1() throws IOException { // test an empty file - TestRFile trf = new TestRFile(conf); trf.openWriter(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index a8aebd2e251..683461b8c3c 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -31,6 +31,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.math.BigInteger; +import java.net.URI; import java.nio.file.Files; import java.nio.file.Paths; import java.security.MessageDigest; @@ -88,6 +89,7 @@ import org.apache.accumulo.server.constraints.MetadataConstraints; import org.apache.accumulo.server.constraints.SystemEnvironment; import org.apache.accumulo.test.util.Wait; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -476,6 +478,63 @@ public void testBadLoadPlans() throws Exception { } } + @Test + public void testComputeLoadPlan() throws Exception { + + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + addSplits(c, tableName, "0333 0666 0999 1333 1666"); + + String dir = getDir("/testBulkFile-"); + + Map> hashes = new HashMap<>(); + String h1 = writeData(dir + "/f1.", aconf, 0, 333); + hashes.put("0333", new HashSet<>(List.of(h1))); + String h2 = writeData(dir + "/f2.", aconf, 0, 666); + hashes.get("0333").add(h2); + hashes.put("0666", new HashSet<>(List.of(h2))); + String h3 = writeData(dir + "/f3.", aconf, 334, 700); + hashes.get("0666").add(h3); + hashes.put("0999", new HashSet<>(List.of(h3))); + hashes.put("1333", Set.of()); + hashes.put("1666", Set.of()); + hashes.put("null", Set.of()); + + SortedSet splits = new TreeSet<>(c.tableOperations().listSplits(tableName)); + + for (String filename : List.of("f1.rf", "f2.rf", "f3.rf")) { + // The body of this loop simulates what each reducer would do + Path path = new Path(dir + "/" + filename); + + // compute the load plan for the rfile + URI file = path.toUri(); + String lpJson = LoadPlan.compute(file, LoadPlan.SplitResolver.from(splits)).toJson(); + + // save the load plan to a file + Path lpPath = new Path(path.getParent(), path.getName().replace(".rf", ".lp")); + try (var output = getCluster().getFileSystem().create(lpPath, false)) { + IOUtils.write(lpJson, output, UTF_8); + } + } + + // This simulates the code that would run after the map reduce job and bulk import the files + var builder = LoadPlan.builder(); + for (var status : getCluster().getFileSystem().listStatus(new Path(dir), + p -> p.getName().endsWith(".lp"))) { + try (var input = getCluster().getFileSystem().open(status.getPath())) { + String lpJson = IOUtils.toString(input, UTF_8); + builder.addPlan(LoadPlan.fromJson(lpJson)); + } + } + + LoadPlan lpAll = builder.build(); + + c.tableOperations().importDirectory(dir).to(tableName).plan(lpAll).load(); + + verifyData(c, tableName, 0, 700, false); + verifyMetadata(c, tableName, hashes); + } + } + @Test public void testEmptyDir() throws Exception { try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { @@ -621,7 +680,7 @@ private void verifyMetadata(AccumuloClient client, String tableName, String endRow = tablet.getEndRow() == null ? "null" : tablet.getEndRow().toString(); - assertEquals(expectedHashes.get(endRow), fileHashes); + assertEquals(expectedHashes.get(endRow), fileHashes, "endRow " + endRow); endRowsSeen.add(endRow); }