Skip to content

Commit

Permalink
[HotFix] Re-add table-filter to Server ExternalCatalog (apache#2310)
Browse files Browse the repository at this point in the history
* re add table filter

* implement in external catalog

* add ut case

* fix comment

* fix comment

* fix comment

* fix ut

* fix update properties

* roll back the engine side's filter

* resolve conflicts

* add ut

---------

Co-authored-by: baiyangtx <xiangnebula@163.com>
Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
  • Loading branch information
3 people authored and ShawHee committed Dec 29, 2023
1 parent e2e16f8 commit fc4120e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.netease.arctic.UnifiedCatalog;
import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.ams.api.properties.CatalogMetaProperties;
import com.netease.arctic.server.persistence.mapper.TableMetaMapper;
import com.netease.arctic.server.table.ServerTableIdentifier;
import com.netease.arctic.table.TableMetaStore;
Expand All @@ -15,19 +16,22 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class ExternalCatalog extends ServerCatalog {

UnifiedCatalog unifiedCatalog;
TableMetaStore tableMetaStore;
private Pattern tableFilterPattern;

protected ExternalCatalog(CatalogMeta metadata) {
super(metadata);
this.tableMetaStore = CatalogUtil.buildMetaStore(metadata);
this.unifiedCatalog =
this.tableMetaStore.doAs(
() -> new CommonUnifiedCatalog(this::getMetadata, Maps.newHashMap()));
updateTableFilter(metadata);
}

public void syncTable(String database, String tableName, TableFormat format) {
Expand All @@ -54,6 +58,7 @@ public void updateMetadata(CatalogMeta metadata) {
super.updateMetadata(metadata);
this.tableMetaStore = CatalogUtil.buildMetaStore(metadata);
this.unifiedCatalog.refresh();
updateTableFilter(metadata);
}

@Override
Expand Down Expand Up @@ -83,14 +88,37 @@ public List<TableIDWithFormat> listTables() {

@Override
public List<TableIDWithFormat> listTables(String database) {
return doAs(() -> new ArrayList<>(unifiedCatalog.listTables(database)));
return doAs(
() ->
new ArrayList<>(
unifiedCatalog.listTables(database).stream()
.filter(
tableIDWithFormat ->
tableFilterPattern == null
|| tableFilterPattern
.matcher(
(database
+ "."
+ tableIDWithFormat.getIdentifier().getTableName()))
.matches())
.collect(Collectors.toList())));
}

@Override
public AmoroTable<?> loadTable(String database, String tableName) {
return doAs(() -> unifiedCatalog.loadTable(database, tableName));
}

private void updateTableFilter(CatalogMeta metadata) {
String tableFilter =
metadata.getCatalogProperties().get(CatalogMetaProperties.KEY_TABLE_FILTER);
if (tableFilter != null) {
tableFilterPattern = Pattern.compile(tableFilter);
} else {
tableFilterPattern = null;
}
}

private <T> T doAs(Callable<T> callable) {
return tableMetaStore.doAs(callable);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

package com.netease.arctic.server.catalog;

import com.netease.arctic.ams.api.CatalogMeta;
import com.netease.arctic.ams.api.properties.CatalogMetaProperties;
import com.netease.arctic.formats.AmoroCatalogTestHelper;
import com.netease.arctic.formats.IcebergHadoopCatalogTestHelper;
import com.netease.arctic.formats.MixedIcebergHadoopCatalogTestHelper;
import com.netease.arctic.formats.PaimonHadoopCatalogTestHelper;
import com.netease.arctic.hive.formats.IcebergHiveCatalogTestHelper;
import com.netease.arctic.hive.formats.MixedIcebergHiveCatalogTestHelper;
import com.netease.arctic.hive.formats.PaimonHiveCatalogTestHelper;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -46,7 +51,9 @@ public static Object[] parameters() {
PaimonHadoopCatalogTestHelper.defaultHelper(),
PaimonHiveCatalogTestHelper.defaultHelper(),
IcebergHadoopCatalogTestHelper.defaultHelper(),
IcebergHiveCatalogTestHelper.defaultHelper()
IcebergHiveCatalogTestHelper.defaultHelper(),
MixedIcebergHadoopCatalogTestHelper.defaultHelper(),
MixedIcebergHiveCatalogTestHelper.defaultHelper()
};
}

Expand All @@ -58,33 +65,76 @@ public void setUp() throws Exception {

@Test
public void listDatabases() {
Assert.assertTrue(getExternalCatalog().listDatabases().contains(testDatabaseName));
Assert.assertTrue(getServerCatalog().listDatabases().contains(testDatabaseName));
}

@Test
public void dataBaseExists() {
Assert.assertTrue(getExternalCatalog().exist(testDatabaseName));
Assert.assertTrue(getServerCatalog().exist(testDatabaseName));
}

@Test
public void tableExists() {
Assert.assertTrue(getExternalCatalog().exist(testDatabaseName, testTableName));
Assert.assertTrue(getServerCatalog().exist(testDatabaseName, testTableName));
}

@Test
public void listTables() {
Assert.assertEquals(1, getExternalCatalog().listTables(testDatabaseName).size());
Assert.assertEquals(1, getServerCatalog().listTables(testDatabaseName).size());
Assert.assertEquals(
testTableName,
getExternalCatalog().listTables(testDatabaseName).get(0).getIdentifier().getTableName());
getServerCatalog().listTables(testDatabaseName).get(0).getIdentifier().getTableName());
}

@Test
public void listTablesWithTableFilter() throws Exception {
// Table filter only affects ExternalCatalog
Assume.assumeTrue(getServerCatalog() instanceof ExternalCatalog);
String dbWithFilter = "db_with_filter";
String tableWithFilter1 = "test_table1";
String tableWithFilter2 = "test_table2";
getAmoroCatalog().createDatabase(dbWithFilter);
getAmoroCatalogTestHelper().createTable(dbWithFilter, tableWithFilter1);
getAmoroCatalogTestHelper().createTable(dbWithFilter, tableWithFilter2);
// without table filter
Assert.assertEquals(2, getServerCatalog().listTables(dbWithFilter).size());

CatalogMeta metadata = getServerCatalog().getMetadata();
metadata
.getCatalogProperties()
.put(CatalogMetaProperties.KEY_TABLE_FILTER, dbWithFilter + "." + tableWithFilter1);
getServerCatalog().updateMetadata(metadata);
Assert.assertEquals(1, getServerCatalog().listTables(dbWithFilter).size());
Assert.assertEquals(
tableWithFilter1,
getServerCatalog().listTables(dbWithFilter).get(0).getIdentifier().getTableName());

CatalogMeta metadata2 = getServerCatalog().getMetadata();
metadata
.getCatalogProperties()
.put(CatalogMetaProperties.KEY_TABLE_FILTER, dbWithFilter + "\\." + ".+");
getServerCatalog().updateMetadata(metadata2);
Assert.assertEquals(2, getServerCatalog().listTables(dbWithFilter).size());

CatalogMeta metadata3 = getServerCatalog().getMetadata();
metadata
.getCatalogProperties()
.put(CatalogMetaProperties.KEY_TABLE_FILTER, testDatabaseName + "\\." + ".+");
getServerCatalog().updateMetadata(metadata3);
Assert.assertEquals(1, getServerCatalog().listTables(testDatabaseName).size());
Assert.assertTrue(getServerCatalog().listTables(dbWithFilter).isEmpty());

CatalogMeta metadata4 = getServerCatalog().getMetadata();
metadata.getCatalogProperties().remove(CatalogMetaProperties.KEY_TABLE_FILTER);
getServerCatalog().updateMetadata(metadata4);
}

@Test
public void loadTable() {
Assert.assertNotNull(getExternalCatalog().loadTable(testDatabaseName, testTableName));
Assert.assertNotNull(getServerCatalog().loadTable(testDatabaseName, testTableName));
}

private ServerCatalog getExternalCatalog() {
private ServerCatalog getServerCatalog() {
return tableService().getServerCatalog(getAmoroCatalogTestHelper().catalogName());
}
}

0 comments on commit fc4120e

Please sign in to comment.