Skip to content

Commit

Permalink
[Arctic-1266][AMS]: Use ArcticSparkSessionCatalog for terminal for hi…
Browse files Browse the repository at this point in the history
…ve catalog (#1264)

* Support terminal access hive table when switch to a catalog with HMS metastore

* add document for arctic.ams.terminal.local.using-session-catalog-for-hive

---------

Co-authored-by: geoli <geoli@tencent.com>
Co-authored-by: baiyangtx <xiangnebula@163.com>
  • Loading branch information
3 people authored Apr 3, 2023
1 parent 37b407b commit ac2be3a
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package com.netease.arctic.ams.server.terminal;

import com.netease.arctic.ams.server.config.Configuration;
import com.netease.arctic.ams.server.utils.CatalogUtil;
import com.netease.arctic.spark.ArcticSparkCatalog;
import com.netease.arctic.spark.ArcticSparkExtensions;
import com.netease.arctic.spark.ArcticSparkSessionCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
Expand Down Expand Up @@ -50,8 +52,16 @@ public static Map<String, String> getSparkConf(Configuration sessionConfig) {
sparkConf.put("spark.sql.catalog." + catalog + "." + key, property);
}
} else {
sparkConf.put("spark.sql.catalog." + catalog, ArcticSparkCatalog.class.getName());
sparkConf.put("spark.sql.catalog." + catalog + ".url", catalogUrlBase + catalog);
String sparkCatalogPrefix = "spark.sql.catalog." + catalog;
String catalogClassName = ArcticSparkCatalog.class.getName();
if (sessionConfig.getBoolean(
TerminalSessionFactory.SessionConfigOptions.USING_SESSION_CATALOG_FOR_HIVE) &&
CatalogUtil.isHiveCatalog(catalog)) {
sparkCatalogPrefix = "spark.sql.catalog.spark_catalog";
catalogClassName = ArcticSparkSessionCatalog.class.getName();
}
sparkConf.put(sparkCatalogPrefix, catalogClassName);
sparkConf.put(sparkCatalogPrefix + ".url", catalogUrlBase + catalog);
}
}
return sparkConf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class TerminalManager {
private final Object sessionMapLock = new Object();
private final Map<String, TerminalSessionContext> sessionMap = Maps.newHashMap();

private Configuration sessionConfiguration;


ThreadPoolExecutor executionPool = new ThreadPoolExecutor(
1, 50, 30, TimeUnit.MINUTES,
Expand All @@ -74,6 +76,7 @@ public TerminalManager(Configuration conf) {
this.resultLimits = conf.getInteger(ArcticMetaStoreConf.TERMINAL_RESULT_LIMIT);
this.stopOnError = conf.getBoolean(ArcticMetaStoreConf.TERMINAL_STOP_ON_ERROR);
this.sessionTimeout = conf.getInteger(ArcticMetaStoreConf.TERMINAL_SESSION_TIMEOUT);
this.sessionConfiguration = getSessionConfiguration(conf);
this.sessionFactory = loadTerminalSessionFactory(conf);
Thread cleanThread = new Thread(new SessionCleanTask());
cleanThread.setName("terminal-session-gc");
Expand All @@ -99,11 +102,9 @@ public String executeScript(String terminalId, String catalog, String script) {
TableMetaStore metaStore = getCatalogTableMetaStore(catalogMeta);
String sessionId = getSessionId(terminalId, metaStore, catalog);
String catalogType = CatalogUtil.isIcebergCatalog(catalog) ? "iceberg" : "arctic";
Configuration configuration = new Configuration();
configuration.setInteger(SessionConfigOptions.FETCH_SIZE, resultLimits);
Configuration configuration = new Configuration(this.sessionConfiguration);
configuration.set(SessionConfigOptions.CATALOGS, Lists.newArrayList(catalog));
configuration.set(SessionConfigOptions.catalogConnector(catalog), catalogType);
configuration.set(SessionConfigOptions.CATALOG_URL_BASE, AmsUtils.getAMSHaAddress());
for (String key : catalogMeta.getCatalogProperties().keySet()) {
String value = catalogMeta.getCatalogProperties().get(key);
configuration.set(SessionConfigOptions.catalogProperty(catalog, key), value);
Expand Down Expand Up @@ -282,6 +283,12 @@ private TerminalSessionFactory loadTerminalSessionFactory(Configuration conf) {
throw new RuntimeException("failed to init session factory", e);
}

factory.initialize(this.sessionConfiguration);
return factory;
}

private Configuration getSessionConfiguration(Configuration conf) {
String backend = conf.get(ArcticMetaStoreConf.TERMINAL_BACKEND);
String factoryPropertiesPrefix = ArcticMetaStoreConf.TERMINAL_PREFIX + backend + ".";
Configuration configuration = new Configuration();

Expand All @@ -294,8 +301,8 @@ private TerminalSessionFactory loadTerminalSessionFactory(Configuration conf) {
configuration.setString(key, value);
}
configuration.set(TerminalSessionFactory.FETCH_SIZE, this.resultLimits);
factory.initialize(configuration);
return factory;
configuration.set(SessionConfigOptions.CATALOG_URL_BASE, AmsUtils.getAMSHaAddress());
return configuration;
}

private class SessionCleanTask implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package com.netease.arctic.ams.server.terminal;

import com.netease.arctic.ams.server.utils.CatalogUtil;

import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -84,4 +86,13 @@ default boolean empty() {
* close session and release resources.
*/
void release();

static boolean canUseSparkSessionCatalog(Map<String, String> sessionConf, String catalog) {
String usingSessionCatalogForHiveKey =
TerminalSessionFactory.SessionConfigOptions.USING_SESSION_CATALOG_FOR_HIVE.key();
String usingSessionCatalogForHive =
sessionConf.getOrDefault(usingSessionCatalogForHiveKey, "false");
return usingSessionCatalogForHive.equals("true") &&
CatalogUtil.isHiveCatalog(catalog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class SessionConfigOptions {
.booleanType()
.noDefaultValue();

public static ConfigOption<Boolean> USING_SESSION_CATALOG_FOR_HIVE = ConfigOptions
.key("using-session-catalog-for-hive")
.booleanType()
.defaultValue(false);

public static ConfigOption<String> catalogConnector(String catalog) {
return ConfigOptions.key("session.catalog." + catalog + ".connector")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,17 @@ public Map<String, String> configs() {
@Override
public ResultSet executeStatement(String catalog, String statement) {
if (currentCatalog == null || !currentCatalog.equalsIgnoreCase(catalog)) {
logs.add("current catalog is " + currentCatalog + ", switch to " + catalog + " before execution");
execute("use `" + catalog + "`");
if (TerminalSession.canUseSparkSessionCatalog(sessionConf, catalog)) {
logs.add(String.format("current catalog is %s, " +
"since it's a hive type catalog and can use spark session catalog, " +
"switch to spark_catalog before execution",
currentCatalog));
execute("use `spark_catalog`");
} else {
logs.add(String.format("current catalog is %s, switch to %s before execution",
currentCatalog, catalog));
execute("use `" + catalog + "`");
}
this.currentCatalog = catalog;
}
java.sql.ResultSet rs = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package com.netease.arctic.ams.server.terminal.kyuubi;

import com.clearspring.analytics.util.Lists;
import com.google.common.collect.Maps;
import com.netease.arctic.ams.server.config.ConfigOption;
import com.netease.arctic.ams.server.config.ConfigOptions;
import com.netease.arctic.ams.server.config.Configuration;
Expand Down Expand Up @@ -111,7 +110,7 @@ public TerminalSession create(TableMetaStore metaStore, Configuration configurat
logMessage(logs, "try to create a kyuubi connection via url: " + kyuubiJdbcUrl);
logMessage(logs, "");

Map<String, String> sessionConf = Maps.newLinkedHashMap();
Map<String, String> sessionConf = configuration.toMap();
sessionConf.put("jdbc.url", kyuubiJdbcUrl);
Properties properties = new Properties();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.netease.arctic.spark.ArcticSparkExtensions;
import com.netease.arctic.table.TableMetaStore;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -66,7 +65,7 @@ public TerminalSession create(TableMetaStore metaStore, Configuration configurat
initializeLogs.add("initialize session, session factory: " + LocalSessionFactory.class.getName());

Map<String, String> sparkConf = SparkContextUtil.getSparkConf(configuration);
Map<String, String> finallyConf = Maps.newLinkedHashMap();
Map<String, String> finallyConf = configuration.toMap();
sparkConf.put(REFRESH_CATALOG_BEFORE_USAGE, "true");
if (isNativeIceberg) {
org.apache.hadoop.conf.Configuration metaConf = metaStore.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,17 @@ public Map<String, String> configs() {
@Override
public ResultSet executeStatement(String catalog, String statement) {
if (currentCatalog == null || !currentCatalog.equalsIgnoreCase(catalog)) {
session.sql("use `" + catalog + "`");
if (TerminalSession.canUseSparkSessionCatalog(sessionConfigs, catalog)) {
session.sql("use `spark_catalog`");
logs.add(String.format("current catalog is %s, " +
"since it's a hive type catalog and can use spark session catalog, " +
"switch to spark_catalog before execution",
currentCatalog));
} else {
session.sql("use `" + catalog + "`");
logs.add("switch to new catalog via: use " + catalog);
}
currentCatalog = catalog;
logs.add("switch to new catalog via: use " + catalog);
}

Dataset<Row> ds = session.sql(statement);
Expand Down
2 changes: 2 additions & 0 deletions site/docs/ch/guides/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ Terminal 在 local 模式执行的情况下,可以配置 Spark 相关参数
arctic.ams.terminal.backend: local
arctic.ams.terminal.local.spark.sql.session.timeZone: UTC
arctic.ams.terminal.local.spark.sql.iceberg.handle-timestamp-without-timezone: false
# When the catalog type is hive, using spark session catalog automatically in the terminal to access hive tables
arctic.ams.terminal.local.using-session-catalog-for-hive: true
```
## 启动 AMS
Expand Down

0 comments on commit ac2be3a

Please sign in to comment.