Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;

/**
* Factory responsible for generating unique but recognizable data file names.
*/
public class OutputFileFactory {
private final PartitionSpec spec;
private final FileFormat format;
Expand All @@ -38,23 +41,51 @@ public class OutputFileFactory {
// The purpose of this uuid is to be able to know from two paths that they were written by the same operation.
// That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all
// with a recursive listing and grep.
private final String uuid = UUID.randomUUID().toString();
private final String operationId;
private final AtomicInteger fileCount = new AtomicInteger(0);

/**
* Constructor where a generated UUID is used as the operationId to ensure uniqueness.
* @param spec Partition specification used by the location provider
* @param format File format used for the extension
* @param locations Location provider used for generating locations
* @param io FileIO to store the files
* @param encryptionManager Encryption manager used for encrypting the files
* @param partitionId First part of the file name
* @param taskId Second part of the file name
*/
public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io,
EncryptionManager encryptionManager, int partitionId, long taskId) {
this(spec, format, locations, io, encryptionManager, partitionId, taskId, UUID.randomUUID().toString());
}

/**
* Constructor with specific operationId. The [partitionId, taskId, operationId] triplet has to be unique across JVM
* instances otherwise the same file name could be generated by different instances of the OutputFileFactory.
* @param spec Partition specification used by the location provider
* @param format File format used for the extension
* @param locations Location provider used for generating locations
* @param io FileIO to store the files
* @param encryptionManager Encryption manager used for encrypting the files
* @param partitionId First part of the file name
* @param taskId Second part of the file name
* @param operationId Third part of the file name
*/
public OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io,
EncryptionManager encryptionManager, int partitionId, long taskId, String operationId) {
this.spec = spec;
this.format = format;
this.locations = locations;
this.io = io;
this.encryptionManager = encryptionManager;
this.partitionId = partitionId;
this.taskId = taskId;
this.operationId = operationId;
}

private String generateFilename() {
return format.addExtension(
String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount.incrementAndGet()));
String.format("%05d-%d-%s-%05d", partitionId, taskId, operationId, fileCount.incrementAndGet()));
}

/**
Expand Down
6 changes: 6 additions & 0 deletions mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ private InputFormatConfig() {
public static final String CATALOG_LOADER_CLASS = "iceberg.mr.catalog.loader.class";
public static final String SELECTED_COLUMNS = "iceberg.mr.selected.columns";
public static final String EXTERNAL_TABLE_PURGE = "external.table.purge";
public static final String FILE_IO = "iceberg.mr.file.io";
public static final String LOCATION_PROVIDER = "iceberg.mr.location.provider";
public static final String ENCRYPTION_MANAGER = "iceberg.mr.encription.manager";

public static final String COMMIT_THREAD_POOL_SIZE = "iceberg.mr.commit.thread.pool.size";
public static final int COMMIT_THREAD_POOL_SIZE_DEFAULT = 10;

public static final String CATALOG_NAME = "iceberg.catalog";
public static final String HADOOP_CATALOG = "hadoop.catalog";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* An Iceberg table committer for adding data files to the Iceberg tables.
* Currently independent of the Hive ACID transactions.
*/
public final class HiveIcebergOutputCommitter extends OutputCommitter {
private static final String FOR_COMMIT_EXTENSION = ".forCommit";

private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);

@Override
public void setupJob(JobContext jobContext) {
// do nothing.
}

@Override
public void setupTask(TaskAttemptContext taskAttemptContext) {
// do nothing.
}

@Override
public boolean needsTaskCommit(TaskAttemptContext context) {
// We need to commit if this is the last phase of a MapReduce process
return TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
context.getJobConf().getNumReduceTasks() == 0;
}

/**
* Collects the generated data files and creates a commit file storing the data file list.
* @param context The job context
* @throws IOException Thrown if there is an error writing the commit file
*/
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
TaskAttemptID attemptID = context.getTaskAttemptID();
String fileForCommitLocation = generateFileForCommitLocation(context.getJobConf(),
attemptID.getJobID(), attemptID.getTaskID().getId());
HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(attemptID);

DataFile[] closedFiles;
if (writer != null) {
closedFiles = writer.dataFiles();
} else {
closedFiles = new DataFile[0];
}

// Creating the file containing the data files generated by this task
createFileForCommit(closedFiles, fileForCommitLocation, HiveIcebergStorageHandler.io(context.getJobConf()));
}

/**
* Removes files generated by this task.
* @param context The task context
* @throws IOException Thrown if there is an error closing the writer
*/
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
// Clean up writer data from the local store
HiveIcebergRecordWriter writer = HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());

// Remove files if it was not done already
writer.close(true);
}

/**
* Reads the commit files stored in the temp directory and collects the generated committed data files.
* Appends the data files to the table. At the end removes the temporary directory.
* @param jobContext The job context
* @throws IOException if there is a failure deleting the files
*/
@Override
public void commitJob(JobContext jobContext) throws IOException {
JobConf conf = jobContext.getJobConf();
Table table = Catalogs.loadTable(conf);

long startTime = System.currentTimeMillis();
LOG.info("Committing job has started for table: {}, using location: {}", table,
generateJobLocation(conf, jobContext.getJobID()));

FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
List<DataFile> dataFiles = dataFiles(jobContext, io, true);

if (dataFiles.size() > 0) {
// Appending data files to the table
AppendFiles append = table.newAppend();
dataFiles.forEach(append::appendFile);
append.commit();
LOG.info("Commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, table,
dataFiles.size());
LOG.debug("Added files {}", dataFiles);
} else {
LOG.info("Commit took {} ms for table: {} with no new files", System.currentTimeMillis() - startTime, table);
}

cleanup(jobContext);
}

/**
* Removes the generated data files, if there is a commit file already generated for them.
* The cleanup at the end removes the temporary directory as well.
* @param jobContext The job context
* @param status The status of the job
* @throws IOException if there is a failure deleting the files
*/
@Override
public void abortJob(JobContext jobContext, int status) throws IOException {
String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID());
LOG.info("Job {} is aborted. Cleaning job location {}", jobContext.getJobID(), location);

FileIO io = HiveIcebergStorageHandler.io(jobContext.getJobConf());
List<DataFile> dataFiles = dataFiles(jobContext, io, false);

// Check if we have files already committed and remove data files if there are any
if (dataFiles.size() > 0) {
Tasks.foreach(dataFiles)
.retry(3)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.debug("Failed on to remove data file {} on abort job", file.path(), exc))
.run(file -> io.deleteFile(file.path().toString()));
}

cleanup(jobContext);
}

/**
* Cleans up the jobs temporary location.
* @param jobContext The job context
* @throws IOException if there is a failure deleting the files
*/
private void cleanup(JobContext jobContext) throws IOException {
String location = generateJobLocation(jobContext.getJobConf(), jobContext.getJobID());
LOG.info("Cleaning for job: {} on location: {}", jobContext.getJobID(), location);

// Remove the job's temp directory recursively.
// Intentionally used foreach on a single item. Using the Tasks API here only for the retry capability.
Tasks.foreach(location)
.retry(3)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.debug("Failed on to remove directory {} on cleanup job", file, exc))
.run(file -> {
Path toDelete = new Path(file);
FileSystem fs = Util.getFs(toDelete, jobContext.getJobConf());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I would prefer if cleanup happened by removing the expected task commit files one-by-one rather than deleting a directory because it could use FileIO. I understand that this is intended to drop the folder as well for stores that track folders. Maybe a follow-up to add a deletePrefix to FileIO would fix it.

fs.delete(toDelete, true);
}, IOException.class);
}

/**
* Get the data committed data files for this job.
* @param jobContext The job context
* @param io The FileIO used for reading a files generated for commit
* @param throwOnFailure If <code>true</code> then it throws an exception on failure
* @return The list of the committed data files
*/
private static List<DataFile> dataFiles(JobContext jobContext, FileIO io, boolean throwOnFailure) {
JobConf conf = jobContext.getJobConf();
// If there are reducers, then every reducer will generate a result file.
// If this is a map only task, then every mapper will generate a result file.
int expectedFiles = conf.getNumReduceTasks() != 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();

ExecutorService executor = null;
try {
// Creating executor service for parallel handling of file reads
executor = Executors.newFixedThreadPool(
conf.getInt(InputFormatConfig.COMMIT_THREAD_POOL_SIZE, InputFormatConfig.COMMIT_THREAD_POOL_SIZE_DEFAULT),
new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.NORM_PRIORITY)
.setNameFormat("iceberg-commit-pool-%d")
.build());

List<DataFile> dataFiles = Collections.synchronizedList(new ArrayList<>());

// Reading the committed files. The assumption here is that the taskIds are generated in sequential order
// starting from 0.
Tasks.range(expectedFiles)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like to explicitly set the failure behavior. This should probably use throwFailureWhenFinished() and stopOnFailure() because this can't continue if any task fails.

Not using stopOnFailure is for tasks like cleaning up files, where each task should at least attempt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added throwFailureWhenFinished, but did not used stopOnFailure. This way I was able to reuse the code for abort and for commit. Since this is only for the exception handing I think this could be an acceptable compromise

.throwFailureWhenFinished(throwOnFailure)
.executeWith(executor)
.retry(3)
.run(taskId -> {
String taskFileName = generateFileForCommitLocation(conf, jobContext.getJobID(), taskId);
dataFiles.addAll(Arrays.asList(readFileForCommit(taskFileName, io)));
});

return dataFiles;
} finally {
if (executor != null) {
executor.shutdown();
}
}
}

/**
* Generates the job temp location based on the job configuration.
* Currently it uses QUERY_LOCATION-jobId.
* @param conf The job's configuration
* @param jobId The JobID for the task
* @return The file to store the results
*/
@VisibleForTesting
static String generateJobLocation(Configuration conf, JobID jobId) {
String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
return tableLocation + "/temp/" + queryId + "-" + jobId;
}

/**
* Generates file location based on the task configuration and a specific task id.
* This file will be used to store the data required to generate the Iceberg commit.
* Currently it uses QUERY_LOCATION-jobId/task-[0..numTasks).forCommit.
* @param conf The job's configuration
* @param jobId The jobId for the task
* @param taskId The taskId for the commit file
* @return The file to store the results
*/
private static String generateFileForCommitLocation(Configuration conf, JobID jobId, int taskId) {
return generateJobLocation(conf, jobId) + "/task-" + taskId + FOR_COMMIT_EXTENSION;
}

private static void createFileForCommit(DataFile[] closedFiles, String location, FileIO io)
throws IOException {

OutputFile fileForCommit = io.newOutputFile(location);
try (ObjectOutputStream oos = new ObjectOutputStream(fileForCommit.createOrOverwrite())) {
oos.writeObject(closedFiles);
}
LOG.debug("Iceberg committed file is created {}", fileForCommit);
}

private static DataFile[] readFileForCommit(String fileForCommitLocation, FileIO io) {
try (ObjectInputStream ois = new ObjectInputStream(io.newInputFile(fileForCommitLocation).newStream())) {
return (DataFile[]) ois.readObject();
} catch (ClassNotFoundException | IOException e) {
throw new NotFoundException("Can not read or parse committed file: %s", fileForCommitLocation);
}
}
}
Loading