Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace the fastjson of the ArcticMetaStore class in ams-server with jackson #1392

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package com.netease.arctic.ams.server;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.netease.arctic.ams.api.ArcticTableMetastore;
import com.netease.arctic.ams.api.CatalogMeta;
Expand All @@ -45,11 +45,7 @@
import com.netease.arctic.ams.server.service.impl.FileInfoCacheService;
import com.netease.arctic.ams.server.service.impl.OptimizeExecuteService;
import com.netease.arctic.ams.server.service.impl.RuntimeDataExpireService;
import com.netease.arctic.ams.server.utils.AmsUtils;
import com.netease.arctic.ams.server.utils.SecurityUtils;
import com.netease.arctic.ams.server.utils.ThreadPool;
import com.netease.arctic.ams.server.utils.UpdateTool;
import com.netease.arctic.ams.server.utils.YamlUtils;
import com.netease.arctic.ams.server.utils.*;
import com.netease.arctic.utils.ConfigurationFileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
Expand Down Expand Up @@ -81,7 +77,7 @@ public class ArcticMetaStore {
public static final Logger LOG = LoggerFactory.getLogger(ArcticMetaStore.class);

public static Configuration conf;
private static JSONObject yamlConfig;
private static JsonNode yamlConfig;
private static TServer server;
private static final List<Thread> residentThreads = new ArrayList<>();
private static HighAvailabilityServices haService = null;
Expand All @@ -98,55 +94,57 @@ public static void main(String[] args) throws Throwable {
}

public static LinkedHashMap getSystemSettingFromYaml() {
JSONObject systemConfig = yamlConfig.getJSONObject(ConfigFileProperties.SYSTEM_CONFIG);
ObjectNode systemConfig = (ObjectNode) yamlConfig.get(ConfigFileProperties.SYSTEM_CONFIG);
LinkedHashMap<String, Object> config = new LinkedHashMap<String, Object>();

systemConfig.put(ArcticMetaStoreConf.ARCTIC_HOME.key(), getArcticHome());
String systemThriftPort = System.getProperty(ArcticMetaStoreConf.THRIFT_BIND_PORT.key());
if (systemThriftPort == null) {
systemConfig.put(
ArcticMetaStoreConf.THRIFT_BIND_PORT.key(),
systemConfig.getInteger(ArcticMetaStoreConf.THRIFT_BIND_PORT.key()));
JacksonUtils.getInteger(systemConfig, ArcticMetaStoreConf.THRIFT_BIND_PORT.key()));
} else {
systemConfig.put(ArcticMetaStoreConf.THRIFT_BIND_PORT.key(), Integer.parseInt(systemThriftPort));
}

validateConfig(systemConfig);
config.putAll(systemConfig);
config.putAll(JacksonUtils.parseObject(JacksonUtils.toJSONString(systemConfig), Map.class));

//extension properties
String extensionProStr = JacksonUtils.getString(yamlConfig, ConfigFileProperties.SYSTEM_EXTENSION_CONFIG);
Map<String,String> extensionPro =
yamlConfig.getObject(ConfigFileProperties.SYSTEM_EXTENSION_CONFIG, Map.class) == null ? new HashMap<>() :
yamlConfig.getObject(ConfigFileProperties.SYSTEM_EXTENSION_CONFIG, Map.class);
JacksonUtils.parseObject(extensionProStr, Map.class) == null ? new HashMap<>() :
JacksonUtils.parseObject(extensionProStr, Map.class);
config.put(ArcticMetaStoreConf.SYSTEM_EXTENSION_PROPERTIES.key(), extensionPro);

return config;
}

public static void validateConfig(JSONObject systemConfig) {
if (!systemConfig.containsKey(ArcticMetaStoreConf.THRIFT_BIND_HOST_PREFIX.key())) {
public static void validateConfig(ObjectNode systemConfig) {
if (!systemConfig.has(ArcticMetaStoreConf.THRIFT_BIND_HOST_PREFIX.key())) {
throw new RuntimeException("configuration " + ArcticMetaStoreConf.THRIFT_BIND_HOST_PREFIX.key() + " must be set");
}
InetAddress inetAddress = AmsUtils.getLocalHostExactAddress(
systemConfig.getString(ArcticMetaStoreConf.THRIFT_BIND_HOST_PREFIX.key()));
JacksonUtils.getString(systemConfig, ArcticMetaStoreConf.THRIFT_BIND_HOST_PREFIX.key()));
if (inetAddress == null) {
throw new RuntimeException("can't find host address start with " +
systemConfig.getString(ArcticMetaStoreConf.THRIFT_BIND_HOST_PREFIX.key()));
JacksonUtils.getString(systemConfig, ArcticMetaStoreConf.THRIFT_BIND_HOST_PREFIX.key()));
}
systemConfig.put(ArcticMetaStoreConf.THRIFT_BIND_HOST.key(), inetAddress.getHostAddress());

//mysql config
if (systemConfig.getString(ArcticMetaStoreConf.DB_TYPE.key()).equalsIgnoreCase("mysql")) {
if (!systemConfig.containsKey(ArcticMetaStoreConf.MYBATIS_CONNECTION_PASSWORD.key()) ||
!systemConfig.containsKey(ArcticMetaStoreConf.MYBATIS_CONNECTION_USER_NAME.key())) {
String dbType = JacksonUtils.getString(systemConfig, ArcticMetaStoreConf.DB_TYPE.key());
if (("mysql").equalsIgnoreCase(dbType)) {
if (!systemConfig.has(ArcticMetaStoreConf.MYBATIS_CONNECTION_PASSWORD.key()) ||
!systemConfig.has(ArcticMetaStoreConf.MYBATIS_CONNECTION_USER_NAME.key())) {
throw new RuntimeException("username and password must be configured if the database type is mysql");
}
}

//HA config
if (systemConfig.containsKey(ArcticMetaStoreConf.HA_ENABLE.key()) &&
systemConfig.getBoolean(ArcticMetaStoreConf.HA_ENABLE.key())) {
if (!systemConfig.containsKey(ArcticMetaStoreConf.ZOOKEEPER_SERVER.key())) {
if (systemConfig.has(ArcticMetaStoreConf.HA_ENABLE.key()) &&
JacksonUtils.getBoolean(systemConfig, ArcticMetaStoreConf.HA_ENABLE.key())) {
if (!systemConfig.has(ArcticMetaStoreConf.ZOOKEEPER_SERVER.key())) {
throw new RuntimeException(ArcticMetaStoreConf.ZOOKEEPER_SERVER.key() + " must be configured when you enable " +
"the ams high availability");
}
Expand Down Expand Up @@ -483,7 +481,7 @@ private static void checkLeader() {
!haService.getMaster().equals(haService.getNodeInfo(
conf.getString(ArcticMetaStoreConf.THRIFT_BIND_HOST),
conf.getInteger(ArcticMetaStoreConf.THRIFT_BIND_PORT)))) {
LOG.info("there is not leader, the leader is " + JSONObject.toJSONString(haService.getMaster()));
LOG.info("there is not leader, the leader is " + JacksonUtils.toJSONString(haService.getMaster()));
failover();
}
} catch (Exception e) {
Expand All @@ -503,77 +501,78 @@ private static Configuration initSystemConfig() {
}

private static void initCatalogConfig() throws IOException {
JSONArray catalogs = yamlConfig.getJSONArray(ConfigFileProperties.CATALOG_LIST);
JsonNode catalogs = yamlConfig.findValue(ConfigFileProperties.CATALOG_LIST).get(0);
List<CatalogMeta> catalogMetas = Lists.newArrayList();
for (int i = 0; i < catalogs.size(); i++) {
CatalogMeta catalogMeta = new CatalogMeta();
JSONObject catalog = catalogs.getJSONObject(i);
catalogMeta.catalogName = catalog.getString(ConfigFileProperties.CATALOG_NAME);
catalogMeta.catalogType = catalog.getString(ConfigFileProperties.CATALOG_TYPE);
JsonNode catalog = catalogs.get(i);
catalogMeta.catalogName = JacksonUtils.getString(catalog, ConfigFileProperties.CATALOG_NAME);
catalogMeta.catalogType = JacksonUtils.getString(catalog, ConfigFileProperties.CATALOG_TYPE);

if (catalog.containsKey(ConfigFileProperties.CATALOG_STORAGE_CONFIG)) {
if (catalog.has(ConfigFileProperties.CATALOG_STORAGE_CONFIG)) {
Map<String, String> storageConfig = new HashMap<>();
JSONObject catalogStorageConfig = catalog.getJSONObject(ConfigFileProperties.CATALOG_STORAGE_CONFIG);
if (catalogStorageConfig.containsKey(ConfigFileProperties.CATALOG_STORAGE_TYPE)) {
JsonNode catalogStorageConfig = catalog.get(ConfigFileProperties.CATALOG_STORAGE_CONFIG);
if (catalogStorageConfig.has(ConfigFileProperties.CATALOG_STORAGE_TYPE)) {
storageConfig.put(
CatalogMetaProperties.STORAGE_CONFIGS_KEY_TYPE,
catalogStorageConfig.getString(ConfigFileProperties.CATALOG_STORAGE_TYPE));
JacksonUtils.getString(catalogStorageConfig, ConfigFileProperties.CATALOG_STORAGE_TYPE));
}
storageConfig.put(
CatalogMetaProperties.STORAGE_CONFIGS_KEY_CORE_SITE,
ConfigurationFileUtils.encodeXmlConfigurationFileWithBase64(
catalogStorageConfig.getString(ConfigFileProperties.CATALOG_CORE_SITE)));
JacksonUtils.getString(catalogStorageConfig, ConfigFileProperties.CATALOG_CORE_SITE)));
storageConfig.put(
CatalogMetaProperties.STORAGE_CONFIGS_KEY_HDFS_SITE,
ConfigurationFileUtils.encodeXmlConfigurationFileWithBase64(
catalogStorageConfig.getString(ConfigFileProperties.CATALOG_HDFS_SITE)));
JacksonUtils.getString(catalogStorageConfig, ConfigFileProperties.CATALOG_HDFS_SITE)));
storageConfig.put(
CatalogMetaProperties.STORAGE_CONFIGS_KEY_HIVE_SITE,
ConfigurationFileUtils.encodeXmlConfigurationFileWithBase64(
catalogStorageConfig.getString(ConfigFileProperties.CATALOG_HIVE_SITE)));
JacksonUtils.getString(catalogStorageConfig, ConfigFileProperties.CATALOG_HIVE_SITE)));
catalogMeta.storageConfigs = storageConfig;
}

if (catalog.containsKey(ConfigFileProperties.CATALOG_AUTH_CONFIG)) {
if (catalog.has(ConfigFileProperties.CATALOG_AUTH_CONFIG)) {
Map<String, String> authConfig = new HashMap<>();
JSONObject catalogAuthConfig = catalog.getJSONObject(ConfigFileProperties.CATALOG_AUTH_CONFIG);
if (catalogAuthConfig.containsKey(ConfigFileProperties.CATALOG_AUTH_TYPE)) {
JsonNode catalogAuthConfig = catalog.get(ConfigFileProperties.CATALOG_AUTH_CONFIG);
if (catalogAuthConfig.has(ConfigFileProperties.CATALOG_AUTH_TYPE)) {
authConfig.put(
CatalogMetaProperties.AUTH_CONFIGS_KEY_TYPE,
catalogAuthConfig.getString(ConfigFileProperties.CATALOG_AUTH_TYPE));
JacksonUtils.getString(catalogAuthConfig, ConfigFileProperties.CATALOG_AUTH_TYPE));
}
if (catalogAuthConfig.getString(ConfigFileProperties.CATALOG_AUTH_TYPE)
if (JacksonUtils.getString(catalogAuthConfig, ConfigFileProperties.CATALOG_AUTH_TYPE)
.equalsIgnoreCase(CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_SIMPLE)) {
if (catalogAuthConfig.containsKey(ConfigFileProperties.CATALOG_SIMPLE_HADOOP_USERNAME)) {
if (catalogAuthConfig.has(ConfigFileProperties.CATALOG_SIMPLE_HADOOP_USERNAME)) {
authConfig.put(
CatalogMetaProperties.AUTH_CONFIGS_KEY_HADOOP_USERNAME,
catalogAuthConfig.getString(ConfigFileProperties.CATALOG_SIMPLE_HADOOP_USERNAME));
JacksonUtils.getString(catalogAuthConfig, ConfigFileProperties.CATALOG_SIMPLE_HADOOP_USERNAME));
}
} else if (catalogAuthConfig.getString(ConfigFileProperties.CATALOG_AUTH_TYPE)
} else if (JacksonUtils.getString(catalogAuthConfig, ConfigFileProperties.CATALOG_AUTH_TYPE)
.equalsIgnoreCase(CatalogMetaProperties.AUTH_CONFIGS_VALUE_TYPE_KERBEROS)) {
if (catalogAuthConfig.containsKey(ConfigFileProperties.CATALOG_KEYTAB)) {
if (catalogAuthConfig.has(ConfigFileProperties.CATALOG_KEYTAB)) {
authConfig.put(
CatalogMetaProperties.AUTH_CONFIGS_KEY_KEYTAB,
ConfigurationFileUtils.encodeConfigurationFileWithBase64(
catalogAuthConfig.getString(ConfigFileProperties.CATALOG_KEYTAB)));
JacksonUtils.getString(catalogAuthConfig, ConfigFileProperties.CATALOG_KEYTAB)));
}
if (catalogAuthConfig.containsKey(ConfigFileProperties.CATALOG_KRB5)) {
if (catalogAuthConfig.has(ConfigFileProperties.CATALOG_KRB5)) {
authConfig.put(
CatalogMetaProperties.AUTH_CONFIGS_KEY_KRB5,
ConfigurationFileUtils.encodeConfigurationFileWithBase64(
catalogAuthConfig.getString(ConfigFileProperties.CATALOG_KRB5)));
JacksonUtils.getString(catalogAuthConfig, ConfigFileProperties.CATALOG_KRB5)));
}
if (catalogAuthConfig.containsKey(ConfigFileProperties.CATALOG_PRINCIPAL)) {
if (catalogAuthConfig.has(ConfigFileProperties.CATALOG_PRINCIPAL)) {
authConfig.put(
CatalogMetaProperties.AUTH_CONFIGS_KEY_PRINCIPAL,
catalogAuthConfig.getString(ConfigFileProperties.CATALOG_PRINCIPAL));
JacksonUtils.getString(catalogAuthConfig, ConfigFileProperties.CATALOG_PRINCIPAL));
}
}
catalogMeta.authConfigs = authConfig;
}

if (catalog.containsKey(ConfigFileProperties.CATALOG_PROPERTIES)) {
catalogMeta.catalogProperties = catalog.getObject(ConfigFileProperties.CATALOG_PROPERTIES, Map.class);
if (catalog.has(ConfigFileProperties.CATALOG_PROPERTIES)) {
catalogMeta.catalogProperties = JacksonUtils.parseObject(JacksonUtils.getString(catalog,
ConfigFileProperties.CATALOG_PROPERTIES), Map.class);
}
catalogMetas.add(catalogMeta);
}
Expand All @@ -591,32 +590,33 @@ private static void initConfig() {
}

private static void initContainerConfig() {
JSONArray containers = yamlConfig.getJSONArray(ConfigFileProperties.CONTAINER_LIST);
JsonNode containers = yamlConfig.findValues(ConfigFileProperties.CONTAINER_LIST).get(0);
for (int i = 0; i < containers.size(); i++) {
JSONObject optimize = containers.getJSONObject(i);
JsonNode optimize = containers.get(i);
Container container = new Container();
container.setName(optimize.getString(ConfigFileProperties.CONTAINER_NAME));
container.setType(optimize.getString(ConfigFileProperties.CONTAINER_TYPE));
if (optimize.containsKey(ConfigFileProperties.CONTAINER_PROPERTIES)) {
container.setProperties(optimize.getObject(ConfigFileProperties.CONTAINER_PROPERTIES, Map.class));
container.setName(JacksonUtils.getString(optimize, ConfigFileProperties.CONTAINER_NAME));
container.setType(JacksonUtils.getString(optimize, ConfigFileProperties.CONTAINER_TYPE));
if (optimize.has(ConfigFileProperties.CONTAINER_PROPERTIES)) {
container.setProperties(JacksonUtils.parseObject(JacksonUtils.getString(optimize,
ConfigFileProperties.CONTAINER_PROPERTIES), Map.class));
}

ServiceContainer.getOptimizeQueueService().insertContainer(container);
}
}

private static void initOptimizeGroupConfig() throws MetaException, NoSuchObjectException, InvalidObjectException {
JSONArray optimizeGroups = yamlConfig.getJSONArray(ConfigFileProperties.OPTIMIZE_GROUP_LIST);
JsonNode optimizeGroups = yamlConfig.findValues(ConfigFileProperties.OPTIMIZE_GROUP_LIST).get(0);
List<OptimizeQueueMeta> optimizeQueueMetas = ServiceContainer.getOptimizeQueueService().getQueues();
for (int i = 0; i < optimizeGroups.size(); i++) {
JSONObject optimizeGroup = optimizeGroups.getJSONObject(i);
JsonNode optimizeGroup = optimizeGroups.get(i);
OptimizeQueueMeta optimizeQueueMeta = new OptimizeQueueMeta();
optimizeQueueMeta.setName(optimizeGroup.getString(ConfigFileProperties.OPTIMIZE_GROUP_NAME));
optimizeQueueMeta.setContainer(optimizeGroup.getString(ConfigFileProperties.OPTIMIZE_GROUP_CONTAINER));
optimizeQueueMeta.setName(JacksonUtils.getString(optimizeGroup, ConfigFileProperties.OPTIMIZE_GROUP_NAME));
optimizeQueueMeta.setContainer(JacksonUtils.getString(optimizeGroup, ConfigFileProperties.OPTIMIZE_GROUP_CONTAINER));

//init schedule policy
String schedulePolicy =
StringUtils.trim(optimizeGroup.getString(ConfigFileProperties.OPTIMIZE_SCHEDULING_POLICY));
StringUtils.trim(JacksonUtils.getString(optimizeGroup, ConfigFileProperties.OPTIMIZE_SCHEDULING_POLICY));
if (StringUtils.isBlank(schedulePolicy)) {
schedulePolicy = ConfigFileProperties.OPTIMIZE_SCHEDULING_POLICY_QUOTA;
} else if (
Expand All @@ -634,15 +634,16 @@ private static void initOptimizeGroupConfig() throws MetaException, NoSuchObject
boolean checkContainer =
containers.stream()
.anyMatch(e -> e.getName()
.equalsIgnoreCase(optimizeGroup.getString(ConfigFileProperties.OPTIMIZE_GROUP_CONTAINER)));
.equalsIgnoreCase(JacksonUtils.getString(optimizeGroup, ConfigFileProperties.OPTIMIZE_GROUP_CONTAINER)));
if (!checkContainer) {
throw new NoSuchObjectException(
"can not find such container config named:" +
optimizeGroup.getString(ConfigFileProperties.OPTIMIZE_GROUP_CONTAINER));
JacksonUtils.getString(optimizeGroup, ConfigFileProperties.OPTIMIZE_GROUP_CONTAINER));
}
if (optimizeGroup.containsKey(ConfigFileProperties.OPTIMIZE_GROUP_PROPERTIES)) {
if (optimizeGroup.has(ConfigFileProperties.OPTIMIZE_GROUP_PROPERTIES)) {
optimizeQueueMeta.setProperties(
optimizeGroup.getObject(ConfigFileProperties.OPTIMIZE_GROUP_PROPERTIES, Map.class));
JacksonUtils.parseObject(JacksonUtils.getString(optimizeGroup,
ConfigFileProperties.OPTIMIZE_GROUP_PROPERTIES), Map.class));
}
boolean updated = false;
for (OptimizeQueueMeta meta : optimizeQueueMetas) {
Expand Down Expand Up @@ -674,7 +675,7 @@ public void isLeader() {
AmsServerInfo serverInfo = new AmsServerInfo();
serverInfo.setHost(conf.getString(ArcticMetaStoreConf.THRIFT_BIND_HOST));
serverInfo.setThriftBindPort(conf.getInteger(ArcticMetaStoreConf.THRIFT_BIND_PORT));
zkService.setData(masterPath, JSONObject.toJSONString(serverInfo));
zkService.setData(masterPath, JacksonUtils.toJSONString(serverInfo));
isLeader.set(true);
startMetaStore(initSystemConfig());
} catch (Throwable throwable) {
Expand Down
Loading