From 2f310b0b4ab92398907c5f6f6b2c84378cf9d263 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 2 Dec 2020 15:42:27 +0100 Subject: [PATCH 1/9] Hive: OutputCommitter implementation for Hive writes --- .../apache/iceberg/mr/InputFormatConfig.java | 3 + .../iceberg/mr/hive/ClosedFileData.java | 83 +++++++ .../mr/hive/HiveIcebergOutputCommitter.java | 217 ++++++++++++++++ .../mr/hive/HiveIcebergRecordWriter.java | 178 +++++++++++++ .../iceberg/mr/hive/LocationHelper.java | 92 +++++++ .../iceberg/mr/hive/HiveIcebergTestUtils.java | 84 ++++++- .../hive/TestHiveIcebergOutputCommitter.java | 233 ++++++++++++++++++ 7 files changed, 879 insertions(+), 11 deletions(-) create mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java create mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java create mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java create mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java create mode 100644 mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index 576c2ed62463..5ddbf88f4508 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -52,6 +52,9 @@ private InputFormatConfig() { public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns"; public static final String EXTERNAL_TABLE_PURGE = "external.table.purge"; + public static final String COMMIT_THREAD_POOL_SIZE = "iceberg.mr.commit.thread.pool.size"; + public static final int COMMIT_THREAD_POOL_SIZE_DEFAULT = 10; + public static final String CATALOG_NAME = "iceberg.catalog"; public static final String HADOOP_CATALOG = "hadoop.catalog"; public static final String HADOOP_TABLES = "hadoop.tables"; diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java b/mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java new file mode 100644 index 000000000000..1c98c4d0fb77 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.Serializable; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +/** + * Class for storing the data file properties which are needed for an Icebreg commit. + * + */ +final class ClosedFileData implements Serializable { + private final PartitionKey partitionKey; + private final String fileName; + private final FileFormat fileFormat; + private final Long length; + private final Metrics metrics; + + ClosedFileData(PartitionKey partitionKey, String fileName, FileFormat fileFormat, Long length, Metrics metrics) { + this.partitionKey = partitionKey; + this.fileName = fileName; + this.fileFormat = fileFormat; + this.length = length; + this.metrics = metrics; + } + + PartitionKey partitionKey() { + return partitionKey; + } + + String fileName() { + return fileName; + } + + FileFormat fileFormat() { + return fileFormat; + } + + Long length() { + return length; + } + + Metrics metrics() { + return metrics; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("partitionKey", partitionKey) + .add("fileName", fileName) + .add("fileFormat", fileFormat) + .add("length", length) + .add("metrics", metrics) + .toString(); + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java new file mode 100644 index 000000000000..71b6ab3c8d20 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.RuntimeIOException; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An Iceberg table committer for adding data files to the Iceberg tables. + * Currently independent of the Hive ACID transactions. + */ +public final class HiveIcebergOutputCommitter extends OutputCommitter { + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class); + + @Override + public void setupJob(JobContext jobContext) { + // do nothing. + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) { + // do nothing. + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) { + // We need to commit if this is the last phase of a MapReduce process + return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) || + context.getJobConf().getNumReduceTasks() == 0; + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + TaskAttemptID attemptID = context.getTaskAttemptID(); + String commitFileLocation = LocationHelper.generateToCommitFileLocation(context.getJobConf(), attemptID); + HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID); + + Set closedFiles = Collections.emptySet(); + if (writer != null) { + closedFiles = writer.closedFileData(); + } + + // Create the committed file for the task + createToCommitFile(closedFiles, commitFileLocation, new HadoopFileIO(context.getJobConf())); + } + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + // Clean up writer data from the local store + HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID()); + + // Remove files if it was not done already + writer.close(true); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + JobConf conf = jobContext.getJobConf(); + // If there are reducers, then every reducer will generate a result file. + // If this is a map only task, then every mapper will generate a result file. + int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); + Table table = Catalogs.loadTable(conf); + + long startTime = System.currentTimeMillis(); + LOG.info("Committing job is started for {} using {} expecting {} file(s)", table, + LocationHelper.generateJobLocation(conf, jobContext.getJobID()), expectedFiles); + + ExecutorService executor = null; + try { + // Creating executor service for parallel handling of file reads + executor = Executors.newFixedThreadPool( + conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT), + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.NORM_PRIORITY) + .setNameFormat("iceberg-commit-pool-%d") + .build()); + + Set dataFiles = new ConcurrentHashMap<>().newKeySet(); + + // Reading the committed files. The assumption here is that the taskIds are generated in sequential order + // starting from 0. + Tasks.range(expectedFiles) + .executeWith(executor) + .retry(3) + .run(taskId -> { + String taskFileName = LocationHelper.generateToCommitFileLocation(conf, jobContext.getJobID(), taskId); + Set closedFiles = readToCommitFile(taskFileName, table.io()); + + // If the data is not empty add to the table + if (!closedFiles.isEmpty()) { + closedFiles.forEach(file -> { + DataFiles.Builder builder = DataFiles.builder(table.spec()) + .withPath(file.fileName()) + .withFormat(file.fileFormat()) + .withFileSizeInBytes(file.length()) + .withPartition(file.partitionKey()) + .withMetrics(file.metrics()); + dataFiles.add(builder.build()); + }); + } + }); + + if (dataFiles.size() > 0) { + // Appending data files to the table + AppendFiles append = table.newAppend(); + Set addedFiles = new HashSet<>(dataFiles.size()); + dataFiles.forEach(dataFile -> { + append.appendFile(dataFile); + addedFiles.add(dataFile.path().toString()); + }); + append.commit(); + LOG.info("Commit for Iceberg write taken {} ms for {} with file(s) {}", + System.currentTimeMillis() - startTime, table, addedFiles); + } else { + LOG.info("Commit for Iceberg write taken {} ms for {} with no new files", + System.currentTimeMillis() - startTime, table); + } + + // Calling super to cleanupJob if something more is needed + cleanupJob(jobContext); + + } finally { + if (executor != null) { + executor.shutdown(); + } + } + } + + @Override + public void abortJob(JobContext jobContext, int status) throws IOException { + String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); + LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location); + + // Remove the result directory for the failed job + Tasks.foreach(location) + .retry(3) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on abort job", file, exc)) + .run(file -> { + Path toDelete = new Path(file); + FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf()); + try { + fs.delete(toDelete, true /* recursive */); + } catch (IOException e) { + throw new RuntimeIOException(e, "Failed to delete job directory: %s", file); + } + }); + cleanupJob(jobContext); + } + + private static void createToCommitFile(Set closedFiles, String location, FileIO io) + throws IOException { + + OutputFile commitFile = io.newOutputFile(location); + ObjectOutputStream oos = new ObjectOutputStream(commitFile.createOrOverwrite()); + oos.writeObject(closedFiles); + oos.close(); + LOG.debug("Iceberg committed file is created {}", commitFile); + } + + private static Set readToCommitFile(String toCommitFileLocation, FileIO io) { + try (ObjectInputStream ois = new ObjectInputStream(io.newInputFile(toCommitFileLocation).newStream())) { + return (Set) ois.readObject(); + } catch (ClassNotFoundException | IOException e) { + throw new NotFoundException("Can not read or parse committed file: %s", toCommitFileLocation); + } + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java new file mode 100644 index 000000000000..60ec8aafe9d9 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.mr.mapred.Container; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class HiveIcebergRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter + implements FileSinkOperator.RecordWriter, org.apache.hadoop.mapred.RecordWriter { + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergRecordWriter.class); + + // map to store the active writers + // Stored in concurrent map, since some executor engines can share containers + private static Map writers = new ConcurrentHashMap<>(); + + private final FileIO io; + private final String location; + private final FileFormat fileFormat; + private final GenericAppenderFactory appenderFactory; + // The current key is reused at every write to avoid unnecessary object creation + private final PartitionKey currentKey; + // Data for every partition is written to a different appender + // This map stores the open appenders for the given partition key + private final Map openAppenders = new HashMap<>(); + // When the appenders are closed the file data needed for the Iceberg commit is stored and accessible through + // this map + private final Map closedFileData = new HashMap<>(); + + HiveIcebergRecordWriter(TaskAttemptID taskAttemptID, Configuration conf, String location, FileFormat fileFormat, + Schema schema, PartitionSpec spec) { + this.io = new HadoopFileIO(conf); + this.location = location; + this.fileFormat = fileFormat; + this.appenderFactory = new GenericAppenderFactory(schema); + this.currentKey = new PartitionKey(spec, schema); + writers.put(taskAttemptID, this); + LOG.info("IcebergRecordWriter is created in {} with {}", location, fileFormat); + } + + @Override + public void write(Writable row) { + Preconditions.checkArgument(row instanceof Container); + + Record record = ((Container) row).get(); + + currentKey.partition(record); + + AppenderWrapper currentAppender = openAppenders.get(currentKey); + if (currentAppender == null) { + currentAppender = getAppender(); + openAppenders.put(currentKey.copy(), currentAppender); + } + + currentAppender.appender.add(record); + } + + @Override + public void write(NullWritable key, Container value) { + write(value); + } + + @Override + public void close(boolean abort) throws IOException { + // Close the open appenders and store the closed file data + for (PartitionKey key : openAppenders.keySet()) { + AppenderWrapper wrapper = openAppenders.get(key); + wrapper.close(); + closedFileData.put(key, + new ClosedFileData(key, wrapper.location, fileFormat, wrapper.length(), wrapper.metrics())); + } + + openAppenders.clear(); + + // If abort then remove the unnecessary files + if (abort) { + Tasks.foreach(closedFileData.values().stream().map(ClosedFileData::fileName).iterator()) + .retry(3) + .suppressFailureWhenFinished() + .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception)) + .run(io::deleteFile); + } + LOG.info("IcebergRecordWriter is closed. Created {} files", closedFileData); + } + + @Override + public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException { + close(false); + } + + @Override + public void close(Reporter reporter) throws IOException { + close(false); + } + + public Set closedFileData() { + return new HashSet<>(closedFileData.values()); + } + + static HiveIcebergRecordWriter removeWriter(TaskAttemptID taskAttemptID) { + return writers.remove(taskAttemptID); + } + + private AppenderWrapper getAppender() { + String dataFileLocation = location + "." + UUID.randomUUID(); + OutputFile dataFile = io.newOutputFile(dataFileLocation); + + FileAppender appender = appenderFactory.newAppender(dataFile, fileFormat); + + LOG.info("New AppenderWrapper is created for {} with {}", dataFileLocation, fileFormat); + return new AppenderWrapper(appender, dataFileLocation); + } + + private static final class AppenderWrapper { + private final FileAppender appender; + private final String location; + + AppenderWrapper(FileAppender appender, String location) { + this.appender = appender; + this.location = location; + } + + public long length() { + return appender.length(); + } + + public Metrics metrics() { + return appender.metrics(); + } + + public void close() throws IOException { + appender.close(); + } + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java b/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java new file mode 100644 index 000000000000..4fd32ee86fec --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.iceberg.mr.InputFormatConfig; + +class LocationHelper { + private static final String TO_COMMIT_EXTENSION = ".toCommit"; + + private LocationHelper() { + } + + /** + * Generates query directory location based on the configuration. + * Currently it uses tableLocation/queryId + * @param conf The job's configuration + * @return The directory to store the query result files + */ + static String generateQueryLocation(Configuration conf) { + String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION); + String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); + return tableLocation + "/" + queryId; + } + + /** + * Generates the job temp location based on the job configuration. + * Currently it uses QUERY_LOCATION/jobId. + * @param conf The job's configuration + * @param jobId The JobID for the task + * @return The file to store the results + */ + static String generateJobLocation(Configuration conf, JobID jobId) { + return generateQueryLocation(conf) + "/" + jobId; + } + + /** + * Generates datafile location based on the task configuration. + * Currently it uses QUERY_LOCATION/jobId/taskAttemptId. + * @param conf The job's configuration + * @param taskAttemptId The TaskAttemptID for the task + * @return The file to store the results + */ + static String generateDataFileLocation(Configuration conf, TaskAttemptID taskAttemptId) { + return generateJobLocation(conf, taskAttemptId.getJobID()) + "/" + taskAttemptId.toString(); + } + + /** + * Generates file location based on the task configuration and a specific task id. + * This file will be used to store the data required to generate the Iceberg commit. + * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks].toCommit. + * @param conf The job's configuration + * @param jobId The jobId for the task + * @param taskId The taskId for the commit file + * @return The file to store the results + */ + static String generateToCommitFileLocation(Configuration conf, JobID jobId, int taskId) { + return generateJobLocation(conf, jobId) + "/task-" + taskId + TO_COMMIT_EXTENSION; + } + + /** + * Generates file location location based on the task configuration. + * This file will be used to store the data required to generate the Iceberg commit. + * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks].committed. + * @param conf The job's configuration + * @param taskAttemptId The TaskAttemptID for the task + * @return The file to store the results + */ + static String generateToCommitFileLocation(Configuration conf, TaskAttemptID taskAttemptId) { + return generateToCommitFileLocation(conf, taskAttemptId.getJobID(), taskAttemptId.getTaskID().getId()); + } +} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index b04ef10d4e49..a8685f7bd45e 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -19,6 +19,8 @@ package org.apache.iceberg.mr.hive; +import java.io.File; +import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.Timestamp; @@ -27,8 +29,10 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.UUID; +import org.apache.commons.compress.utils.Lists; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -43,9 +47,13 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobID; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergBinaryObjectInspector; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDecimalObjectInspector; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector; @@ -100,6 +108,11 @@ private HiveIcebergTestUtils() { // Empty constructor for the utility class } + /** + * Generates a test record where every field has a value. + * @param uuidAsByte As per #1881 Parquet needs byte[] value for UUID, other file formats need UUID object + * @return Record with every field set + */ public static Record getTestRecord(boolean uuidAsByte) { Record record = GenericRecord.create(HiveIcebergTestUtils.FULL_SCHEMA); record.set(0, true); @@ -127,6 +140,10 @@ public static Record getTestRecord(boolean uuidAsByte) { return record; } + /** + * Record with every field set to null. + * @return Empty record + */ public static Record getNullTestRecord() { Record record = GenericRecord.create(HiveIcebergTestUtils.FULL_SCHEMA); @@ -137,13 +154,12 @@ public static Record getNullTestRecord() { return record; } + /** + * Hive values for the test record. + * @param record The original Iceberg record + * @return The Hive 'record' containing the same values + */ public static List valuesForTestRecord(Record record) { -// ByteBuffer byteBuffer = record.get(11, ByteBuffer.class); -// byte[] bytes = new byte[byteBuffer.remaining()]; -// byteBuffer.mark(); -// byteBuffer.get(bytes); -// byteBuffer.reset(); - return Arrays.asList( new BooleanWritable(Boolean.TRUE), new IntWritable(record.get(1, Integer.class)), @@ -163,19 +179,65 @@ public static List valuesForTestRecord(Record record) { ); } + /** + * Check if 2 Iceberg records are the same or not. Compares OffsetDateTimes only by the Intant they represent. + * @param expected The expected record + * @param actual The actual record + */ public static void assertEquals(Record expected, Record actual) { for (int i = 0; i < expected.size(); ++i) { if (expected.get(i) instanceof OffsetDateTime) { // For OffsetDateTime we just compare the actual instant Assert.assertEquals(((OffsetDateTime) expected.get(i)).toInstant(), ((OffsetDateTime) actual.get(i)).toInstant()); + } else if (expected.get(i) instanceof byte[]) { + Assert.assertArrayEquals((byte[]) expected.get(i), (byte[]) actual.get(i)); } else { - if (expected.get(i) instanceof byte[]) { - Assert.assertArrayEquals((byte[]) expected.get(i), (byte[]) actual.get(i)); - } else { - Assert.assertEquals(expected.get(i), actual.get(i)); - } + Assert.assertEquals(expected.get(i), actual.get(i)); } } } + + /** + * Validates whether the table contains the expected records. The results should be sorted by a unique key so we do + * not end up with flaky tests. + * @param table The table we should read the records from + * @param expected The expected list of Records (The list will be sorted) + * @param sortBy The column position by which we will sort + * @throws IOException Exceptions when reading the table data + */ + public static void validateData(Table table, List expected, int sortBy) throws IOException { + // Refresh the table, so we get the new data as well + table.refresh(); + List records = Lists.newArrayList(); + try (CloseableIterable iterable = IcebergGenerics.read(table).build()) { + iterable.forEach(records::add); + } + + // Sort based on the specified column + expected.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy))); + records.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy))); + + Assert.assertEquals(expected.size(), records.size()); + for (int i = 0; i < expected.size(); ++i) { + assertEquals(expected.get(i), records.get(i)); + } + } + + /** + * Validates the number of files under a {@link Table} generated by a specific queryId and jobId. + * @param table The table we are checking + * @param queryId The queryId which generated the files + * @param jobId The jobId which generated the files + * @param dataFileNum The expected number of data files (TABLE_LOCATION/queryId/jobId/* which are not commit files) + * @param commitFileNum The expected number of commit files (TABLE_LOCATION/queryId/jobId/task-***) + */ + public static void validateFiles(Table table, String queryId, JobID jobId, int dataFileNum, int commitFileNum) { + String location = table.location() + "/" + queryId + "/" + jobId; + String[] dataFiles = new File(location).list((dir, name) -> !name.startsWith(".") && !name.startsWith("task-")); + Assert.assertEquals(dataFileNum, dataFiles == null ? 0 : dataFiles.length); + + String[] commitFiles = new File(location).list((dir, name) -> name.startsWith("task-")); + Assert.assertEquals(commitFileNum, commitFiles == null ? 0 : commitFiles.length); + } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java new file mode 100644 index 000000000000..b2738947385d --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.TestHelper; +import org.apache.iceberg.mr.mapred.Container; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestHiveIcebergOutputCommitter { + private static final int RECORD_NUM = 5; + private static final String QUERY_ID = "query_id"; + private static final JobID JOB_ID = new JobID("test", 0); + private static final TaskAttemptID MAP_TASK_ID = + new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 0, 0); + private static final TaskAttemptID REDUCE_TASK_ID = + new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.REDUCE, 0, 0); + + private static final Schema CUSTOMER_SCHEMA = new Schema( + required(1, "customer_id", Types.LongType.get()), + required(2, "first_name", Types.StringType.get()) + ); + + private static final PartitionSpec PARTITIONED_SPEC = + PartitionSpec.builderFor(CUSTOMER_SCHEMA).bucket("customer_id", 3).build(); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testNeedsTaskCommit() { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + + JobConf mapOnlyJobConf = new JobConf(); + mapOnlyJobConf.setNumMapTasks(10); + mapOnlyJobConf.setNumReduceTasks(0); + + // Map only job should commit map tasks + Assert.assertTrue(committer.needsTaskCommit(new TaskAttemptContextImpl(mapOnlyJobConf, MAP_TASK_ID))); + + JobConf mapReduceJobConf = new JobConf(); + mapReduceJobConf.setNumMapTasks(10); + mapReduceJobConf.setNumReduceTasks(10); + + // MapReduce job should not commit map tasks, but should commit reduce tasks + Assert.assertFalse(committer.needsTaskCommit(new TaskAttemptContextImpl(mapReduceJobConf, MAP_TASK_ID))); + Assert.assertTrue(committer.needsTaskCommit(new TaskAttemptContextImpl(mapReduceJobConf, REDUCE_TASK_ID))); + } + + @Test + public void testSuccessfulUnpartitionedWrite() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), false); + JobConf conf = jobConf(table, 1); + List expected = writeRecords(table, 1, 0, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 1, 1); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testSuccessfulPartitionedWrite() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), true); + JobConf conf = jobConf(table, 1); + List expected = writeRecords(table, 1, 0, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 3, 1); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testSuccessfulMultipleTasksUnpartitionedWrite() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), false); + JobConf conf = jobConf(table, 2); + List expected = writeRecords(table, 2, 0, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 2, 2); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testSuccessfulMultipleTasksPartitionedWrite() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), true); + JobConf conf = jobConf(table, 2); + List expected = writeRecords(table, 2, 0, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 6, 2); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testRetryTask() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), false); + JobConf conf = jobConf(table, 2); + + // Write records and abort the tasks + writeRecords(table, 2, 0, false, true, conf); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 0, 0); + HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); + + // Write records but do not abort the tasks + // The data files remain since we can not identify them but should not be read + writeRecords(table, 2, 1, false, false, conf); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 2, 0); + HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); + + // Write and commit the records + List expected = writeRecords(table, 2, 2, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 4, 2); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testAbortJob() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), false); + JobConf conf = jobConf(table, 1); + writeRecords(table, 1, 0, true, false, conf); + committer.abortJob(new JobContextImpl(conf, JOB_ID), JobStatus.State.FAILED); + + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 0, 0); + HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); + } + + private Table table(String location, boolean partitioned) { + HadoopTables tables = new HadoopTables(); + return tables.create(CUSTOMER_SCHEMA, partitioned ? PARTITIONED_SPEC : PartitionSpec.unpartitioned(), location); + } + + private JobConf jobConf(Table table, int taskNum) { + JobConf conf = new JobConf(); + conf.setNumMapTasks(taskNum); + conf.setNumReduceTasks(0); + conf.set(InputFormatConfig.TABLE_LOCATION, table.location()); + conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID); + + return conf; + } + + /** + * Write random records to the given table using separate {@link HiveIcebergOutputCommitter} and + * a separate {@link HiveIcebergRecordWriter} for every task. + * @param table The target table to write to + * @param taskNum The number of tasks in the job handled by the committer + * @param attemptNum The id used for attempt number generation + * @param commitTasks If true the tasks will be committed + * @param abortTasks If true the tasks will be aborted - needed so we can simulate no commit/no abort + * situation + * @param conf The job configuration + * @return The random generated records which were appended to the table + * @throws IOException Propagating {@link HiveIcebergRecordWriter} exceptions + */ + private List writeRecords(Table table, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks, + JobConf conf) throws IOException { + List expected = new ArrayList<>(RECORD_NUM * taskNum); + + for (int i = 0; i < taskNum; ++i) { + List records = TestHelper.generateRandomRecords(table.schema(), RECORD_NUM, i + attemptNum); + TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum); + String location = table.location() + "/" + QUERY_ID + "/" + JOB_ID + "/" + taskId; + HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(taskId, new Configuration(), location, + FileFormat.PARQUET, table.schema(), table.spec()); + + Container container = new Container<>(); + + records.forEach(record -> { + container.set(record); + testWriter.write(container); + }); + + testWriter.close(false); + if (commitTasks) { + new HiveIcebergOutputCommitter().commitTask(new TaskAttemptContextImpl(conf, taskId)); + expected.addAll(records); + } else if (abortTasks) { + new HiveIcebergOutputCommitter().abortTask(new TaskAttemptContextImpl(conf, taskId)); + } + } + + return expected; + } +} From fa43aef90c9bd95ff4dbf5859e90acd9b7e052e2 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Thu, 3 Dec 2020 19:04:16 +0100 Subject: [PATCH 2/9] Addressing Marton's comments --- .../iceberg/mr/hive/ClosedFileData.java | 18 ++++---- .../mr/hive/HiveIcebergOutputCommitter.java | 42 +++++++++---------- .../mr/hive/HiveIcebergRecordWriter.java | 2 +- .../iceberg/mr/hive/LocationHelper.java | 14 +++---- 4 files changed, 40 insertions(+), 36 deletions(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java b/mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java index 1c98c4d0fb77..85d2978d73e6 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java @@ -26,7 +26,7 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** - * Class for storing the data file properties which are needed for an Icebreg commit. + * Class for storing the data file properties which are needed for an Iceberg commit. *
    *
  • Partition key *
  • File name @@ -39,14 +39,14 @@ final class ClosedFileData implements Serializable { private final PartitionKey partitionKey; private final String fileName; private final FileFormat fileFormat; - private final Long length; + private final Long fileSize; private final Metrics metrics; - ClosedFileData(PartitionKey partitionKey, String fileName, FileFormat fileFormat, Long length, Metrics metrics) { + ClosedFileData(PartitionKey partitionKey, String fileName, FileFormat fileFormat, Long fileSize, Metrics metrics) { this.partitionKey = partitionKey; this.fileName = fileName; this.fileFormat = fileFormat; - this.length = length; + this.fileSize = fileSize; this.metrics = metrics; } @@ -62,8 +62,12 @@ FileFormat fileFormat() { return fileFormat; } - Long length() { - return length; + /** + * File size in bytes. + * @return The size + */ + Long fileSize() { + return fileSize; } Metrics metrics() { @@ -76,7 +80,7 @@ public String toString() { .add("partitionKey", partitionKey) .add("fileName", fileName) .add("fileFormat", fileFormat) - .add("length", length) + .add("fileSize", fileSize) .add("metrics", metrics) .toString(); } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 71b6ab3c8d20..1af790db7d64 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -80,7 +80,7 @@ public boolean needsTaskCommit(TaskAttemptContext context) { @Override public void commitTask(TaskAttemptContext context) throws IOException { TaskAttemptID attemptID = context.getTaskAttemptID(); - String commitFileLocation = LocationHelper.generateToCommitFileLocation(context.getJobConf(), attemptID); + String fileForCommitLocation = LocationHelper.generateFileForCommitLocation(context.getJobConf(), attemptID); HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID); Set closedFiles = Collections.emptySet(); @@ -88,8 +88,8 @@ public void commitTask(TaskAttemptContext context) throws IOException { closedFiles = writer.closedFileData(); } - // Create the committed file for the task - createToCommitFile(closedFiles, commitFileLocation, new HadoopFileIO(context.getJobConf())); + // Creating the file containing the descriptor(s) for the file(s) written by this task + createFileForCommit(closedFiles, fileForCommitLocation, new HadoopFileIO(context.getJobConf())); } @Override @@ -110,7 +110,7 @@ public void commitJob(JobContext jobContext) throws IOException { Table table = Catalogs.loadTable(conf); long startTime = System.currentTimeMillis(); - LOG.info("Committing job is started for {} using {} expecting {} file(s)", table, + LOG.info("Committing job has started for table: {}, using location: {}, expecting {} file(s).", table, LocationHelper.generateJobLocation(conf, jobContext.getJobID()), expectedFiles); ExecutorService executor = null; @@ -132,8 +132,8 @@ public void commitJob(JobContext jobContext) throws IOException { .executeWith(executor) .retry(3) .run(taskId -> { - String taskFileName = LocationHelper.generateToCommitFileLocation(conf, jobContext.getJobID(), taskId); - Set closedFiles = readToCommitFile(taskFileName, table.io()); + String taskFileName = LocationHelper.generateFileForCommitLocation(conf, jobContext.getJobID(), taskId); + Set closedFiles = readFileForCommit(taskFileName, table.io()); // If the data is not empty add to the table if (!closedFiles.isEmpty()) { @@ -141,7 +141,7 @@ public void commitJob(JobContext jobContext) throws IOException { DataFiles.Builder builder = DataFiles.builder(table.spec()) .withPath(file.fileName()) .withFormat(file.fileFormat()) - .withFileSizeInBytes(file.length()) + .withFileSizeInBytes(file.fileSize()) .withPartition(file.partitionKey()) .withMetrics(file.metrics()); dataFiles.add(builder.build()); @@ -158,11 +158,10 @@ public void commitJob(JobContext jobContext) throws IOException { addedFiles.add(dataFile.path().toString()); }); append.commit(); - LOG.info("Commit for Iceberg write taken {} ms for {} with file(s) {}", - System.currentTimeMillis() - startTime, table, addedFiles); + LOG.info("Commit took {} ms for table: {} with file(s): {}", System.currentTimeMillis() - startTime, table, + addedFiles); } else { - LOG.info("Commit for Iceberg write taken {} ms for {} with no new files", - System.currentTimeMillis() - startTime, table); + LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table); } // Calling super to cleanupJob if something more is needed @@ -180,7 +179,8 @@ public void abortJob(JobContext jobContext, int status) throws IOException { String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location); - // Remove the result directory for the failed job + // Remove the result directory for the failed job. + // Intentionally used foreach on a single item. Using the Tasks API here only for the retry capability. Tasks.foreach(location) .retry(3) .suppressFailureWhenFinished() @@ -197,21 +197,21 @@ public void abortJob(JobContext jobContext, int status) throws IOException { cleanupJob(jobContext); } - private static void createToCommitFile(Set closedFiles, String location, FileIO io) + private static void createFileForCommit(Set closedFiles, String location, FileIO io) throws IOException { - OutputFile commitFile = io.newOutputFile(location); - ObjectOutputStream oos = new ObjectOutputStream(commitFile.createOrOverwrite()); - oos.writeObject(closedFiles); - oos.close(); - LOG.debug("Iceberg committed file is created {}", commitFile); + OutputFile fileForCommit = io.newOutputFile(location); + try (ObjectOutputStream oos = new ObjectOutputStream(fileForCommit.createOrOverwrite())) { + oos.writeObject(closedFiles); + } + LOG.debug("Iceberg committed file is created {}", fileForCommit); } - private static Set readToCommitFile(String toCommitFileLocation, FileIO io) { - try (ObjectInputStream ois = new ObjectInputStream(io.newInputFile(toCommitFileLocation).newStream())) { + private static Set readFileForCommit(String fileForCommitLocation, FileIO io) { + try (ObjectInputStream ois = new ObjectInputStream(io.newInputFile(fileForCommitLocation).newStream())) { return (Set) ois.readObject(); } catch (ClassNotFoundException | IOException e) { - throw new NotFoundException("Can not read or parse committed file: %s", toCommitFileLocation); + throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation); } } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java index 60ec8aafe9d9..4bb54f913a6c 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java @@ -123,7 +123,7 @@ public void close(boolean abort) throws IOException { .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception)) .run(io::deleteFile); } - LOG.info("IcebergRecordWriter is closed. Created {} files", closedFileData); + LOG.info("IcebergRecordWriter is closed. Created {} files", closedFileData.values()); } @Override diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java b/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java index 4fd32ee86fec..ed3ef23ac024 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java @@ -26,7 +26,7 @@ import org.apache.iceberg.mr.InputFormatConfig; class LocationHelper { - private static final String TO_COMMIT_EXTENSION = ".toCommit"; + private static final String FOR_COMMIT_EXTENSION = ".forCommit"; private LocationHelper() { } @@ -68,25 +68,25 @@ static String generateDataFileLocation(Configuration conf, TaskAttemptID taskAtt /** * Generates file location based on the task configuration and a specific task id. * This file will be used to store the data required to generate the Iceberg commit. - * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks].toCommit. + * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks).forCommit. * @param conf The job's configuration * @param jobId The jobId for the task * @param taskId The taskId for the commit file * @return The file to store the results */ - static String generateToCommitFileLocation(Configuration conf, JobID jobId, int taskId) { - return generateJobLocation(conf, jobId) + "/task-" + taskId + TO_COMMIT_EXTENSION; + static String generateFileForCommitLocation(Configuration conf, JobID jobId, int taskId) { + return generateJobLocation(conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION; } /** * Generates file location location based on the task configuration. * This file will be used to store the data required to generate the Iceberg commit. - * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks].committed. + * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks).forCommit. * @param conf The job's configuration * @param taskAttemptId The TaskAttemptID for the task * @return The file to store the results */ - static String generateToCommitFileLocation(Configuration conf, TaskAttemptID taskAttemptId) { - return generateToCommitFileLocation(conf, taskAttemptId.getJobID(), taskAttemptId.getTaskID().getId()); + static String generateFileForCommitLocation(Configuration conf, TaskAttemptID taskAttemptId) { + return generateFileForCommitLocation(conf, taskAttemptId.getJobID(), taskAttemptId.getTaskID().getId()); } } From a12fd5ec8a1948ef3d099d66e6049e154179fc02 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Thu, 3 Dec 2020 19:23:04 +0100 Subject: [PATCH 3/9] Removed the Precondition since it does not have an added value anyway --- .../org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java index 4bb54f913a6c..badf7805c403 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java @@ -44,7 +44,6 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mr.mapred.Container; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,10 +82,9 @@ class HiveIcebergRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter) row).get(); + // Update the current key with the record, so we do not create a new object for every record currentKey.partition(record); AppenderWrapper currentAppender = openAppenders.get(currentKey); From 97c1016bde16cd30b676d795546f0d3cf473ca42 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 7 Dec 2020 14:55:04 +0100 Subject: [PATCH 4/9] Moved to PartitionedFanoutWriter Addressed other review comments as well. --- .../apache/iceberg/io/OutputFileFactory.java | 14 +- .../.task-0.forCommit.crc | Bin 0 -> 28 bytes .../.task-1.forCommit.crc | Bin 0 -> 28 bytes .../query_id-job_test_0000/task-0.forCommit | Bin 0 -> 2230 bytes .../query_id-job_test_0000/task-1.forCommit | Bin 0 -> 2212 bytes .../apache/iceberg/mr/InputFormatConfig.java | 3 + .../iceberg/mr/hive/ClosedFileData.java | 87 -------- .../mr/hive/HiveIcebergOutputCommitter.java | 202 +++++++++++------- .../mr/hive/HiveIcebergRecordWriter.java | 132 +++--------- .../mr/hive/HiveIcebergStorageHandler.java | 105 ++++++++- .../mr/hive/HiveOutputFileFactory.java | 50 +++++ .../iceberg/mr/hive/LocationHelper.java | 44 +--- .../iceberg/mr/hive/HiveIcebergTestUtils.java | 23 +- .../hive/TestHiveIcebergOutputCommitter.java | 64 +++--- 14 files changed, 383 insertions(+), 341 deletions(-) create mode 100644 mr/null/query_id-job_test_0000/.task-0.forCommit.crc create mode 100644 mr/null/query_id-job_test_0000/.task-1.forCommit.crc create mode 100644 mr/null/query_id-job_test_0000/task-0.forCommit create mode 100644 mr/null/query_id-job_test_0000/task-1.forCommit delete mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/ClosedFileData.java create mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/HiveOutputFileFactory.java diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index e68f4c5ad2ec..acbb152a2209 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -52,7 +52,7 @@ public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider this.taskId = taskId; } - private String generateFilename() { + protected String generateFilename() { return format.addExtension( String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount.incrementAndGet())); } @@ -73,4 +73,16 @@ public EncryptedOutputFile newOutputFile(PartitionKey key) { OutputFile rawOutputFile = io.newOutputFile(newDataLocation); return encryptionManager.encrypt(rawOutputFile); } + + protected FileFormat format() { + return format; + } + + protected String uuid() { + return uuid; + } + + protected int nextCount() { + return fileCount.incrementAndGet(); + } } diff --git a/mr/null/query_id-job_test_0000/.task-0.forCommit.crc b/mr/null/query_id-job_test_0000/.task-0.forCommit.crc new file mode 100644 index 0000000000000000000000000000000000000000..56a92676b1d6fef4078340b04f251da2898cfef9 GIT binary patch literal 28 kcmYc;N@ieSU}6Z(uwvaG_e}Qlr}|l0Zmk%^@C0E7Sw9RL6T literal 0 HcmV?d00001 diff --git a/mr/null/query_id-job_test_0000/.task-1.forCommit.crc b/mr/null/query_id-job_test_0000/.task-1.forCommit.crc new file mode 100644 index 0000000000000000000000000000000000000000..3534f9d6ff04a814e37945a9e42cf43bb9169033 GIT binary patch literal 28 kcmYc;N@ieSU}6Z(uwp%3pngqB^z~Hv2Prm-(ia^70Cju{FaQ7m literal 0 HcmV?d00001 diff --git a/mr/null/query_id-job_test_0000/task-0.forCommit b/mr/null/query_id-job_test_0000/task-0.forCommit new file mode 100644 index 0000000000000000000000000000000000000000..c9123cb62318fb6a43528d52c9b7075b33a2b388 GIT binary patch literal 2230 zcmaJ@No*Tc7=Ce_#E_)fpeRCt$PI+5wZ~2z+ew5%Oq&lOS4B#+loh)|26hmFCJ=+5o)I}Xw_O*+y*iD%bYRErcj zv!bNKpeE1qtggikCP$SRX_a-=&TA-TancdX(N!whJgZ7G)EJME7FpLg(>N#nn3+_{ zOe$w4q0i?_65~nI%2sTuW_dBHfH6lh3idIEIaA4MROO_zWM8FPMvCcL1{}ChV4m_N zPP%Kjac*V>zd5a&c?$lYyr2ImApq{X(x#rDH}xEoF~xb^S|JxmGsLybiZhH;1^R$- zMIbI!Ut?xOw>3pbIb-C2MzzyawILx};}vR$W~=Ica(lIs*d$Ke^`e`vR2yX=*LzBd zqQN*hQcsBk%aTyQ8h|R;GRJq&ZZ(;#n~I1I6KhG5Hp_r=&!y8Aw-6jczZ-p3HhpU z0j9Ig7 z%ghrR8VP#>!(rJ&gXy4$1yaMIRLD2vmq83JzZbx`VlxgY_T4AN(vZ|9fmQ3}q-B0` z;f2NH3B!QSMe1nBVH3enuhW#uaE(OlbefrfkO8=>Ofhh#UMPMj-I(1`u1`2%&WIBR zPqYDsuvxSnP}-PGMhXTjOIQ#yUz~e-^^ea!u5=v!lLojQz(%=K|MWhZ$^0Cu@`YMt z^Uh;;n_Y=p6|(D>Qzsg)(y^O1W;F-~IwSJw$+L?y(P(@@w=Bye!ssaDl~nOc3cDCJ zKt@YFyD&tMbPyj$3s9_jrLOGZ0snW?>vxX~{_r}6Xp;Uqh26CswkH;PzS-(~+d-UB z;#`#`MCI2=BCiZoYg(xGZ)D_${fWh<9&oVn6z z9abx4#7%IxPNZTbdH1fQ*o(o>6=(b3FpALKbe5^O^*!PQ+a+HbYYie=$f)FL?^#{ z6vF)ix`*H!_~jWuQ%@nK3TxLVf2PKJH~wv}QaXwSkCvG=7XvTJCuQa32f`T*j^RU~ Hc&zY0m5-+Q literal 0 HcmV?d00001 diff --git a/mr/null/query_id-job_test_0000/task-1.forCommit b/mr/null/query_id-job_test_0000/task-1.forCommit new file mode 100644 index 0000000000000000000000000000000000000000..f3f6b338bb2d13ae9c22888b5bc726078acb6361 GIT binary patch literal 2212 zcmaJ@OKcle6n$}=#Ned)m7)p-A~z7K)}A;`Y$p*4aoSL)&Zmu2B3aNp&pbQhjAte@ zZxTDuN(ezEB1EOaf(3#_*+oJ@>H@?DSfQdyNNo5>R23`KD2psw2@u@-{Hc?;9cjih z_y0NfK5uT52FD`LCgX;c2~d;DODv$uEXCmSH05+c)!4`<*>|?OZiTj7gyc;^NS$pF zUk!yw-~ zX2(dYNiD8&)zIfmCdZUG@ydqgIcbe&=_-{Ru4>X0HAmv4Nj7xObk0dPW+oLg zlS-LM=ySX%F_I)+w&GAt<%O66#$3r5*vA-VEJf9+#z|YzzDD(o6gTt?IPjprEagj_ zbX0NYxTOkyW7@EC6#PH;DE|{e0Nl5yEh9H;8CfP{inE5jLavZTh-;Y@rWmIR^a10F zLR^}$#;mB}=!%eX$H^g`>KCczKthhr%hXog(X@l)mP#eDNu2m&p9*!KYTJ#d!VOVImb7_^~5znp41GgOi-B{R*XEsmn>#4 z8JdzeM_)z*qVOGc8&u-j*>uSVB>SClpL<2x>6HbNPw3CBL zd3W=pF7JnTHmb+t&_f}Cn2kTL{rJJLKfuyt*%B6`1H*vpqUc*Yyt%~NVL>J2QvM1| zXN}3JLU2hGZ9#NZj?d^$u9|6b;yyi=7&GBzLt3Sll!j_B%a$VRQh(@D%1jLng|%>K zS$1a?o zKa(&`=v=gh_8e>?80vMJ(ipChsFO}J3lP!+cZDei&NA|akEPqwJIbw57t9%P!{CWF zzz{ZywgXD*lgVh_gk=c}V(RO~7gqoJ^7C@X;g{6G?E*H+o%{FbQ<=b-m{z?ig{~uzsOh#r@QotlEP!DcoiU zb_WL49EbTAYjO?_n;b=&r#RVArIM9^7_X9*Yqu9ZZL^PS<+9=AAFmOqSV4jPD=7YI z=qttD@(+w2bTh3o4Y#~soOt`s1!1AI#HQh7Vaf!`k3eZ&e5}KsqBQ!>CSHHZ`DLw6 zL<<=eJ?%eRt7EP9RR8`rU+mtzx=_iLlleD%DD4~e6*sBxq)(?g{NB@=64{0`9C-)B z1%YDrzTxC+m$B@jJ-ER?0 - *
  • Partition key - *
  • File name - *
  • File format - *
  • File size - *
  • Metrics - *
- */ -final class ClosedFileData implements Serializable { - private final PartitionKey partitionKey; - private final String fileName; - private final FileFormat fileFormat; - private final Long fileSize; - private final Metrics metrics; - - ClosedFileData(PartitionKey partitionKey, String fileName, FileFormat fileFormat, Long fileSize, Metrics metrics) { - this.partitionKey = partitionKey; - this.fileName = fileName; - this.fileFormat = fileFormat; - this.fileSize = fileSize; - this.metrics = metrics; - } - - PartitionKey partitionKey() { - return partitionKey; - } - - String fileName() { - return fileName; - } - - FileFormat fileFormat() { - return fileFormat; - } - - /** - * File size in bytes. - * @return The size - */ - Long fileSize() { - return fileSize; - } - - Metrics metrics() { - return metrics; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("partitionKey", partitionKey) - .add("fileName", fileName) - .add("fileFormat", fileFormat) - .add("fileSize", fileSize) - .add("metrics", metrics) - .toString(); - } -} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 1af790db7d64..edb4cab3ca0c 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -22,12 +22,14 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; @@ -38,11 +40,8 @@ import org.apache.hadoop.mapreduce.TaskType; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; import org.apache.iceberg.Table; import org.apache.iceberg.exceptions.NotFoundException; -import org.apache.iceberg.exceptions.RuntimeIOException; -import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.hadoop.Util; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; @@ -59,6 +58,7 @@ */ public final class HiveIcebergOutputCommitter extends OutputCommitter { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class); + private FileIO io; @Override public void setupJob(JobContext jobContext) { @@ -77,21 +77,34 @@ public boolean needsTaskCommit(TaskAttemptContext context) { context.getJobConf().getNumReduceTasks() == 0; } + /** + * Collects the generated data files and creates a commit file storing the data file list. + * @param context The job context + * @throws IOException Thrown if there is an error writing the commit file + */ @Override public void commitTask(TaskAttemptContext context) throws IOException { TaskAttemptID attemptID = context.getTaskAttemptID(); - String fileForCommitLocation = LocationHelper.generateFileForCommitLocation(context.getJobConf(), attemptID); + String fileForCommitLocation = LocationHelper.generateFileForCommitLocation(context.getJobConf(), + attemptID.getJobID(), attemptID.getTaskID().getId()); HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID); - Set closedFiles = Collections.emptySet(); + DataFile[] closedFiles; if (writer != null) { - closedFiles = writer.closedFileData(); + closedFiles = writer.complete(); + } else { + closedFiles = new DataFile[0]; } - // Creating the file containing the descriptor(s) for the file(s) written by this task - createFileForCommit(closedFiles, fileForCommitLocation, new HadoopFileIO(context.getJobConf())); + // Creating the file containing the data files generated by this task + createFileForCommit(closedFiles, fileForCommitLocation, io(context.getJobConf())); } + /** + * Removes files generated by this task. + * @param context The task context + * @throws IOException Thrown if there is an error closing the writer + */ @Override public void abortTask(TaskAttemptContext context) throws IOException { // Clean up writer data from the local store @@ -101,17 +114,99 @@ public void abortTask(TaskAttemptContext context) throws IOException { writer.close(true); } + /** + * Reads the commit files stored in the temp directory and collects the generated committed data files. + * Appends the data files to the table. At the end removes the temporary directory. + * @param jobContext The job context + */ @Override - public void commitJob(JobContext jobContext) throws IOException { + public void commitJob(JobContext jobContext) { JobConf conf = jobContext.getJobConf(); - // If there are reducers, then every reducer will generate a result file. - // If this is a map only task, then every mapper will generate a result file. - int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); Table table = Catalogs.loadTable(conf); long startTime = System.currentTimeMillis(); - LOG.info("Committing job has started for table: {}, using location: {}, expecting {} file(s).", table, - LocationHelper.generateJobLocation(conf, jobContext.getJobID()), expectedFiles); + LOG.info("Committing job has started for table: {}, using location: {}", table, + LocationHelper.generateJobLocation(conf, jobContext.getJobID())); + + List dataFiles = dataFiles(jobContext, io(jobContext.getJobConf()), true); + + if (dataFiles.size() > 0) { + // Appending data files to the table + AppendFiles append = table.newAppend(); + dataFiles.forEach(append::appendFile); + append.commit(); + LOG.info("Commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, table, + dataFiles.size()); + LOG.debug("Added files {}", dataFiles); + } else { + LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table); + } + + cleanup(jobContext); + } + + /** + * Removes the generated data files, if there is a commit file already generated for them. + * The cleanup at the end removes the temporary directory as well. + * @param jobContext The job context + * @param status The status of the job + */ + @Override + public void abortJob(JobContext jobContext, int status) { + String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); + LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location); + + List dataFiles = dataFiles(jobContext, io(jobContext.getJobConf()), false); + + // Check if we have files already committed and remove data files if there are any + if (dataFiles.size() > 0) { + Tasks.foreach(dataFiles) + .retry(3) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.debug("Failed on to remove data file {} on abort job", file.path(), exc)) + .run(file -> io(jobContext.getJobConf()).deleteFile(file.path().toString())); + } + + cleanup(jobContext); + } + + /** + * Cleans up the jobs temporary location. + * @param jobContext The job context + */ + private void cleanup(JobContext jobContext) { + String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); + LOG.info("Cleaning for job: {} on location: {}", jobContext.getJobID(), location); + + // Remove the job's temp directory recursively. + // Intentionally used foreach on a single item. Using the Tasks API here only for the retry capability. + Tasks.foreach(location) + .retry(3) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on cleanup job", file, exc)) + .run(file -> { + Path toDelete = new Path(file); + FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf()); + try { + fs.delete(toDelete, true); + } catch (IOException e) { + throw new UncheckedIOException(String.format("Failed to delete job directory: %s", file), e); + } + }); + } + + /** + * Get the data committed data files for this job. + * @param jobContext The job context + * @param io The FileIO used for reading a files generated for commit + * @param throwOnFailure If true then it throws an exception on failure + * @return The list of the committed data files + */ + private static List dataFiles(JobContext jobContext, FileIO io, boolean throwOnFailure) { + JobConf conf = jobContext.getJobConf(); + // If there are reducers, then every reducer will generate a result file. + // If this is a map only task, then every mapper will generate a result file. + int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); ExecutorService executor = null; try { @@ -124,49 +219,20 @@ public void commitJob(JobContext jobContext) throws IOException { .setNameFormat("iceberg-commit-pool-%d") .build()); - Set dataFiles = new ConcurrentHashMap<>().newKeySet(); + List dataFiles = Collections.synchronizedList(new ArrayList<>()); // Reading the committed files. The assumption here is that the taskIds are generated in sequential order // starting from 0. Tasks.range(expectedFiles) + .throwFailureWhenFinished(throwOnFailure) .executeWith(executor) .retry(3) .run(taskId -> { String taskFileName = LocationHelper.generateFileForCommitLocation(conf, jobContext.getJobID(), taskId); - Set closedFiles = readFileForCommit(taskFileName, table.io()); - - // If the data is not empty add to the table - if (!closedFiles.isEmpty()) { - closedFiles.forEach(file -> { - DataFiles.Builder builder = DataFiles.builder(table.spec()) - .withPath(file.fileName()) - .withFormat(file.fileFormat()) - .withFileSizeInBytes(file.fileSize()) - .withPartition(file.partitionKey()) - .withMetrics(file.metrics()); - dataFiles.add(builder.build()); - }); - } + dataFiles.addAll(Arrays.asList(readFileForCommit(taskFileName, io))); }); - if (dataFiles.size() > 0) { - // Appending data files to the table - AppendFiles append = table.newAppend(); - Set addedFiles = new HashSet<>(dataFiles.size()); - dataFiles.forEach(dataFile -> { - append.appendFile(dataFile); - addedFiles.add(dataFile.path().toString()); - }); - append.commit(); - LOG.info("Commit took {} ms for table: {} with file(s): {}", System.currentTimeMillis() - startTime, table, - addedFiles); - } else { - LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table); - } - - // Calling super to cleanupJob if something more is needed - cleanupJob(jobContext); - + return dataFiles; } finally { if (executor != null) { executor.shutdown(); @@ -174,30 +240,7 @@ public void commitJob(JobContext jobContext) throws IOException { } } - @Override - public void abortJob(JobContext jobContext, int status) throws IOException { - String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); - LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location); - - // Remove the result directory for the failed job. - // Intentionally used foreach on a single item. Using the Tasks API here only for the retry capability. - Tasks.foreach(location) - .retry(3) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on abort job", file, exc)) - .run(file -> { - Path toDelete = new Path(file); - FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf()); - try { - fs.delete(toDelete, true /* recursive */); - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to delete job directory: %s", file); - } - }); - cleanupJob(jobContext); - } - - private static void createFileForCommit(Set closedFiles, String location, FileIO io) + private static void createFileForCommit(DataFile[] closedFiles, String location, FileIO io) throws IOException { OutputFile fileForCommit = io.newOutputFile(location); @@ -207,11 +250,18 @@ private static void createFileForCommit(Set closedFiles, String LOG.debug("Iceberg committed file is created {}", fileForCommit); } - private static Set readFileForCommit(String fileForCommitLocation, FileIO io) { + private static DataFile[] readFileForCommit(String fileForCommitLocation, FileIO io) { try (ObjectInputStream ois = new ObjectInputStream(io.newInputFile(fileForCommitLocation).newStream())) { - return (Set) ois.readObject(); + return (DataFile[]) ois.readObject(); } catch (ClassNotFoundException | IOException e) { throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation); } } + + private FileIO io(Configuration conf) { + if (io == null) { + io = HiveIcebergStorageHandler.io(conf); + } + return io; + } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java index badf7805c403..324c120bd0ba 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java @@ -20,157 +20,87 @@ package org.apache.iceberg.mr.hive; import java.io.IOException; -import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Metrics; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; -import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; -import org.apache.iceberg.hadoop.HadoopFileIO; -import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; import org.apache.iceberg.mr.mapred.Container; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class HiveIcebergRecordWriter extends org.apache.hadoop.mapreduce.RecordWriter - implements FileSinkOperator.RecordWriter, org.apache.hadoop.mapred.RecordWriter { +class HiveIcebergRecordWriter extends PartitionedFanoutWriter + implements FileSinkOperator.RecordWriter, org.apache.hadoop.mapred.RecordWriter> { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergRecordWriter.class); + // The current key is reused at every write to avoid unnecessary object creation + private final PartitionKey currentKey; + private final FileIO io; + // map to store the active writers // Stored in concurrent map, since some executor engines can share containers - private static Map writers = new ConcurrentHashMap<>(); + private static final Map writers = new ConcurrentHashMap<>(); - private final FileIO io; - private final String location; - private final FileFormat fileFormat; - private final GenericAppenderFactory appenderFactory; - // The current key is reused at every write to avoid unnecessary object creation - private final PartitionKey currentKey; - // Data for every partition is written to a different appender - // This map stores the open appenders for the given partition key - private final Map openAppenders = new HashMap<>(); - // When the appenders are closed the file data needed for the Iceberg commit is stored and accessible through - // this map - private final Map closedFileData = new HashMap<>(); + static HiveIcebergRecordWriter removeWriter(TaskAttemptID taskAttemptID) { + return writers.remove(taskAttemptID); + } - HiveIcebergRecordWriter(TaskAttemptID taskAttemptID, Configuration conf, String location, FileFormat fileFormat, - Schema schema, PartitionSpec spec) { - this.io = new HadoopFileIO(conf); - this.location = location; - this.fileFormat = fileFormat; - this.appenderFactory = new GenericAppenderFactory(schema); + HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format, + FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize, + TaskAttemptID taskAttemptID) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.io = io; this.currentKey = new PartitionKey(spec, schema); writers.put(taskAttemptID, this); - LOG.info("IcebergRecordWriter is created in {} with {}", location, fileFormat); } @Override - public void write(Writable row) { - Record record = ((Container) row).get(); - - // Update the current key with the record, so we do not create a new object for every record - currentKey.partition(record); - - AppenderWrapper currentAppender = openAppenders.get(currentKey); - if (currentAppender == null) { - currentAppender = getAppender(); - openAppenders.put(currentKey.copy(), currentAppender); - } + protected PartitionKey partition(Record row) { + currentKey.partition(row); + return currentKey; + } - currentAppender.appender.add(record); + @Override + public void write(Writable row) throws IOException { + super.write(((Container) row).get()); } @Override - public void write(NullWritable key, Container value) { + public void write(NullWritable key, Container value) throws IOException { write(value); } @Override public void close(boolean abort) throws IOException { - // Close the open appenders and store the closed file data - for (PartitionKey key : openAppenders.keySet()) { - AppenderWrapper wrapper = openAppenders.get(key); - wrapper.close(); - closedFileData.put(key, - new ClosedFileData(key, wrapper.location, fileFormat, wrapper.length(), wrapper.metrics())); - } - - openAppenders.clear(); + DataFile[] dataFiles = super.complete(); // If abort then remove the unnecessary files if (abort) { - Tasks.foreach(closedFileData.values().stream().map(ClosedFileData::fileName).iterator()) + Tasks.foreach(dataFiles) .retry(3) .suppressFailureWhenFinished() .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception)) - .run(io::deleteFile); + .run(dataFile -> io.deleteFile(dataFile.path().toString())); } - LOG.info("IcebergRecordWriter is closed. Created {} files", closedFileData.values()); - } - @Override - public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) throws IOException { - close(false); + LOG.info("IcebergRecordWriter is closed with abort={}. Created {} files", abort, dataFiles.length); } @Override public void close(Reporter reporter) throws IOException { close(false); } - - public Set closedFileData() { - return new HashSet<>(closedFileData.values()); - } - - static HiveIcebergRecordWriter removeWriter(TaskAttemptID taskAttemptID) { - return writers.remove(taskAttemptID); - } - - private AppenderWrapper getAppender() { - String dataFileLocation = location + "." + UUID.randomUUID(); - OutputFile dataFile = io.newOutputFile(dataFileLocation); - - FileAppender appender = appenderFactory.newAppender(dataFile, fileFormat); - - LOG.info("New AppenderWrapper is created for {} with {}", dataFileLocation, fileFormat); - return new AppenderWrapper(appender, dataFileLocation); - } - - private static final class AppenderWrapper { - private final FileAppender appender; - private final String location; - - AppenderWrapper(FileAppender appender, String location) { - this.appender = appender; - this.location = location; - } - - public long length() { - return appender.length(); - } - - public Metrics metrics() { - return appender.metrics(); - } - - public void close() throws IOException { - appender.close(); - } - } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 07dc958cf018..abe76424ae90 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -19,6 +19,13 @@ package org.apache.iceberg.mr.hive; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.util.Base64; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -35,10 +42,17 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler { @@ -65,7 +79,7 @@ public HiveMetaHook getMetaHook() { } @Override - public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { + public HiveAuthorizationProvider getAuthorizationProvider() { return null; } @@ -133,4 +147,93 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese predicate.pushedPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc; return predicate; } + + /** + * Returns the Table FileIO serialized to the configuration. + * @param configuration The configuration used to get the data from + * @return The Table FileIO object + */ + public static FileIO io(Configuration configuration) { + return (FileIO) get(configuration, InputFormatConfig.FILE_IO); + } + + /** + * Returns the Table LocationProvider serialized to the configuration. + * @param configuration The configuration used to get the data from + * @return The Table LocationProvider object + */ + public static LocationProvider location(Configuration configuration) { + return (LocationProvider) get(configuration, InputFormatConfig.LOCATION_PROVIDER); + } + + /** + * Returns the Table EncryptionManager serialized to the configuration. + * @param configuration The configuration used to get the data from + * @return The Table EncryptionManager object + */ + public static EncryptionManager encryption(Configuration configuration) { + return (EncryptionManager) get(configuration, InputFormatConfig.ENCRYPTION_MANAGER); + } + + /** + * Returns the Table Schema serialized to the configuration. + * @param configuration The configuration used to get the data from + * @return The Table Schema object + */ + public static Schema schema(Configuration configuration) { + return SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA)); + } + + /** + * Returns the Table PartitionSpec serialized to the configuration. + * @param configuration The configuration used to get the data from + * @return The Table PartitionSpec object + */ + public static PartitionSpec spec(Configuration configuration) { + return PartitionSpecParser.fromJson(schema(configuration), configuration.get(InputFormatConfig.PARTITION_SPEC)); + } + + /** + * Stores the serializable table data in the configuration. + * Currently the following is handled: + *
    + *
  • - Location
  • + *
  • - Schema
  • + *
  • - Partition specification
  • + *
  • - FileIO for handling table files
  • + *
  • - Location provider used for file generation
  • + *
  • - Encryption manager for encryption handling
  • + *
+ * @param configuration The target configuration to store to + * @param table The table which we want to store to the configuration + */ + @VisibleForTesting + static void put(Configuration configuration, Table table) { + configuration.set(InputFormatConfig.TABLE_LOCATION, table.location()); + configuration.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); + configuration.set(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(table.spec())); + + put(configuration, InputFormatConfig.FILE_IO, table.io()); + put(configuration, InputFormatConfig.LOCATION_PROVIDER, table.locationProvider()); + put(configuration, InputFormatConfig.ENCRYPTION_MANAGER, table.encryption()); + } + + private static void put(Configuration configuration, String key, Serializable object) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { + oos.writeObject(object); + configuration.set(key, Base64.getEncoder().encodeToString(baos.toByteArray())); + } catch (IOException ioe) { + throw new RuntimeException(String.format("Error serializing %s to configuration", object), ioe); + } + } + + private static Object get(Configuration configuration, String key) { + byte [] data = Base64.getDecoder().decode(configuration.get(key)); + try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data))) { + return ois.readObject(); + } catch (Exception e) { + throw new RuntimeException(String.format("Error reading %s from configuration", key), e); + } + } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveOutputFileFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveOutputFileFactory.java new file mode 100644 index 000000000000..752e96e9b5c7 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveOutputFileFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFileFactory; + +public class HiveOutputFileFactory extends OutputFileFactory { + private final TaskAttemptID taskAttemptID; + + public HiveOutputFileFactory(PartitionSpec spec, FileFormat fileFormat, LocationProvider locationProvider, FileIO io, + EncryptionManager encryptionManager, TaskAttemptID taskAttemptID) { + super(spec, fileFormat, locationProvider, io, encryptionManager, 0, 0); + this.taskAttemptID = taskAttemptID; + } + + /** + * Override the filename generation so it contains jobId, taskId, taskAttemptId. Kept the UUID and the fileCount so + * the filenames are similar for other writers. + * @return The generated file name + */ + @Override + protected String generateFilename() { + return format().addExtension( + String.format("%05d-%d-%d-%s-%05d", taskAttemptID.getJobID().getId(), taskAttemptID.getTaskID().getId(), + taskAttemptID.getId(), uuid(), nextCount())); + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java b/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java index ed3ef23ac024..bf91ef03d6f9 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java @@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.mapred.TaskAttemptID; import org.apache.hadoop.mapreduce.JobID; import org.apache.iceberg.mr.InputFormatConfig; @@ -31,44 +30,23 @@ class LocationHelper { private LocationHelper() { } - /** - * Generates query directory location based on the configuration. - * Currently it uses tableLocation/queryId - * @param conf The job's configuration - * @return The directory to store the query result files - */ - static String generateQueryLocation(Configuration conf) { - String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION); - String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); - return tableLocation + "/" + queryId; - } - /** * Generates the job temp location based on the job configuration. - * Currently it uses QUERY_LOCATION/jobId. + * Currently it uses QUERY_LOCATION-jobId. * @param conf The job's configuration * @param jobId The JobID for the task * @return The file to store the results */ static String generateJobLocation(Configuration conf, JobID jobId) { - return generateQueryLocation(conf) + "/" + jobId; - } - - /** - * Generates datafile location based on the task configuration. - * Currently it uses QUERY_LOCATION/jobId/taskAttemptId. - * @param conf The job's configuration - * @param taskAttemptId The TaskAttemptID for the task - * @return The file to store the results - */ - static String generateDataFileLocation(Configuration conf, TaskAttemptID taskAttemptId) { - return generateJobLocation(conf, taskAttemptId.getJobID()) + "/" + taskAttemptId.toString(); + String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION); + String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); + return tableLocation + "/temp/" + queryId + "-" + jobId; } /** * Generates file location based on the task configuration and a specific task id. * This file will be used to store the data required to generate the Iceberg commit. - * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks).forCommit. + * Currently it uses QUERY_LOCATION-jobId/task-[0..numTasks).forCommit. * @param conf The job's configuration * @param jobId The jobId for the task * @param taskId The taskId for the commit file @@ -77,16 +55,4 @@ static String generateDataFileLocation(Configuration conf, TaskAttemptID taskAtt static String generateFileForCommitLocation(Configuration conf, JobID jobId, int taskId) { return generateJobLocation(conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION; } - - /** - * Generates file location location based on the task configuration. - * This file will be used to store the data required to generate the Iceberg commit. - * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks).forCommit. - * @param conf The job's configuration - * @param taskAttemptId The TaskAttemptID for the task - * @return The file to store the results - */ - static String generateFileForCommitLocation(Configuration conf, TaskAttemptID taskAttemptId) { - return generateFileForCommitLocation(conf, taskAttemptId.getJobID(), taskAttemptId.getTaskID().getId()); - } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index a8685f7bd45e..91747d8c008a 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -26,12 +26,16 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; import org.apache.commons.compress.utils.Lists; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -226,18 +230,19 @@ public static void validateData(Table table, List expected, int sortBy) /** * Validates the number of files under a {@link Table} generated by a specific queryId and jobId. + * Validates that the commit files are removed. * @param table The table we are checking * @param queryId The queryId which generated the files * @param jobId The jobId which generated the files - * @param dataFileNum The expected number of data files (TABLE_LOCATION/queryId/jobId/* which are not commit files) - * @param commitFileNum The expected number of commit files (TABLE_LOCATION/queryId/jobId/task-***) + * @param dataFileNum The expected number of data files (TABLE_LOCATION/data/*) */ - public static void validateFiles(Table table, String queryId, JobID jobId, int dataFileNum, int commitFileNum) { - String location = table.location() + "/" + queryId + "/" + jobId; - String[] dataFiles = new File(location).list((dir, name) -> !name.startsWith(".") && !name.startsWith("task-")); - Assert.assertEquals(dataFileNum, dataFiles == null ? 0 : dataFiles.length); - - String[] commitFiles = new File(location).list((dir, name) -> name.startsWith("task-")); - Assert.assertEquals(commitFileNum, commitFiles == null ? 0 : commitFiles.length); + public static void validateFiles(Table table, String queryId, JobID jobId, int dataFileNum) throws IOException { + List dataFiles = Files.walk(Paths.get(table.location() + "/data")) + .filter(Files::isRegularFile) + .filter(path -> !path.getFileName().toString().startsWith(".")) + .collect(Collectors.toList()); + Assert.assertEquals(dataFileNum, dataFiles.size()); + + Assert.assertFalse(new File(table.location() + "/" + queryId + "-" + jobId).exists()); } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java index b2738947385d..60c699498e47 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContextImpl; @@ -36,9 +35,13 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.mr.TestHelper; import org.apache.iceberg.mr.mapred.Container; import org.apache.iceberg.types.Types; @@ -50,6 +53,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; public class TestHiveIcebergOutputCommitter { + private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024; private static final int RECORD_NUM = 5; private static final String QUERY_ID = "query_id"; private static final JobID JOB_ID = new JobID("test", 0); @@ -94,10 +98,10 @@ public void testSuccessfulUnpartitionedWrite() throws IOException { HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); Table table = table(temp.getRoot().getPath(), false); JobConf conf = jobConf(table, 1); - List expected = writeRecords(table, 1, 0, true, false, conf); + List expected = writeRecords(1, 0, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 1, 1); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 1); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -106,10 +110,10 @@ public void testSuccessfulPartitionedWrite() throws IOException { HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); Table table = table(temp.getRoot().getPath(), true); JobConf conf = jobConf(table, 1); - List expected = writeRecords(table, 1, 0, true, false, conf); + List expected = writeRecords(1, 0, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 3, 1); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 3); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -118,10 +122,10 @@ public void testSuccessfulMultipleTasksUnpartitionedWrite() throws IOException { HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); Table table = table(temp.getRoot().getPath(), false); JobConf conf = jobConf(table, 2); - List expected = writeRecords(table, 2, 0, true, false, conf); + List expected = writeRecords(2, 0, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 2, 2); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 2); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -130,10 +134,10 @@ public void testSuccessfulMultipleTasksPartitionedWrite() throws IOException { HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); Table table = table(temp.getRoot().getPath(), true); JobConf conf = jobConf(table, 2); - List expected = writeRecords(table, 2, 0, true, false, conf); + List expected = writeRecords(2, 0, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 6, 2); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 6); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -144,20 +148,20 @@ public void testRetryTask() throws IOException { JobConf conf = jobConf(table, 2); // Write records and abort the tasks - writeRecords(table, 2, 0, false, true, conf); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 0, 0); + writeRecords(2, 0, false, true, conf); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 0); HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); // Write records but do not abort the tasks // The data files remain since we can not identify them but should not be read - writeRecords(table, 2, 1, false, false, conf); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 2, 0); + writeRecords(2, 1, false, false, conf); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 2); HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); // Write and commit the records - List expected = writeRecords(table, 2, 2, true, false, conf); + List expected = writeRecords(2, 2, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 4, 2); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 4); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -166,10 +170,10 @@ public void testAbortJob() throws IOException { HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); Table table = table(temp.getRoot().getPath(), false); JobConf conf = jobConf(table, 1); - writeRecords(table, 1, 0, true, false, conf); + writeRecords(1, 0, true, false, conf); committer.abortJob(new JobContextImpl(conf, JOB_ID), JobStatus.State.FAILED); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 0, 0); + HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 0); HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); } @@ -182,16 +186,15 @@ private JobConf jobConf(Table table, int taskNum) { JobConf conf = new JobConf(); conf.setNumMapTasks(taskNum); conf.setNumReduceTasks(0); - conf.set(InputFormatConfig.TABLE_LOCATION, table.location()); conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID); + HiveIcebergStorageHandler.put(conf, table); return conf; } /** * Write random records to the given table using separate {@link HiveIcebergOutputCommitter} and * a separate {@link HiveIcebergRecordWriter} for every task. - * @param table The target table to write to * @param taskNum The number of tasks in the job handled by the committer * @param attemptNum The id used for attempt number generation * @param commitTasks If true the tasks will be committed @@ -201,23 +204,30 @@ private JobConf jobConf(Table table, int taskNum) { * @return The random generated records which were appended to the table * @throws IOException Propagating {@link HiveIcebergRecordWriter} exceptions */ - private List writeRecords(Table table, int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks, + private List writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks, JobConf conf) throws IOException { List expected = new ArrayList<>(RECORD_NUM * taskNum); + FileIO io = HiveIcebergStorageHandler.io(conf); + LocationProvider location = HiveIcebergStorageHandler.location(conf); + EncryptionManager encryption = HiveIcebergStorageHandler.encryption(conf); + Schema schema = HiveIcebergStorageHandler.schema(conf); + PartitionSpec spec = HiveIcebergStorageHandler.spec(conf); + for (int i = 0; i < taskNum; ++i) { - List records = TestHelper.generateRandomRecords(table.schema(), RECORD_NUM, i + attemptNum); + List records = TestHelper.generateRandomRecords(schema, RECORD_NUM, i + attemptNum); TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum); - String location = table.location() + "/" + QUERY_ID + "/" + JOB_ID + "/" + taskId; - HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(taskId, new Configuration(), location, - FileFormat.PARQUET, table.schema(), table.spec()); + OutputFileFactory outputFileFactory = + new HiveOutputFileFactory(spec, FileFormat.PARQUET, location, io, encryption, taskId); + HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, FileFormat.PARQUET, + new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, taskId); Container container = new Container<>(); - records.forEach(record -> { + for (Record record : records) { container.set(record); testWriter.write(container); - }); + } testWriter.close(false); if (commitTasks) { From 7ab921c9e2382b5dae1e5cbafb66b77d11c7ea80 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 7 Dec 2020 15:33:57 +0100 Subject: [PATCH 5/9] Checkstyle --- .../org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index abe76424ae90..0cec4d8456ac 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -30,7 +30,6 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaHook; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; From 55b9223902bb1140a5be63dcc6044f2693fc2e1e Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 7 Dec 2020 20:47:37 +0100 Subject: [PATCH 6/9] Removing crc files --- .../query_id-job_test_0000/.task-0.forCommit.crc | Bin 28 -> 0 bytes .../query_id-job_test_0000/.task-1.forCommit.crc | Bin 28 -> 0 bytes mr/null/query_id-job_test_0000/task-0.forCommit | Bin 2230 -> 0 bytes mr/null/query_id-job_test_0000/task-1.forCommit | Bin 2212 -> 0 bytes 4 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 mr/null/query_id-job_test_0000/.task-0.forCommit.crc delete mode 100644 mr/null/query_id-job_test_0000/.task-1.forCommit.crc delete mode 100644 mr/null/query_id-job_test_0000/task-0.forCommit delete mode 100644 mr/null/query_id-job_test_0000/task-1.forCommit diff --git a/mr/null/query_id-job_test_0000/.task-0.forCommit.crc b/mr/null/query_id-job_test_0000/.task-0.forCommit.crc deleted file mode 100644 index 56a92676b1d6fef4078340b04f251da2898cfef9..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 28 kcmYc;N@ieSU}6Z(uwvaG_e}Qlr}|l0Zmk%^@C0E7Sw9RL6T diff --git a/mr/null/query_id-job_test_0000/.task-1.forCommit.crc b/mr/null/query_id-job_test_0000/.task-1.forCommit.crc deleted file mode 100644 index 3534f9d6ff04a814e37945a9e42cf43bb9169033..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 28 kcmYc;N@ieSU}6Z(uwp%3pngqB^z~Hv2Prm-(ia^70Cju{FaQ7m diff --git a/mr/null/query_id-job_test_0000/task-0.forCommit b/mr/null/query_id-job_test_0000/task-0.forCommit deleted file mode 100644 index c9123cb62318fb6a43528d52c9b7075b33a2b388..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2230 zcmaJ@No*Tc7=Ce_#E_)fpeRCt$PI+5wZ~2z+ew5%Oq&lOS4B#+loh)|26hmFCJ=+5o)I}Xw_O*+y*iD%bYRErcj zv!bNKpeE1qtggikCP$SRX_a-=&TA-TancdX(N!whJgZ7G)EJME7FpLg(>N#nn3+_{ zOe$w4q0i?_65~nI%2sTuW_dBHfH6lh3idIEIaA4MROO_zWM8FPMvCcL1{}ChV4m_N zPP%Kjac*V>zd5a&c?$lYyr2ImApq{X(x#rDH}xEoF~xb^S|JxmGsLybiZhH;1^R$- zMIbI!Ut?xOw>3pbIb-C2MzzyawILx};}vR$W~=Ica(lIs*d$Ke^`e`vR2yX=*LzBd zqQN*hQcsBk%aTyQ8h|R;GRJq&ZZ(;#n~I1I6KhG5Hp_r=&!y8Aw-6jczZ-p3HhpU z0j9Ig7 z%ghrR8VP#>!(rJ&gXy4$1yaMIRLD2vmq83JzZbx`VlxgY_T4AN(vZ|9fmQ3}q-B0` z;f2NH3B!QSMe1nBVH3enuhW#uaE(OlbefrfkO8=>Ofhh#UMPMj-I(1`u1`2%&WIBR zPqYDsuvxSnP}-PGMhXTjOIQ#yUz~e-^^ea!u5=v!lLojQz(%=K|MWhZ$^0Cu@`YMt z^Uh;;n_Y=p6|(D>Qzsg)(y^O1W;F-~IwSJw$+L?y(P(@@w=Bye!ssaDl~nOc3cDCJ zKt@YFyD&tMbPyj$3s9_jrLOGZ0snW?>vxX~{_r}6Xp;Uqh26CswkH;PzS-(~+d-UB z;#`#`MCI2=BCiZoYg(xGZ)D_${fWh<9&oVn6z z9abx4#7%IxPNZTbdH1fQ*o(o>6=(b3FpALKbe5^O^*!PQ+a+HbYYie=$f)FL?^#{ z6vF)ix`*H!_~jWuQ%@nK3TxLVf2PKJH~wv}QaXwSkCvG=7XvTJCuQa32f`T*j^RU~ Hc&zY0m5-+Q diff --git a/mr/null/query_id-job_test_0000/task-1.forCommit b/mr/null/query_id-job_test_0000/task-1.forCommit deleted file mode 100644 index f3f6b338bb2d13ae9c22888b5bc726078acb6361..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2212 zcmaJ@OKcle6n$}=#Ned)m7)p-A~z7K)}A;`Y$p*4aoSL)&Zmu2B3aNp&pbQhjAte@ zZxTDuN(ezEB1EOaf(3#_*+oJ@>H@?DSfQdyNNo5>R23`KD2psw2@u@-{Hc?;9cjih z_y0NfK5uT52FD`LCgX;c2~d;DODv$uEXCmSH05+c)!4`<*>|?OZiTj7gyc;^NS$pF zUk!yw-~ zX2(dYNiD8&)zIfmCdZUG@ydqgIcbe&=_-{Ru4>X0HAmv4Nj7xObk0dPW+oLg zlS-LM=ySX%F_I)+w&GAt<%O66#$3r5*vA-VEJf9+#z|YzzDD(o6gTt?IPjprEagj_ zbX0NYxTOkyW7@EC6#PH;DE|{e0Nl5yEh9H;8CfP{inE5jLavZTh-;Y@rWmIR^a10F zLR^}$#;mB}=!%eX$H^g`>KCczKthhr%hXog(X@l)mP#eDNu2m&p9*!KYTJ#d!VOVImb7_^~5znp41GgOi-B{R*XEsmn>#4 z8JdzeM_)z*qVOGc8&u-j*>uSVB>SClpL<2x>6HbNPw3CBL zd3W=pF7JnTHmb+t&_f}Cn2kTL{rJJLKfuyt*%B6`1H*vpqUc*Yyt%~NVL>J2QvM1| zXN}3JLU2hGZ9#NZj?d^$u9|6b;yyi=7&GBzLt3Sll!j_B%a$VRQh(@D%1jLng|%>K zS$1a?o zKa(&`=v=gh_8e>?80vMJ(ipChsFO}J3lP!+cZDei&NA|akEPqwJIbw57t9%P!{CWF zzz{ZywgXD*lgVh_gk=c}V(RO~7gqoJ^7C@X;g{6G?E*H+o%{FbQ<=b-m{z?ig{~uzsOh#r@QotlEP!DcoiU zb_WL49EbTAYjO?_n;b=&r#RVArIM9^7_X9*Yqu9ZZL^PS<+9=AAFmOqSV4jPD=7YI z=qttD@(+w2bTh3o4Y#~soOt`s1!1AI#HQh7Vaf!`k3eZ&e5}KsqBQ!>CSHHZ`DLw6 zL<<=eJ?%eRt7EP9RR8`rU+mtzx=_iLlleD%DD4~e6*sBxq)(?g{NB@=64{0`9C-)B z1%YDrzTxC+m$B@jJ-ER?0 Date: Mon, 7 Dec 2020 21:38:21 +0100 Subject: [PATCH 7/9] Rebase and some minor changes --- .../mr/hive/HiveIcebergOutputCommitter.java | 39 ++++------ .../mr/hive/HiveIcebergRecordWriter.java | 2 +- .../mr/hive/HiveIcebergStorageHandler.java | 75 +++++++------------ 3 files changed, 41 insertions(+), 75 deletions(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index edb4cab3ca0c..8022014c26c7 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -22,14 +22,12 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; @@ -58,7 +56,6 @@ */ public final class HiveIcebergOutputCommitter extends OutputCommitter { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class); - private FileIO io; @Override public void setupJob(JobContext jobContext) { @@ -91,13 +88,13 @@ public void commitTask(TaskAttemptContext context) throws IOException { DataFile[] closedFiles; if (writer != null) { - closedFiles = writer.complete(); + closedFiles = writer.dataFiles(); } else { closedFiles = new DataFile[0]; } // Creating the file containing the data files generated by this task - createFileForCommit(closedFiles, fileForCommitLocation, io(context.getJobConf())); + createFileForCommit(closedFiles, fileForCommitLocation, HiveIcebergStorageHandler.io(context.getJobConf())); } /** @@ -118,9 +115,10 @@ public void abortTask(TaskAttemptContext context) throws IOException { * Reads the commit files stored in the temp directory and collects the generated committed data files. * Appends the data files to the table. At the end removes the temporary directory. * @param jobContext The job context + * @throws IOException if there is a failure deleting the files */ @Override - public void commitJob(JobContext jobContext) { + public void commitJob(JobContext jobContext) throws IOException { JobConf conf = jobContext.getJobConf(); Table table = Catalogs.loadTable(conf); @@ -128,7 +126,8 @@ public void commitJob(JobContext jobContext) { LOG.info("Committing job has started for table: {}, using location: {}", table, LocationHelper.generateJobLocation(conf, jobContext.getJobID())); - List dataFiles = dataFiles(jobContext, io(jobContext.getJobConf()), true); + FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf()); + List dataFiles = dataFiles(jobContext, io, true); if (dataFiles.size() > 0) { // Appending data files to the table @@ -150,13 +149,15 @@ public void commitJob(JobContext jobContext) { * The cleanup at the end removes the temporary directory as well. * @param jobContext The job context * @param status The status of the job + * @throws IOException if there is a failure deleting the files */ @Override - public void abortJob(JobContext jobContext, int status) { + public void abortJob(JobContext jobContext, int status) throws IOException { String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location); - List dataFiles = dataFiles(jobContext, io(jobContext.getJobConf()), false); + FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf()); + List dataFiles = dataFiles(jobContext, io, false); // Check if we have files already committed and remove data files if there are any if (dataFiles.size() > 0) { @@ -164,7 +165,7 @@ public void abortJob(JobContext jobContext, int status) { .retry(3) .suppressFailureWhenFinished() .onFailure((file, exc) -> LOG.debug("Failed on to remove data file {} on abort job", file.path(), exc)) - .run(file -> io(jobContext.getJobConf()).deleteFile(file.path().toString())); + .run(file -> io.deleteFile(file.path().toString())); } cleanup(jobContext); @@ -173,8 +174,9 @@ public void abortJob(JobContext jobContext, int status) { /** * Cleans up the jobs temporary location. * @param jobContext The job context + * @throws IOException if there is a failure deleting the files */ - private void cleanup(JobContext jobContext) { + private void cleanup(JobContext jobContext) throws IOException { String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); LOG.info("Cleaning for job: {} on location: {}", jobContext.getJobID(), location); @@ -187,12 +189,8 @@ private void cleanup(JobContext jobContext) { .run(file -> { Path toDelete = new Path(file); FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf()); - try { - fs.delete(toDelete, true); - } catch (IOException e) { - throw new UncheckedIOException(String.format("Failed to delete job directory: %s", file), e); - } - }); + fs.delete(toDelete, true); + }, IOException.class); } /** @@ -257,11 +255,4 @@ private static DataFile[] readFileForCommit(String fileForCommitLocation, FileIO throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation); } } - - private FileIO io(Configuration conf) { - if (io == null) { - io = HiveIcebergStorageHandler.io(conf); - } - return io; - } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java index 324c120bd0ba..7d94e214938d 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java @@ -85,7 +85,7 @@ public void write(NullWritable key, Container value) throws IOException { @Override public void close(boolean abort) throws IOException { - DataFile[] dataFiles = super.complete(); + DataFile[] dataFiles = super.dataFiles(); // If abort then remove the unnecessary files if (abort) { diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 0cec4d8456ac..ec4c9a75fb53 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -19,13 +19,6 @@ package org.apache.iceberg.mr.hive; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.util.Base64; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -51,6 +44,7 @@ import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.SerializationUtil; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler { @@ -149,47 +143,47 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese /** * Returns the Table FileIO serialized to the configuration. - * @param configuration The configuration used to get the data from + * @param config The configuration used to get the data from * @return The Table FileIO object */ - public static FileIO io(Configuration configuration) { - return (FileIO) get(configuration, InputFormatConfig.FILE_IO); + public static FileIO io(Configuration config) { + return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.FILE_IO)); } /** * Returns the Table LocationProvider serialized to the configuration. - * @param configuration The configuration used to get the data from + * @param config The configuration used to get the data from * @return The Table LocationProvider object */ - public static LocationProvider location(Configuration configuration) { - return (LocationProvider) get(configuration, InputFormatConfig.LOCATION_PROVIDER); + public static LocationProvider location(Configuration config) { + return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.LOCATION_PROVIDER)); } /** * Returns the Table EncryptionManager serialized to the configuration. - * @param configuration The configuration used to get the data from + * @param config The configuration used to get the data from * @return The Table EncryptionManager object */ - public static EncryptionManager encryption(Configuration configuration) { - return (EncryptionManager) get(configuration, InputFormatConfig.ENCRYPTION_MANAGER); + public static EncryptionManager encryption(Configuration config) { + return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.ENCRYPTION_MANAGER)); } /** * Returns the Table Schema serialized to the configuration. - * @param configuration The configuration used to get the data from + * @param config The configuration used to get the data from * @return The Table Schema object */ - public static Schema schema(Configuration configuration) { - return SchemaParser.fromJson(configuration.get(InputFormatConfig.TABLE_SCHEMA)); + public static Schema schema(Configuration config) { + return SchemaParser.fromJson(config.get(InputFormatConfig.TABLE_SCHEMA)); } /** * Returns the Table PartitionSpec serialized to the configuration. - * @param configuration The configuration used to get the data from + * @param config The configuration used to get the data from * @return The Table PartitionSpec object */ - public static PartitionSpec spec(Configuration configuration) { - return PartitionSpecParser.fromJson(schema(configuration), configuration.get(InputFormatConfig.PARTITION_SPEC)); + public static PartitionSpec spec(Configuration config) { + return PartitionSpecParser.fromJson(schema(config), config.get(InputFormatConfig.PARTITION_SPEC)); } /** @@ -203,36 +197,17 @@ public static PartitionSpec spec(Configuration configuration) { *
  • - Location provider used for file generation
  • *
  • - Encryption manager for encryption handling
  • * - * @param configuration The target configuration to store to + * @param config The target configuration to store to * @param table The table which we want to store to the configuration */ @VisibleForTesting - static void put(Configuration configuration, Table table) { - configuration.set(InputFormatConfig.TABLE_LOCATION, table.location()); - configuration.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); - configuration.set(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(table.spec())); - - put(configuration, InputFormatConfig.FILE_IO, table.io()); - put(configuration, InputFormatConfig.LOCATION_PROVIDER, table.locationProvider()); - put(configuration, InputFormatConfig.ENCRYPTION_MANAGER, table.encryption()); - } - - private static void put(Configuration configuration, String key, Serializable object) { - try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos)) { - oos.writeObject(object); - configuration.set(key, Base64.getEncoder().encodeToString(baos.toByteArray())); - } catch (IOException ioe) { - throw new RuntimeException(String.format("Error serializing %s to configuration", object), ioe); - } - } - - private static Object get(Configuration configuration, String key) { - byte [] data = Base64.getDecoder().decode(configuration.get(key)); - try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(data))) { - return ois.readObject(); - } catch (Exception e) { - throw new RuntimeException(String.format("Error reading %s from configuration", key), e); - } + static void put(Configuration config, Table table) { + config.set(InputFormatConfig.TABLE_LOCATION, table.location()); + config.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); + config.set(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(table.spec())); + + config.set(InputFormatConfig.FILE_IO, SerializationUtil.serializeToBase64(table.io())); + config.set(InputFormatConfig.LOCATION_PROVIDER, SerializationUtil.serializeToBase64(table.locationProvider())); + config.set(InputFormatConfig.ENCRYPTION_MANAGER, SerializationUtil.serializeToBase64(table.encryption())); } } From f1eddc4a3b8e5b883e8c2458ddb24fc36de0bccb Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 8 Dec 2020 08:50:45 +0100 Subject: [PATCH 8/9] Removed HiveOutputFileFactory - added new cosntructor to OutputFileFactory instead Removed LocationHelper - added the methods to HiveIcebergOutputCommitter instead since the methods are not used elsewhere anymore --- .../apache/iceberg/io/OutputFileFactory.java | 49 +++++++++++----- .../mr/hive/HiveIcebergOutputCommitter.java | 43 ++++++++++++-- .../mr/hive/HiveOutputFileFactory.java | 50 ---------------- .../iceberg/mr/hive/LocationHelper.java | 58 ------------------- .../iceberg/mr/hive/HiveIcebergTestUtils.java | 7 ++- .../hive/TestHiveIcebergOutputCommitter.java | 19 +++--- 6 files changed, 86 insertions(+), 140 deletions(-) delete mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/HiveOutputFileFactory.java delete mode 100644 mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index acbb152a2209..f0788361aae8 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -27,6 +27,9 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; +/** + * Factory responsible for generating unique but recognizable data file names. + */ public class OutputFileFactory { private final PartitionSpec spec; private final FileFormat format; @@ -38,11 +41,38 @@ public class OutputFileFactory { // The purpose of this uuid is to be able to know from two paths that they were written by the same operation. // That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all // with a recursive listing and grep. - private final String uuid = UUID.randomUUID().toString(); + private final String operationId; private final AtomicInteger fileCount = new AtomicInteger(0); + /** + * Constructor where a generated UUID is used as the operationId to ensure uniqueness. + * @param spec Partition specification used by the location provider + * @param format File format used for the extension + * @param locations Location provider used for generating locations + * @param io FileIO to store the files + * @param encryptionManager Encryption manager used for encrypting the files + * @param partitionId First part of the file name + * @param taskId Second part of the file name + */ public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io, EncryptionManager encryptionManager, int partitionId, long taskId) { + this(spec, format, locations, io, encryptionManager, partitionId, taskId, UUID.randomUUID().toString()); + } + + /** + * Constructor with specific operationId. The [partitionId, taskId, operationId] triplet has to be unique across JVM + * instances otherwise the same file name could be generated by different instances of the OutputFileFactory. + * @param spec Partition specification used by the location provider + * @param format File format used for the extension + * @param locations Location provider used for generating locations + * @param io FileIO to store the files + * @param encryptionManager Encryption manager used for encrypting the files + * @param partitionId First part of the file name + * @param taskId Second part of the file name + * @param operationId Third part of the file name + */ + public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io, + EncryptionManager encryptionManager, int partitionId, long taskId, String operationId) { this.spec = spec; this.format = format; this.locations = locations; @@ -50,11 +80,12 @@ public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider this.encryptionManager = encryptionManager; this.partitionId = partitionId; this.taskId = taskId; + this.operationId = operationId; } - protected String generateFilename() { + private String generateFilename() { return format.addExtension( - String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount.incrementAndGet())); + String.format("%05d-%d-%s-%05d", partitionId, taskId, operationId, fileCount.incrementAndGet())); } /** @@ -73,16 +104,4 @@ public EncryptedOutputFile newOutputFile(PartitionKey key) { OutputFile rawOutputFile = io.newOutputFile(newDataLocation); return encryptionManager.encrypt(rawOutputFile); } - - protected FileFormat format() { - return format; - } - - protected String uuid() { - return uuid; - } - - protected int nextCount() { - return fileCount.incrementAndGet(); - } } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 8022014c26c7..7c34f5ad8feb 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -28,13 +28,16 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobContext; import org.apache.hadoop.mapred.OutputCommitter; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; @@ -45,6 +48,7 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; @@ -55,6 +59,8 @@ * Currently independent of the Hive ACID transactions. */ public final class HiveIcebergOutputCommitter extends OutputCommitter { + private static final String FOR_COMMIT_EXTENSION = ".forCommit"; + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class); @Override @@ -82,7 +88,7 @@ public boolean needsTaskCommit(TaskAttemptContext context) { @Override public void commitTask(TaskAttemptContext context) throws IOException { TaskAttemptID attemptID = context.getTaskAttemptID(); - String fileForCommitLocation = LocationHelper.generateFileForCommitLocation(context.getJobConf(), + String fileForCommitLocation = generateFileForCommitLocation(context.getJobConf(), attemptID.getJobID(), attemptID.getTaskID().getId()); HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID); @@ -124,7 +130,7 @@ public void commitJob(JobContext jobContext) throws IOException { long startTime = System.currentTimeMillis(); LOG.info("Committing job has started for table: {}, using location: {}", table, - LocationHelper.generateJobLocation(conf, jobContext.getJobID())); + generateJobLocation(conf, jobContext.getJobID())); FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf()); List dataFiles = dataFiles(jobContext, io, true); @@ -153,7 +159,7 @@ public void commitJob(JobContext jobContext) throws IOException { */ @Override public void abortJob(JobContext jobContext, int status) throws IOException { - String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); + String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location); FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf()); @@ -177,7 +183,7 @@ public void abortJob(JobContext jobContext, int status) throws IOException { * @throws IOException if there is a failure deleting the files */ private void cleanup(JobContext jobContext) throws IOException { - String location = LocationHelper.generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); + String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); LOG.info("Cleaning for job: {} on location: {}", jobContext.getJobID(), location); // Remove the job's temp directory recursively. @@ -226,7 +232,7 @@ private static List dataFiles(JobContext jobContext, FileIO io, boolea .executeWith(executor) .retry(3) .run(taskId -> { - String taskFileName = LocationHelper.generateFileForCommitLocation(conf, jobContext.getJobID(), taskId); + String taskFileName = generateFileForCommitLocation(conf, jobContext.getJobID(), taskId); dataFiles.addAll(Arrays.asList(readFileForCommit(taskFileName, io))); }); @@ -238,6 +244,33 @@ private static List dataFiles(JobContext jobContext, FileIO io, boolea } } + /** + * Generates the job temp location based on the job configuration. + * Currently it uses QUERY_LOCATION-jobId. + * @param conf The job's configuration + * @param jobId The JobID for the task + * @return The file to store the results + */ + @VisibleForTesting + static String generateJobLocation(Configuration conf, JobID jobId) { + String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION); + String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); + return tableLocation + "/temp/" + queryId + "-" + jobId; + } + + /** + * Generates file location based on the task configuration and a specific task id. + * This file will be used to store the data required to generate the Iceberg commit. + * Currently it uses QUERY_LOCATION-jobId/task-[0..numTasks).forCommit. + * @param conf The job's configuration + * @param jobId The jobId for the task + * @param taskId The taskId for the commit file + * @return The file to store the results + */ + private static String generateFileForCommitLocation(Configuration conf, JobID jobId, int taskId) { + return generateJobLocation(conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION; + } + private static void createFileForCommit(DataFile[] closedFiles, String location, FileIO io) throws IOException { diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveOutputFileFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveOutputFileFactory.java deleted file mode 100644 index 752e96e9b5c7..000000000000 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveOutputFileFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.encryption.EncryptionManager; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.LocationProvider; -import org.apache.iceberg.io.OutputFileFactory; - -public class HiveOutputFileFactory extends OutputFileFactory { - private final TaskAttemptID taskAttemptID; - - public HiveOutputFileFactory(PartitionSpec spec, FileFormat fileFormat, LocationProvider locationProvider, FileIO io, - EncryptionManager encryptionManager, TaskAttemptID taskAttemptID) { - super(spec, fileFormat, locationProvider, io, encryptionManager, 0, 0); - this.taskAttemptID = taskAttemptID; - } - - /** - * Override the filename generation so it contains jobId, taskId, taskAttemptId. Kept the UUID and the fileCount so - * the filenames are similar for other writers. - * @return The generated file name - */ - @Override - protected String generateFilename() { - return format().addExtension( - String.format("%05d-%d-%d-%s-%05d", taskAttemptID.getJobID().getId(), taskAttemptID.getTaskID().getId(), - taskAttemptID.getId(), uuid(), nextCount())); - } -} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java b/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java deleted file mode 100644 index bf91ef03d6f9..000000000000 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/LocationHelper.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iceberg.mr.hive; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.mapreduce.JobID; -import org.apache.iceberg.mr.InputFormatConfig; - -class LocationHelper { - private static final String FOR_COMMIT_EXTENSION = ".forCommit"; - - private LocationHelper() { - } - - /** - * Generates the job temp location based on the job configuration. - * Currently it uses QUERY_LOCATION-jobId. - * @param conf The job's configuration - * @param jobId The JobID for the task - * @return The file to store the results - */ - static String generateJobLocation(Configuration conf, JobID jobId) { - String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION); - String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); - return tableLocation + "/temp/" + queryId + "-" + jobId; - } - - /** - * Generates file location based on the task configuration and a specific task id. - * This file will be used to store the data required to generate the Iceberg commit. - * Currently it uses QUERY_LOCATION-jobId/task-[0..numTasks).forCommit. - * @param conf The job's configuration - * @param jobId The jobId for the task - * @param taskId The taskId for the commit file - * @return The file to store the results - */ - static String generateFileForCommitLocation(Configuration conf, JobID jobId, int taskId) { - return generateJobLocation(conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION; - } -} diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index 91747d8c008a..37ef0760ddb9 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -51,6 +51,7 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobID; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -232,17 +233,17 @@ public static void validateData(Table table, List expected, int sortBy) * Validates the number of files under a {@link Table} generated by a specific queryId and jobId. * Validates that the commit files are removed. * @param table The table we are checking - * @param queryId The queryId which generated the files + * @param conf The configuration used for generating the job location * @param jobId The jobId which generated the files * @param dataFileNum The expected number of data files (TABLE_LOCATION/data/*) */ - public static void validateFiles(Table table, String queryId, JobID jobId, int dataFileNum) throws IOException { + public static void validateFiles(Table table, Configuration conf, JobID jobId, int dataFileNum) throws IOException { List dataFiles = Files.walk(Paths.get(table.location() + "/data")) .filter(Files::isRegularFile) .filter(path -> !path.getFileName().toString().startsWith(".")) .collect(Collectors.toList()); Assert.assertEquals(dataFileNum, dataFiles.size()); - Assert.assertFalse(new File(table.location() + "/" + queryId + "-" + jobId).exists()); + Assert.assertFalse(new File(HiveIcebergOutputCommitter.generateJobLocation(conf, jobId)).exists()); } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java index 60c699498e47..68b2a00f6598 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -101,7 +101,7 @@ public void testSuccessfulUnpartitionedWrite() throws IOException { List expected = writeRecords(1, 0, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 1); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 1); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -113,7 +113,7 @@ public void testSuccessfulPartitionedWrite() throws IOException { List expected = writeRecords(1, 0, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 3); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 3); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -125,7 +125,7 @@ public void testSuccessfulMultipleTasksUnpartitionedWrite() throws IOException { List expected = writeRecords(2, 0, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 2); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -137,7 +137,7 @@ public void testSuccessfulMultipleTasksPartitionedWrite() throws IOException { List expected = writeRecords(2, 0, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 6); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 6); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -149,19 +149,19 @@ public void testRetryTask() throws IOException { // Write records and abort the tasks writeRecords(2, 0, false, true, conf); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 0); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 0); HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); // Write records but do not abort the tasks // The data files remain since we can not identify them but should not be read writeRecords(2, 1, false, false, conf); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 2); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2); HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); // Write and commit the records List expected = writeRecords(2, 2, true, false, conf); committer.commitJob(new JobContextImpl(conf, JOB_ID)); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 4); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4); HiveIcebergTestUtils.validateData(table, expected, 0); } @@ -173,7 +173,7 @@ public void testAbortJob() throws IOException { writeRecords(1, 0, true, false, conf); committer.abortJob(new JobContextImpl(conf, JOB_ID), JobStatus.State.FAILED); - HiveIcebergTestUtils.validateFiles(table, QUERY_ID, JOB_ID, 0); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 0); HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); } @@ -218,7 +218,8 @@ private List writeRecords(int taskNum, int attemptNum, boolean commitTas List records = TestHelper.generateRandomRecords(schema, RECORD_NUM, i + attemptNum); TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum); OutputFileFactory outputFileFactory = - new HiveOutputFileFactory(spec, FileFormat.PARQUET, location, io, encryption, taskId); + new OutputFileFactory(spec, FileFormat.PARQUET, location, io, encryption, taskId.getTaskID().getId(), + attemptNum, QUERY_ID + "-" + JOB_ID); HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, FileFormat.PARQUET, new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, taskId); From 1741c3793842496b07e32db505270f5dc8b451fa Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 9 Dec 2020 08:47:22 +0100 Subject: [PATCH 9/9] Rebased --- .../iceberg/mr/hive/HiveIcebergTestUtils.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index 37ef0760ddb9..e109984e4dd7 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -23,20 +23,21 @@ import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; -import java.sql.Timestamp; -import java.time.LocalDate; -import java.time.LocalDateTime; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.commons.compress.utils.Lists; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -51,7 +52,6 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobID; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -214,7 +214,7 @@ public static void assertEquals(Record expected, Record actual) { public static void validateData(Table table, List expected, int sortBy) throws IOException { // Refresh the table, so we get the new data as well table.refresh(); - List records = Lists.newArrayList(); + List records = new ArrayList<>(expected.size()); try (CloseableIterable iterable = IcebergGenerics.read(table).build()) { iterable.forEach(records::add); } @@ -242,8 +242,8 @@ public static void validateFiles(Table table, Configuration conf, JobID jobId, i .filter(Files::isRegularFile) .filter(path -> !path.getFileName().toString().startsWith(".")) .collect(Collectors.toList()); - Assert.assertEquals(dataFileNum, dataFiles.size()); + Assert.assertEquals(dataFileNum, dataFiles.size()); Assert.assertFalse(new File(HiveIcebergOutputCommitter.generateJobLocation(conf, jobId)).exists()); } }