Skip to content

Commit

Permalink
[apache#2541] feat(spark-connector): support DDL, read and write oper…
Browse files Browse the repository at this point in the history
…ations to Iceberg catalog
  • Loading branch information
caican00 committed Mar 15, 2024
1 parent ffb80e7 commit b18810d
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.datastrato.gravitino.spark.connector;

import com.datastrato.gravitino.spark.connector.hive.HiveAdaptor;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergAdaptor;
import java.util.Locale;

/**
Expand All @@ -17,6 +18,8 @@ public static GravitinoCatalogAdaptor createGravitinoAdaptor(String provider) {
switch (provider.toLowerCase(Locale.ROOT)) {
case "hive":
return new HiveAdaptor();
case "iceberg":
return new IcebergAdaptor();
default:
throw new RuntimeException(String.format("Provider:%s is not supported yet", provider));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.datastrato.gravitino.spark.connector.iceberg;

import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.spark.connector.GravitinoCatalogAdaptor;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/** IcebergAdaptor provides specific operations for Iceberg Catalog to adapt to GravitinoCatalog. */
public class IcebergAdaptor implements GravitinoCatalogAdaptor {

@Override
public PropertiesConverter getPropertiesConverter() {
return new IcebergPropertiesConverter();
}

@Override
public SparkBaseTable createSparkTable(
Identifier identifier,
Table gravitinoTable,
TableCatalog sparkCatalog,
PropertiesConverter propertiesConverter) {
return new SparkIcebergTable(identifier, gravitinoTable, sparkCatalog, propertiesConverter);
}

@Override
public TableCatalog createAndInitSparkCatalog(
String name, CaseInsensitiveStringMap options, Map<String, String> properties) {
Preconditions.checkArgument(
properties != null, "Iceberg Catalog properties should not be null");
String metastoreUri = properties.get(GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUri),
"Couldn't get "
+ GravitinoSparkConfig.GRAVITINO_HIVE_METASTORE_URI
+ " from iceberg catalog properties");

TableCatalog icebergCatalog = new SparkCatalog();
HashMap<String, String> all = new HashMap<>(options);
all.put(GravitinoSparkConfig.SPARK_HIVE_METASTORE_URI, metastoreUri);
icebergCatalog.initialize(name, new CaseInsensitiveStringMap(all));

return icebergCatalog;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.datastrato.gravitino.spark.connector.iceberg;

import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import java.util.HashMap;
import java.util.Map;

/** Transform iceberg catalog properties between Spark and Gravitino. */
public class IcebergPropertiesConverter implements PropertiesConverter {
@Override
public Map<String, String> toGravitinoTableProperties(Map<String, String> properties) {
return new HashMap<>(properties);
}

@Override
public Map<String, String> toSparkTableProperties(Map<String, String> properties) {
return new HashMap<>(properties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.datastrato.gravitino.spark.connector.iceberg;

import com.datastrato.gravitino.rel.Table;
import com.datastrato.gravitino.spark.connector.PropertiesConverter;
import com.datastrato.gravitino.spark.connector.table.SparkBaseTable;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;

public class SparkIcebergTable extends SparkBaseTable {

public SparkIcebergTable(
Identifier identifier,
Table gravitinoTable,
TableCatalog sparkCatalog,
PropertiesConverter propertiesConverter) {
super(identifier, gravitinoTable, sparkCatalog, propertiesConverter);
}
}

0 comments on commit b18810d

Please sign in to comment.