From b73f2d19ba18a6c25dfa5f066e87985da53c801d Mon Sep 17 00:00:00 2001 From: jinsilei Date: Tue, 16 May 2023 15:31:38 +0800 Subject: [PATCH 1/2] use ArcticSparkSessionCatalog in terminal --- .../server/terminal/SparkContextUtil.java | 17 +++++++++++++++-- .../arctic/server/terminal/TerminalSession.java | 13 +++++++++++++ .../server/terminal/TerminalSessionFactory.java | 5 +++++ .../server/terminal/kyuubi/KyuubiSession.java | 13 +++++++++++-- .../kyuubi/KyuubiTerminalSessionFactory.java | 3 +-- .../terminal/local/LocalSessionFactory.java | 3 +-- .../terminal/local/LocalTerminalSession.java | 12 ++++++++++-- 7 files changed, 56 insertions(+), 10 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/SparkContextUtil.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/SparkContextUtil.java index 5c7b49ac1c..23a0b9e39c 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/SparkContextUtil.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/SparkContextUtil.java @@ -18,9 +18,12 @@ package com.netease.arctic.server.terminal; +import com.netease.arctic.hive.utils.CatalogUtil; +import com.netease.arctic.server.catalog.CatalogType; import com.netease.arctic.server.utils.Configurations; import com.netease.arctic.spark.ArcticSparkCatalog; import com.netease.arctic.spark.ArcticSparkExtensions; +import com.netease.arctic.spark.ArcticSparkSessionCatalog; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.SparkCatalog; import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; @@ -50,8 +53,18 @@ public static Map getSparkConf(Configurations sessionConfig) { sparkConf.put("spark.sql.catalog." + catalog + "." + key, property); } } else { - sparkConf.put("spark.sql.catalog." + catalog, ArcticSparkCatalog.class.getName()); - sparkConf.put("spark.sql.catalog." + catalog + ".url", catalogUrlBase + catalog); + String sparkCatalogPrefix = "spark.sql.catalog." + catalog; + String catalogClassName = ArcticSparkCatalog.class.getName(); + String type = + sessionConfig.get(TerminalSessionFactory.SessionConfigOptions.catalogProperty(catalog, "type")); + if (sessionConfig.getBoolean( + TerminalSessionFactory.SessionConfigOptions.USING_SESSION_CATALOG_FOR_HIVE) && + CatalogType.HIVE.name().equalsIgnoreCase(type)) { + sparkCatalogPrefix = "spark.sql.catalog.spark_catalog"; + catalogClassName = ArcticSparkSessionCatalog.class.getName(); + } + sparkConf.put(sparkCatalogPrefix, catalogClassName); + sparkConf.put(sparkCatalogPrefix + ".url", catalogUrlBase + catalog); } } return sparkConf; diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSession.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSession.java index 49fa123a93..3a1d3e025b 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSession.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSession.java @@ -18,6 +18,9 @@ package com.netease.arctic.server.terminal; +import com.netease.arctic.hive.utils.CatalogUtil; +import com.netease.arctic.server.catalog.CatalogType; + import java.util.List; import java.util.Map; @@ -84,4 +87,14 @@ default boolean empty() { * close session and release resources. */ void release(); + + static boolean canUseSparkSessionCatalog(Map sessionConf, String catalog) { + String usingSessionCatalogForHiveKey = + TerminalSessionFactory.SessionConfigOptions.USING_SESSION_CATALOG_FOR_HIVE.key(); + String usingSessionCatalogForHive = + sessionConf.getOrDefault(usingSessionCatalogForHiveKey, "false"); + String type = + sessionConf.get(TerminalSessionFactory.SessionConfigOptions.catalogProperty(catalog, "type")); + return usingSessionCatalogForHive.equals("true") && CatalogType.HIVE.name().equalsIgnoreCase(type); + } } diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSessionFactory.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSessionFactory.java index 3b27f6a860..1b1b942c30 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSessionFactory.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSessionFactory.java @@ -71,6 +71,11 @@ public static ConfigOption catalogConnector(String catalog) { .noDefaultValue(); } + public static ConfigOption USING_SESSION_CATALOG_FOR_HIVE = ConfigOptions + .key("using-session-catalog-for-hive") + .booleanType() + .defaultValue(false); + public static ConfigOption catalogProperty(String catalog, String propertyKey) { return ConfigOptions.key("catalog." + catalog + "." + propertyKey) .stringType() diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/kyuubi/KyuubiSession.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/kyuubi/KyuubiSession.java index 331f2497db..6efd4f4bdf 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/kyuubi/KyuubiSession.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/kyuubi/KyuubiSession.java @@ -50,8 +50,17 @@ public Map configs() { @Override public ResultSet executeStatement(String catalog, String statement) { if (currentCatalog == null || !currentCatalog.equalsIgnoreCase(catalog)) { - logs.add("current catalog is " + currentCatalog + ", switch to " + catalog + " before execution"); - execute("use `" + catalog + "`"); + if (TerminalSession.canUseSparkSessionCatalog(sessionConf, catalog)) { + logs.add(String.format("current catalog is %s, " + + "since it's a hive type catalog and can use spark session catalog, " + + "switch to spark_catalog before execution", + currentCatalog)); + execute("use `spark_catalog`"); + } else { + logs.add(String.format("current catalog is %s, switch to %s before execution", + currentCatalog, catalog)); + execute("use `" + catalog + "`"); + } this.currentCatalog = catalog; } java.sql.ResultSet rs = null; diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/kyuubi/KyuubiTerminalSessionFactory.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/kyuubi/KyuubiTerminalSessionFactory.java index 9805dbbe41..20ea2b3692 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/kyuubi/KyuubiTerminalSessionFactory.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/kyuubi/KyuubiTerminalSessionFactory.java @@ -19,7 +19,6 @@ package com.netease.arctic.server.terminal.kyuubi; import com.clearspring.analytics.util.Lists; -import com.google.common.collect.Maps; import com.netease.arctic.server.terminal.SparkContextUtil; import com.netease.arctic.server.terminal.TerminalSession; import com.netease.arctic.server.terminal.TerminalSessionFactory; @@ -111,7 +110,7 @@ public TerminalSession create(TableMetaStore metaStore, Configurations configura logMessage(logs, "try to create a kyuubi connection via url: " + kyuubiJdbcUrl); logMessage(logs, ""); - Map sessionConf = Maps.newLinkedHashMap(); + Map sessionConf = configuration.toMap(); sessionConf.put("jdbc.url", kyuubiJdbcUrl); Properties properties = new Properties(); diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/local/LocalSessionFactory.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/local/LocalSessionFactory.java index e9bb28f6b5..177e069652 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/local/LocalSessionFactory.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/local/LocalSessionFactory.java @@ -27,7 +27,6 @@ import com.netease.arctic.table.TableMetaStore; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; import org.apache.spark.SparkConf; @@ -65,7 +64,7 @@ public TerminalSession create(TableMetaStore metaStore, Configurations configura Map sparkConf = SparkContextUtil.getSparkConf(configuration); sparkConf.put(com.netease.arctic.spark.SparkSQLProperties.REFRESH_CATALOG_BEFORE_USAGE, "true"); - Map finallyConf = Maps.newLinkedHashMap(); + Map finallyConf = configuration.toMap(); catalogs.stream() .filter(c -> isIcebergCatalog(c, configuration)) .forEach(c -> setHadoopConfigToSparkSession(c, session, metaStore)); diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/local/LocalTerminalSession.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/local/LocalTerminalSession.java index 0707cd3e90..ac0875ca53 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/local/LocalTerminalSession.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/local/LocalTerminalSession.java @@ -57,9 +57,17 @@ public Map configs() { @Override public ResultSet executeStatement(String catalog, String statement) { if (currentCatalog == null || !currentCatalog.equalsIgnoreCase(catalog)) { - session.sql("use `" + catalog + "`"); + if (TerminalSession.canUseSparkSessionCatalog(sessionConfigs, catalog)) { + session.sql("use `spark_catalog`"); + logs.add(String.format("current catalog is %s, " + + "since it's a hive type catalog and can use spark session catalog, " + + "switch to spark_catalog before execution", + currentCatalog)); + } else { + session.sql("use `" + catalog + "`"); + logs.add("switch to new catalog via: use " + catalog); + } currentCatalog = catalog; - logs.add("switch to new catalog via: use " + catalog); } Dataset ds = session.sql(statement); From 1ab9df67b6b13b8dd1ff09454dee17b3b35e20c5 Mon Sep 17 00:00:00 2001 From: jinsilei Date: Tue, 16 May 2023 15:35:03 +0800 Subject: [PATCH 2/2] remove useless import --- .../com/netease/arctic/server/terminal/SparkContextUtil.java | 1 - .../java/com/netease/arctic/server/terminal/TerminalSession.java | 1 - 2 files changed, 2 deletions(-) diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/SparkContextUtil.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/SparkContextUtil.java index 23a0b9e39c..7a1f8580df 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/SparkContextUtil.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/SparkContextUtil.java @@ -18,7 +18,6 @@ package com.netease.arctic.server.terminal; -import com.netease.arctic.hive.utils.CatalogUtil; import com.netease.arctic.server.catalog.CatalogType; import com.netease.arctic.server.utils.Configurations; import com.netease.arctic.spark.ArcticSparkCatalog; diff --git a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSession.java b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSession.java index 3a1d3e025b..4d06362833 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSession.java +++ b/ams/server/src/main/java/com/netease/arctic/server/terminal/TerminalSession.java @@ -18,7 +18,6 @@ package com.netease.arctic.server.terminal; -import com.netease.arctic.hive.utils.CatalogUtil; import com.netease.arctic.server.catalog.CatalogType; import java.util.List;