Skip to content

Commit

Permalink
[#3370] feat(flink-connector): support schema operations (#3582)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

- Support Schema operation for flink-connector.

### Why are the changes needed?


- Fix: #3370 

### Does this PR introduce _any_ user-facing change?

- no

### How was this patch tested?

- add UTs and ITs
  • Loading branch information
coolderli authored Jun 5, 2024
1 parent b902fd4 commit 23b794e
Show file tree
Hide file tree
Showing 9 changed files with 425 additions and 27 deletions.
13 changes: 13 additions & 0 deletions flink-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ dependencies {
testImplementation(libs.hadoop2.common) {
exclude("*")
}
testImplementation(libs.hadoop2.hdfs) {
exclude("com.sun.jersey")
exclude("commons-cli", "commons-cli")
exclude("commons-io", "commons-io")
exclude("commons-codec", "commons-codec")
exclude("commons-logging", "commons-logging")
exclude("javax.servlet", "servlet-api")
exclude("org.mortbay.jetty")
}
testImplementation(libs.hadoop2.mapreduce.client.core) {
exclude("*")
}
Expand Down Expand Up @@ -125,6 +134,10 @@ tasks.test {
} else {
dependsOn(tasks.jar)

doFirst {
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12")
}

val init = project.extra.get("initIntegrationTest") as (Test) -> Unit
init(this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public interface PropertiesConverter {
* Gravitino properties.
*
* @param flinkConf The configuration provided by Flink.
* @return properties for the Gravitino connector.
* @return properties for the Gravitino catalog.
*/
default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
return flinkConf.toMap();
Expand All @@ -36,4 +36,24 @@ default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf
default Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}

/**
* Converts properties from Flink connector schema properties to Gravitino schema properties.
*
* @param flinkProperties The schema properties provided by Flink.
* @return The schema properties for the Gravitino.
*/
default Map<String, String> toGravitinoSchemaProperties(Map<String, String> flinkProperties) {
return flinkProperties;
}

/**
* Converts properties from Gravitino schema properties to Flink connector schema properties.
*
* @param gravitinoProperties The schema properties provided by Gravitino.
* @return The schema properties for the Flink connector.
*/
default Map<String, String> toFlinkSchemaProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,25 @@

package com.datastrato.gravitino.flink.connector.catalog;

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.exceptions.NoSuchCatalogException;
import com.datastrato.gravitino.exceptions.NoSuchSchemaException;
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
Expand All @@ -35,8 +50,11 @@
* org.apache.flink.table.catalog.Catalog} interface.
*/
public abstract class BaseCatalog extends AbstractCatalog {
private final PropertiesConverter propertiesConverter;

protected BaseCatalog(String catalogName, String defaultDatabase) {
super(catalogName, defaultDatabase);
this.propertiesConverter = getPropertiesConverter();
}

@Override
Expand All @@ -47,35 +65,73 @@ public void close() throws CatalogException {}

@Override
public List<String> listDatabases() throws CatalogException {
throw new UnsupportedOperationException();
return Arrays.asList(catalog().asSchemas().listSchemas());
}

@Override
public CatalogDatabase getDatabase(String s) throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
public CatalogDatabase getDatabase(String databaseName)
throws DatabaseNotExistException, CatalogException {
try {
Schema schema = catalog().asSchemas().loadSchema(databaseName);
Map<String, String> properties =
propertiesConverter.toFlinkSchemaProperties(schema.properties());
return new CatalogDatabaseImpl(properties, schema.comment());
} catch (NoSuchSchemaException e) {
throw new DatabaseNotExistException(getName(), databaseName);
}
}

@Override
public boolean databaseExists(String s) throws CatalogException {
throw new UnsupportedOperationException();
public boolean databaseExists(String databaseName) throws CatalogException {
return catalog().asSchemas().schemaExists(databaseName);
}

@Override
public void createDatabase(String s, CatalogDatabase catalogDatabase, boolean b)
public void createDatabase(
String databaseName, CatalogDatabase catalogDatabase, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
throw new UnsupportedOperationException();
try {
Map<String, String> properties =
propertiesConverter.toGravitinoSchemaProperties(catalogDatabase.getProperties());
catalog().asSchemas().createSchema(databaseName, catalogDatabase.getComment(), properties);
} catch (SchemaAlreadyExistsException e) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(getName(), databaseName);
}
} catch (NoSuchCatalogException e) {
throw new CatalogException(e);
}
}

@Override
public void dropDatabase(String s, boolean b, boolean b1)
public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
throw new UnsupportedOperationException();
try {
boolean dropped = catalog().asSchemas().dropSchema(databaseName, cascade);
if (!dropped && !ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), databaseName);
}
} catch (NonEmptySchemaException e) {
throw new DatabaseNotEmptyException(getName(), databaseName);
} catch (NoSuchCatalogException e) {
throw new CatalogException(e);
}
}

@Override
public void alterDatabase(String s, CatalogDatabase catalogDatabase, boolean b)
public void alterDatabase(
String databaseName, CatalogDatabase catalogDatabase, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
try {
SchemaChange[] schemaChanges = getSchemaChange(getDatabase(databaseName), catalogDatabase);
catalog().asSchemas().alterSchema(databaseName, schemaChanges);
} catch (NoSuchSchemaException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getName(), databaseName);
}
} catch (NoSuchCatalogException e) {
throw new CatalogException(e);
}
}

@Override
Expand Down Expand Up @@ -278,4 +334,33 @@ public void alterPartitionColumnStatistics(
throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}

protected abstract PropertiesConverter getPropertiesConverter();

@VisibleForTesting
static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase updated) {
Map<String, String> currentProperties = current.getProperties();
Map<String, String> updatedProperties = updated.getProperties();

List<SchemaChange> schemaChanges = Lists.newArrayList();
MapDifference<String, String> difference =
Maps.difference(currentProperties, updatedProperties);
difference
.entriesOnlyOnLeft()
.forEach((key, value) -> schemaChanges.add(SchemaChange.removeProperty(key)));
difference
.entriesOnlyOnRight()
.forEach((key, value) -> schemaChanges.add(SchemaChange.setProperty(key, value)));
difference
.entriesDiffering()
.forEach(
(key, value) -> {
schemaChanges.add(SchemaChange.setProperty(key, value.rightValue()));
});
return schemaChanges.toArray(new SchemaChange[0]);
}

private Catalog catalog() {
return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.datastrato.gravitino.flink.connector.hive;

import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.datastrato.gravitino.flink.connector.catalog.BaseCatalog;
import java.util.Optional;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -36,4 +37,9 @@ public HiveConf getHiveConf() {
public Optional<Factory> getFactory() {
return hiveCatalog.getFactory();
}

@Override
protected PropertiesConverter getPropertiesConverter() {
return HivePropertiesConverter.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

package com.datastrato.gravitino.flink.connector.hive;

import static com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta.METASTORE_URIS;

import com.datastrato.gravitino.catalog.hive.HiveCatalogPropertiesMeta;
import com.datastrato.gravitino.flink.connector.PropertiesConverter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
Expand All @@ -21,19 +20,21 @@ private HivePropertiesConverter() {}

public static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter();

private static final Map<String, String> HIVE_CONFIG_TO_GRAVITINO =
ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname, METASTORE_URIS);
private static final Map<String, String> HIVE_CATALOG_CONFIG_TO_GRAVITINO =
ImmutableMap.of(
HiveConf.ConfVars.METASTOREURIS.varname, HiveCatalogPropertiesMeta.METASTORE_URIS);
private static final Map<String, String> GRAVITINO_CONFIG_TO_HIVE =
ImmutableMap.of(METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname);
ImmutableMap.of(
HiveCatalogPropertiesMeta.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname);

@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties = Maps.newHashMap();

for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
String hiveKey = HIVE_CONFIG_TO_GRAVITINO.get(entry.getKey());
if (hiveKey != null) {
gravitinoProperties.put(hiveKey, entry.getValue());
String gravitinoKey = HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.flink.connector.catalog;

import com.datastrato.gravitino.SchemaChange;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TestBaseCatalog {

@Test
public void testHiveSchemaChanges() {
Map<String, String> currentProperties = ImmutableMap.of("key", "value", "key2", "value2");
CatalogDatabase current = new CatalogDatabaseImpl(currentProperties, null);

Map<String, String> newProperties = ImmutableMap.of("key2", "new-value2", "key3", "value3");
CatalogDatabase updated = new CatalogDatabaseImpl(newProperties, null);

SchemaChange[] schemaChange = BaseCatalog.getSchemaChange(current, updated);
Assertions.assertEquals(3, schemaChange.length);
Assertions.assertInstanceOf(SchemaChange.RemoveProperty.class, schemaChange[0]);
Assertions.assertEquals("key", ((SchemaChange.RemoveProperty) schemaChange[0]).getProperty());

Assertions.assertInstanceOf(SchemaChange.SetProperty.class, schemaChange[1]);
Assertions.assertEquals("key3", ((SchemaChange.SetProperty) schemaChange[1]).getProperty());
Assertions.assertEquals("value3", ((SchemaChange.SetProperty) schemaChange[1]).getValue());

Assertions.assertInstanceOf(SchemaChange.SetProperty.class, schemaChange[2]);
Assertions.assertEquals("key2", ((SchemaChange.SetProperty) schemaChange[2]).getProperty());
Assertions.assertEquals("new-value2", ((SchemaChange.SetProperty) schemaChange[2]).getValue());
}
}
Loading

0 comments on commit 23b794e

Please sign in to comment.