From 4b692656a5d6aa9f410cd2124883e8dd3b93ca99 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Sun, 11 Dec 2022 08:41:07 +0100 Subject: [PATCH 1/6] Core: Allow configuring metrics reporter impl via Catalog property --- .../metrics/LoggingMetricsReporter.java | 5 +++ .../org/apache/iceberg/CatalogProperties.java | 1 + .../java/org/apache/iceberg/CatalogUtil.java | 39 ++++++++++++++++ .../iceberg/rest/RESTSessionCatalog.java | 12 +++-- .../org/apache/iceberg/TestCatalogUtil.java | 45 +++++++++++++++++++ 5 files changed, 98 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java b/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java index 26e9a0b527f1..8d41f10e74fb 100644 --- a/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java +++ b/api/src/main/java/org/apache/iceberg/metrics/LoggingMetricsReporter.java @@ -28,6 +28,11 @@ */ public class LoggingMetricsReporter implements MetricsReporter { private static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsReporter.class); + private static final LoggingMetricsReporter INSTANCE = new LoggingMetricsReporter(); + + public static LoggingMetricsReporter instance() { + return INSTANCE; + } @Override public void report(MetricsReport report) { diff --git a/core/src/main/java/org/apache/iceberg/CatalogProperties.java b/core/src/main/java/org/apache/iceberg/CatalogProperties.java index 95fe6a074c0a..c2490ee3ea83 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogProperties.java +++ b/core/src/main/java/org/apache/iceberg/CatalogProperties.java @@ -29,6 +29,7 @@ private CatalogProperties() {} public static final String WAREHOUSE_LOCATION = "warehouse"; public static final String TABLE_DEFAULT_PREFIX = "table-default."; public static final String TABLE_OVERRIDE_PREFIX = "table-override."; + public static final String METRICS_REPORTER_IMPL = "metrics-reporter-impl"; /** * Controls whether the catalog will cache table entries upon load. diff --git a/core/src/main/java/org/apache/iceberg/CatalogUtil.java b/core/src/main/java/org/apache/iceberg/CatalogUtil.java index c0c5078a3f34..ef4d17c24997 100644 --- a/core/src/main/java/org/apache/iceberg/CatalogUtil.java +++ b/core/src/main/java/org/apache/iceberg/CatalogUtil.java @@ -35,6 +35,7 @@ import org.apache.iceberg.hadoop.Configurable; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -400,4 +401,42 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) { setConf.invoke(conf); } + + /** + * Load a custom {@link MetricsReporter} implementation. + * + *

The implementation must have a no-arg constructor. + * + * @param impl full class name of a custom {@link MetricsReporter} implementation + * @return An initialized {@link MetricsReporter}. + * @throws IllegalArgumentException if class path not found or right constructor not found or the + * loaded class cannot be cast to the given interface type + */ + public static MetricsReporter loadMetricsReporter(String impl) { + LOG.info("Loading custom MetricsReporter implementation: {}", impl); + DynConstructors.Ctor ctor; + try { + ctor = + DynConstructors.builder(MetricsReporter.class) + .loader(CatalogUtil.class.getClassLoader()) + .impl(impl) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize MetricsReporter, missing no-arg constructor: %s", impl), + e); + } + + MetricsReporter reporter; + try { + reporter = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize MetricsReporter, %s does not implement MetricsReporter.", impl), + e); + } + + return reporter; + } } diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 0b4edc9a84f0..766f1f91c60a 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -106,7 +106,7 @@ public class RESTSessionCatalog extends BaseSessionCatalog private ResourcePaths paths = null; private Object conf = null; private FileIO io = null; - private MetricsReporter reporter = null; + private MetricsReporter customMetricsReporter = null; // a lazy thread pool for token refresh private volatile ScheduledExecutorService refreshExecutor = null; @@ -177,7 +177,9 @@ public void initialize(String name, Map unresolved) { this.io = CatalogUtil.loadFileIO( ioImpl != null ? ioImpl : ResolvingFileIO.class.getName(), mergedProps, conf); - this.reporter = new LoggingMetricsReporter(); + String metricsReporterImpl = mergedProps.get(CatalogProperties.METRICS_REPORTER_IMPL); + this.customMetricsReporter = + null != metricsReporterImpl ? CatalogUtil.loadMetricsReporter(metricsReporterImpl) : null; super.initialize(name, mergedProps); } @@ -304,7 +306,9 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { new BaseTable( ops, fullTableName(loadedIdent), - report -> reportMetrics(tableIdentifier, report, session::headers)); + null != customMetricsReporter + ? customMetricsReporter + : report -> reportMetrics(tableIdentifier, report, session::headers)); if (metadataType != null) { return MetadataTableUtils.createMetadataTableInstance(table, metadataType); } @@ -316,7 +320,7 @@ private void reportMetrics( TableIdentifier tableIdentifier, MetricsReport report, Supplier> headers) { - reporter.report(report); + LoggingMetricsReporter.instance().report(report); try { client.post( paths.metrics(tableIdentifier), diff --git a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java index a31cf51b685a..86e4a2c83729 100644 --- a/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java +++ b/core/src/test/java/org/apache/iceberg/TestCatalogUtil.java @@ -29,6 +29,8 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.assertj.core.api.Assertions; import org.junit.Assert; @@ -187,6 +189,32 @@ public void buildCustomCatalog_withTypeSet() { () -> CatalogUtil.buildIcebergCatalog(name, options, hadoopConf)); } + @Test + public void loadCustomMetricsReporter_noArg() { + Map properties = Maps.newHashMap(); + properties.put("key", "val"); + + MetricsReporter metricsReporter = + CatalogUtil.loadMetricsReporter(TestMetricsReporterDefault.class.getName()); + Assertions.assertThat(metricsReporter).isInstanceOf(TestMetricsReporterDefault.class); + } + + @Test + public void loadCustomMetricsReporter_badArg() { + Assertions.assertThatThrownBy( + () -> CatalogUtil.loadMetricsReporter(TestMetricsReporterBadArg.class.getName())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("missing no-arg constructor"); + } + + @Test + public void loadCustomMetricsReporter_badClass() { + Assertions.assertThatThrownBy( + () -> CatalogUtil.loadMetricsReporter(TestFileIONotImpl.class.getName())) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("does not implement MetricsReporter"); + } + public static class TestCatalog extends BaseMetastoreCatalog { private String catalogName; @@ -399,4 +427,21 @@ public String getArg() { public static class TestFileIONotImpl { public TestFileIONotImpl() {} } + + public static class TestMetricsReporterBadArg implements MetricsReporter { + private final String arg; + + public TestMetricsReporterBadArg(String arg) { + this.arg = arg; + } + + @Override + public void report(MetricsReport report) {} + } + + public static class TestMetricsReporterDefault implements MetricsReporter { + + @Override + public void report(MetricsReport report) {} + } } From f1d60b59d8907c60dea6ffa31bce181132f54b37 Mon Sep 17 00:00:00 2001 From: zaikhan Date: Mon, 12 Dec 2022 15:58:03 +0530 Subject: [PATCH 2/6] configuration metrics reporter for metastore catalogs --- .../iceberg/metrics/MetricsReporter.java | 6 +++ .../apache/iceberg/BaseMetastoreCatalog.java | 10 ++++- .../java/org/apache/iceberg/BaseTable.java | 4 ++ .../apache/iceberg/hive/TestHiveCatalog.java | 37 +++++++++++++++++++ 4 files changed, 56 insertions(+), 1 deletion(-) diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java index 45e3d16bff0b..adb3cebfcbc6 100644 --- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java +++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java @@ -18,10 +18,16 @@ */ package org.apache.iceberg.metrics; +import java.util.Map; + /** This interface defines the basic API for reporting metrics for operations to a Table. */ @FunctionalInterface public interface MetricsReporter { + default void init(Map properties) { + // To allow configuring metric reporter. + } + /** * Indicates that a operation is done by reporting a {@link MetricsReport}. A {@link * MetricsReport} is usually directly derived from a {@link MetricsReport} instance. diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 16a5164f6f49..5f2e8e5adf9e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -25,6 +25,8 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.metrics.LoggingMetricsReporter; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -51,7 +53,13 @@ public Table loadTable(TableIdentifier identifier) { } } else { - result = new BaseTable(ops, fullTableName(name(), identifier)); + MetricsReporter metricsReporter = + properties().containsKey(CatalogProperties.METRICS_REPORTER_IMPL) + ? CatalogUtil.loadMetricsReporter( + properties().get(CatalogProperties.METRICS_REPORTER_IMPL)) + : LoggingMetricsReporter.instance(); + metricsReporter.init(properties()); + result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter); } } else if (isValidMetadataIdentifier(identifier)) { diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 0b72027d5525..ea72daf20260 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -63,6 +63,10 @@ public String name() { return name; } + public MetricsReporter reporter() { + return reporter; + } + @Override public void refresh() { ops.refresh(); diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index a9cae2b28b60..fd3be4847311 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.BaseTable; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -69,6 +70,8 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -198,6 +201,28 @@ public void testCreateTableTxnBuilder() throws Exception { } } + @Test + public void testLoadTableWithCustomMetricReporter() throws Exception { + Schema schema = getTestSchema(); + TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl"); + String location = temp.newFolder("tbl").toString(); + + HiveCatalog catalog = new HiveCatalog(); + catalog.initialize( + "hive", ImmutableMap.of("metrics-reporter-impl", TestMetricsReporter.class.getName())); + try { + Transaction txn = + catalog.buildTable(tableIdent, schema).withLocation(location).createTransaction(); + txn.commitTransaction(); + BaseTable baseTable = (BaseTable) catalog.loadTable(tableIdent); + Assertions.assertInstanceOf(TestMetricsReporter.class, baseTable.reporter()); + TestMetricsReporter reporter = (TestMetricsReporter) baseTable.reporter(); + Assertions.assertTrue(reporter.initialized); + } finally { + catalog.dropTable(tableIdent); + } + } + @Test public void testReplaceTxnBuilder() throws Exception { Schema schema = getTestSchema(); @@ -1176,4 +1201,16 @@ public void testDatabaseLocationWithSlashInWarehouseDir() { Assert.assertEquals("s3://bucket/database.db", database.getLocationUri()); } + + public static final class TestMetricsReporter implements MetricsReporter { + boolean initialized = false; + + @Override + public void init(Map properties) { + initialized = true; + } + + @Override + public void report(MetricsReport report) {} + } } From ec0d162de31c73f4c25cee7450d3bfef9b4f03b8 Mon Sep 17 00:00:00 2001 From: zaikhan Date: Wed, 21 Dec 2022 09:38:12 +0530 Subject: [PATCH 3/6] review feedback --- .../iceberg/metrics/MetricsReporter.java | 6 --- .../apache/iceberg/BaseMetastoreCatalog.java | 1 - .../java/org/apache/iceberg/BaseTable.java | 4 -- .../iceberg/rest/RESTSessionCatalog.java | 4 +- .../apache/iceberg/hive/TestHiveCatalog.java | 37 ------------------- 5 files changed, 1 insertion(+), 51 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java index 27a6694115b2..5fae755bbe0e 100644 --- a/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java +++ b/api/src/main/java/org/apache/iceberg/metrics/MetricsReporter.java @@ -18,16 +18,10 @@ */ package org.apache.iceberg.metrics; -import java.util.Map; - /** This interface defines the basic API for reporting metrics for operations to a Table. */ @FunctionalInterface public interface MetricsReporter { - default void init(Map properties) { - // To allow configuring metric reporter. - } - /** * Indicates that an operation is done by reporting a {@link MetricsReport}. A {@link * MetricsReport} is usually directly derived from a {@link MetricsReport} instance. diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 5f2e8e5adf9e..351638bd33ab 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -58,7 +58,6 @@ public Table loadTable(TableIdentifier identifier) { ? CatalogUtil.loadMetricsReporter( properties().get(CatalogProperties.METRICS_REPORTER_IMPL)) : LoggingMetricsReporter.instance(); - metricsReporter.init(properties()); result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter); } diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 84a02bf425af..bfa9cb7ce519 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -63,10 +63,6 @@ public String name() { return name; } - public MetricsReporter reporter() { - return reporter; - } - @Override public void refresh() { ops.refresh(); diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java index 536577f58da4..8dd9aee95389 100644 --- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java +++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java @@ -312,9 +312,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) { new BaseTable( ops, fullTableName(loadedIdent), - null != customMetricsReporter - ? customMetricsReporter - : report -> reportMetrics(tableIdentifier, report, session::headers)); + report -> reportMetrics(tableIdentifier, report, session::headers)); if (metadataType != null) { return MetadataTableUtils.createMetadataTableInstance(table, metadataType); } diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java index fd3be4847311..a9cae2b28b60 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCatalog.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.iceberg.AssertHelpers; -import org.apache.iceberg.BaseTable; import org.apache.iceberg.CachingCatalog; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -70,8 +69,6 @@ import org.apache.iceberg.exceptions.NamespaceNotEmptyException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.metrics.MetricsReport; -import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -201,28 +198,6 @@ public void testCreateTableTxnBuilder() throws Exception { } } - @Test - public void testLoadTableWithCustomMetricReporter() throws Exception { - Schema schema = getTestSchema(); - TableIdentifier tableIdent = TableIdentifier.of(DB_NAME, "tbl"); - String location = temp.newFolder("tbl").toString(); - - HiveCatalog catalog = new HiveCatalog(); - catalog.initialize( - "hive", ImmutableMap.of("metrics-reporter-impl", TestMetricsReporter.class.getName())); - try { - Transaction txn = - catalog.buildTable(tableIdent, schema).withLocation(location).createTransaction(); - txn.commitTransaction(); - BaseTable baseTable = (BaseTable) catalog.loadTable(tableIdent); - Assertions.assertInstanceOf(TestMetricsReporter.class, baseTable.reporter()); - TestMetricsReporter reporter = (TestMetricsReporter) baseTable.reporter(); - Assertions.assertTrue(reporter.initialized); - } finally { - catalog.dropTable(tableIdent); - } - } - @Test public void testReplaceTxnBuilder() throws Exception { Schema schema = getTestSchema(); @@ -1201,16 +1176,4 @@ public void testDatabaseLocationWithSlashInWarehouseDir() { Assert.assertEquals("s3://bucket/database.db", database.getLocationUri()); } - - public static final class TestMetricsReporter implements MetricsReporter { - boolean initialized = false; - - @Override - public void init(Map properties) { - initialized = true; - } - - @Override - public void report(MetricsReport report) {} - } } From bea52f7181cb69b9f067f37f38f7067c668d87c3 Mon Sep 17 00:00:00 2001 From: zaikhan Date: Wed, 21 Dec 2022 22:31:49 +0530 Subject: [PATCH 4/6] review feedback --- .../apache/iceberg/BaseMetastoreCatalog.java | 20 ++++++--- .../org/apache/iceberg/jdbc/JdbcCatalog.java | 7 ++- .../apache/iceberg/jdbc/TestJdbcCatalog.java | 45 +++++++++++++++++++ 3 files changed, 65 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index 351638bd33ab..c9c38f599f34 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -38,6 +38,8 @@ public abstract class BaseMetastoreCatalog implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(BaseMetastoreCatalog.class); + private MetricsReporter metricsReporter; + @Override public Table loadTable(TableIdentifier identifier) { Table result; @@ -53,12 +55,7 @@ public Table loadTable(TableIdentifier identifier) { } } else { - MetricsReporter metricsReporter = - properties().containsKey(CatalogProperties.METRICS_REPORTER_IMPL) - ? CatalogUtil.loadMetricsReporter( - properties().get(CatalogProperties.METRICS_REPORTER_IMPL)) - : LoggingMetricsReporter.instance(); - result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter); + result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter()); } } else if (isValidMetadataIdentifier(identifier)) { @@ -308,4 +305,15 @@ protected static String fullTableName(String catalogName, TableIdentifier identi return sb.toString(); } + + private synchronized MetricsReporter metricsReporter() { + if (metricsReporter == null) { + metricsReporter = + properties().containsKey(CatalogProperties.METRICS_REPORTER_IMPL) + ? CatalogUtil.loadMetricsReporter( + properties().get(CatalogProperties.METRICS_REPORTER_IMPL)) + : LoggingMetricsReporter.instance(); + } + return metricsReporter; + } } diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index 58cb65f9180e..84de51dd0830 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -89,7 +89,7 @@ public void initialize(String name, Map properties) { "Cannot initialize JDBCCatalog because warehousePath must not be null or empty"); this.warehouseLocation = LocationUtil.stripTrailingSlash(inputWarehouseLocation); - this.catalogProperties = properties; + this.catalogProperties = ImmutableMap.copyOf(properties); if (name != null) { this.catalogName = name; @@ -464,6 +464,11 @@ public boolean namespaceExists(Namespace namespace) { return JdbcUtil.namespaceExists(catalogName, connections, namespace); } + @Override + protected Map properties() { + return catalogProperties == null ? ImmutableMap.of() : catalogProperties; + } + private int execute(String sql, String... args) { return execute(err -> {}, sql, args); } diff --git a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java index 1b41433dff07..aa9dc2ad302e 100644 --- a/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java +++ b/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; @@ -41,6 +42,8 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -56,6 +59,9 @@ import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.hadoop.Util; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.MetricsReport; +import org.apache.iceberg.metrics.MetricsReporter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -795,4 +801,43 @@ public void testRegisterExistingTable() { .hasMessage("Table already exists: a.t1"); Assertions.assertThat(catalog.dropTable(identifier)).isTrue(); } + + @Test + public void testCatalogWithCustomMetricsReporter() throws IOException { + JdbcCatalog catalogWithCustomReporter = + initCatalog( + "test_jdbc_catalog_with_custom_reporter", + ImmutableMap.of( + CatalogProperties.METRICS_REPORTER_IMPL, CustomMetricsReporter.class.getName())); + try { + catalogWithCustomReporter.buildTable(TABLE, SCHEMA).create(); + Table table = catalogWithCustomReporter.loadTable(TABLE); + table + .newFastAppend() + .appendFile( + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(FileFormat.PARQUET.addExtension(UUID.randomUUID().toString())) + .withFileSizeInBytes(10) + .withRecordCount(2) + .build()) + .commit(); + try (CloseableIterable tasks = table.newScan().planFiles()) { + Assertions.assertThat(tasks.iterator()).hasNext(); + } + } finally { + catalogWithCustomReporter.dropTable(TABLE); + } + // counter of custom metrics reporter should have been increased + // 1x for commit metrics / 1x for scan metrics + Assertions.assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(2); + } + + public static class CustomMetricsReporter implements MetricsReporter { + static final AtomicInteger COUNTER = new AtomicInteger(0); + + @Override + public void report(MetricsReport report) { + COUNTER.incrementAndGet(); + } + } } From 1f5e9e65cdc45456749ddc5b6ba804cc4e56b580 Mon Sep 17 00:00:00 2001 From: zaikhan Date: Thu, 22 Dec 2022 08:41:18 +0530 Subject: [PATCH 5/6] review feedback --- .../src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index c9c38f599f34..742a332deccc 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -306,7 +306,7 @@ protected static String fullTableName(String catalogName, TableIdentifier identi return sb.toString(); } - private synchronized MetricsReporter metricsReporter() { + private MetricsReporter metricsReporter() { if (metricsReporter == null) { metricsReporter = properties().containsKey(CatalogProperties.METRICS_REPORTER_IMPL) @@ -314,6 +314,7 @@ private synchronized MetricsReporter metricsReporter() { properties().get(CatalogProperties.METRICS_REPORTER_IMPL)) : LoggingMetricsReporter.instance(); } + return metricsReporter; } } From 8eb8d93446e4ad75d6dcc2f6dbae55e00b17caff Mon Sep 17 00:00:00 2001 From: zaikhan Date: Mon, 6 Feb 2023 15:51:47 +0530 Subject: [PATCH 6/6] rebased master --- core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java index fa1364574186..8ecbfa5373fd 100644 --- a/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java +++ b/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java @@ -464,11 +464,6 @@ public boolean namespaceExists(Namespace namespace) { return JdbcUtil.namespaceExists(catalogName, connections, namespace); } - @Override - protected Map properties() { - return catalogProperties == null ? ImmutableMap.of() : catalogProperties; - } - private int execute(String sql, String... args) { return execute(err -> {}, sql, args); }