Skip to content

Commit 58a2f50

Browse files
author
Laszlo Pinter
committed
Address review comments.
1 parent 2c6943c commit 58a2f50

File tree

8 files changed

+78
-39
lines changed

8 files changed

+78
-39
lines changed

core/src/main/java/org/apache/iceberg/CatalogProperties.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,5 @@ private CatalogProperties() {
5353

5454
public static final String LOCK_TABLE = "lock.table";
5555

56+
public static final String CLASS = "class";
5657
}

hive-metastore/src/main/java/org/apache/iceberg/hive/HiveCatalogs.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
import com.github.benmanes.caffeine.cache.Cache;
2323
import com.github.benmanes.caffeine.cache.Caffeine;
24+
import java.util.Map;
2425
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.hive.conf.HiveConf;
27+
import org.apache.iceberg.CatalogUtil;
2628

2729
public final class HiveCatalogs {
2830

@@ -36,4 +38,11 @@ public static HiveCatalog loadCatalog(Configuration conf) {
3638
String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
3739
return CATALOG_CACHE.get(metastoreUri, uri -> new HiveCatalog(conf));
3840
}
41+
42+
public static HiveCatalog loadCatalog(String catalogName, Map<String, String> properties, Configuration conf) {
43+
// metastore URI can be null in local mode
44+
String metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
45+
return CATALOG_CACHE.get(metastoreUri, uri -> (HiveCatalog) CatalogUtil.loadCatalog(HiveCatalog.class.getName(),
46+
catalogName, properties, conf));
47+
}
3948
}

mr/src/main/java/org/apache/iceberg/mr/Catalogs.java

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.Properties;
2626
import java.util.Set;
2727
import org.apache.hadoop.conf.Configuration;
28+
import org.apache.iceberg.CatalogProperties;
29+
import org.apache.iceberg.CatalogUtil;
2830
import org.apache.iceberg.PartitionSpec;
2931
import org.apache.iceberg.PartitionSpecParser;
3032
import org.apache.iceberg.Schema;
@@ -56,6 +58,7 @@
5658
public final class Catalogs {
5759
private static final Logger LOG = LoggerFactory.getLogger(Catalogs.class);
5860

61+
public static final String DEFAULT_CATALOG = "default";
5962
public static final String HADOOP = "hadoop";
6063
public static final String HIVE = "hive";
6164
public static final String CUSTOM = "custom";
@@ -192,9 +195,9 @@ public static boolean dropTable(Configuration conf, Properties props) {
192195
public static boolean hiveCatalog(Configuration conf, Properties props) {
193196
String catalogName = props.getProperty(InputFormatConfig.TABLE_CATALOG);
194197
if (catalogName != null) {
195-
return HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName)));
198+
return HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName)));
196199
} else {
197-
if (HIVE.equals(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, "default")))) {
200+
if (HIVE.equalsIgnoreCase(conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, DEFAULT_CATALOG)))) {
198201
return true;
199202
} else {
200203
return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
@@ -205,49 +208,55 @@ public static boolean hiveCatalog(Configuration conf, Properties props) {
205208
@VisibleForTesting
206209
static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) {
207210
String catalogType;
208-
String name = catalogName;
209-
if (name == null) {
210-
name = "default";
211-
}
211+
String name = catalogName == null ? DEFAULT_CATALOG : catalogName;
212212
catalogType = conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, name));
213213

214214
// keep both catalog configuration methods for seamless transition
215215
if (catalogType != null) {
216-
// new logic
217-
return loadCatalog(conf, catalogType, String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, name),
218-
String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, name));
216+
// new logic
217+
LOG.debug("Using catalog configuration from table properties.");
218+
return loadCatalog(conf, name, catalogType);
219219
} else {
220220
// old logic
221221
// use catalog {@link InputFormatConfig.CATALOG} stored in global hive config if table specific catalog
222222
// configuration or default catalog definition is missing
223+
LOG.debug("Using catalog configuration from global configuration.");
223224
catalogType = conf.get(InputFormatConfig.CATALOG);
224-
return loadCatalog(conf, catalogType, InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION,
225-
InputFormatConfig.CATALOG_LOADER_CLASS);
225+
return loadCatalog(conf, name, catalogType);
226226
}
227227
}
228228

229-
private static Optional<Catalog> loadCatalog(Configuration conf, String catalogType,
230-
String warehouseLocationConfigName, String loaderClassConfigName) {
229+
private static Optional<Catalog> loadCatalog(Configuration conf, String catalogName, String catalogType) {
231230
if (catalogType == null) {
232231
LOG.info("Catalog is not configured");
233232
return Optional.empty();
234233
}
235234

235+
Map<String, String> properties = getCatalogProperties(conf, catalogName);
236236
Catalog catalog;
237237
switch (catalogType.toLowerCase()) {
238238
case HADOOP:
239-
String warehouseLocation = conf.get(warehouseLocationConfigName);
240-
catalog = (warehouseLocation != null) ? new HadoopCatalog(conf, warehouseLocation) :
241-
new HadoopCatalog(conf);
239+
if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
240+
catalog = CatalogUtil.loadCatalog(HadoopCatalog.class.getName(), catalogName,
241+
getCatalogProperties(conf, catalogName), conf);
242+
} else {
243+
String warehouseLocation = conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
244+
catalog = (warehouseLocation != null) ? new HadoopCatalog(conf, warehouseLocation) : new HadoopCatalog(conf);
245+
}
242246
LOG.info("Loaded Hadoop catalog {}", catalog);
243247
return Optional.of(catalog);
244248
case HIVE:
245-
catalog = HiveCatalogs.loadCatalog(conf);
249+
catalog = HiveCatalogs.loadCatalog(catalogName, properties, conf);
246250
LOG.info("Loaded Hive Metastore catalog {}", catalog);
247251
return Optional.of(catalog);
248252
case CUSTOM:
249-
String catalogLoaderClass = conf.get(loaderClassConfigName);
250-
if (catalogLoaderClass != null) {
253+
if (properties.containsKey(CatalogProperties.CLASS)) {
254+
String catalogLoaderClass = properties.get(CatalogProperties.CLASS);
255+
catalog = CatalogUtil.loadCatalog(catalogLoaderClass, catalogName, properties, conf);
256+
LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
257+
return Optional.of(catalog);
258+
} else if (conf.get(InputFormatConfig.CATALOG_LOADER_CLASS) != null) {
259+
String catalogLoaderClass = conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
251260
CatalogLoader loader = (CatalogLoader) DynConstructors.builder(CatalogLoader.class)
252261
.impl(catalogLoaderClass)
253262
.build()
@@ -262,4 +271,15 @@ private static Optional<Catalog> loadCatalog(Configuration conf, String catalogT
262271
throw new NoSuchNamespaceException("Catalog %s is not supported.", catalogType);
263272
}
264273
}
274+
275+
private static Map<String, String> getCatalogProperties(Configuration conf, String catalogName) {
276+
Map<String, String> properties = new HashMap<>();
277+
conf.iterator().forEachRemaining(e -> {
278+
String keyPrefix = InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName;
279+
if (e.getKey().startsWith(keyPrefix)) {
280+
properties.put(e.getKey().substring(keyPrefix.length() + 1), e.getValue());
281+
}
282+
});
283+
return properties;
284+
}
265285
}

mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,10 @@ private InputFormatConfig() {
7575
public static final String SNAPSHOT_TABLE = "iceberg.snapshots.table";
7676
public static final String SNAPSHOT_TABLE_SUFFIX = "__snapshots";
7777

78-
public static final String CATALOG_TYPE_TEMPLATE = CATALOG_NAME + ".%s.type";
79-
public static final String CATALOG_WAREHOUSE_TEMPLATE = CATALOG_NAME + ".%s.warehouse";
80-
public static final String CATALOG_LOADER_CLASS_TEMPLATE = CATALOG_NAME + ".%s.loader.class";
78+
public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
79+
public static final String CATALOG_TYPE_TEMPLATE = CATALOG_CONFIG_PREFIX + "%s.type";
80+
public static final String CATALOG_WAREHOUSE_TEMPLATE = CATALOG_CONFIG_PREFIX + "%s.warehouse";
81+
public static final String CATALOG_CLASS_TEMPLATE = CATALOG_CONFIG_PREFIX + "%s.class";
8182

8283
public enum InMemoryDataModel {
8384
PIG,

mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,16 +78,17 @@ public void testLoadTableFromLocation() throws IOException {
7878
@Test
7979
public void testLoadTableFromCatalog() throws IOException {
8080
String defaultCatalogName = "default";
81-
conf.set("warehouse.location", temp.newFolder("hadoop", "warehouse").toString());
81+
String warehouseLocation = temp.newFolder("hadoop", "warehouse").toString();
82+
conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, defaultCatalogName), warehouseLocation);
8283
conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, defaultCatalogName), Catalogs.CUSTOM);
83-
conf.setClass(String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, defaultCatalogName),
84-
CustomHadoopCatalogLoader.class, CatalogLoader.class);
84+
conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, defaultCatalogName),
85+
CustomHadoopCatalog.class.getName());
8586

8687
AssertHelpers.assertThrows(
8788
"Should complain about table identifier not set", IllegalArgumentException.class,
8889
"identifier not set", () -> Catalogs.loadTable(conf));
8990

90-
HadoopCatalog catalog = new CustomHadoopCatalog(conf);
91+
HadoopCatalog catalog = new CustomHadoopCatalog(conf, warehouseLocation);
9192
Table hadoopCatalogTable = catalog.createTable(TableIdentifier.of("table"), SCHEMA);
9293

9394
conf.set(InputFormatConfig.TABLE_IDENTIFIER, "table");
@@ -142,10 +143,11 @@ public void testCreateDropTableToLocation() throws IOException {
142143
public void testCreateDropTableToCatalog() throws IOException {
143144
TableIdentifier identifier = TableIdentifier.of("test", "table");
144145
String defaultCatalogName = "default";
146+
String warehouseLocation = temp.newFolder("hadoop", "warehouse").toString();
145147

146-
conf.set("warehouse.location", temp.newFolder("hadoop", "warehouse").toString());
147-
conf.setClass(String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, defaultCatalogName),
148-
CustomHadoopCatalogLoader.class, CatalogLoader.class);
148+
conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, defaultCatalogName), warehouseLocation);
149+
conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, defaultCatalogName),
150+
CustomHadoopCatalog.class.getName());
149151
conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, defaultCatalogName), Catalogs.CUSTOM);
150152

151153
Properties missingSchema = new Properties();
@@ -168,7 +170,7 @@ public void testCreateDropTableToCatalog() throws IOException {
168170

169171
Catalogs.createTable(conf, properties);
170172

171-
HadoopCatalog catalog = new CustomHadoopCatalog(conf);
173+
HadoopCatalog catalog = new CustomHadoopCatalog(conf, warehouseLocation);
172174
Table table = catalog.loadTable(identifier);
173175

174176
Assert.assertEquals(SchemaParser.toJson(SCHEMA), SchemaParser.toJson(table.schema()));
@@ -241,7 +243,7 @@ public void testLoadCatalog() throws IOException {
241243

242244
Assert.assertTrue(hadoopCatalog.isPresent());
243245
Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
244-
Assert.assertEquals("HadoopCatalog{name=hadoop, location=/tmp/mylocation}", hadoopCatalog.get().toString());
246+
Assert.assertEquals("HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString());
245247

246248
// arbitrary catalog name with hive catalog type
247249
conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), Catalogs.HIVE);
@@ -252,14 +254,14 @@ public void testLoadCatalog() throws IOException {
252254

253255
// arbitrary catalog name with custom catalog type without specific classloader
254256
conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), Catalogs.CUSTOM);
257+
conf.unset(InputFormatConfig.CATALOG_LOADER_CLASS);
255258
customHadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
256259

257260
Assert.assertFalse(customHadoopCatalog.isPresent());
258261

259262
// arbitrary catalog name with custom catalog type and provided classloader
260263
conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), Catalogs.CUSTOM);
261-
conf.setClass(String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, catalogName),
262-
CustomHadoopCatalogLoader.class, CatalogLoader.class);
264+
conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalogName), CustomHadoopCatalog.class.getName());
263265
customHadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
264266

265267
Assert.assertTrue(customHadoopCatalog.isPresent());
@@ -277,6 +279,10 @@ public static class CustomHadoopCatalog extends HadoopCatalog {
277279

278280
public static final String WAREHOUSE_LOCATION = "warehouse.location";
279281

282+
public CustomHadoopCatalog() {
283+
284+
}
285+
280286
public CustomHadoopCatalog(Configuration conf, String warehouseLocation) {
281287
super(conf, warehouseLocation);
282288
}

mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iceberg.FileFormat;
2727
import org.apache.iceberg.Schema;
2828
import org.apache.iceberg.data.Record;
29+
import org.apache.iceberg.mr.Catalogs;
2930
import org.apache.iceberg.mr.TestHelper;
3031
import org.apache.iceberg.types.Types;
3132
import org.apache.orc.OrcConf;
@@ -77,7 +78,7 @@ static TestHiveShell shell(Map<String, String> configs) {
7778

7879
static TestTables testTables(TestHiveShell shell, TestTables.TestTableType testTableType, TemporaryFolder temp)
7980
throws IOException {
80-
return testTables(shell, testTableType, temp, "default");
81+
return testTables(shell, testTableType, temp, Catalogs.DEFAULT_CATALOG);
8182
}
8283

8384
static TestTables testTables(TestHiveShell shell, TestTables.TestTableType testTableType, TemporaryFolder temp,

mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ public class TestHiveIcebergStorageHandlerWithMultipleCatalogs {
6767
private TestTables testTables1;
6868
private TestTables testTables2;
6969

70-
@Parameterized.Parameters(name = "fileFormat={0}, fileFormat={1}, engine={2}, tableType1={3}, catalogName1={4}, " +
70+
@Parameterized.Parameters(name = "fileFormat1={0}, fileFormat2={1}, engine={2}, tableType1={3}, catalogName1={4}, " +
7171
"tableType2={5}, catalogName2={6}")
7272
public static Collection<Object[]> parameters() {
7373
Collection<Object[]> testParams = new ArrayList<>();
7474
String javaVersion = System.getProperty("java.specification.version");
7575

76-
// Run tests with every FileFormat for a two Catalogs
76+
// Run tests with PARQUET and ORC file formats for a two Catalogs
7777
for (String engine : EXECUTION_ENGINES) {
7878
// include Tez tests only for Java 8
7979
if (javaVersion.equals("1.8") || "mr".equals(engine)) {

mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,9 +252,10 @@ static class CustomCatalogTestTables extends TestTables {
252252
public Map<String, String> properties() {
253253
return ImmutableMap.of(
254254
String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "custom",
255-
String.format(InputFormatConfig.CATALOG_LOADER_CLASS_TEMPLATE, catalog),
256-
TestCatalogs.CustomHadoopCatalogLoader.class.getName(),
257-
TestCatalogs.CustomHadoopCatalog.WAREHOUSE_LOCATION, warehouseLocation
255+
String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalog),
256+
TestCatalogs.CustomHadoopCatalog.class.getName(),
257+
String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalog),
258+
warehouseLocation
258259
);
259260
}
260261

0 commit comments

Comments
 (0)