Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ public Transaction createTransaction() {
tableProperties.putAll(tableOverrideProperties());
TableMetadata metadata =
TableMetadata.newTableMetadata(schema, spec, sortOrder, baseLocation, tableProperties);
return Transactions.createTableTransaction(identifier.toString(), ops, metadata);
return Transactions.createTableTransaction(
identifier.toString(), ops, metadata, metricsReporter());
}

@Override
Expand Down Expand Up @@ -249,9 +250,11 @@ private Transaction newReplaceTableTransaction(boolean orCreate) {
}

if (orCreate) {
return Transactions.createOrReplaceTableTransaction(identifier.toString(), ops, metadata);
return Transactions.createOrReplaceTableTransaction(
identifier.toString(), ops, metadata, metricsReporter());
} else {
return Transactions.replaceTableTransaction(identifier.toString(), ops, metadata);
return Transactions.replaceTableTransaction(
identifier.toString(), ops, metadata, metricsReporter());
}
}

Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/Transactions.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ public static Transaction createOrReplaceTableTransaction(
return new BaseTransaction(tableName, ops, TransactionType.CREATE_OR_REPLACE_TABLE, start);
}

public static Transaction createOrReplaceTableTransaction(
String tableName, TableOperations ops, TableMetadata start, MetricsReporter reporter) {
return new BaseTransaction(
tableName, ops, TransactionType.CREATE_OR_REPLACE_TABLE, start, reporter);
}

public static Transaction replaceTableTransaction(
String tableName, TableOperations ops, TableMetadata start) {
return new BaseTransaction(tableName, ops, TransactionType.REPLACE_TABLE, start);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class InMemoryCatalog extends BaseMetastoreViewCatalog
private String catalogName;
private String warehouseLocation;
private CloseableGroup closeableGroup;
private Map<String, String> catalogProperties;

public InMemoryCatalog() {
this.namespaces = Maps.newConcurrentMap();
Expand All @@ -85,6 +86,7 @@ public String name() {
@Override
public void initialize(String name, Map<String, String> properties) {
this.catalogName = name != null ? name : InMemoryCatalog.class.getSimpleName();
this.catalogProperties = ImmutableMap.copyOf(properties);

String warehouse = properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
this.warehouseLocation = warehouse.replaceAll("/*$", "");
Expand Down Expand Up @@ -368,6 +370,11 @@ public void renameView(TableIdentifier from, TableIdentifier to) {
}
}

@Override
protected Map<String, String> properties() {
return catalogProperties == null ? ImmutableMap.of() : catalogProperties;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is needed so that the metrics reporter is properly initialized

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this!

}

private class InMemoryTableOperations extends BaseMetastoreTableOperations {
private final FileIO fileIO;
private final TableIdentifier tableIdentifier;
Expand Down
90 changes: 90 additions & 0 deletions core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.FilesTable;
import org.apache.iceberg.HasTableOperations;
Expand All @@ -56,6 +59,10 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.CommitReport;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -144,6 +151,8 @@ public abstract class CatalogTests<C extends Catalog & SupportsNamespaces> {

protected abstract C catalog();

protected abstract C initCatalog(String catalogName, Map<String, String> additionalProperties);

protected boolean supportsNamespaceProperties() {
return true;
}
Expand Down Expand Up @@ -2695,6 +2704,87 @@ public void testRegisterExistingTable() {
assertThat(catalog.dropTable(identifier)).isTrue();
}

@Test
public void testCatalogWithCustomMetricsReporter() throws IOException {
C catalogWithCustomReporter =
initCatalog(
"catalog_with_custom_reporter",
ImmutableMap.of(
CatalogProperties.METRICS_REPORTER_IMPL, CustomMetricsReporter.class.getName()));

if (requiresNamespaceCreate()) {
catalogWithCustomReporter.createNamespace(TABLE.namespace());
}

catalogWithCustomReporter.buildTable(TABLE, SCHEMA).create();

Table table = catalogWithCustomReporter.loadTable(TABLE);
DataFile dataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath(FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()))
.withFileSizeInBytes(10)
.withRecordCount(2)
.build();

// append file through FastAppend and check and reset counter
table.newFastAppend().appendFile(dataFile).commit();
assertThat(CustomMetricsReporter.COMMIT_COUNTER.get()).isEqualTo(1);
CustomMetricsReporter.COMMIT_COUNTER.set(0);

TableIdentifier identifier = TableIdentifier.of(NS, "custom_metrics_reporter_table");
// append file through createTransaction() and check and reset counter
catalogWithCustomReporter
.buildTable(identifier, SCHEMA)
.createTransaction()
.newFastAppend()
.appendFile(dataFile)
.commit();
assertThat(CustomMetricsReporter.COMMIT_COUNTER.get()).isEqualTo(1);
CustomMetricsReporter.COMMIT_COUNTER.set(0);

// append file through createOrReplaceTransaction() and check and reset counter
catalogWithCustomReporter
.buildTable(identifier, SCHEMA)
.createOrReplaceTransaction()
.newFastAppend()
.appendFile(dataFile)
.commit();
assertThat(CustomMetricsReporter.COMMIT_COUNTER.get()).isEqualTo(1);
CustomMetricsReporter.COMMIT_COUNTER.set(0);

// append file through replaceTransaction() and check and reset counter
catalogWithCustomReporter
.buildTable(TABLE, SCHEMA)
.replaceTransaction()
.newFastAppend()
.appendFile(dataFile)
.commit();
assertThat(CustomMetricsReporter.COMMIT_COUNTER.get()).isEqualTo(1);
CustomMetricsReporter.COMMIT_COUNTER.set(0);

try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
assertThat(tasks.iterator()).hasNext();
}

assertThat(CustomMetricsReporter.SCAN_COUNTER.get()).isEqualTo(1);
// reset counter in case subclasses run this test multiple times
CustomMetricsReporter.SCAN_COUNTER.set(0);
}

public static class CustomMetricsReporter implements MetricsReporter {
static final AtomicInteger SCAN_COUNTER = new AtomicInteger(0);
static final AtomicInteger COMMIT_COUNTER = new AtomicInteger(0);

@Override
public void report(MetricsReport report) {
if (report instanceof ScanReport) {
SCAN_COUNTER.incrementAndGet();
} else if (report instanceof CommitReport) {
COMMIT_COUNTER.incrementAndGet();
}
}
}

private static void assertEmpty(String context, Catalog catalog, Namespace ns) {
try {
assertThat(catalog.listTables(ns)).as(context).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.inmemory;

import java.util.Map;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -27,15 +28,22 @@ public class TestInMemoryCatalog extends CatalogTests<InMemoryCatalog> {

@BeforeEach
public void before() {
this.catalog = new InMemoryCatalog();
this.catalog.initialize("in-memory-catalog", ImmutableMap.of());
this.catalog = initCatalog("in-memory-catalog", ImmutableMap.of());
}

@Override
protected InMemoryCatalog catalog() {
return catalog;
}

@Override
protected InMemoryCatalog initCatalog(
String catalogName, Map<String, String> additionalProperties) {
InMemoryCatalog cat = new InMemoryCatalog();
cat.initialize(catalogName, additionalProperties);
return cat;
}

@Override
protected boolean requiresNamespaceCreate() {
return true;
Expand Down
50 changes: 3 additions & 47 deletions core/src/test/java/org/apache/iceberg/jdbc/TestJdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -50,8 +49,6 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand All @@ -68,9 +65,6 @@
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -139,7 +133,8 @@ public void setupTable() throws Exception {
catalog = initCatalog("test_jdbc_catalog", Maps.newHashMap());
}

private JdbcCatalog initCatalog(String catalogName, Map<String, String> props) {
@Override
protected JdbcCatalog initCatalog(String catalogName, Map<String, String> additionalProperties) {
Map<String, String> properties = Maps.newHashMap();
properties.put(
CatalogProperties.URI,
Expand All @@ -150,7 +145,7 @@ private JdbcCatalog initCatalog(String catalogName, Map<String, String> props) {
warehouseLocation = this.tableDir.toAbsolutePath().toString();
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);
properties.put("type", "jdbc");
properties.putAll(props);
properties.putAll(additionalProperties);

return (JdbcCatalog) CatalogUtil.buildIcebergCatalog(catalogName, properties, conf);
}
Expand Down Expand Up @@ -1059,36 +1054,6 @@ public void testConversions() {
assertThat(JdbcUtil.stringToNamespace(nsString)).isEqualTo(ns);
}

@Test
public void testCatalogWithCustomMetricsReporter() throws IOException {
JdbcCatalog catalogWithCustomReporter =
initCatalog(
"test_jdbc_catalog_with_custom_reporter",
ImmutableMap.of(
CatalogProperties.METRICS_REPORTER_IMPL, CustomMetricsReporter.class.getName()));
try {
catalogWithCustomReporter.buildTable(TABLE, SCHEMA).create();
Table table = catalogWithCustomReporter.loadTable(TABLE);
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath(FileFormat.PARQUET.addExtension(UUID.randomUUID().toString()))
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
assertThat(tasks.iterator()).hasNext();
}
} finally {
catalogWithCustomReporter.dropTable(TABLE);
}
// counter of custom metrics reporter should have been increased
// 1x for commit metrics / 1x for scan metrics
assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(2);
}

@Test
public void testCommitExceptionWithoutMessage() {
TableIdentifier tableIdent = TableIdentifier.of("db", "tbl");
Expand Down Expand Up @@ -1129,15 +1094,6 @@ public void testCommitExceptionWithMessage() {
}
}

public static class CustomMetricsReporter implements MetricsReporter {
static final AtomicInteger COUNTER = new AtomicInteger(0);

@Override
public void report(MetricsReport report) {
COUNTER.incrementAndGet();
}
}

private String createMetadataLocationViaJdbcCatalog(TableIdentifier identifier)
throws SQLException {
// temporary connection just to actually create a concrete metadata location
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -38,6 +39,24 @@ protected JdbcCatalog catalog() {
return catalog;
}

@Override
protected JdbcCatalog initCatalog(String catalogName, Map<String, String> additionalProperties) {
Map<String, String> properties = Maps.newHashMap();
properties.put(
CatalogProperties.URI,
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""));
properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, tableDir.toAbsolutePath().toString());
properties.put(JdbcUtil.SCHEMA_VERSION_PROPERTY, JdbcUtil.SchemaVersion.V1.name());
properties.putAll(additionalProperties);

JdbcCatalog cat = new JdbcCatalog();
cat.setConf(new Configuration());
cat.initialize(catalogName, properties);
return cat;
}

@Override
protected boolean supportsNamespaceProperties() {
return true;
Expand All @@ -50,17 +69,6 @@ protected boolean supportsNestedNamespaces() {

@BeforeEach
public void setupCatalog() {
Map<String, String> properties = Maps.newHashMap();
properties.put(
CatalogProperties.URI,
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""));
properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user");
properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, tableDir.toAbsolutePath().toString());
properties.put(JdbcUtil.SCHEMA_VERSION_PROPERTY, JdbcUtil.SchemaVersion.V1.name());

catalog = new JdbcCatalog();
catalog.setConf(new Configuration());
catalog.initialize("testCatalog", properties);
this.catalog = initCatalog("testCatalog", ImmutableMap.of());
}
}
Loading