From c4177bbb90dbb204cba877cac5fc12d827c0da11 Mon Sep 17 00:00:00 2001 From: Clearvive <143773256+Clearvive@users.noreply.github.com> Date: Mon, 20 Nov 2023 11:51:25 +0800 Subject: [PATCH] [#575] feat(jdbc): Support for DataSource and schema operations in JDBC catalog. (#703) ### What changes were proposed in this pull request? We need to add management functionalities for data sources and operations on schemas in jdbc-common. ### Why are the changes needed? Fix: #575 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT --------- Co-authored-by: Clearvive --- catalogs/catalog-jdbc-common/build.gradle.kts | 8 +- .../gravitino/catalog/jdbc/JdbcCatalog.java | 23 ++++- .../catalog/jdbc/JdbcCatalogOperations.java | 97 ++++++++++++++++-- .../jdbc/JdbcCatalogPropertiesMetadata.java | 39 +++++++- .../catalog/jdbc/config/JdbcConfig.java | 75 ++++++++++++++ .../converter/JdbcExceptionConverter.java | 25 +++++ .../jdbc/converter/JdbcTypeConverter.java | 26 +++++ .../jdbc/operation/DatabaseOperation.java | 49 +++++++++ .../operation/JdbcDatabaseOperations.java | 69 +++++++++++++ .../catalog/jdbc/utils/DataSourceUtils.java | 74 ++++++++++++++ .../jdbc/utils/JdbcConnectorUtils.java | 29 ++++++ .../catalog/jdbc/config/TestJdbcConfig.java | 22 +++++ .../converter/SqliteExceptionConverter.java | 23 +++++ .../operation/SqliteDatabaseOperations.java | 91 +++++++++++++++++ .../operation/TestJdbcDatabaseOperations.java | 99 +++++++++++++++++++ .../jdbc/utils/TestDataSourceUtils.java | 30 ++++++ .../java/com/datastrato/gravitino/Config.java | 4 + .../datastrato/gravitino/utils/MapUtils.java | 4 + gradle/libs.versions.toml | 2 + 19 files changed, 778 insertions(+), 11 deletions(-) create mode 100644 catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/config/JdbcConfig.java create mode 100644 catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/converter/JdbcExceptionConverter.java create mode 100644 catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/converter/JdbcTypeConverter.java create mode 100644 catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/DatabaseOperation.java create mode 100644 catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java create mode 100644 catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/DataSourceUtils.java create mode 100644 catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/JdbcConnectorUtils.java create mode 100644 catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/config/TestJdbcConfig.java create mode 100644 catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/converter/SqliteExceptionConverter.java create mode 100644 catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java create mode 100644 catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcDatabaseOperations.java create mode 100644 catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/utils/TestDataSourceUtils.java diff --git a/catalogs/catalog-jdbc-common/build.gradle.kts b/catalogs/catalog-jdbc-common/build.gradle.kts index 64b80026f0e..b10623396eb 100644 --- a/catalogs/catalog-jdbc-common/build.gradle.kts +++ b/catalogs/catalog-jdbc-common/build.gradle.kts @@ -23,6 +23,7 @@ dependencies { implementation(libs.bundles.log4j) implementation(libs.commons.lang3) implementation(libs.commons.collections4) + implementation(libs.commons.dbcp2) implementation(libs.substrait.java.core) { exclude("com.fasterxml.jackson.core") exclude("com.fasterxml.jackson.datatype") @@ -32,6 +33,11 @@ dependencies { exclude("org.slf4j") } + testImplementation(libs.commons.io) + testImplementation(libs.sqlite.jdbc) + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) + testRuntimeOnly(libs.junit.jupiter.engine) compileOnly(libs.lombok) annotationProcessor(libs.lombok) -} \ No newline at end of file +} diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java index 14024fca0fb..577929caed1 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalog.java @@ -6,6 +6,9 @@ import com.datastrato.gravitino.catalog.BaseCatalog; import com.datastrato.gravitino.catalog.CatalogOperations; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations; import com.datastrato.gravitino.rel.SupportsSchemas; import com.datastrato.gravitino.rel.TableCatalog; import java.util.Map; @@ -21,7 +24,12 @@ public abstract class JdbcCatalog extends BaseCatalog { */ @Override protected CatalogOperations newOps(Map config) { - JdbcCatalogOperations ops = new JdbcCatalogOperations(entity()); + JdbcCatalogOperations ops = + new JdbcCatalogOperations( + entity(), + createExceptionConverter(), + createJdbcTypeConverter(), + createJdbcDatabaseOperations()); ops.initialize(config); return ops; } @@ -37,4 +45,17 @@ public SupportsSchemas asSchemas() { public TableCatalog asTableCatalog() { return (JdbcCatalogOperations) ops(); } + + /** @return The {@link JdbcExceptionConverter} to be used by the catalog. */ + protected JdbcExceptionConverter createExceptionConverter() { + return new JdbcExceptionConverter() {}; + } + + /** @return The {@link JdbcTypeConverter} to be used by the catalog. */ + protected abstract JdbcTypeConverter createJdbcTypeConverter(); + + /** + * @return The {@link JdbcDatabaseOperations} to be used by the catalog to manage databases in the + */ + protected abstract JdbcDatabaseOperations createJdbcDatabaseOperations(); } diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java index bc7fb06032d..220a89f4449 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java @@ -4,16 +4,25 @@ */ package com.datastrato.gravitino.catalog.jdbc; +import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX; + import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.StringIdentifier; import com.datastrato.gravitino.catalog.CatalogOperations; import com.datastrato.gravitino.catalog.PropertiesMetadata; +import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter; +import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations; +import com.datastrato.gravitino.catalog.jdbc.utils.DataSourceUtils; import com.datastrato.gravitino.exceptions.NoSuchCatalogException; import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.NoSuchTableException; import com.datastrato.gravitino.exceptions.NonEmptySchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; +import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.meta.CatalogEntity; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.SchemaChange; @@ -24,7 +33,15 @@ import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.datastrato.gravitino.utils.MapUtils; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import javax.sql.DataSource; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +58,31 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas private final CatalogEntity entity; + private final JdbcExceptionConverter exceptionConverter; + + private final JdbcTypeConverter jdbcTypeConverter; + + private final JdbcDatabaseOperations jdbcDatabaseOperations; + + private DataSource dataSource; + /** * Constructs a new instance of JdbcCatalogOperations. * * @param entity The catalog entity associated with this operations instance. + * @param exceptionConverter The exception converter to be used by the operations. + * @param jdbcTypeConverter The type converter to be used by the operations. + * @param jdbcDatabaseOperations The database operations to be used by the operations. */ - public JdbcCatalogOperations(CatalogEntity entity) { + public JdbcCatalogOperations( + CatalogEntity entity, + JdbcExceptionConverter exceptionConverter, + JdbcTypeConverter jdbcTypeConverter, + JdbcDatabaseOperations jdbcDatabaseOperations) { this.entity = entity; + this.exceptionConverter = exceptionConverter; + this.jdbcTypeConverter = jdbcTypeConverter; + this.jdbcDatabaseOperations = jdbcDatabaseOperations; } /** @@ -58,6 +93,12 @@ public JdbcCatalogOperations(CatalogEntity entity) { */ @Override public void initialize(Map conf) throws RuntimeException { + // Key format like gravitino.bypass.a.b + Map resultConf = + Maps.newHashMap(MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX)); + JdbcConfig jdbcConfig = new JdbcConfig(resultConf); + this.dataSource = DataSourceUtils.createDataSource(jdbcConfig); + this.jdbcDatabaseOperations.initialize(dataSource, exceptionConverter); this.jdbcCatalogPropertiesMetadata = new JdbcCatalogPropertiesMetadata(); this.jdbcTablePropertiesMetadata = new JdbcTablePropertiesMetadata(); this.jdbcSchemaPropertiesMetadata = new JdbcSchemaPropertiesMetadata(); @@ -65,7 +106,9 @@ public void initialize(Map conf) throws RuntimeException { /** Closes the Jdbc catalog and releases the associated client pool. */ @Override - public void close() {} + public void close() { + DataSourceUtils.closeDataSource(dataSource); + } /** * Lists the schemas under the given namespace. @@ -76,7 +119,10 @@ public void close() {} */ @Override public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { - throw new UnsupportedOperationException(); + List schemaNames = jdbcDatabaseOperations.list(); + return schemaNames.stream() + .map(db -> NameIdentifier.of(namespace, db)) + .toArray(NameIdentifier[]::new); } /** @@ -93,7 +139,27 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc public JdbcSchema createSchema( NameIdentifier ident, String comment, Map properties) throws NoSuchCatalogException, SchemaAlreadyExistsException { - throw new UnsupportedOperationException(); + StringIdentifier identifier = + Preconditions.checkNotNull( + StringIdentifier.fromProperties(properties), + "The gravitino id attribute does not exist in properties"); + String notAllowedKey = + properties.keySet().stream() + .filter(s -> !StringUtils.equals(s, StringIdentifier.ID_KEY)) + .collect(Collectors.joining(",")); + if (StringUtils.isNotEmpty(notAllowedKey)) { + LOG.warn("The properties [{}] are not allowed to be set in the jdbc schema", notAllowedKey); + } + HashMap resultProperties = Maps.newHashMap(properties); + resultProperties.remove(StringIdentifier.ID_KEY); + jdbcDatabaseOperations.create( + ident.name(), StringIdentifier.addToComment(identifier, comment), resultProperties); + return new JdbcSchema.Builder() + .withName(ident.name()) + .withProperties(resultProperties) + .withComment(comment) + .withAuditInfo(AuditInfo.EMPTY) + .build(); } /** @@ -105,7 +171,23 @@ public JdbcSchema createSchema( */ @Override public JdbcSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + JdbcSchema load = jdbcDatabaseOperations.load(ident.name()); + String comment = load.comment(); + StringIdentifier id = StringIdentifier.fromComment(comment); + if (id == null) { + LOG.warn("The comment {} does not contain gravitino id attribute", comment); + return load; + } else { + Map properties = + load.properties() == null ? Maps.newHashMap() : Maps.newHashMap(load.properties()); + StringIdentifier.addToProperties(id, properties); + return new JdbcSchema.Builder() + .withAuditInfo(load.auditInfo()) + .withName(load.name()) + .withComment(load.comment()) + .withProperties(properties) + .build(); + } } /** @@ -119,7 +201,7 @@ public JdbcSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException @Override public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("jdbc-catalog does not support alter the schema"); } /** @@ -132,7 +214,8 @@ public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes) */ @Override public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { - throw new UnsupportedOperationException(); + jdbcDatabaseOperations.delete(ident.name(), cascade); + return true; } /** diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogPropertiesMetadata.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogPropertiesMetadata.java index 36a40690f58..7e835cf06a2 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogPropertiesMetadata.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogPropertiesMetadata.java @@ -4,15 +4,50 @@ */ package com.datastrato.gravitino.catalog.jdbc; +import static com.datastrato.gravitino.catalog.PropertyEntry.integerPropertyEntry; +import static com.datastrato.gravitino.catalog.PropertyEntry.stringReservedPropertyEntry; + import com.datastrato.gravitino.catalog.BaseCatalogPropertiesMetadata; import com.datastrato.gravitino.catalog.PropertyEntry; -import java.util.Collections; +import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.util.List; import java.util.Map; public class JdbcCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata { + private static final Map> PROPERTIES_METADATA; + + static { + List> propertyEntries = + ImmutableList.of( + stringReservedPropertyEntry( + JdbcConfig.JDBC_URL.getKey(), JdbcConfig.JDBC_URL.getDoc(), true), + stringReservedPropertyEntry( + JdbcConfig.USERNAME.getKey(), JdbcConfig.USERNAME.getDoc(), true), + stringReservedPropertyEntry( + JdbcConfig.PASSWORD.getKey(), JdbcConfig.PASSWORD.getDoc(), true), + integerPropertyEntry( + JdbcConfig.POOL_MIN_SIZE.getKey(), + JdbcConfig.POOL_MIN_SIZE.getDoc(), + false, + false, + JdbcConfig.POOL_MIN_SIZE.getDefaultValue(), + true, + true), + integerPropertyEntry( + JdbcConfig.POOL_MAX_SIZE.getKey(), + JdbcConfig.POOL_MAX_SIZE.getDoc(), + false, + false, + JdbcConfig.POOL_MAX_SIZE.getDefaultValue(), + true, + true)); + PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); + } @Override protected Map> specificPropertyEntries() { - return Collections.emptyMap(); + return PROPERTIES_METADATA; } } diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/config/JdbcConfig.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/config/JdbcConfig.java new file mode 100644 index 00000000000..409ea66bfde --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/config/JdbcConfig.java @@ -0,0 +1,75 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.jdbc.config; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.config.ConfigBuilder; +import com.datastrato.gravitino.config.ConfigEntry; +import java.util.Map; + +public class JdbcConfig extends Config { + + public static final ConfigEntry JDBC_URL = + new ConfigBuilder("jdbc-url") + .doc("The url of the Jdbc connection") + .version("0.3.0") + .stringConf() + .createWithDefault(null); + + public static final ConfigEntry USERNAME = + new ConfigBuilder("jdbc-user") + .doc("The username of the Jdbc connection") + .version("0.3.0") + .stringConf() + .createWithDefault(null); + + public static final ConfigEntry PASSWORD = + new ConfigBuilder("jdbc-password") + .doc("The password of the Jdbc connection") + .version("0.3.0") + .stringConf() + .createWithDefault(null); + + public static final ConfigEntry POOL_MIN_SIZE = + new ConfigBuilder("jdbc.pool.min-size") + .doc("The minimum number of connections in the pool") + .version("0.3.0") + .intConf() + .createWithDefault(2); + + public static final ConfigEntry POOL_MAX_SIZE = + new ConfigBuilder("jdbc.pool.max-size") + .doc("The maximum number of connections in the pool") + .version("0.3.0") + .intConf() + .createWithDefault(10); + + public String getJdbcUrl() { + return get(JDBC_URL); + } + + public String getUsername() { + return get(USERNAME); + } + + public String getPassword() { + return get(PASSWORD); + } + + public int getPoolMinSize() { + return get(POOL_MIN_SIZE); + } + + public int getPoolMaxSize() { + return get(POOL_MAX_SIZE); + } + + public JdbcConfig(Map properties) { + super(false); + loadFromMap(properties, k -> true); + assert null != getJdbcUrl(); + } +} diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/converter/JdbcExceptionConverter.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/converter/JdbcExceptionConverter.java new file mode 100644 index 00000000000..d89f1b23bbd --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/converter/JdbcExceptionConverter.java @@ -0,0 +1,25 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.jdbc.converter; + +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import java.sql.SQLException; + +/** Interface for converter JDBC exceptions to Gravitino exceptions. */ +public abstract class JdbcExceptionConverter { + + /** + * Convert JDBC exception to GravitinoException. + * + * @param sqlException The sql exception to map + * @return A best attempt at a corresponding connector exception or generic with the SQLException + * as the cause + */ + public GravitinoRuntimeException toGravitinoException(final SQLException sqlException) { + // TODO we need to transform specific SQL exceptions into our own exceptions, such as + // SchemaAlreadyExistsException + return new GravitinoRuntimeException(sqlException.getMessage(), sqlException); + } +} diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/converter/JdbcTypeConverter.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/converter/JdbcTypeConverter.java new file mode 100644 index 00000000000..2b378cfc9ea --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/converter/JdbcTypeConverter.java @@ -0,0 +1,26 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.jdbc.converter; + +import io.substrait.type.Type; + +public abstract class JdbcTypeConverter { + + /** + * Convert from Gravitino type to JDBC type + * + * @param type + * @return + */ + abstract Type toGravitinoType(String type); + + /** + * Convert from JDBC type to Gravitino type + * + * @param type + * @return + */ + abstract String fromGravitinoType(Type type); +} diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/DatabaseOperation.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/DatabaseOperation.java new file mode 100644 index 00000000000..61683c37c6c --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/DatabaseOperation.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.jdbc.operation; + +import com.datastrato.gravitino.catalog.jdbc.JdbcSchema; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import java.util.List; +import java.util.Map; +import javax.sql.DataSource; + +public interface DatabaseOperation { + + /** + * Initializes the database operations. + * + * @param dataSource The data source to use for the operations. + * @param exceptionMapper The exception mapper to use for the operations. + * @throws RuntimeException + */ + void initialize(final DataSource dataSource, final JdbcExceptionConverter exceptionMapper) + throws RuntimeException; + + /** + * Creates a database with the given name and comment. + * + * @param databaseName The name of the database to create. + * @param comment The comment of the database to create. + */ + void create(String databaseName, String comment, Map properties); + + /** + * @param databaseName The name of the database to check. + * @param cascade If set to true, drops all the tables in the database as well. + */ + void delete(String databaseName, boolean cascade); + + /** @return The list name of databases. */ + List list(); + + /** + * @param databaseName The name of the database to check. + * @return information object of the JDBC database. + */ + JdbcSchema load(String databaseName) throws NoSuchSchemaException; +} diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java new file mode 100644 index 00000000000..e15084ee307 --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.jdbc.operation; + +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.catalog.jdbc.utils.JdbcConnectorUtils; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Operations for managing databases in a JDBC data store. */ +public abstract class JdbcDatabaseOperations implements DatabaseOperation { + + public static final Logger LOG = LoggerFactory.getLogger(JdbcDatabaseOperations.class); + + protected DataSource dataSource; + protected JdbcExceptionConverter exceptionMapper; + + @Override + public void initialize(final DataSource dataSource, final JdbcExceptionConverter exceptionMapper) + throws RuntimeException { + this.dataSource = dataSource; + this.exceptionMapper = exceptionMapper; + } + + @Override + public void create(String databaseName, String comment, Map properties) { + LOG.info("Beginning to create database {}", databaseName); + try (final Connection connection = this.dataSource.getConnection()) { + JdbcConnectorUtils.executeUpdate( + connection, generateCreateDatabaseSql(databaseName, comment, properties)); + LOG.info("Finished creating database {}", databaseName); + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + @Override + public void delete(String databaseName, boolean cascade) { + LOG.info("Beginning to drop database {}", databaseName); + try (final Connection connection = this.dataSource.getConnection()) { + JdbcConnectorUtils.executeUpdate(connection, generateDropDatabaseSql(databaseName, cascade)); + LOG.info("Finished dropping database {}", databaseName); + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + + /** + * @param databaseName The name of the database to create. + * @param comment The comment of the database to create. + * @return the SQL statement to create a database with the given name and comment. + */ + abstract String generateCreateDatabaseSql( + String databaseName, String comment, Map properties); + + /** + * @param databaseName The name of the database. + * @param cascade cascade If set to true, drops all the tables in the schema as well. + * @return the SQL statement to drop a database with the given name. + */ + abstract String generateDropDatabaseSql(String databaseName, boolean cascade); +} diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/DataSourceUtils.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/DataSourceUtils.java new file mode 100644 index 00000000000..e5744db2383 --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/DataSourceUtils.java @@ -0,0 +1,74 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.jdbc.utils; + +import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig; +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; +import javax.sql.DataSource; +import org.apache.commons.dbcp2.BasicDataSource; +import org.apache.commons.dbcp2.BasicDataSourceFactory; + +/** + * Utility class for creating a {@link DataSource} from a {@link JdbcConfig}. It is mainly + * responsible for creating connection pool management of data sources and configuring some + * connection pools. The apache-dbcp2 connection pool is used here. + */ +public class DataSourceUtils { + + /** SQL statements for database connection pool testing. */ + private static final String POOL_TEST_QUERY = "SELECT 1"; + + public static DataSource createDataSource(Map properties) { + return createDataSource(new JdbcConfig(properties)); + } + + public static DataSource createDataSource(JdbcConfig jdbcConfig) + throws GravitinoRuntimeException { + try { + return createDBCPDataSource(jdbcConfig); + } catch (Exception exception) { + throw new GravitinoRuntimeException("Error creating datasource", exception); + } + } + + private static DataSource createDBCPDataSource(JdbcConfig jdbcConfig) throws Exception { + BasicDataSource basicDataSource = + BasicDataSourceFactory.createDataSource(getProperties(jdbcConfig)); + basicDataSource.setUrl(jdbcConfig.getJdbcUrl()); + if (null != jdbcConfig.getUsername()) { + basicDataSource.setUsername(jdbcConfig.getUsername()); + } + if (null != jdbcConfig.getPassword()) { + basicDataSource.setPassword(jdbcConfig.getPassword()); + } + basicDataSource.setMaxTotal(jdbcConfig.getPoolMaxSize()); + basicDataSource.setMinIdle(jdbcConfig.getPoolMinSize()); + // Set each time a connection is taken out from the connection pool, a test statement will be + // executed to confirm whether the connection is valid. + basicDataSource.setTestOnBorrow(true); + basicDataSource.setValidationQuery(POOL_TEST_QUERY); + return basicDataSource; + } + + private static Properties getProperties(JdbcConfig jdbcConfig) { + Properties properties = new Properties(); + properties.putAll(jdbcConfig.getAllConfig()); + return properties; + } + + public static void closeDataSource(DataSource dataSource) { + if (null != dataSource) { + try { + assert dataSource instanceof BasicDataSource; + ((BasicDataSource) dataSource).close(); + } catch (SQLException ignore) { + // no op + } + } + } +} diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/JdbcConnectorUtils.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/JdbcConnectorUtils.java new file mode 100644 index 00000000000..5ece0e3bf01 --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/utils/JdbcConnectorUtils.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.jdbc.utils; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +public final class JdbcConnectorUtils { + + private JdbcConnectorUtils() {} + + /** + * Execute a SQL update statement against the given datasource. + * + * @param connection The connection to attempt to execute an update against + * @param sql The sql to execute + * @return The number of rows updated or exception + * @throws SQLException on error during execution of the update to the underlying SQL data store + */ + public static int executeUpdate(final Connection connection, final String sql) + throws SQLException { + try (final Statement statement = connection.createStatement()) { + return statement.executeUpdate(sql); + } + } +} diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/config/TestJdbcConfig.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/config/TestJdbcConfig.java new file mode 100644 index 00000000000..2b3987f39be --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/config/TestJdbcConfig.java @@ -0,0 +1,22 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.jdbc.config; + +import com.google.common.collect.Maps; +import java.util.HashMap; +import java.util.NoSuchElementException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestJdbcConfig { + + @Test + public void testCreateDataSourceConfig() { + HashMap properties = Maps.newHashMap(); + Assertions.assertThrows(NoSuchElementException.class, () -> new JdbcConfig(properties)); + properties.put(JdbcConfig.JDBC_URL.getKey(), "jdbc:sqlite::memory:"); + Assertions.assertDoesNotThrow(() -> new JdbcConfig(properties)); + } +} diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/converter/SqliteExceptionConverter.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/converter/SqliteExceptionConverter.java new file mode 100644 index 00000000000..76a92420da6 --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/converter/SqliteExceptionConverter.java @@ -0,0 +1,23 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.jdbc.converter; + +import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; +import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import java.sql.SQLException; + +public class SqliteExceptionConverter extends JdbcExceptionConverter { + + public static final int SCHEMA_ALREADY_EXISTS_CODE = 1; + + @Override + public GravitinoRuntimeException toGravitinoException(SQLException sqlException) { + if (sqlException.getErrorCode() == SCHEMA_ALREADY_EXISTS_CODE) { + return new SchemaAlreadyExistsException(sqlException.getMessage(), sqlException); + } else { + return super.toGravitinoException(sqlException); + } + } +} diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java new file mode 100644 index 00000000000..c389c132ee0 --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java @@ -0,0 +1,91 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.jdbc.operation; + +import static com.datastrato.gravitino.catalog.jdbc.converter.SqliteExceptionConverter.SCHEMA_ALREADY_EXISTS_CODE; + +import com.datastrato.gravitino.catalog.jdbc.JdbcSchema; +import com.datastrato.gravitino.catalog.jdbc.utils.JdbcConnectorUtils; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.google.common.base.Preconditions; +import java.io.File; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.commons.io.FileUtils; + +public class SqliteDatabaseOperations extends JdbcDatabaseOperations { + private String dbPath; + + public SqliteDatabaseOperations(String baseFileDir) { + this.dbPath = Preconditions.checkNotNull(baseFileDir); + } + + @Override + public void create(String databaseName, String comment, Map properties) { + try { + if (exist(databaseName)) { + throw new SQLException( + String.format("Database %s already exists", databaseName), + "CREATE DATABASE" + databaseName, + SCHEMA_ALREADY_EXISTS_CODE); + } + try (Connection connection = + DriverManager.getConnection("jdbc:sqlite:/" + dbPath + "/" + databaseName)) { + JdbcConnectorUtils.executeUpdate( + connection, "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)"); + } + } catch (SQLException e) { + throw exceptionMapper.toGravitinoException(e); + } + Preconditions.checkArgument(exist(databaseName), "Database %s does not exist", databaseName); + } + + public boolean exist(String databaseName) { + return new File(dbPath + "/" + databaseName).exists(); + } + + @Override + public List list() { + File file = new File(dbPath); + Preconditions.checkArgument(file.exists(), "Database path %s does not exist", dbPath); + return Arrays.stream(Objects.requireNonNull(file.listFiles())) + .map(File::getName) + .collect(Collectors.toList()); + } + + @Override + public JdbcSchema load(String databaseName) throws NoSuchSchemaException { + if (exist(databaseName)) { + return new JdbcSchema.Builder().withName(databaseName).build(); + } + return null; + } + + @Override + public void delete(String databaseName, boolean cascade) { + delete(databaseName); + } + + public void delete(String databaseName) { + FileUtils.deleteQuietly(new File(dbPath + "/" + databaseName)); + } + + @Override + String generateCreateDatabaseSql( + String databaseName, String comment, Map properties) { + return null; + } + + @Override + String generateDropDatabaseSql(String databaseName, boolean cascade) { + return null; + } +} diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcDatabaseOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcDatabaseOperations.java new file mode 100644 index 00000000000..d2778b31d46 --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcDatabaseOperations.java @@ -0,0 +1,99 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.jdbc.operation; + +import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig; +import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter; +import com.datastrato.gravitino.catalog.jdbc.converter.SqliteExceptionConverter; +import com.datastrato.gravitino.catalog.jdbc.utils.DataSourceUtils; +import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.HashMap; +import java.util.List; +import javax.sql.DataSource; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class TestJdbcDatabaseOperations { + + private static DataSource DATA_SOURCE; + private static JdbcExceptionConverter EXCEPTION_MAPPER; + + private static SqliteDatabaseOperations JDBC_DATABASE_OPERATIONS; + + private static File BASE_FILE_DIR; + + private static String FILE_PATH; + private static String JDBC_URL; + + @BeforeAll + public static void startup() throws IOException { + BASE_FILE_DIR = Files.createTempDirectory("gravitino-jdbc").toFile(); + FILE_PATH = BASE_FILE_DIR.getPath() + "/test"; + JDBC_URL = "jdbc:sqlite:" + FILE_PATH; + FileUtils.createParentDirectories(new File(FILE_PATH)); + createDataSource(); + createExceptionMapper(); + createJdbcDatabaseOperations(); + } + + @AfterAll + public static void stop() { + FileUtils.deleteQuietly(BASE_FILE_DIR); + } + + private static void createExceptionMapper() { + EXCEPTION_MAPPER = new SqliteExceptionConverter(); + } + + private static void createDataSource() { + HashMap properties = Maps.newHashMap(); + properties.put(JdbcConfig.JDBC_URL.getKey(), JDBC_URL); + properties.put(JdbcConfig.USERNAME.getKey(), "test"); + properties.put(JdbcConfig.PASSWORD.getKey(), "test"); + DATA_SOURCE = DataSourceUtils.createDataSource(properties); + } + + private static void createJdbcDatabaseOperations() { + JDBC_DATABASE_OPERATIONS = new SqliteDatabaseOperations(BASE_FILE_DIR.getPath()); + JDBC_DATABASE_OPERATIONS.initialize(DATA_SOURCE, EXCEPTION_MAPPER); + } + + @Test + public void testOperationDatabase() { + String database1 = "test"; + String database2 = "test2"; + // creat database + Assertions.assertDoesNotThrow(() -> JDBC_DATABASE_OPERATIONS.create(database1, null, null)); + SchemaAlreadyExistsException illegalArgumentException = + Assertions.assertThrows( + SchemaAlreadyExistsException.class, + () -> JDBC_DATABASE_OPERATIONS.create(database1, null, null)); + Assertions.assertTrue( + StringUtils.contains(illegalArgumentException.getMessage(), "already exists")); + Assertions.assertDoesNotThrow(() -> JDBC_DATABASE_OPERATIONS.create(database2, null, null)); + + // list database + List listDatabases = JDBC_DATABASE_OPERATIONS.list(); + Assertions.assertEquals(2, listDatabases.size()); + Assertions.assertTrue(listDatabases.contains(database1)); + Assertions.assertTrue(listDatabases.contains(database2)); + + // drop database + JDBC_DATABASE_OPERATIONS.delete(database1); + List databases = JDBC_DATABASE_OPERATIONS.list(); + Assertions.assertFalse(databases.contains(database1)); + Assertions.assertNotNull(JDBC_DATABASE_OPERATIONS.load(database2)); + JDBC_DATABASE_OPERATIONS.delete(database2); + Assertions.assertNull(JDBC_DATABASE_OPERATIONS.load(database2)); + } +} diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/utils/TestDataSourceUtils.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/utils/TestDataSourceUtils.java new file mode 100644 index 00000000000..4439033a722 --- /dev/null +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/utils/TestDataSourceUtils.java @@ -0,0 +1,30 @@ +/* + * Copyright 2023 Datastrato. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.jdbc.utils; + +import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig; +import com.google.common.collect.Maps; +import java.sql.SQLException; +import java.util.HashMap; +import javax.sql.DataSource; +import org.apache.commons.dbcp2.BasicDataSource; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestDataSourceUtils { + + @Test + public void testCreateDataSource() throws SQLException { + HashMap properties = Maps.newHashMap(); + properties.put(JdbcConfig.JDBC_URL.getKey(), "jdbc:sqlite::memory:"); + properties.put(JdbcConfig.USERNAME.getKey(), "test"); + properties.put(JdbcConfig.PASSWORD.getKey(), "test"); + + DataSource dataSource = + Assertions.assertDoesNotThrow(() -> DataSourceUtils.createDataSource(properties)); + Assertions.assertTrue(dataSource instanceof org.apache.commons.dbcp2.BasicDataSource); + ((BasicDataSource) dataSource).close(); + } +} diff --git a/common/src/main/java/com/datastrato/gravitino/Config.java b/common/src/main/java/com/datastrato/gravitino/Config.java index 4ab1bcf5f8a..7c681f08f3f 100644 --- a/common/src/main/java/com/datastrato/gravitino/Config.java +++ b/common/src/main/java/com/datastrato/gravitino/Config.java @@ -132,6 +132,10 @@ public Map getConfigsWithPrefix(String prefix) { return MapUtils.getPrefixMap(configMap, prefix); } + public Map getAllConfig() { + return MapUtils.unmodifiableMap(configMap); + } + /** * Sets the value of a configuration entry. * diff --git a/common/src/main/java/com/datastrato/gravitino/utils/MapUtils.java b/common/src/main/java/com/datastrato/gravitino/utils/MapUtils.java index 5cafb0ce28a..053118e98ce 100644 --- a/common/src/main/java/com/datastrato/gravitino/utils/MapUtils.java +++ b/common/src/main/java/com/datastrato/gravitino/utils/MapUtils.java @@ -22,4 +22,8 @@ public static Map getPrefixMap(Map m, String pre return Collections.unmodifiableMap(configs); } + + public static Map unmodifiableMap(Map m) { + return Collections.unmodifiableMap(m); + } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 3e211c2a0ee..4fd4409670a 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -23,6 +23,7 @@ mockserver = "5.13.2" commons-lang3 = "3.12.0" commons-io = "2.11.0" commons-collections4 = "4.4" +commons-dbcp2 = "2.9.0" caffeine = "2.9.3" rocksdbjni = "7.7.3" iceberg = '1.3.1' @@ -100,6 +101,7 @@ scala-collection-compat = { group = "org.scala-lang.modules", name = "scala-col sqlite-jdbc = { group = "org.xerial", name = "sqlite-jdbc", version.ref = "sqlite-jdbc" } testng = { group = "org.testng", name = "testng", version.ref = "testng" } spark-hive = { group = "org.apache.spark", name = "spark-hive_2.13", version.ref = "spark" } +commons-dbcp2 = { group = "org.apache.commons", name = "commons-dbcp2", version.ref = "commons-dbcp2" } testcontainers = { group = "org.testcontainers", name = "testcontainers", version.ref = "testcontainers" } testcontainers-junit-jupiter = { group = "org.testcontainers", name = "junit-jupiter", version.ref = "testcontainers" } trino-jdbc = { group = "io.trino", name = "trino-jdbc", version.ref = "trino-jdbc" }