Skip to content

Commit

Permalink
[AMORO-2737] Convert the YAML config file to Configurations first and…
Browse files Browse the repository at this point in the history
… then perform the validity check. (apache#2739)

[AMORO-2737] Convert the YAML config file to Configurations first and then perform the validity check.
  • Loading branch information
paul8263 authored Apr 12, 2024
1 parent ba02527 commit 12d3688
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.netease.arctic.api.config.ConfigOptions;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

public class ArcticManagementConf {

Expand Down Expand Up @@ -382,6 +384,10 @@ public class ArcticManagementConf {
public static final String DB_TYPE_MYSQL = "mysql";
public static final String DB_TYPE_POSTGRES = "postgres";

// terminal config
public static final List<String> TERMINAL_BACKEND_VALUES =
Arrays.asList("local", "kyuubi", "custom");

// plugin config
public static final String METRIC_REPORTERS = "metric-reports";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.server;

import com.netease.arctic.api.config.ConfigOption;
import com.netease.arctic.api.config.Configurations;
import com.netease.arctic.server.dashboard.utils.AmsUtil;

import java.net.InetAddress;

public class ArcticManagementConfValidator {
public static void validateConfig(Configurations configurations) {
// SERVER_EXPOSE_HOST config
if ("".equals(configurations.getString(ArcticManagementConf.SERVER_EXPOSE_HOST))) {
throw new IllegalArgumentException(
"configuration " + ArcticManagementConf.SERVER_EXPOSE_HOST.key() + " must be set");
}
InetAddress inetAddress =
AmsUtil.lookForBindHost(configurations.getString(ArcticManagementConf.SERVER_EXPOSE_HOST));
configurations.setString(ArcticManagementConf.SERVER_EXPOSE_HOST, inetAddress.getHostAddress());

// mysql or postgres config
if (ArcticManagementConf.DB_TYPE_MYSQL.equalsIgnoreCase(
configurations.getString(ArcticManagementConf.DB_TYPE))
|| ArcticManagementConf.DB_TYPE_POSTGRES.equalsIgnoreCase(
configurations.getString(ArcticManagementConf.DB_TYPE))) {
if ("".equals(configurations.getString(ArcticManagementConf.DB_PASSWORD))
|| "".equals(configurations.getString(ArcticManagementConf.DB_USER_NAME))) {
throw new IllegalArgumentException(
"username and password must be configured if the database type is mysql or postgres");
}
}

// HA config
if (configurations.getBoolean(ArcticManagementConf.HA_ENABLE)) {
if ("".equals(configurations.getString(ArcticManagementConf.HA_ZOOKEEPER_ADDRESS))) {
throw new IllegalArgumentException(
ArcticManagementConf.HA_ZOOKEEPER_ADDRESS.key()
+ " must be configured when you enable "
+ "the ams high availability");
}
}
// terminal config
String terminalBackend =
configurations.getString(ArcticManagementConf.TERMINAL_BACKEND).toLowerCase();
if (!ArcticManagementConf.TERMINAL_BACKEND_VALUES.contains(terminalBackend)) {
throw new IllegalArgumentException(
String.format(
"Illegal terminal implement: %s, local, kyuubi, custom is available",
terminalBackend));
}

validateThreadCount(configurations, ArcticManagementConf.REFRESH_TABLES_THREAD_COUNT);
validateThreadCount(configurations, ArcticManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT);

if (configurations.getBoolean(ArcticManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) {
validateThreadCount(configurations, ArcticManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT);
}

if (configurations.getBoolean(ArcticManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) {
validateThreadCount(configurations, ArcticManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT);
}
if (configurations.getBoolean(ArcticManagementConf.SYNC_HIVE_TABLES_ENABLED)) {
validateThreadCount(configurations, ArcticManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT);
}
}

private static void validateThreadCount(
Configurations configurations, ConfigOption<Integer> config) {
int threadCount = configurations.getInteger(config);
if (threadCount <= 0) {
throw new IllegalArgumentException(
String.format(
"%s(%s) must > 0, actual value = %d",
config.key(), config.description(), threadCount));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.netease.arctic.api.OptimizerProperties;
import com.netease.arctic.api.OptimizingService;
import com.netease.arctic.api.config.ConfigHelpers;
import com.netease.arctic.api.config.ConfigOption;
import com.netease.arctic.api.config.Configurations;
import com.netease.arctic.server.dashboard.DashboardServer;
import com.netease.arctic.server.dashboard.response.ErrorResponse;
Expand Down Expand Up @@ -68,12 +67,10 @@
import org.yaml.snakeyaml.Yaml;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -404,8 +401,8 @@ private void initServiceConfig(Map<String, Object> envConfig) throws IOException
// If same configurations in files and environment variables, environment variables have
// higher priority.
expandedConfigurationMap.putAll(envConfig);
validateConfig(expandedConfigurationMap);
serviceConfig = Configurations.fromObjectMap(expandedConfigurationMap);
ArcticManagementConfValidator.validateConfig(serviceConfig);
SqlSessionFactoryProvider.getInstance().init(serviceConfig);
}

Expand All @@ -415,84 +412,6 @@ private Map<String, Object> initEnvConfig() {
return ConfigHelpers.convertConfigurationKeys(prefix, System.getenv());
}

private void validateConfig(Map<String, Object> systemConfig) {
if (!systemConfig.containsKey(ArcticManagementConf.SERVER_EXPOSE_HOST.key())) {
throw new IllegalArgumentException(
"configuration " + ArcticManagementConf.SERVER_EXPOSE_HOST.key() + " must be set");
}
InetAddress inetAddress =
AmsUtil.lookForBindHost(
(String) systemConfig.get(ArcticManagementConf.SERVER_EXPOSE_HOST.key()));
systemConfig.put(ArcticManagementConf.SERVER_EXPOSE_HOST.key(), inetAddress.getHostAddress());

if (!systemConfig.containsKey(ArcticManagementConf.DB_TYPE.key())) {
throw new IllegalArgumentException(
"configuration " + ArcticManagementConf.DB_TYPE.key() + " must be set");
}

// mysql config
if (ArcticManagementConf.DB_TYPE_MYSQL.equalsIgnoreCase(
(String) systemConfig.get(ArcticManagementConf.DB_TYPE.key()))) {
if (!systemConfig.containsKey(ArcticManagementConf.DB_PASSWORD.key())
|| !systemConfig.containsKey(ArcticManagementConf.DB_USER_NAME.key())) {
throw new IllegalArgumentException(
"username and password must be configured if the database type is mysql");
}
}

// HA config
if (systemConfig.containsKey(ArcticManagementConf.HA_ENABLE.key())
&& ((Boolean) systemConfig.get(ArcticManagementConf.HA_ENABLE.key()))) {
if (!systemConfig.containsKey(ArcticManagementConf.HA_ZOOKEEPER_ADDRESS.key())) {
throw new IllegalArgumentException(
ArcticManagementConf.HA_ZOOKEEPER_ADDRESS.key()
+ " must be configured when you enable "
+ "the ams high availability");
}
}
// terminal config
String terminalBackend =
systemConfig
.getOrDefault(ArcticManagementConf.TERMINAL_BACKEND.key(), "")
.toString()
.toLowerCase();
if (!Arrays.asList("local", "kyuubi", "custom").contains(terminalBackend)) {
throw new IllegalArgumentException(
String.format(
"Illegal terminal implement: %s, local, kyuubi, custom is available",
terminalBackend));
}

validateThreadCount(systemConfig, ArcticManagementConf.REFRESH_TABLES_THREAD_COUNT);
validateThreadCount(systemConfig, ArcticManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT);

if (enabled(systemConfig, ArcticManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) {
validateThreadCount(systemConfig, ArcticManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT);
}

if (enabled(systemConfig, ArcticManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) {
validateThreadCount(systemConfig, ArcticManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT);
}
if (enabled(systemConfig, ArcticManagementConf.SYNC_HIVE_TABLES_ENABLED)) {
validateThreadCount(systemConfig, ArcticManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT);
}
}

private boolean enabled(Map<String, Object> systemConfig, ConfigOption<Boolean> config) {
return (boolean) systemConfig.getOrDefault(config.key(), config.defaultValue());
}

private void validateThreadCount(
Map<String, Object> systemConfig, ConfigOption<Integer> config) {
int threadCount = (int) systemConfig.getOrDefault(config.key(), config.defaultValue());
if (threadCount <= 0) {
throw new IllegalArgumentException(
String.format(
"%s(%s) must > 0, actual value = %d",
config.key(), config.description(), threadCount));
}
}

/** Override the value of {@link SystemProperties}. */
private void setIcebergSystemProperties() {
int workerThreadPoolSize =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.server;

import com.netease.arctic.api.config.Configurations;
import org.junit.Assert;
import org.junit.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.stream.Stream;

public class TestArcticManagementConfValidator {

@Test
public void testBlankServerExposeHost() {
Configurations configurations = new Configurations();
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));
}

@Test
public void testValidateServerExposeHost() {
Configurations configurations = new Configurations();
configurations.setString(ArcticManagementConf.SERVER_EXPOSE_HOST, "0.0.0.0");
Assert.assertThrows(
RuntimeException.class, () -> ArcticManagementConfValidator.validateConfig(configurations));
}

public static Stream<Arguments> testValidateDBConfig() {
return Stream.of(Arguments.of("mysql"), Arguments.of("postgres"));
}

@ParameterizedTest
@MethodSource
public void testValidateDBConfig(String dbType) {
Configurations configurations = new Configurations();
configurations.setString(ArcticManagementConf.SERVER_EXPOSE_HOST, "127.0.0.1");
configurations.setString(ArcticManagementConf.DB_TYPE, dbType);
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));

configurations.setString(ArcticManagementConf.DB_PASSWORD, "123456");
configurations.setString(ArcticManagementConf.DB_USER_NAME, "");
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));

configurations.removeConfig(ArcticManagementConf.DB_PASSWORD);
configurations.setString(ArcticManagementConf.DB_USER_NAME, "root");
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));

configurations.setString(ArcticManagementConf.DB_PASSWORD, "123456");
ArcticManagementConfValidator.validateConfig(configurations);
}

@Test
public void testValidateHAConfig() {
Configurations configurations = new Configurations();
configurations.setString(ArcticManagementConf.SERVER_EXPOSE_HOST, "127.0.0.1");
configurations.setBoolean(ArcticManagementConf.HA_ENABLE, true);
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));

configurations.setString(ArcticManagementConf.HA_ZOOKEEPER_ADDRESS, "127.0.0.1:2181");
ArcticManagementConfValidator.validateConfig(configurations);
}

@Test
public void testValidateTerminalBackend() {
Configurations configurations = new Configurations();
configurations.setString(ArcticManagementConf.SERVER_EXPOSE_HOST, "127.0.0.1");

configurations.setString(ArcticManagementConf.TERMINAL_BACKEND, "invalid_terminal_backend");
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));

configurations.setString(ArcticManagementConf.TERMINAL_BACKEND, "local");
ArcticManagementConfValidator.validateConfig(configurations);

configurations.setString(ArcticManagementConf.TERMINAL_BACKEND, "kyuubi");
ArcticManagementConfValidator.validateConfig(configurations);

configurations.setString(ArcticManagementConf.TERMINAL_BACKEND, "custom");
ArcticManagementConfValidator.validateConfig(configurations);
}

@Test
public void testValidateThreadCount() {
Configurations configurations = new Configurations();
configurations.setString(ArcticManagementConf.SERVER_EXPOSE_HOST, "127.0.0.1");

configurations.setInteger(ArcticManagementConf.REFRESH_TABLES_THREAD_COUNT, -1);
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));
configurations.setInteger(ArcticManagementConf.REFRESH_TABLES_THREAD_COUNT, 10);
ArcticManagementConfValidator.validateConfig(configurations);

configurations.setInteger(ArcticManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT, -1);
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));
configurations.setInteger(ArcticManagementConf.OPTIMIZING_COMMIT_THREAD_COUNT, 10);
ArcticManagementConfValidator.validateConfig(configurations);

configurations.setBoolean(ArcticManagementConf.EXPIRE_SNAPSHOTS_ENABLED, true);
configurations.setInteger(ArcticManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT, -1);
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));
configurations.setInteger(ArcticManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT, 10);
ArcticManagementConfValidator.validateConfig(configurations);

configurations.setBoolean(ArcticManagementConf.CLEAN_ORPHAN_FILES_ENABLED, true);
configurations.setInteger(ArcticManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT, -1);
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));
configurations.setInteger(ArcticManagementConf.CLEAN_ORPHAN_FILES_THREAD_COUNT, 10);
ArcticManagementConfValidator.validateConfig(configurations);

configurations.setBoolean(ArcticManagementConf.SYNC_HIVE_TABLES_ENABLED, true);
configurations.setInteger(ArcticManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT, -1);
Assert.assertThrows(
IllegalArgumentException.class,
() -> ArcticManagementConfValidator.validateConfig(configurations));
configurations.setInteger(ArcticManagementConf.SYNC_HIVE_TABLES_THREAD_COUNT, 10);
ArcticManagementConfValidator.validateConfig(configurations);
}
}

0 comments on commit 12d3688

Please sign in to comment.