Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;

import com.aliyun.odps.table.utils.Preconditions;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -127,14 +129,34 @@ public Map<StorageProperties.Type, StorageProperties> getStoragePropertiesMap()
.collect(Collectors.toMap(StorageProperties::getType, Function.identity()));
} catch (UserException e) {
LOG.warn("Failed to initialize catalog storage properties", e);
throw new RuntimeException("Failed to initialize storage properties for catalog", e);
throw new RuntimeException("Failed to initialize storage properties, error: "
+ ExceptionUtils.getRootCauseMessage(e), e);
}
}
}
}
return storagePropertiesMap;
}

public void checkMetaStoreAndStorageProperties(Class msClass) {
MetastoreProperties msProperties;
List<StorageProperties> storageProperties;
try {
msProperties = MetastoreProperties.create(getProperties());
storageProperties = StorageProperties.createAll(getProperties());
} catch (UserException e) {
throw new RuntimeException("Failed to initialize Catalog properties, error: "
+ ExceptionUtils.getRootCauseMessage(e), e);
}
Preconditions.checkNotNull(storageProperties,
"Storage properties are not configured properly");
Preconditions.checkNotNull(msProperties, "Metastore properties are not configured properly");
Preconditions.checkArgument(
msClass.isInstance(msProperties),
String.format("Metastore properties type is not correct. Expected %s but got %s",
msClass.getName(), msProperties.getClass().getName()));
}

/**
* Get metastore properties with lazy loading, using double-check locking to ensure thread safety
*/
Expand All @@ -150,7 +172,8 @@ public MetastoreProperties getMetastoreProperties() {
metastoreProperties = MetastoreProperties.create(getProperties());
} catch (UserException e) {
LOG.warn("Failed to create metastore properties", e);
throw new RuntimeException("Failed to create metastore properties", e);
throw new RuntimeException("Failed to create metastore properties, error: "
+ ExceptionUtils.getRootCauseMessage(e), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package org.apache.doris.datasource.hive;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
Expand All @@ -34,14 +32,12 @@
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.metastore.AbstractHMSProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.fs.FileSystemProvider;
import org.apache.doris.fs.FileSystemProviderImpl;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.transaction.TransactionManagerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -94,14 +90,6 @@ public AbstractHMSProperties getHmsProperties() {
return hmsProperties;
}

private void initAndCheckHmsProperties() {
try {
this.hmsProperties = (AbstractHMSProperties) MetastoreProperties.create(catalogProperty.getProperties());
} catch (UserException e) {
throw new RuntimeException("Failed to create HMSProperties from catalog properties", e);
}
}

@VisibleForTesting
public HMSExternalCatalog() {
catalogProperty = new CatalogProperty(null, null);
Expand Down Expand Up @@ -134,41 +122,7 @@ public void checkProperties() throws DdlException {
throw new DdlException(
"The parameter " + PARTITION_CACHE_TTL_SECOND + " is wrong, value is " + partitionCacheTtlSecond);
}

// check the dfs.ha properties
// 'dfs.nameservices'='your-nameservice',
// 'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
// 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
// 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
// 'dfs.client.failover.proxy.provider.your-nameservice'='xxx'
String dfsNameservices = catalogProperty.getOrDefault(HdfsResource.DSF_NAMESERVICES, "");
if (Strings.isNullOrEmpty(dfsNameservices)) {
return;
}

String[] nameservices = dfsNameservices.split(",");
for (String dfsservice : nameservices) {
String namenodes = catalogProperty.getOrDefault("dfs.ha.namenodes." + dfsservice, "");
if (Strings.isNullOrEmpty(namenodes)) {
throw new DdlException("Missing dfs.ha.namenodes." + dfsservice + " property");
}
String[] names = namenodes.split(",");
for (String name : names) {
String address = catalogProperty.getOrDefault("dfs.namenode.rpc-address." + dfsservice + "." + name,
"");
if (Strings.isNullOrEmpty(address)) {
throw new DdlException(
"Missing dfs.namenode.rpc-address." + dfsservice + "." + name + " property");
}
}
String failoverProvider = catalogProperty.getOrDefault("dfs.client.failover.proxy.provider." + dfsservice,
"");
if (Strings.isNullOrEmpty(failoverProvider)) {
throw new DdlException(
"Missing dfs.client.failover.proxy.provider." + dfsservice + " property");
}
}
initAndCheckHmsProperties();
catalogProperty.checkMetaStoreAndStorageProperties(AbstractHMSProperties.class);
}

@Override
Expand All @@ -180,7 +134,7 @@ protected synchronized void initPreExecutionAuthenticator() {

@Override
protected void initLocalObjectsImpl() {
initAndCheckHmsProperties();
this.hmsProperties = (AbstractHMSProperties) catalogProperty.getMetastoreProperties();
initPreExecutionAuthenticator();
HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hmsProperties.getHiveConf(), this);
threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth(
Expand All @@ -190,7 +144,7 @@ protected void initLocalObjectsImpl() {
true,
executionAuthenticator);
FileSystemProvider fileSystemProvider = new FileSystemProviderImpl(Env.getCurrentEnv().getExtMetaCacheMgr(),
this.catalogProperty.getStoragePropertiesMap());
this.catalogProperty.getStoragePropertiesMap());
this.fileSystemExecutor = ThreadPoolManager.newDaemonFixedThreadPool(FILE_SYSTEM_EXECUTOR_THREAD_NUM,
Integer.MAX_VALUE, String.format("hms_committer_%s_file_system_executor_pool", name), true);
transactionManager = TransactionManagerFactory.createHiveTransactionManager(hiveOps, fileSystemProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.doris.datasource.iceberg;

import org.apache.doris.common.DdlException;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.metastore.AbstractIcebergProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;
import org.apache.doris.transaction.TransactionManagerFactory;

import org.apache.iceberg.catalog.Catalog;
Expand Down Expand Up @@ -54,21 +53,24 @@ public IcebergExternalCatalog(long catalogId, String name, String comment) {
// Create catalog based on catalog type
protected void initCatalog() {
try {
msProperties = (AbstractIcebergProperties) MetastoreProperties
.create(getProperties());
msProperties = (AbstractIcebergProperties) catalogProperty.getMetastoreProperties();
this.catalog = msProperties.initializeCatalog(getName(), new ArrayList<>(catalogProperty
.getStoragePropertiesMap().values()));

this.icebergCatalogType = msProperties.getIcebergCatalogType();
} catch (UserException e) {
throw new RuntimeException("Failed to initialize Iceberg catalog: " + e.getMessage(), e);
} catch (ClassCastException e) {
throw new RuntimeException("Invalid properties for Iceberg catalog: " + getProperties(), e);
} catch (Exception e) {
throw new RuntimeException("Unexpected error while initializing Iceberg catalog: " + e.getMessage(), e);
}
}

@Override
public void checkProperties() throws DdlException {
super.checkProperties();
catalogProperty.checkMetaStoreAndStorageProperties(AbstractIcebergProperties.class);
}

@Override
protected synchronized void initPreExecutionAuthenticator() {
if (executionAuthenticator == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.doris.datasource.paimon;

import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.NameMapping;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
import org.apache.doris.datasource.property.metastore.MetastoreProperties;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -60,12 +58,7 @@ public PaimonExternalCatalog(long catalogId, String name, String resource, Map<S

@Override
protected void initLocalObjectsImpl() {
try {
paimonProperties = (AbstractPaimonProperties) MetastoreProperties.create(catalogProperty.getProperties());
} catch (UserException e) {
throw new IllegalArgumentException("Failed to create Paimon properties from catalog properties,exception: "
+ ExceptionUtils.getRootCauseMessage(e), e);
}
paimonProperties = (AbstractPaimonProperties) catalogProperty.getMetastoreProperties();
catalogType = paimonProperties.getPaimonCatalogType();
catalog = createCatalog();
initPreExecutionAuthenticator();
Expand Down Expand Up @@ -194,14 +187,7 @@ public Map<String, String> getPaimonOptionsMap() {

@Override
public void checkProperties() throws DdlException {
if (null != paimonProperties) {
try {
this.paimonProperties = (AbstractPaimonProperties) MetastoreProperties
.create(catalogProperty.getProperties());
} catch (UserException e) {
throw new DdlException("Failed to create Paimon properties from catalog properties, exception: "
+ ExceptionUtils.getRootCauseMessage(e), e);
}
}
super.checkProperties();
catalogProperty.checkMetaStoreAndStorageProperties(AbstractPaimonProperties.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void initNormalizeAndCheckProps() {
initBackendConfigProperties();
this.hadoopStorageConfig = new Configuration();
this.backendConfigProperties.forEach(hadoopStorageConfig::set);
HdfsPropertiesUtils.checkHaConfig(backendConfigProperties);
hadoopAuthenticator = HadoopAuthenticator.getHadoopAuthenticator(hadoopStorageConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,22 @@
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;

import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class HdfsPropertiesUtils {
private static final String URI_KEY = "uri";
Expand Down Expand Up @@ -127,7 +133,7 @@ public static String validateAndNormalizeUri(String location, Set<String> suppor
}

public static String validateAndNormalizeUri(String location, String host, String defaultFs,
Set<String> supportedSchemas) {
Set<String> supportedSchemas) {
if (StringUtils.isBlank(location)) {
throw new IllegalArgumentException("Property 'uri' is required.");
}
Expand Down Expand Up @@ -179,4 +185,76 @@ public static String validateAndNormalizeUri(String location, String host, Strin
throw new StoragePropertiesException("Failed to parse URI: " + location, e);
}
}

/**
* Validate the required HDFS HA configuration properties.
*
* <p>This method checks the following:
* <ul>
* <li>{@code dfs.nameservices} must be defined if HA is enabled.</li>
* <li>{@code dfs.ha.namenodes.<nameservice>} must be defined and contain at least 2 namenodes.</li>
* <li>For each namenode, {@code dfs.namenode.rpc-address.<nameservice>.<namenode>} must be defined.</li>
* <li>{@code dfs.client.failover.proxy.provider.<nameservice>} must be defined.</li>
* </ul>
*
* @param hdfsProperties configuration map (similar to core-site.xml/hdfs-site.xml properties)
*/
public static void checkHaConfig(Map<String, String> hdfsProperties) {
if (hdfsProperties == null) {
return;
}
// 1. Check dfs.nameservices
String dfsNameservices = hdfsProperties.getOrDefault(HdfsClientConfigKeys.DFS_NAMESERVICES, "");
if (Strings.isNullOrEmpty(dfsNameservices)) {
// No nameservice configured => HA is not enabled, nothing to validate
return;
}
for (String dfsservice : splitAndTrim(dfsNameservices)) {
if (dfsservice.isEmpty()) {
continue;
}
// 2. Check dfs.ha.namenodes.<nameservice>
String haNnKey = HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + dfsservice;
String namenodes = hdfsProperties.getOrDefault(haNnKey, "");
if (Strings.isNullOrEmpty(namenodes)) {
throw new IllegalArgumentException("Missing property: " + haNnKey);
}
List<String> names = splitAndTrim(namenodes);
if (names.size() < 2) {
throw new IllegalArgumentException("HA requires at least 2 namenodes for service: " + dfsservice);
}
// 3. Check dfs.namenode.rpc-address.<nameservice>.<namenode>
for (String name : names) {
String rpcKey = HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + dfsservice + "." + name;
String address = hdfsProperties.getOrDefault(rpcKey, "");
if (Strings.isNullOrEmpty(address)) {
throw new IllegalArgumentException("Missing property: " + rpcKey + " (expected format: host:port)");
}
}
// 4. Check dfs.client.failover.proxy.provider.<nameservice>
String failoverKey = HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX + "." + dfsservice;
String failoverProvider = hdfsProperties.getOrDefault(failoverKey, "");
if (Strings.isNullOrEmpty(failoverProvider)) {
throw new IllegalArgumentException("Missing property: " + failoverKey);
}
}
}

/**
* Utility method to split a comma-separated string, trim whitespace,
* and remove empty tokens.
*
* @param s the input string
* @return list of trimmed non-empty values
*/
private static List<String> splitAndTrim(String s) {
if (Strings.isNullOrEmpty(s)) {
return Collections.emptyList();
}
return Arrays.stream(s.split(","))
.map(String::trim)
.filter(tok -> !tok.isEmpty())
.collect(Collectors.toList());
}
}

Loading
Loading