diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index e68f4c5ad2ec..f0788361aae8 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -27,6 +27,9 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; +/** + * Factory responsible for generating unique but recognizable data file names. + */ public class OutputFileFactory { private final PartitionSpec spec; private final FileFormat format; @@ -38,11 +41,38 @@ public class OutputFileFactory { // The purpose of this uuid is to be able to know from two paths that they were written by the same operation. // That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all // with a recursive listing and grep. - private final String uuid = UUID.randomUUID().toString(); + private final String operationId; private final AtomicInteger fileCount = new AtomicInteger(0); + /** + * Constructor where a generated UUID is used as the operationId to ensure uniqueness. + * @param spec Partition specification used by the location provider + * @param format File format used for the extension + * @param locations Location provider used for generating locations + * @param io FileIO to store the files + * @param encryptionManager Encryption manager used for encrypting the files + * @param partitionId First part of the file name + * @param taskId Second part of the file name + */ public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io, EncryptionManager encryptionManager, int partitionId, long taskId) { + this(spec, format, locations, io, encryptionManager, partitionId, taskId, UUID.randomUUID().toString()); + } + + /** + * Constructor with specific operationId. The [partitionId, taskId, operationId] triplet has to be unique across JVM + * instances otherwise the same file name could be generated by different instances of the OutputFileFactory. + * @param spec Partition specification used by the location provider + * @param format File format used for the extension + * @param locations Location provider used for generating locations + * @param io FileIO to store the files + * @param encryptionManager Encryption manager used for encrypting the files + * @param partitionId First part of the file name + * @param taskId Second part of the file name + * @param operationId Third part of the file name + */ + public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io, + EncryptionManager encryptionManager, int partitionId, long taskId, String operationId) { this.spec = spec; this.format = format; this.locations = locations; @@ -50,11 +80,12 @@ public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider this.encryptionManager = encryptionManager; this.partitionId = partitionId; this.taskId = taskId; + this.operationId = operationId; } private String generateFilename() { return format.addExtension( - String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount.incrementAndGet())); + String.format("%05d-%d-%s-%05d", partitionId, taskId, operationId, fileCount.incrementAndGet())); } /** diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index 576c2ed62463..40e3106f7dad 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -51,6 +51,12 @@ private InputFormatConfig() { public static final String CATALOG_LOADER_CLASS = "iceberg.mr.catalog.loader.class"; public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns"; public static final String EXTERNAL_TABLE_PURGE = "external.table.purge"; + public static final String FILE_IO = "iceberg.mr.file.io"; + public static final String LOCATION_PROVIDER = "iceberg.mr.location.provider"; + public static final String ENCRYPTION_MANAGER = "iceberg.mr.encription.manager"; + + public static final String COMMIT_THREAD_POOL_SIZE = "iceberg.mr.commit.thread.pool.size"; + public static final int COMMIT_THREAD_POOL_SIZE_DEFAULT = 10; public static final String CATALOG_NAME = "iceberg.catalog"; public static final String HADOOP_CATALOG = "hadoop.catalog"; diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java new file mode 100644 index 000000000000..7c34f5ad8feb --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.OutputCommitter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.mr.Catalogs; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An Iceberg table committer for adding data files to the Iceberg tables. + * Currently independent of the Hive ACID transactions. + */ +public final class HiveIcebergOutputCommitter extends OutputCommitter { + private static final String FOR_COMMIT_EXTENSION = ".forCommit"; + + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class); + + @Override + public void setupJob(JobContext jobContext) { + // do nothing. + } + + @Override + public void setupTask(TaskAttemptContext taskAttemptContext) { + // do nothing. + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) { + // We need to commit if this is the last phase of a MapReduce process + return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) || + context.getJobConf().getNumReduceTasks() == 0; + } + + /** + * Collects the generated data files and creates a commit file storing the data file list. + * @param context The job context + * @throws IOException Thrown if there is an error writing the commit file + */ + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + TaskAttemptID attemptID = context.getTaskAttemptID(); + String fileForCommitLocation = generateFileForCommitLocation(context.getJobConf(), + attemptID.getJobID(), attemptID.getTaskID().getId()); + HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID); + + DataFile[] closedFiles; + if (writer != null) { + closedFiles = writer.dataFiles(); + } else { + closedFiles = new DataFile[0]; + } + + // Creating the file containing the data files generated by this task + createFileForCommit(closedFiles, fileForCommitLocation, HiveIcebergStorageHandler.io(context.getJobConf())); + } + + /** + * Removes files generated by this task. + * @param context The task context + * @throws IOException Thrown if there is an error closing the writer + */ + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + // Clean up writer data from the local store + HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID()); + + // Remove files if it was not done already + writer.close(true); + } + + /** + * Reads the commit files stored in the temp directory and collects the generated committed data files. + * Appends the data files to the table. At the end removes the temporary directory. + * @param jobContext The job context + * @throws IOException if there is a failure deleting the files + */ + @Override + public void commitJob(JobContext jobContext) throws IOException { + JobConf conf = jobContext.getJobConf(); + Table table = Catalogs.loadTable(conf); + + long startTime = System.currentTimeMillis(); + LOG.info("Committing job has started for table: {}, using location: {}", table, + generateJobLocation(conf, jobContext.getJobID())); + + FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf()); + List dataFiles = dataFiles(jobContext, io, true); + + if (dataFiles.size() > 0) { + // Appending data files to the table + AppendFiles append = table.newAppend(); + dataFiles.forEach(append::appendFile); + append.commit(); + LOG.info("Commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, table, + dataFiles.size()); + LOG.debug("Added files {}", dataFiles); + } else { + LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table); + } + + cleanup(jobContext); + } + + /** + * Removes the generated data files, if there is a commit file already generated for them. + * The cleanup at the end removes the temporary directory as well. + * @param jobContext The job context + * @param status The status of the job + * @throws IOException if there is a failure deleting the files + */ + @Override + public void abortJob(JobContext jobContext, int status) throws IOException { + String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); + LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location); + + FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf()); + List dataFiles = dataFiles(jobContext, io, false); + + // Check if we have files already committed and remove data files if there are any + if (dataFiles.size() > 0) { + Tasks.foreach(dataFiles) + .retry(3) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.debug("Failed on to remove data file {} on abort job", file.path(), exc)) + .run(file -> io.deleteFile(file.path().toString())); + } + + cleanup(jobContext); + } + + /** + * Cleans up the jobs temporary location. + * @param jobContext The job context + * @throws IOException if there is a failure deleting the files + */ + private void cleanup(JobContext jobContext) throws IOException { + String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID()); + LOG.info("Cleaning for job: {} on location: {}", jobContext.getJobID(), location); + + // Remove the job's temp directory recursively. + // Intentionally used foreach on a single item. Using the Tasks API here only for the retry capability. + Tasks.foreach(location) + .retry(3) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on cleanup job", file, exc)) + .run(file -> { + Path toDelete = new Path(file); + FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf()); + fs.delete(toDelete, true); + }, IOException.class); + } + + /** + * Get the data committed data files for this job. + * @param jobContext The job context + * @param io The FileIO used for reading a files generated for commit + * @param throwOnFailure If true then it throws an exception on failure + * @return The list of the committed data files + */ + private static List dataFiles(JobContext jobContext, FileIO io, boolean throwOnFailure) { + JobConf conf = jobContext.getJobConf(); + // If there are reducers, then every reducer will generate a result file. + // If this is a map only task, then every mapper will generate a result file. + int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks(); + + ExecutorService executor = null; + try { + // Creating executor service for parallel handling of file reads + executor = Executors.newFixedThreadPool( + conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT), + new ThreadFactoryBuilder() + .setDaemon(true) + .setPriority(Thread.NORM_PRIORITY) + .setNameFormat("iceberg-commit-pool-%d") + .build()); + + List dataFiles = Collections.synchronizedList(new ArrayList<>()); + + // Reading the committed files. The assumption here is that the taskIds are generated in sequential order + // starting from 0. + Tasks.range(expectedFiles) + .throwFailureWhenFinished(throwOnFailure) + .executeWith(executor) + .retry(3) + .run(taskId -> { + String taskFileName = generateFileForCommitLocation(conf, jobContext.getJobID(), taskId); + dataFiles.addAll(Arrays.asList(readFileForCommit(taskFileName, io))); + }); + + return dataFiles; + } finally { + if (executor != null) { + executor.shutdown(); + } + } + } + + /** + * Generates the job temp location based on the job configuration. + * Currently it uses QUERY_LOCATION-jobId. + * @param conf The job's configuration + * @param jobId The JobID for the task + * @return The file to store the results + */ + @VisibleForTesting + static String generateJobLocation(Configuration conf, JobID jobId) { + String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION); + String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname); + return tableLocation + "/temp/" + queryId + "-" + jobId; + } + + /** + * Generates file location based on the task configuration and a specific task id. + * This file will be used to store the data required to generate the Iceberg commit. + * Currently it uses QUERY_LOCATION-jobId/task-[0..numTasks).forCommit. + * @param conf The job's configuration + * @param jobId The jobId for the task + * @param taskId The taskId for the commit file + * @return The file to store the results + */ + private static String generateFileForCommitLocation(Configuration conf, JobID jobId, int taskId) { + return generateJobLocation(conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION; + } + + private static void createFileForCommit(DataFile[] closedFiles, String location, FileIO io) + throws IOException { + + OutputFile fileForCommit = io.newOutputFile(location); + try (ObjectOutputStream oos = new ObjectOutputStream(fileForCommit.createOrOverwrite())) { + oos.writeObject(closedFiles); + } + LOG.debug("Iceberg committed file is created {}", fileForCommit); + } + + private static DataFile[] readFileForCommit(String fileForCommitLocation, FileIO io) { + try (ObjectInputStream ois = new ObjectInputStream(io.newInputFile(fileForCommitLocation).newStream())) { + return (DataFile[]) ois.readObject(); + } catch (ClassNotFoundException | IOException e) { + throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation); + } + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java new file mode 100644 index 000000000000..7d94e214938d --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergRecordWriter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedFanoutWriter; +import org.apache.iceberg.mr.mapred.Container; +import org.apache.iceberg.util.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class HiveIcebergRecordWriter extends PartitionedFanoutWriter + implements FileSinkOperator.RecordWriter, org.apache.hadoop.mapred.RecordWriter> { + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergRecordWriter.class); + + // The current key is reused at every write to avoid unnecessary object creation + private final PartitionKey currentKey; + private final FileIO io; + + // map to store the active writers + // Stored in concurrent map, since some executor engines can share containers + private static final Map writers = new ConcurrentHashMap<>(); + + static HiveIcebergRecordWriter removeWriter(TaskAttemptID taskAttemptID) { + return writers.remove(taskAttemptID); + } + + HiveIcebergRecordWriter(Schema schema, PartitionSpec spec, FileFormat format, + FileAppenderFactory appenderFactory, OutputFileFactory fileFactory, FileIO io, long targetFileSize, + TaskAttemptID taskAttemptID) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.io = io; + this.currentKey = new PartitionKey(spec, schema); + writers.put(taskAttemptID, this); + } + + @Override + protected PartitionKey partition(Record row) { + currentKey.partition(row); + return currentKey; + } + + @Override + public void write(Writable row) throws IOException { + super.write(((Container) row).get()); + } + + @Override + public void write(NullWritable key, Container value) throws IOException { + write(value); + } + + @Override + public void close(boolean abort) throws IOException { + DataFile[] dataFiles = super.dataFiles(); + + // If abort then remove the unnecessary files + if (abort) { + Tasks.foreach(dataFiles) + .retry(3) + .suppressFailureWhenFinished() + .onFailure((file, exception) -> LOG.debug("Failed on to remove file {} on abort", file, exception)) + .run(dataFile -> io.deleteFile(dataFile.path().toString())); + } + + LOG.info("IcebergRecordWriter is closed with abort={}. Created {} files", abort, dataFiles.length); + } + + @Override + public void close(Reporter reporter) throws IOException { + close(false); + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 07dc958cf018..ec4c9a75fb53 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -23,7 +23,6 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaHook; -import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -35,10 +34,18 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.SerializationUtil; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler { @@ -65,7 +72,7 @@ public HiveMetaHook getMetaHook() { } @Override - public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException { + public HiveAuthorizationProvider getAuthorizationProvider() { return null; } @@ -133,4 +140,74 @@ public DecomposedPredicate decomposePredicate(JobConf jobConf, Deserializer dese predicate.pushedPredicate = (ExprNodeGenericFuncDesc) exprNodeDesc; return predicate; } + + /** + * Returns the Table FileIO serialized to the configuration. + * @param config The configuration used to get the data from + * @return The Table FileIO object + */ + public static FileIO io(Configuration config) { + return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.FILE_IO)); + } + + /** + * Returns the Table LocationProvider serialized to the configuration. + * @param config The configuration used to get the data from + * @return The Table LocationProvider object + */ + public static LocationProvider location(Configuration config) { + return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.LOCATION_PROVIDER)); + } + + /** + * Returns the Table EncryptionManager serialized to the configuration. + * @param config The configuration used to get the data from + * @return The Table EncryptionManager object + */ + public static EncryptionManager encryption(Configuration config) { + return SerializationUtil.deserializeFromBase64(config.get(InputFormatConfig.ENCRYPTION_MANAGER)); + } + + /** + * Returns the Table Schema serialized to the configuration. + * @param config The configuration used to get the data from + * @return The Table Schema object + */ + public static Schema schema(Configuration config) { + return SchemaParser.fromJson(config.get(InputFormatConfig.TABLE_SCHEMA)); + } + + /** + * Returns the Table PartitionSpec serialized to the configuration. + * @param config The configuration used to get the data from + * @return The Table PartitionSpec object + */ + public static PartitionSpec spec(Configuration config) { + return PartitionSpecParser.fromJson(schema(config), config.get(InputFormatConfig.PARTITION_SPEC)); + } + + /** + * Stores the serializable table data in the configuration. + * Currently the following is handled: + *
    + *
  • - Location
  • + *
  • - Schema
  • + *
  • - Partition specification
  • + *
  • - FileIO for handling table files
  • + *
  • - Location provider used for file generation
  • + *
  • - Encryption manager for encryption handling
  • + *
+ * @param config The target configuration to store to + * @param table The table which we want to store to the configuration + */ + @VisibleForTesting + static void put(Configuration config, Table table) { + config.set(InputFormatConfig.TABLE_LOCATION, table.location()); + config.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema())); + config.set(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(table.spec())); + + config.set(InputFormatConfig.FILE_IO, SerializationUtil.serializeToBase64(table.io())); + config.set(InputFormatConfig.LOCATION_PROVIDER, SerializationUtil.serializeToBase64(table.locationProvider())); + config.set(InputFormatConfig.ENCRYPTION_MANAGER, SerializationUtil.serializeToBase64(table.encryption())); + } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java index b04ef10d4e49..e109984e4dd7 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java @@ -19,16 +19,25 @@ package org.apache.iceberg.mr.hive; +import java.io.File; +import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Comparator; import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -43,9 +52,13 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobID; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergBinaryObjectInspector; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergDecimalObjectInspector; import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector; @@ -100,6 +113,11 @@ private HiveIcebergTestUtils() { // Empty constructor for the utility class } + /** + * Generates a test record where every field has a value. + * @param uuidAsByte As per #1881 Parquet needs byte[] value for UUID, other file formats need UUID object + * @return Record with every field set + */ public static Record getTestRecord(boolean uuidAsByte) { Record record = GenericRecord.create(HiveIcebergTestUtils.FULL_SCHEMA); record.set(0, true); @@ -127,6 +145,10 @@ public static Record getTestRecord(boolean uuidAsByte) { return record; } + /** + * Record with every field set to null. + * @return Empty record + */ public static Record getNullTestRecord() { Record record = GenericRecord.create(HiveIcebergTestUtils.FULL_SCHEMA); @@ -137,13 +159,12 @@ public static Record getNullTestRecord() { return record; } + /** + * Hive values for the test record. + * @param record The original Iceberg record + * @return The Hive 'record' containing the same values + */ public static List valuesForTestRecord(Record record) { -// ByteBuffer byteBuffer = record.get(11, ByteBuffer.class); -// byte[] bytes = new byte[byteBuffer.remaining()]; -// byteBuffer.mark(); -// byteBuffer.get(bytes); -// byteBuffer.reset(); - return Arrays.asList( new BooleanWritable(Boolean.TRUE), new IntWritable(record.get(1, Integer.class)), @@ -163,19 +184,66 @@ public static List valuesForTestRecord(Record record) { ); } + /** + * Check if 2 Iceberg records are the same or not. Compares OffsetDateTimes only by the Intant they represent. + * @param expected The expected record + * @param actual The actual record + */ public static void assertEquals(Record expected, Record actual) { for (int i = 0; i < expected.size(); ++i) { if (expected.get(i) instanceof OffsetDateTime) { // For OffsetDateTime we just compare the actual instant Assert.assertEquals(((OffsetDateTime) expected.get(i)).toInstant(), ((OffsetDateTime) actual.get(i)).toInstant()); + } else if (expected.get(i) instanceof byte[]) { + Assert.assertArrayEquals((byte[]) expected.get(i), (byte[]) actual.get(i)); } else { - if (expected.get(i) instanceof byte[]) { - Assert.assertArrayEquals((byte[]) expected.get(i), (byte[]) actual.get(i)); - } else { - Assert.assertEquals(expected.get(i), actual.get(i)); - } + Assert.assertEquals(expected.get(i), actual.get(i)); } } } + + /** + * Validates whether the table contains the expected records. The results should be sorted by a unique key so we do + * not end up with flaky tests. + * @param table The table we should read the records from + * @param expected The expected list of Records (The list will be sorted) + * @param sortBy The column position by which we will sort + * @throws IOException Exceptions when reading the table data + */ + public static void validateData(Table table, List expected, int sortBy) throws IOException { + // Refresh the table, so we get the new data as well + table.refresh(); + List records = new ArrayList<>(expected.size()); + try (CloseableIterable iterable = IcebergGenerics.read(table).build()) { + iterable.forEach(records::add); + } + + // Sort based on the specified column + expected.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy))); + records.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy))); + + Assert.assertEquals(expected.size(), records.size()); + for (int i = 0; i < expected.size(); ++i) { + assertEquals(expected.get(i), records.get(i)); + } + } + + /** + * Validates the number of files under a {@link Table} generated by a specific queryId and jobId. + * Validates that the commit files are removed. + * @param table The table we are checking + * @param conf The configuration used for generating the job location + * @param jobId The jobId which generated the files + * @param dataFileNum The expected number of data files (TABLE_LOCATION/data/*) + */ + public static void validateFiles(Table table, Configuration conf, JobID jobId, int dataFileNum) throws IOException { + List dataFiles = Files.walk(Paths.get(table.location() + "/data")) + .filter(Files::isRegularFile) + .filter(path -> !path.getFileName().toString().startsWith(".")) + .collect(Collectors.toList()); + + Assert.assertEquals(dataFileNum, dataFiles.size()); + Assert.assertFalse(new File(HiveIcebergOutputCommitter.generateJobLocation(conf, jobId)).exists()); + } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java new file mode 100644 index 000000000000..68b2a00f6598 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContextImpl; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.TaskAttemptContextImpl; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.LocationProvider; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.mr.TestHelper; +import org.apache.iceberg.mr.mapred.Container; +import org.apache.iceberg.types.Types; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.required; + +public class TestHiveIcebergOutputCommitter { + private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024; + private static final int RECORD_NUM = 5; + private static final String QUERY_ID = "query_id"; + private static final JobID JOB_ID = new JobID("test", 0); + private static final TaskAttemptID MAP_TASK_ID = + new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, 0, 0); + private static final TaskAttemptID REDUCE_TASK_ID = + new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.REDUCE, 0, 0); + + private static final Schema CUSTOMER_SCHEMA = new Schema( + required(1, "customer_id", Types.LongType.get()), + required(2, "first_name", Types.StringType.get()) + ); + + private static final PartitionSpec PARTITIONED_SPEC = + PartitionSpec.builderFor(CUSTOMER_SCHEMA).bucket("customer_id", 3).build(); + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testNeedsTaskCommit() { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + + JobConf mapOnlyJobConf = new JobConf(); + mapOnlyJobConf.setNumMapTasks(10); + mapOnlyJobConf.setNumReduceTasks(0); + + // Map only job should commit map tasks + Assert.assertTrue(committer.needsTaskCommit(new TaskAttemptContextImpl(mapOnlyJobConf, MAP_TASK_ID))); + + JobConf mapReduceJobConf = new JobConf(); + mapReduceJobConf.setNumMapTasks(10); + mapReduceJobConf.setNumReduceTasks(10); + + // MapReduce job should not commit map tasks, but should commit reduce tasks + Assert.assertFalse(committer.needsTaskCommit(new TaskAttemptContextImpl(mapReduceJobConf, MAP_TASK_ID))); + Assert.assertTrue(committer.needsTaskCommit(new TaskAttemptContextImpl(mapReduceJobConf, REDUCE_TASK_ID))); + } + + @Test + public void testSuccessfulUnpartitionedWrite() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), false); + JobConf conf = jobConf(table, 1); + List expected = writeRecords(1, 0, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 1); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testSuccessfulPartitionedWrite() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), true); + JobConf conf = jobConf(table, 1); + List expected = writeRecords(1, 0, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 3); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testSuccessfulMultipleTasksUnpartitionedWrite() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), false); + JobConf conf = jobConf(table, 2); + List expected = writeRecords(2, 0, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testSuccessfulMultipleTasksPartitionedWrite() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), true); + JobConf conf = jobConf(table, 2); + List expected = writeRecords(2, 0, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 6); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testRetryTask() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), false); + JobConf conf = jobConf(table, 2); + + // Write records and abort the tasks + writeRecords(2, 0, false, true, conf); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 0); + HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); + + // Write records but do not abort the tasks + // The data files remain since we can not identify them but should not be read + writeRecords(2, 1, false, false, conf); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2); + HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); + + // Write and commit the records + List expected = writeRecords(2, 2, true, false, conf); + committer.commitJob(new JobContextImpl(conf, JOB_ID)); + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4); + HiveIcebergTestUtils.validateData(table, expected, 0); + } + + @Test + public void testAbortJob() throws IOException { + HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter(); + Table table = table(temp.getRoot().getPath(), false); + JobConf conf = jobConf(table, 1); + writeRecords(1, 0, true, false, conf); + committer.abortJob(new JobContextImpl(conf, JOB_ID), JobStatus.State.FAILED); + + HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 0); + HiveIcebergTestUtils.validateData(table, Collections.emptyList(), 0); + } + + private Table table(String location, boolean partitioned) { + HadoopTables tables = new HadoopTables(); + return tables.create(CUSTOMER_SCHEMA, partitioned ? PARTITIONED_SPEC : PartitionSpec.unpartitioned(), location); + } + + private JobConf jobConf(Table table, int taskNum) { + JobConf conf = new JobConf(); + conf.setNumMapTasks(taskNum); + conf.setNumReduceTasks(0); + conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID); + + HiveIcebergStorageHandler.put(conf, table); + return conf; + } + + /** + * Write random records to the given table using separate {@link HiveIcebergOutputCommitter} and + * a separate {@link HiveIcebergRecordWriter} for every task. + * @param taskNum The number of tasks in the job handled by the committer + * @param attemptNum The id used for attempt number generation + * @param commitTasks If true the tasks will be committed + * @param abortTasks If true the tasks will be aborted - needed so we can simulate no commit/no abort + * situation + * @param conf The job configuration + * @return The random generated records which were appended to the table + * @throws IOException Propagating {@link HiveIcebergRecordWriter} exceptions + */ + private List writeRecords(int taskNum, int attemptNum, boolean commitTasks, boolean abortTasks, + JobConf conf) throws IOException { + List expected = new ArrayList<>(RECORD_NUM * taskNum); + + FileIO io = HiveIcebergStorageHandler.io(conf); + LocationProvider location = HiveIcebergStorageHandler.location(conf); + EncryptionManager encryption = HiveIcebergStorageHandler.encryption(conf); + Schema schema = HiveIcebergStorageHandler.schema(conf); + PartitionSpec spec = HiveIcebergStorageHandler.spec(conf); + + for (int i = 0; i < taskNum; ++i) { + List records = TestHelper.generateRandomRecords(schema, RECORD_NUM, i + attemptNum); + TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum); + OutputFileFactory outputFileFactory = + new OutputFileFactory(spec, FileFormat.PARQUET, location, io, encryption, taskId.getTaskID().getId(), + attemptNum, QUERY_ID + "-" + JOB_ID); + HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, spec, FileFormat.PARQUET, + new GenericAppenderFactory(schema), outputFileFactory, io, TARGET_FILE_SIZE, taskId); + + Container container = new Container<>(); + + for (Record record : records) { + container.set(record); + testWriter.write(container); + } + + testWriter.close(false); + if (commitTasks) { + new HiveIcebergOutputCommitter().commitTask(new TaskAttemptContextImpl(conf, taskId)); + expected.addAll(records); + } else if (abortTasks) { + new HiveIcebergOutputCommitter().abortTask(new TaskAttemptContextImpl(conf, taskId)); + } + } + + return expected; + } +}