Skip to content

Commit

Permalink
[Feature][Catalog] Add InMemoryCatalog for test and add new getCatalo…
Browse files Browse the repository at this point in the history
…gTableFromConfig method (apache#5485)
  • Loading branch information
Hisoka-X authored Sep 19, 2023
1 parent c230709 commit cae66b6
Show file tree
Hide file tree
Showing 9 changed files with 484 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ default List<CatalogTable> getTables(ReadonlyConfig config) throws CatalogExcept
// Get the list of specified tables
List<String> tableNames = config.get(CatalogOptions.TABLE_NAMES);
List<CatalogTable> catalogTables = new ArrayList<>();
if (tableNames != null && tableNames.size() >= 1) {
if (tableNames != null && !tableNames.isEmpty()) {
for (String tableName : tableNames) {
TablePath tablePath = TablePath.of(tableName);
if (this.tableExists(tablePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package org.apache.seatunnel.api.table.catalog;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -56,11 +54,4 @@ public interface CatalogOptions {
.withDescription(
"The table names RegEx of the database to capture."
+ "The table name needs to include the database name, for example: database_.*\\.table_.*");

OptionRule.Builder BASE_RULE =
OptionRule.builder()
.optional(CommonOptions.FACTORY_ID)
.optional(NAME)
.optional(DATABASE_PATTERN)
.exclusive(TABLE_PATTERN, TABLE_NAMES);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -92,6 +93,8 @@ public static CatalogTable getCatalogTable(String tableName, SeaTunnelRowType ro
"It is converted from RowType and only has column information.");
}

// TODO remove this method after https://github.com/apache/seatunnel/issues/5483 done.
@Deprecated
public static List<CatalogTable> getCatalogTables(Config config, ClassLoader classLoader) {
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
Map<String, String> catalogOptions =
Expand Down Expand Up @@ -132,6 +135,65 @@ public static List<CatalogTable> getCatalogTables(Config config, ClassLoader cla
.orElse(Collections.emptyList());
}

/**
* Get catalog table from config, if schema is specified, return a catalog table with specified
* schema, otherwise, return a catalog table with schema from catalog.
*
* @deprecated DO NOT invoke it in any new TableSourceFactory/TableSinkFactory, please directly
* use TableSourceFactory/TableSinkFactory instance to get CatalogTable. We just use it to
* transition the old CatalogTable creation logic. Details please <a
* href="https://cwiki.apache.org/confluence/display/SEATUNNEL/STIP5-Refactor+Catalog+and+CatalogTable">check
* </a>
*/
@Deprecated
public static List<CatalogTable> getCatalogTablesFromConfig(
Config config, ClassLoader classLoader) {
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);

// We use plugin_name as factoryId, so MySQL-CDC should be MySQL
String factoryId = readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
// Highest priority: specified schema
Map<String, String> schemaMap = readonlyConfig.get(CatalogTableUtil.SCHEMA);
if (schemaMap != null) {
if (schemaMap.isEmpty()) {
throw new SeaTunnelException("Schema config can not be empty");
}
CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(config).getCatalogTable();
return Collections.singletonList(catalogTable);
}

Optional<Catalog> optionalCatalog =
FactoryUtil.createOptionalCatalog(
factoryId, readonlyConfig, classLoader, factoryId);
return optionalCatalog
.map(
c -> {
long startTime = System.currentTimeMillis();
try (Catalog catalog = c) {
catalog.open();
List<CatalogTable> catalogTables =
catalog.getTables(readonlyConfig);
log.info(
String.format(
"Get catalog tables, cost time: %d",
System.currentTimeMillis() - startTime));
if (catalogTables.isEmpty()) {
throw new SeaTunnelException(
String.format(
"Can not find catalog table with factoryId [%s]",
factoryId));
}
return catalogTables;
}
})
.orElseThrow(
() ->
new SeaTunnelException(
String.format(
"Can not find catalog with factoryId [%s]",
factoryId)));
}

public static CatalogTableUtil buildWithConfig(Config config) {
CheckResult checkResult = CheckConfigUtil.checkAllExists(config, "schema");
if (!checkResult.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public static TableIdentifier of(String catalogName, String databaseName, String
return new TableIdentifier(catalogName, databaseName, null, tableName);
}

public static TableIdentifier of(String catalogName, TablePath tablePath) {
return new TableIdentifier(
catalogName,
tablePath.getDatabaseName(),
tablePath.getSchemaName(),
tablePath.getTableName());
}

public static TableIdentifier of(
String catalogName, String databaseName, String schemaName, String tableName) {
return new TableIdentifier(catalogName, databaseName, schemaName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
Expand All @@ -28,6 +28,7 @@
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.utils.SeaTunnelException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand All @@ -37,17 +38,17 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

import static org.apache.seatunnel.api.table.catalog.CatalogOptions.TABLE_NAMES;
import static org.apache.seatunnel.common.constants.CollectionConstants.PLUGIN_NAME;

public class CatalogTableUtilTest {
@Test
public void testSimpleSchemaParse() throws FileNotFoundException, URISyntaxException {
String path = getTestConfigFile("/conf/simple.schema.conf");
Config config =
ConfigFactory.parseFile(new File(path))
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(
ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
Config config = ConfigFactory.parseFile(new File(path));
SeaTunnelRowType seaTunnelRowType =
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
Assertions.assertNotNull(seaTunnelRowType);
Expand All @@ -61,12 +62,7 @@ public void testSimpleSchemaParse() throws FileNotFoundException, URISyntaxExcep
@Test
public void testComplexSchemaParse() throws FileNotFoundException, URISyntaxException {
String path = getTestConfigFile("/conf/complex.schema.conf");
Config config =
ConfigFactory.parseFile(new File(path))
.resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
.resolveWith(
ConfigFactory.systemProperties(),
ConfigResolveOptions.defaults().setAllowUnresolved(true));
Config config = ConfigFactory.parseFile(new File(path));
SeaTunnelRowType seaTunnelRowType =
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
Assertions.assertNotNull(seaTunnelRowType);
Expand All @@ -88,6 +84,41 @@ public void testComplexSchemaParse() throws FileNotFoundException, URISyntaxExce
"row", nestedRowFieldType.getFieldName(nestedRowFieldType.indexOf("row")));
}

@Test
public void testCatalogUtilGetCatalogTable() throws FileNotFoundException, URISyntaxException {
String path = getTestConfigFile("/conf/getCatalogTable.conf");
Config config = ConfigFactory.parseFile(new File(path));
Config source = config.getConfigList("source").get(0);
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTablesFromConfig(
source, Thread.currentThread().getContextClassLoader());
Assertions.assertEquals(2, catalogTables.size());
Assertions.assertEquals(
TableIdentifier.of("InMemory", TablePath.of("st.public.table1")),
catalogTables.get(0).getTableId());
Assertions.assertEquals(
TableIdentifier.of("InMemory", TablePath.of("st.public.table2")),
catalogTables.get(1).getTableId());
// test empty tables
Config emptyTableSource =
source.withValue(
TABLE_NAMES.key(), ConfigValueFactory.fromIterable(new ArrayList<>()));
Assertions.assertThrows(
SeaTunnelException.class,
() ->
CatalogTableUtil.getCatalogTablesFromConfig(
emptyTableSource, Thread.currentThread().getContextClassLoader()));
// test unknown catalog
Config cannotFindCatalogSource =
source.withValue(PLUGIN_NAME, ConfigValueFactory.fromAnyRef("unknownCatalog"));
Assertions.assertThrows(
SeaTunnelException.class,
() ->
CatalogTableUtil.getCatalogTablesFromConfig(
cannotFindCatalogSource,
Thread.currentThread().getContextClassLoader()));
}

public static String getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
URL resource = CatalogTableUtilTest.class.getResource(configFile);
Expand Down
Loading

0 comments on commit cae66b6

Please sign in to comment.