Skip to content

Commit

Permalink
HBASE-28459 HFileOutputFormat2 ClassCastException with s3 magic commi…
Browse files Browse the repository at this point in the history
…tter (#5858)

Co-authored-by: Sravi Kommineni <skommineni@hubspot.com>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
(cherry picked from commit b7def4f)
  • Loading branch information
ksravista authored and Apache9 committed May 6, 2024
1 parent 82fe5e6 commit 8b9cafc
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
Expand All @@ -85,7 +86,6 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -193,11 +193,15 @@ protected static byte[] getTableNameSuffixedWithFamily(byte[] tableName, byte[]
return combineTableNameSuffix(tableName, family);
}

protected static Path getWorkPath(final OutputCommitter committer) {
return (Path) ReflectionUtils.invokeMethod(committer, "getWorkPath");
}

static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(
final TaskAttemptContext context, final OutputCommitter committer) throws IOException {

// Get the path of the temporary output file
final Path outputDir = ((FileOutputCommitter) committer).getWorkPath();
final Path outputDir = getWorkPath(committer);
final Configuration conf = context.getConfiguration();
final boolean writeMultipleTables =
conf.getBoolean(MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,12 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.ClassRule;
import org.junit.Ignore;
Expand Down Expand Up @@ -1583,6 +1586,59 @@ public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
}
}

@Test
public void itGetsWorkPathHadoop2() throws Exception {
Configuration conf = new Configuration(this.util.getConfiguration());
Job job = new Job(conf);
FileOutputCommitter committer =
new FileOutputCommitter(new Path("/test"), createTestTaskAttemptContext(job));
assertEquals(committer.getWorkPath(), HFileOutputFormat2.getWorkPath(committer));
}

@Test
public void itGetsWorkPathHadoo3() {
Hadoop3TestOutputCommitter committer = new Hadoop3TestOutputCommitter(new Path("/test"));
assertEquals(committer.getWorkPath(), HFileOutputFormat2.getWorkPath(committer));
}

static class Hadoop3TestOutputCommitter extends OutputCommitter {

Path path;

Hadoop3TestOutputCommitter(Path path) {
this.path = path;
}

public Path getWorkPath() {
return path;
}

@Override
public void setupJob(JobContext jobContext) throws IOException {

}

@Override
public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {

}

@Override
public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
return false;
}

@Override
public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {

}

@Override
public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {

}
}

private static class ConfigurationCaptorConnection implements Connection {
private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";

Expand Down

0 comments on commit 8b9cafc

Please sign in to comment.