Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3292] fix(spark-connector): passing Gravitino catalog properties to spark connector #3270

Merged
merged 5 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/apache-hive-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ The Hive catalog supports creating, updating, and deleting databases and tables

When you use the Gravitino with Trino. You can pass the Trino Hive connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.hive.config.resources` to pass the `hive.config.resources` to the Gravitino Hive catalog in Trino runtime.

When you use the Gravitino with Spark. You can pass the Spark Hive connector configuration using prefix `spark.bypass.`. For example, using `spark.bypass.hive.exec.dynamic.partition.mode` to pass the `hive.exec.dynamic.partition.mode` to the Spark Hive connector in Spark runtime.


### Catalog operations

Refer to [Manage Relational Metadata Using Gravitino](./manage-relational-metadata-using-gravitino.md#catalog-operations) for more details.
Expand Down
3 changes: 3 additions & 0 deletions docs/lakehouse-iceberg-catalog.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ Any properties not defined by Gravitino with `gravitino.bypass.` prefix will pas

When you use the Gravitino with Trino. You can pass the Trino Iceberg connector configuration using prefix `trino.bypass.`. For example, using `trino.bypass.iceberg.table-statistics-enabled` to pass the `iceberg.table-statistics-enabled` to the Gravitino Iceberg catalog in Trino runtime.

When you use the Gravitino with Spark. You can pass the Spark Iceberg connector configuration using prefix `spark.bypass.`. For example, using `spark.bypass.io-impl` to pass the `io-impl` to the Spark Iceberg connector in Spark runtime.


#### JDBC catalog

If you are using JDBC catalog, you must provide `jdbc-user`, `jdbc-password` and `jdbc-driver` to catalog properties.
Expand Down
16 changes: 16 additions & 0 deletions docs/spark-connector/spark-catalog-hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,19 @@ INSERT OVERWRITE TABLE employees PARTITION(department='Marketing') VALUES (3, 'M

SELECT * FROM employees WHERE department = 'Engineering';
```


## Catalog properties

Gravitino spark connector will transform below property names which are defined in catalog properties to Spark Hive connector configuration.

| Property name in Gravitino catalog properties | Spark Hive connector configuration | Description | Since Version |
|-----------------------------------------------|------------------------------------|----------------------------|---------------|
| `metastore.uris` | `hive.metastore.uris` | Hive metastore uri address | 0.5.0 |

Gravitino catalog property names with the prefix `spark.bypass.` are passed to Spark Hive connector. For example, using `spark.bypass.hive.exec.dynamic.partition.mode` to pass the `hive.exec.dynamic.partition.mode` to the Spark Hive connector.


:::caution
When using the `spark-sql` shell client, you must explicitly set the `spark.bypass.spark.sql.hive.metastore.jars` in the Gravitino Hive catalog properties. Replace the default `builtin` value with the appropriate setting for your setup.
:::
14 changes: 14 additions & 0 deletions docs/spark-connector/spark-catalog-iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,17 @@ VALUES

SELECT * FROM employee WHERE date(hire_date) = '2021-01-01'
```

## Catalog properties

Gravitino spark connector will transform below property names which are defined in catalog properties to Spark Iceberg connector configuration.

| Gravitino catalog property name | Spark Iceberg connector configuration | Description | Since Version |
|---------------------------------|---------------------------------------|---------------------------|---------------|
| `catalog-backend` | `type` | Catalog backend type | 0.5.0 |
| `uri` | `uri` | Catalog backend uri | 0.5.0 |
| `warehouse` | `warehouse` | Catalog backend warehouse | 0.5.0 |
| `jdbc-user` | `jdbc.user` | JDBC user name | 0.5.0 |
| `jdbc-password` | `jdbc.password` | JDBC password | 0.5.0 |

Gravitino catalog property names with the prefix `spark.bypass.` are passed to Spark Iceberg connector. For example, using `spark.bypass.io-impl` to pass the `io-impl` to the Spark Iceberg connector.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,74 @@

package com.datastrato.gravitino.spark.connector;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/** Transform table properties between Gravitino and Spark. */
/** Interface for transforming properties between Gravitino and Spark. */
public interface PropertiesConverter {
@VisibleForTesting String SPARK_PROPERTY_PREFIX = "spark.bypass.";

/**
* Converts properties from application provided properties and Gravitino catalog properties to
* Spark connector properties.
*
* <p>It provides the common implementation, include extract properties with "spark.bypass"
* prefix, merge user provided options and transformed properties.
*
* @param options Case-insensitive properties map provided by application configuration.
* @param properties Gravitino catalog properties.
* @return properties for the Spark connector.
*/
default Map<String, String> toSparkCatalogProperties(
CaseInsensitiveStringMap options, Map<String, String> properties) {
Map<String, String> all = new HashMap<>();
if (properties != null) {
properties.forEach(
(k, v) -> {
if (k.startsWith(SPARK_PROPERTY_PREFIX)) {
String newKey = k.substring(SPARK_PROPERTY_PREFIX.length());
all.put(newKey, v);
}
});
}

Map<String, String> transformedProperties = toSparkCatalogProperties(properties);
if (transformedProperties != null) {
all.putAll(transformedProperties);
}

if (options != null) {
all.putAll(options);
}
return all;
}

/**
* Transform properties from Gravitino catalog properties to Spark connector properties.
*
* <p>This interface focus on the catalog specific transform logic, the common logic are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This interface focuses..."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* implemented in {@code toSparkCatalogProperties}.
*
* @param properties Gravitino catalog properties.
* @return properties for the Spark connector.
*/
Map<String, String> toSparkCatalogProperties(Map<String, String> properties);

/**
* Converts Spark table properties to Gravitino table properties.
*
* @param properties Spark table properties.
* @return Gravitino table properties.
*/
Map<String, String> toGravitinoTableProperties(Map<String, String> properties);

/**
* Converts Gravitino table properties to Spark table properties.
*
* @param properties Gravitino table properties.
* @return Spark table properties.
*/
Map<String, String> toSparkTableProperties(Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
package com.datastrato.gravitino.spark.connector.hive;

import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
Expand All @@ -25,17 +21,9 @@ public class GravitinoHiveCatalog extends BaseCatalog {
@Override
protected TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String> properties) {
Preconditions.checkArgument(properties != null, "Hive Catalog properties should not be null");
String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI
+ " from hive catalog properties");

TableCatalog hiveCatalog = new HiveTableCatalog();
HashMap<String, String> all = new HashMap<>(options);
all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri);
Map<String, String> all =
getPropertiesConverter().toSparkCatalogProperties(options, properties);
hiveCatalog.initialize(name, new CaseInsensitiveStringMap(all));

return hiveCatalog;
Expand All @@ -54,7 +42,7 @@ protected SparkBaseTable createSparkTable(

@Override
protected PropertiesConverter getPropertiesConverter() {
return new HivePropertiesConverter();
return HivePropertiesConverter.getInstance();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,31 @@
package com.datastrato.gravitino.spark.connector.hive;

import com.datastrato.gravitino.catalog.hive.HiveTablePropertiesMetadata;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.ws.rs.NotSupportedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.sql.connector.catalog.TableCatalog;

/** Transform hive catalog properties between Spark and Gravitino. */
public class HivePropertiesConverter implements PropertiesConverter {
public static class HivePropertiesConverterHolder {
private static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter();
}

private HivePropertiesConverter() {}

public static HivePropertiesConverter getInstance() {
return HivePropertiesConverterHolder.INSTANCE;
}

// Transform Spark hive file format to Gravitino hive file format
static final Map<String, String> fileFormatMap =
Expand Down Expand Up @@ -48,6 +61,20 @@ public class HivePropertiesConverter implements PropertiesConverter {
HivePropertiesConstants.GRAVITINO_HIVE_TABLE_LOCATION,
HivePropertiesConstants.SPARK_HIVE_LOCATION);

@Override
public Map<String, String> toSparkCatalogProperties(Map<String, String> properties) {
Preconditions.checkArgument(properties != null, "Hive Catalog properties should not be null");
String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI
+ " from hive catalog properties");
HashMap<String, String> all = new HashMap<>();
all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri);
return all;
}

/**
* CREATE TABLE xxx STORED AS PARQUET will save "hive.stored-as" = "PARQUET" in property.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@
import com.datastrato.gravitino.spark.connector.SparkTransformConverter;
import com.datastrato.gravitino.spark.connector.catalog.BaseCatalog;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand All @@ -36,32 +32,10 @@ public class GravitinoIcebergCatalog extends BaseCatalog implements FunctionCata
@Override
protected TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String> properties) {
Preconditions.checkArgument(
properties != null, "Iceberg Catalog properties should not be null");

String catalogBackend =
properties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND);
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogBackend), "Iceberg Catalog backend should not be empty.");

HashMap<String, String> all = new HashMap<>(options);

switch (catalogBackend.toLowerCase(Locale.ENGLISH)) {
case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE:
initHiveProperties(catalogBackend, properties, all);
break;
case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC:
initJdbcProperties(catalogBackend, properties, all);
break;
default:
// SparkCatalog does not support Memory type catalog
throw new IllegalArgumentException(
"Unsupported Iceberg Catalog backend: " + catalogBackend);
}

Map<String, String> all =
getPropertiesConverter().toSparkCatalogProperties(options, properties);
TableCatalog icebergCatalog = new SparkCatalog();
icebergCatalog.initialize(name, new CaseInsensitiveStringMap(all));

return icebergCatalog;
}

Expand All @@ -78,7 +52,7 @@ protected SparkBaseTable createSparkTable(

@Override
protected PropertiesConverter getPropertiesConverter() {
return new IcebergPropertiesConverter();
return IcebergPropertiesConverter.getInstance();
}

@Override
Expand All @@ -95,79 +69,4 @@ public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExce
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
return ((SparkCatalog) sparkCatalog).loadFunction(ident);
}

private void initHiveProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String metastoreUri =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
+ " from Iceberg Catalog properties");
String hiveWarehouse =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(hiveWarehouse),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE
+ " from Iceberg Catalog properties");
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ENGLISH));
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, metastoreUri);
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, hiveWarehouse);
}

private void initJdbcProperties(
String catalogBackend,
Map<String, String> gravitinoProperties,
HashMap<String, String> icebergProperties) {
String jdbcUri =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUri),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
+ " from Iceberg Catalog properties");
String jdbcWarehouse =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcWarehouse),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE
+ " from Iceberg Catalog properties");
String jdbcUser = gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_USER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcUser),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_JDBC_USER
+ " from Iceberg Catalog properties");
String jdbcPassword =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcPassword),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_JDBC_PASSWORD
+ " from Iceberg Catalog properties");
String jdbcDriver =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER);
Preconditions.checkArgument(
StringUtils.isNotBlank(jdbcDriver),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER
+ " from Iceberg Catalog properties");
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_TYPE,
catalogBackend.toLowerCase(Locale.ROOT));
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, jdbcUri);
icebergProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, jdbcWarehouse);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_USER, jdbcUser);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_PASSWORD, jdbcPassword);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_JDBC_DRIVER, jdbcDriver);
}
}
Loading
Loading