Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.common.util;

import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.storage.AzurePropertyUtils;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import org.apache.doris.fs.FileSystemType;
Expand Down Expand Up @@ -307,6 +308,10 @@ public static String getTempWritePath(String loc, String prefix) {
}

public TFileType getTFileTypeForBE() {
if ((SchemaTypeMapper.ABFS.getSchema().equals(schema) || SchemaTypeMapper.ABFSS.getSchema()
.equals(schema)) && AzurePropertyUtils.isOneLakeLocation(normalizedLocation)) {
return TFileType.FILE_HDFS;
}
if (StringUtils.isNotBlank(normalizedLocation) && isHdfsOnOssEndpoint(normalizedLocation)) {
return TFileType.FILE_HDFS;
}
Expand All @@ -324,6 +329,10 @@ public Path toStorageLocation() {


public FileSystemType getFileSystemType() {
if ((SchemaTypeMapper.ABFS.getSchema().equals(schema) || SchemaTypeMapper.ABFSS.getSchema()
.equals(schema)) && AzurePropertyUtils.isOneLakeLocation(normalizedLocation)) {
return FileSystemType.HDFS;
}
return SchemaTypeMapper.fromSchemaToFileSystemType(schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.storage.StorageProperties;

import com.google.common.collect.ImmutableList;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -58,7 +60,7 @@ protected AbstractPaimonProperties(Map<String, String> props) {

public abstract Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList);

protected void appendCatalogOptions(List<StorageProperties> storagePropertiesList) {
protected void appendCatalogOptions() {
if (StringUtils.isNotBlank(warehouse)) {
catalogOptions.set(CatalogOptions.WAREHOUSE.key(), warehouse);
}
Expand All @@ -69,7 +71,10 @@ protected void appendCatalogOptions(List<StorageProperties> storagePropertiesLis
if (k.toLowerCase().startsWith(USER_PROPERTY_PREFIX)) {
String newKey = k.substring(USER_PROPERTY_PREFIX.length());
if (StringUtils.isNotBlank(newKey)) {
catalogOptions.set(newKey, v);
boolean excluded = userStoragePrefixes.stream().anyMatch(k::startsWith);
if (!excluded) {
catalogOptions.set(newKey, v);
}
}
}
});
Expand All @@ -78,12 +83,16 @@ protected void appendCatalogOptions(List<StorageProperties> storagePropertiesLis
/**
* Build catalog options including common and subclass-specific ones.
*/
public void buildCatalogOptions(List<StorageProperties> storagePropertiesList) {
public void buildCatalogOptions() {
catalogOptions = new Options();
appendCatalogOptions(storagePropertiesList);
appendCatalogOptions();
appendCustomCatalogOptions();
}

protected void appendUserHadoopConfig(Configuration conf) {
normalizeS3Config().forEach(conf::set);
}

public Map<String, String> getCatalogOptionsMap() {
// Return the cached map if already initialized
Map<String, String> existing = catalogOptionsMapRef.get();
Expand Down Expand Up @@ -112,6 +121,38 @@ public Map<String, String> getCatalogOptionsMap() {
}
}

/**
* @See org.apache.paimon.s3.S3FileIO
* Possible S3 config key prefixes:
* 1. "s3." - Paimon legacy custom prefix
* 2. "s3a." - Paimon-supported shorthand
* 3. "fs.s3a." - Hadoop S3A official prefix
*
* All of them are normalized to the Hadoop-recognized prefix "fs.s3a."
*/
private final List<String> userStoragePrefixes = ImmutableList.of(
"paimon.s3.", "paimon.s3a.", "paimon.fs.s3.", "paimon.fs.oss."
);

/** Hadoop S3A standard prefix */
private static final String FS_S3A_PREFIX = "fs.s3a.";

/**
* Normalizes user-provided S3 config keys to Hadoop S3A keys
*/
protected Map<String, String> normalizeS3Config() {
Map<String, String> result = new HashMap<>();
origProps.forEach((key, value) -> {
for (String prefix : userStoragePrefixes) {
if (key.startsWith(prefix)) {
result.put(FS_S3A_PREFIX + key.substring(prefix.length()), value);
return; // stop after the first matching prefix
}
}
});
return result;
}


/**
* Hook method for subclasses to append metastore-specific or custom catalog options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
List<StorageProperties> storagePropertiesList) {
Configuration configuration = buildConfiguration(storagePropertiesList);
HadoopCatalog catalog = new HadoopCatalog();
buildCatalogProps(catalogProps, storagePropertiesList);
buildCatalogProps(storagePropertiesList);
catalog.setConf(configuration);
try {
this.executionAuthenticator.execute(() -> {
Expand All @@ -71,7 +71,7 @@ private Configuration buildConfiguration(List<StorageProperties> storageProperti
return configuration;
}

private void buildCatalogProps(Map<String, String> props, List<StorageProperties> storagePropertiesList) {
private void buildCatalogProps(List<StorageProperties> storagePropertiesList) {
if (storagePropertiesList.size() == 1 && storagePropertiesList.get(0) instanceof HdfsProperties) {
HdfsProperties hdfsProps = (HdfsProperties) storagePropertiesList.get(0);
if (hdfsProps.isKerberos()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ public Catalog initCatalog(String catalogName, Map<String, String> catalogProps,
HiveCatalog hiveCatalog = new HiveCatalog();
hiveCatalog.setConf(conf);
storagePropertiesList.forEach(sp -> {
for (Map.Entry<String, String> entry : sp.getHadoopStorageConfig()) {
catalogProps.put(entry.getKey(), entry.getValue());
}
// NOTE: Custom FileIO implementation (KerberizedHadoopFileIO) is commented out by default.
// Using FileIO for Kerberos authentication may cause serialization issues when accessing
// Iceberg system tables (e.g., history, snapshots, manifests).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.datasource.property.ConnectorProperty;
import org.apache.doris.datasource.property.ParamRules;
import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties;
import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;

import com.google.common.collect.Maps;
Expand Down Expand Up @@ -86,6 +85,7 @@ public class IcebergRestProperties extends AbstractIcebergProperties {

@ConnectorProperty(names = {"iceberg.rest.oauth2.credential"},
required = false,
sensitive = true,
description = "The oauth2 credential for the iceberg rest catalog service.")
private String icebergRestOauth2Credential;

Expand Down Expand Up @@ -150,6 +150,7 @@ public class IcebergRestProperties extends AbstractIcebergProperties {

@ConnectorProperty(names = {"iceberg.rest.secret-access-key"},
required = false,
sensitive = true,
description = "The secret access key for the iceberg rest catalog service.")
private String icebergRestSecretAccessKey = "";

Expand Down Expand Up @@ -339,14 +340,12 @@ public void toFileIOProperties(List<StorageProperties> storagePropertiesList,
Map<String, String> fileIOProperties, Configuration conf) {

for (StorageProperties storageProperties : storagePropertiesList) {
if (storageProperties instanceof HdfsCompatibleProperties) {
storageProperties.getBackendConfigProperties().forEach(conf::set);
} else if (storageProperties instanceof AbstractS3CompatibleProperties) {
if (storageProperties instanceof AbstractS3CompatibleProperties) {
// For all S3-compatible storage types, put properties in fileIOProperties map
toS3FileIOProperties((AbstractS3CompatibleProperties) storageProperties, fileIOProperties);
} else {
// For other storage types, just use fileIOProperties map
fileIOProperties.putAll(storageProperties.getBackendConfigProperties());
conf.addResource(storageProperties.getHadoopStorageConfig());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ private HiveConf buildHiveConf() {
@Override
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
HiveConf hiveConf = buildHiveConf();
buildCatalogOptions(storagePropertiesList);
buildCatalogOptions();
StorageProperties ossProps = storagePropertiesList.stream()
.filter(sp -> sp.getType() == StorageProperties.Type.OSS)
.findFirst()
Expand All @@ -97,10 +97,8 @@ public Catalog initializeCatalog(String catalogName, List<StorageProperties> sto
throw new IllegalStateException("Expected OSSProperties type.");
}
OSSProperties ossProperties = (OSSProperties) ossProps;
for (Map.Entry<String, String> entry : ossProperties.getHadoopStorageConfig()) {
catalogOptions.set(entry.getKey(), entry.getValue());
}
hiveConf.addResource(ossProperties.getHadoopStorageConfig());
appendUserHadoopConfig(hiveConf);
CatalogContext catalogContext = CatalogContext.create(catalogOptions, hiveConf);
return CatalogFactory.createCatalog(catalogContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,16 @@ protected PaimonFileSystemMetaStoreProperties(Map<String, String> props) {

@Override
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
buildCatalogOptions(storagePropertiesList);
buildCatalogOptions();
Configuration conf = new Configuration();
storagePropertiesList.forEach(storageProperties -> {
for (Map.Entry<String, String> entry : storageProperties.getHadoopStorageConfig()) {
catalogOptions.set(entry.getKey(), entry.getValue());
}
conf.addResource(storageProperties.getHadoopStorageConfig());
if (storageProperties.getType().equals(StorageProperties.Type.HDFS)) {
this.executionAuthenticator = new HadoopExecutionAuthenticator(((HdfsProperties) storageProperties)
.getHadoopAuthenticator());
}
});

appendUserHadoopConfig(conf);
CatalogContext catalogContext = CatalogContext.create(catalogOptions, conf);
try {
return this.executionAuthenticator.execute(() -> CatalogFactory.createCatalog(catalogContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,9 @@ private Configuration buildHiveConfiguration(List<StorageProperties> storageProp

@Override
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
buildCatalogOptions();
Configuration conf = buildHiveConfiguration(storagePropertiesList);
buildCatalogOptions(storagePropertiesList);
for (Map.Entry<String, String> entry : conf) {
catalogOptions.set(entry.getKey(), entry.getValue());
}
appendUserHadoopConfig(conf);
CatalogContext catalogContext = CatalogContext.create(catalogOptions, conf);
try {
return executionAuthenticator.execute(() -> CatalogFactory.createCatalog(catalogContext));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public String getPaimonCatalogType() {

@Override
public Catalog initializeCatalog(String catalogName, List<StorageProperties> storagePropertiesList) {
buildCatalogOptions(storagePropertiesList);
buildCatalogOptions();
CatalogContext catalogContext = CatalogContext.create(catalogOptions);
return CatalogFactory.createCatalog(catalogContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.LogManager;
Expand All @@ -30,6 +29,7 @@
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.SdkSystemSetting;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -226,8 +226,6 @@ public static Optional<String> extractRegion(Set<Pattern> endpointPatterns, Stri

protected abstract Set<Pattern> endpointPatterns();

protected abstract Set<String> schemas();

// This method should be overridden by subclasses to provide a default endpoint based on the region.
// Because for aws s3, only region is needed, the endpoint can be constructed from the region.
// But for other s3 compatible storage, the endpoint may need to be specified explicitly.
Expand All @@ -249,16 +247,11 @@ public String validateAndGetUri(Map<String, String> loadProps) throws UserExcept
@Override
public void initializeHadoopStorageConfig() {
hadoopStorageConfig = new Configuration();
origProps.forEach((key, value) -> {
if (key.startsWith("fs.")) {
hadoopStorageConfig.set(key, value);
}
});
// Compatibility note: Due to historical reasons, even when the underlying
// storage is OSS, OBS, etc., users may still configure the schema as "s3://".
// To ensure backward compatibility, we append S3-related properties by default.
appendS3HdfsProperties(hadoopStorageConfig);
ensureDisableCache(hadoopStorageConfig, origProps);
setDefaultRequestChecksum();
}

private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
Expand All @@ -285,34 +278,30 @@ private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
}

/**
* By default, Hadoop caches FileSystem instances per scheme and authority (e.g. s3a://bucket/), meaning that all
* subsequent calls using the same URI will reuse the same FileSystem object.
* In multi-tenant or dynamic credential environments — where different users may access the same bucket using
* different access keys or tokens — this cache reuse can lead to cross-credential contamination.
* <p>
* Specifically, if the cache is not disabled, a FileSystem instance initialized with one set of credentials may
* be reused by another session targeting the same bucket but with a different AK/SK. This results in:
* <p>
* Incorrect authentication (using stale credentials)
* Sets the AWS request checksum calculation property to "WHEN_REQUIRED"
* only if it has not been explicitly set by the user.
*
* <p>
* Unexpected permission errors or access denial
* Background:
* AWS SDK for Java v2 uses the system property
* {@link SdkSystemSetting#AWS_REQUEST_CHECKSUM_CALCULATION} to determine
* whether request payloads should have a checksum calculated.
* <p>
* Potential data leakage between users
* According to the official AWS discussion:
* https://github.com/aws/aws-sdk-java-v2/discussions/5802
* - Default SDK behavior may calculate checksums automatically if the property is not set.
* - Automatic calculation can affect performance or cause unexpected behavior for large requests.
* <p>
* To avoid such risks, the configuration property
* fs.<schema>.impl.disable.cache
* must be set to true for all object storage backends (e.g., S3A, OSS, COS, OBS), ensuring that each new access
* creates an isolated FileSystem instance with its own credentials and configuration context.
* This method ensures:
* 1. The property is set to "WHEN_REQUIRED" only if the user has not already set it.
* 2. User-specified settings are never overridden.
* 3. Aligns with AWS SDK recommended best practices.
* </p>
*/
private void ensureDisableCache(Configuration conf, Map<String, String> origProps) {
for (String schema : schemas()) {
String key = "fs." + schema + ".impl.disable.cache";
String userValue = origProps.get(key);
if (StringUtils.isNotBlank(userValue)) {
conf.setBoolean(key, BooleanUtils.toBoolean(userValue));
} else {
conf.setBoolean(key, true);
}
public static void setDefaultRequestChecksum() {
String key = SdkSystemSetting.AWS_REQUEST_CHECKSUM_CALCULATION.property();
if (System.getProperty(key) == null) {
System.setProperty(key, "WHEN_REQUIRED");
}
}

Expand Down
Loading
Loading