Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#575] feat(jdbc): Support for DataSource and schema operations in JDBC catalog. #703

Merged
merged 4 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion catalogs/catalog-jdbc-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.commons.collections4)
implementation(libs.commons.dbcp2)
implementation(libs.substrait.java.core) {
exclude("com.fasterxml.jackson.core")
exclude("com.fasterxml.jackson.datatype")
Expand All @@ -32,6 +33,11 @@ dependencies {
exclude("org.slf4j")
}

testImplementation(libs.commons.io)
testImplementation(libs.sqlite.jdbc)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testRuntimeOnly(libs.junit.jupiter.engine)
compileOnly(libs.lombok)
annotationProcessor(libs.lombok)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import com.datastrato.gravitino.catalog.BaseCatalog;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import com.datastrato.gravitino.rel.SupportsSchemas;
import com.datastrato.gravitino.rel.TableCatalog;
import java.util.Map;
Expand All @@ -21,7 +24,12 @@ public abstract class JdbcCatalog extends BaseCatalog<JdbcCatalog> {
*/
@Override
protected CatalogOperations newOps(Map<String, String> config) {
JdbcCatalogOperations ops = new JdbcCatalogOperations(entity());
JdbcCatalogOperations ops =
new JdbcCatalogOperations(
entity(),
createExceptionConverter(),
createJdbcTypeConverter(),
createJdbcDatabaseOperations());
ops.initialize(config);
return ops;
}
Expand All @@ -37,4 +45,17 @@ public SupportsSchemas asSchemas() {
public TableCatalog asTableCatalog() {
return (JdbcCatalogOperations) ops();
}

/** @return The {@link JdbcExceptionConverter} to be used by the catalog. */
protected JdbcExceptionConverter createExceptionConverter() {
return new JdbcExceptionConverter() {};
}

/** @return The {@link JdbcTypeConverter} to be used by the catalog. */
protected abstract JdbcTypeConverter createJdbcTypeConverter();

/**
* @return The {@link JdbcDatabaseOperations} to be used by the catalog to manage databases in the
*/
protected abstract JdbcDatabaseOperations createJdbcDatabaseOperations();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,25 @@
*/
package com.datastrato.gravitino.catalog.jdbc;

import static com.datastrato.gravitino.catalog.BaseCatalog.CATALOG_BYPASS_PREFIX;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.catalog.CatalogOperations;
import com.datastrato.gravitino.catalog.PropertiesMetadata;
import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.operation.JdbcDatabaseOperations;
import com.datastrato.gravitino.catalog.jdbc.utils.DataSourceUtils;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NoSuchTableException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.meta.CatalogEntity;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.SchemaChange;
Expand All @@ -24,7 +33,15 @@
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.utils.MapUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,13 +58,31 @@ public class JdbcCatalogOperations implements CatalogOperations, SupportsSchemas

private final CatalogEntity entity;

private final JdbcExceptionConverter exceptionConverter;

private final JdbcTypeConverter jdbcTypeConverter;

private final JdbcDatabaseOperations jdbcDatabaseOperations;

private DataSource dataSource;

/**
* Constructs a new instance of JdbcCatalogOperations.
*
* @param entity The catalog entity associated with this operations instance.
* @param exceptionConverter The exception converter to be used by the operations.
* @param jdbcTypeConverter The type converter to be used by the operations.
* @param jdbcDatabaseOperations The database operations to be used by the operations.
*/
public JdbcCatalogOperations(CatalogEntity entity) {
public JdbcCatalogOperations(
CatalogEntity entity,
JdbcExceptionConverter exceptionConverter,
JdbcTypeConverter jdbcTypeConverter,
JdbcDatabaseOperations jdbcDatabaseOperations) {
this.entity = entity;
this.exceptionConverter = exceptionConverter;
this.jdbcTypeConverter = jdbcTypeConverter;
this.jdbcDatabaseOperations = jdbcDatabaseOperations;
}

/**
Expand All @@ -58,14 +93,22 @@ public JdbcCatalogOperations(CatalogEntity entity) {
*/
@Override
public void initialize(Map<String, String> conf) throws RuntimeException {
// Key format like gravitino.bypass.a.b
Map<String, String> resultConf =
Maps.newHashMap(MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX));
JdbcConfig jdbcConfig = new JdbcConfig(resultConf);
this.dataSource = DataSourceUtils.createDataSource(jdbcConfig);
this.jdbcDatabaseOperations.initialize(dataSource, exceptionConverter);
this.jdbcCatalogPropertiesMetadata = new JdbcCatalogPropertiesMetadata();
this.jdbcTablePropertiesMetadata = new JdbcTablePropertiesMetadata();
this.jdbcSchemaPropertiesMetadata = new JdbcSchemaPropertiesMetadata();
}

/** Closes the Jdbc catalog and releases the associated client pool. */
@Override
public void close() {}
public void close() {
DataSourceUtils.closeDataSource(dataSource);
}

/**
* Lists the schemas under the given namespace.
Expand All @@ -76,7 +119,10 @@ public void close() {}
*/
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
throw new UnsupportedOperationException();
List<String> schemaNames = jdbcDatabaseOperations.list();
return schemaNames.stream()
.map(db -> NameIdentifier.of(namespace, db))
.toArray(NameIdentifier[]::new);
}

/**
Expand All @@ -93,7 +139,27 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc
public JdbcSchema createSchema(
NameIdentifier ident, String comment, Map<String, String> properties)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a warning log to clarify that we don't support setting properties for jdbc schema, also ignoring the properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to throw a exception, the user may miss warning logs.

throws NoSuchCatalogException, SchemaAlreadyExistsException {
throw new UnsupportedOperationException();
StringIdentifier identifier =
Preconditions.checkNotNull(
StringIdentifier.fromProperties(properties),
"The gravitino id attribute does not exist in properties");
String notAllowedKey =
properties.keySet().stream()
.filter(s -> !StringUtils.equals(s, StringIdentifier.ID_KEY))
.collect(Collectors.joining(","));
if (StringUtils.isNotEmpty(notAllowedKey)) {
LOG.warn("The properties [{}] are not allowed to be set in the jdbc schema", notAllowedKey);
}
HashMap<String, String> resultProperties = Maps.newHashMap(properties);
resultProperties.remove(StringIdentifier.ID_KEY);
jdbcDatabaseOperations.create(
ident.name(), StringIdentifier.addToComment(identifier, comment), resultProperties);
return new JdbcSchema.Builder()
.withName(ident.name())
.withProperties(resultProperties)
.withComment(comment)
.withAuditInfo(AuditInfo.EMPTY)
.build();
}

/**
Expand All @@ -105,7 +171,23 @@ public JdbcSchema createSchema(
*/
@Override
public JdbcSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
JdbcSchema load = jdbcDatabaseOperations.load(ident.name());
String comment = load.comment();
StringIdentifier id = StringIdentifier.fromComment(comment);
if (id == null) {
LOG.warn("The comment {} does not contain gravitino id attribute", comment);
return load;
} else {
Map<String, String> properties =
load.properties() == null ? Maps.newHashMap() : Maps.newHashMap(load.properties());
StringIdentifier.addToProperties(id, properties);
return new JdbcSchema.Builder()
.withAuditInfo(load.auditInfo())
.withName(load.name())
.withComment(load.comment())
.withProperties(properties)
.build();
}
}

/**
Expand All @@ -119,7 +201,7 @@ public JdbcSchema loadSchema(NameIdentifier ident) throws NoSuchSchemaException
@Override
public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
throws NoSuchSchemaException {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("jdbc-catalog does not support alter the schema");
}

/**
Expand All @@ -132,7 +214,8 @@ public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes)
*/
@Override
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing the process logic of cascade

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not resovled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could generateDropDatabaseSql delete tables under the database?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cascade parameter will be passed down to the underlying data sources, such as MySQL or PostgreSQL, for them to perceive whether it can be merged, and undergo checks accordingly.

Copy link
Contributor

@FANNG1 FANNG1 Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should keep consistent about the action if the underlying catalog or db doesn't support drop database cascade, should gravitino delete the corresponding tables explictly or throw a exception? @jerryshao

throw new UnsupportedOperationException();
jdbcDatabaseOperations.delete(ident.name(), cascade);
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,50 @@
*/
package com.datastrato.gravitino.catalog.jdbc;

import static com.datastrato.gravitino.catalog.PropertyEntry.integerPropertyEntry;
import static com.datastrato.gravitino.catalog.PropertyEntry.stringReservedPropertyEntry;

import com.datastrato.gravitino.catalog.BaseCatalogPropertiesMetadata;
import com.datastrato.gravitino.catalog.PropertyEntry;
import java.util.Collections;
import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;

public class JdbcCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata {
private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
stringReservedPropertyEntry(
JdbcConfig.JDBC_URL.getKey(), JdbcConfig.JDBC_URL.getDoc(), true),
stringReservedPropertyEntry(
JdbcConfig.USERNAME.getKey(), JdbcConfig.USERNAME.getDoc(), true),
stringReservedPropertyEntry(
JdbcConfig.PASSWORD.getKey(), JdbcConfig.PASSWORD.getDoc(), true),
integerPropertyEntry(
JdbcConfig.POOL_MIN_SIZE.getKey(),
JdbcConfig.POOL_MIN_SIZE.getDoc(),
false,
false,
JdbcConfig.POOL_MIN_SIZE.getDefaultValue(),
true,
true),
integerPropertyEntry(
JdbcConfig.POOL_MAX_SIZE.getKey(),
JdbcConfig.POOL_MAX_SIZE.getDoc(),
false,
false,
JdbcConfig.POOL_MAX_SIZE.getDefaultValue(),
true,
true));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return Collections.emptyMap();
return PROPERTIES_METADATA;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.jdbc.config;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.config.ConfigBuilder;
import com.datastrato.gravitino.config.ConfigEntry;
import java.util.Map;

public class JdbcConfig extends Config {

public static final ConfigEntry<String> JDBC_URL =
new ConfigBuilder("jdbc-url")
.doc("The url of the Jdbc connection")
.version("0.3.0")
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<String> USERNAME =
new ConfigBuilder("jdbc-user")
.doc("The username of the Jdbc connection")
.version("0.3.0")
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<String> PASSWORD =
new ConfigBuilder("jdbc-password")
.doc("The password of the Jdbc connection")
.version("0.3.0")
.stringConf()
.createWithDefault(null);

public static final ConfigEntry<Integer> POOL_MIN_SIZE =
new ConfigBuilder("jdbc.pool.min-size")
.doc("The minimum number of connections in the pool")
.version("0.3.0")
.intConf()
.createWithDefault(2);

public static final ConfigEntry<Integer> POOL_MAX_SIZE =
new ConfigBuilder("jdbc.pool.max-size")
.doc("The maximum number of connections in the pool")
.version("0.3.0")
.intConf()
.createWithDefault(10);

public String getJdbcUrl() {
return get(JDBC_URL);
}

public String getUsername() {
return get(USERNAME);
}

public String getPassword() {
return get(PASSWORD);
}

public int getPoolMinSize() {
return get(POOL_MIN_SIZE);
}

public int getPoolMaxSize() {
return get(POOL_MAX_SIZE);
}

public JdbcConfig(Map<String, String> properties) {
super(false);
loadFromMap(properties, k -> true);
assert null != getJdbcUrl();
}
}
Loading