Skip to content

Commit f274a3c

Browse files
author
Laszlo Pinter
committed
Save catalogName in jobConf
1 parent b73d751 commit f274a3c

File tree

4 files changed

+32
-35
lines changed

4 files changed

+32
-35
lines changed

mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ private InputFormatConfig() {
4747
public static final String TABLE_SCHEMA = "iceberg.mr.table.schema";
4848
public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec";
4949
public static final String SERIALIZED_TABLE_PREFIX = "iceberg.mr.serialized.table.";
50+
public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog";
5051
public static final String LOCALITY = "iceberg.mr.locality";
5152
public static final String CATALOG = "iceberg.mr.catalog";
5253
public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "iceberg.mr.catalog.hadoop.warehouse.location";

mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import org.apache.iceberg.mr.InputFormatConfig;
5252
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
5353
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
54-
import org.apache.iceberg.util.Pair;
5554
import org.apache.iceberg.util.Tasks;
5655
import org.slf4j.Logger;
5756
import org.slf4j.LoggerFactory;
@@ -94,7 +93,7 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
9493
TaskAttemptID attemptID = context.getTaskAttemptID();
9594
JobConf jobConf = context.getJobConf();
9695
Map<String, HiveIcebergRecordWriter> writers = HiveIcebergRecordWriter.getWriters(attemptID);
97-
Collection<Pair<String, String>> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
96+
Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
9897

9998
ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
10099
try {
@@ -105,8 +104,8 @@ public void commitTask(TaskAttemptContext originalContext) throws IOException {
105104
.throwFailureWhenFinished()
106105
.executeWith(tableExecutor)
107106
.run(output -> {
108-
Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output.second());
109-
HiveIcebergRecordWriter writer = writers.get(output.second());
107+
Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output);
108+
HiveIcebergRecordWriter writer = writers.get(output);
110109
DataFile[] closedFiles = writer != null ? writer.dataFiles() : new DataFile[0];
111110
String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
112111
attemptID.getJobID(), attemptID.getTaskID().getId());
@@ -158,7 +157,7 @@ public void commitJob(JobContext originalContext) throws IOException {
158157
long startTime = System.currentTimeMillis();
159158
LOG.info("Committing job {} has started", jobContext.getJobID());
160159

161-
Collection<Pair<String, String>> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
160+
Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
162161
Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
163162

164163
ExecutorService fileExecutor = fileExecutor(jobConf);
@@ -170,9 +169,10 @@ public void commitJob(JobContext originalContext) throws IOException {
170169
.stopOnFailure()
171170
.executeWith(tableExecutor)
172171
.run(output -> {
173-
Table table = HiveIcebergStorageHandler.table(jobConf, output.second());
172+
Table table = HiveIcebergStorageHandler.table(jobConf, output);
173+
String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, output);
174174
jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
175-
commitTable(table.io(), fileExecutor, jobContext, output.second(), table.location(), output.first());
175+
commitTable(table.io(), fileExecutor, jobContext, output, table.location(), catalogName);
176176
});
177177
} finally {
178178
fileExecutor.shutdown();
@@ -199,7 +199,7 @@ public void abortJob(JobContext originalContext, int status) throws IOException
199199
JobConf jobConf = jobContext.getJobConf();
200200

201201
LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID());
202-
Collection<Pair<String, String>> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
202+
Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
203203
Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
204204

205205
ExecutorService fileExecutor = fileExecutor(jobConf);
@@ -213,7 +213,7 @@ public void abortJob(JobContext originalContext, int status) throws IOException
213213
.run(output -> {
214214
LOG.info("Cleaning job for table {}", jobContext.getJobID(), output);
215215

216-
Table table = HiveIcebergStorageHandler.table(jobConf, output.second());
216+
Table table = HiveIcebergStorageHandler.table(jobConf, output);
217217
jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
218218
Collection<DataFile> dataFiles = dataFiles(fileExecutor, table.location(), jobContext, table.io(), false);
219219

mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,12 @@
4444
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
4545
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
4646
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
47-
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4847
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
49-
import org.apache.iceberg.util.Pair;
5048
import org.apache.iceberg.util.SerializationUtil;
5149

5250
public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, HiveStorageHandler {
5351
private static final Splitter TABLE_NAME_SPLITTER = Splitter.on("..");
5452
private static final String TABLE_NAME_SEPARATOR = "..";
55-
public static final String CATALOG_NAME_SEPARATOR = "::";
5653

5754
static final String WRITE_KEY = "HiveIcebergStorageHandler_write";
5855

@@ -116,19 +113,18 @@ public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String
116113
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
117114
if (tableDesc != null && tableDesc.getProperties() != null &&
118115
tableDesc.getProperties().get(WRITE_KEY) != null) {
119-
Preconditions.checkArgument(!tableDesc.getTableName().contains(TABLE_NAME_SEPARATOR),
120-
"Can not handle table " + tableDesc.getTableName() + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
116+
String tableName = tableDesc.getTableName();
117+
Preconditions.checkArgument(!tableName.contains(TABLE_NAME_SEPARATOR),
118+
"Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
121119
String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES);
120+
tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName;
121+
jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
122+
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables);
123+
122124
String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME);
123125
if (catalogName != null) {
124-
String tableWithCatalogName = catalogName + CATALOG_NAME_SEPARATOR + tableDesc.getTableName();
125-
tables = tables == null ? tableWithCatalogName : tables + TABLE_NAME_SEPARATOR + tableWithCatalogName;
126-
} else {
127-
tables = tables == null ? tableDesc.getTableName() : tables + TABLE_NAME_SEPARATOR + tableDesc.getTableName();
126+
jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, catalogName);
128127
}
129-
130-
jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
131-
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables);
132128
}
133129
}
134130

@@ -174,19 +170,20 @@ public static Table table(Configuration config, String name) {
174170
/**
175171
* Returns the names of the output tables stored in the configuration.
176172
* @param config The configuration used to get the data from
177-
* @return The collection of catalog name - table name pairs.
173+
* @return The collection of the table names as returned by TableDesc.getTableName()
178174
*/
179-
public static Collection<Pair<String, String>> outputTables(Configuration config) {
180-
Collection<String> tables = TABLE_NAME_SPLITTER.splitToList(config.get(InputFormatConfig.OUTPUT_TABLES));
181-
Collection<Pair<String, String>> result = Lists.newArrayList();
182-
tables.stream().map(t -> t.split(CATALOG_NAME_SEPARATOR, 2)).forEach(s -> {
183-
if (s.length < 2) {
184-
result.add(Pair.of(null, s[0]));
185-
} else {
186-
result.add(Pair.of(s[0], s[1]));
187-
}
188-
});
189-
return result;
175+
public static Collection<String> outputTables(Configuration config) {
176+
return TABLE_NAME_SPLITTER.splitToList(config.get(InputFormatConfig.OUTPUT_TABLES));
177+
}
178+
179+
/**
180+
* Returns the catalog name serialized to the configuration.
181+
* @param config The configuration used to get the data from
182+
* @param name The name of the table we neeed as returned by TableDesc.getTableName()
183+
* @return catalog name
184+
*/
185+
public static String catalogName(Configuration config, String name) {
186+
return config.get(InputFormatConfig.TABLE_CATALOG_PREFIX + name);
190187
}
191188

192189
/**

mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,7 @@ private JobConf jobConf(Table table, int taskNum) {
229229
conf.setNumMapTasks(taskNum);
230230
conf.setNumReduceTasks(0);
231231
conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID);
232-
conf.set(InputFormatConfig.OUTPUT_TABLES, table.properties().get(InputFormatConfig.CATALOG_NAME) +
233-
HiveIcebergStorageHandler.CATALOG_NAME_SEPARATOR + table.name());
232+
conf.set(InputFormatConfig.OUTPUT_TABLES, table.name());
234233
conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + table.name(), SerializationUtil.serializeToBase64(table));
235234

236235
Map<String, String> propMap = Maps.newHashMap();

0 commit comments

Comments
 (0)