From 0afa74ee0c515101704a06dcd65232b5ad2c7a80 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Fri, 28 Nov 2025 16:49:19 +0800 Subject: [PATCH 1/3] branch-4.0: [fix](rest-s3)Set AWS Request Checksum Calculation only if not set void issues on non-S3 storage (#58467) #58467 --- .../AbstractS3CompatibleProperties.java | 30 +++++++++++++++++++ .../property/storage/OSSPropertiesTest.java | 10 +++++++ 2 files changed, 40 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java index c4cc493abe9232..78aa8069de0027 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java @@ -30,6 +30,7 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.SdkSystemSetting; import java.util.HashMap; import java.util.Map; @@ -258,6 +259,7 @@ public void initializeHadoopStorageConfig() { // storage is OSS, OBS, etc., users may still configure the schema as "s3://". // To ensure backward compatibility, we append S3-related properties by default. appendS3HdfsProperties(hadoopStorageConfig); + setDefaultRequestChecksum(); ensureDisableCache(hadoopStorageConfig, origProps); } @@ -284,6 +286,34 @@ private void appendS3HdfsProperties(Configuration hadoopStorageConfig) { hadoopStorageConfig.set("fs.s3a.path.style.access", getUsePathStyle()); } + /** + * Sets the AWS request checksum calculation property to "WHEN_REQUIRED" + * only if it has not been explicitly set by the user. + * + *

+ * Background: + * AWS SDK for Java v2 uses the system property + * {@link SdkSystemSetting#AWS_REQUEST_CHECKSUM_CALCULATION} to determine + * whether request payloads should have a checksum calculated. + *

+ * According to the official AWS discussion: + * https://github.com/aws/aws-sdk-java-v2/discussions/5802 + * - Default SDK behavior may calculate checksums automatically if the property is not set. + * - Automatic calculation can affect performance or cause unexpected behavior for large requests. + *

+ * This method ensures: + * 1. The property is set to "WHEN_REQUIRED" only if the user has not already set it. + * 2. User-specified settings are never overridden. + * 3. Aligns with AWS SDK recommended best practices. + *

+ */ + public static void setDefaultRequestChecksum() { + String key = SdkSystemSetting.AWS_REQUEST_CHECKSUM_CALCULATION.property(); + if (System.getProperty(key) == null) { + System.setProperty(key, "WHEN_REQUIRED"); + } + } + /** * By default, Hadoop caches FileSystem instances per scheme and authority (e.g. s3a://bucket/), meaning that all * subsequent calls using the same URI will reuse the same FileSystem object. diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java index 0bc5e823f0ecee..5d6c5dae12f31f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.SdkSystemSetting; import java.util.HashMap; import java.util.Map; @@ -268,4 +269,13 @@ public void testS3DisableHadoopCache() throws UserException { Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false)); } + @Test + public void testResuestCheckSum() throws UserException { + Map props = Maps.newHashMap(); + props.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com"); + Assertions.assertEquals("WHEN_REQUIRED", System.getProperty(SdkSystemSetting.AWS_REQUEST_CHECKSUM_CALCULATION.property())); + System.setProperty("aws.requestChecksumCalculation", "ALWAYS"); + Assertions.assertEquals("ALWAYS", System.getProperty(SdkSystemSetting.AWS_REQUEST_CHECKSUM_CALCULATION.property())); + } + } From e9d4887d77ab3f7e0825b1bb3e3001c8d47418d1 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 23 Oct 2025 13:42:37 +0800 Subject: [PATCH 2/3] [fix](paimon)Support user-defined S3 config prefixes and unify to HDFS S3A protocol #57116 --- .../metastore/AbstractPaimonProperties.java | 49 +++++++++- .../IcebergFileSystemMetaStoreProperties.java | 4 +- .../IcebergHMSMetaStoreProperties.java | 3 - .../metastore/IcebergRestProperties.java | 7 +- .../PaimonAliyunDLFMetaStoreProperties.java | 6 +- .../PaimonFileSystemMetaStoreProperties.java | 7 +- .../PaimonHMSMetaStoreProperties.java | 6 +- .../PaimonRestMetaStoreProperties.java | 2 +- .../AbstractS3CompatibleProperties.java | 41 --------- .../property/storage/AzureProperties.java | 7 ++ .../property/storage/BrokerProperties.java | 8 ++ .../storage/HdfsCompatibleProperties.java | 7 ++ .../property/storage/LocalProperties.java | 6 ++ .../property/storage/StorageProperties.java | 62 ++++++++++++- .../AbstractPaimonPropertiesTest.java | 89 +++++++++++++++++++ .../PaimonRestMetaStorePropertiesTest.java | 14 +-- 16 files changed, 236 insertions(+), 82 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractPaimonPropertiesTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java index 9c9a49455d1c86..7d0fca2446ca4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractPaimonProperties.java @@ -21,8 +21,10 @@ import org.apache.doris.datasource.property.ConnectorProperty; import org.apache.doris.datasource.property.storage.StorageProperties; +import com.google.common.collect.ImmutableList; import lombok.Getter; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; @@ -58,7 +60,7 @@ protected AbstractPaimonProperties(Map props) { public abstract Catalog initializeCatalog(String catalogName, List storagePropertiesList); - protected void appendCatalogOptions(List storagePropertiesList) { + protected void appendCatalogOptions() { if (StringUtils.isNotBlank(warehouse)) { catalogOptions.set(CatalogOptions.WAREHOUSE.key(), warehouse); } @@ -69,7 +71,10 @@ protected void appendCatalogOptions(List storagePropertiesLis if (k.toLowerCase().startsWith(USER_PROPERTY_PREFIX)) { String newKey = k.substring(USER_PROPERTY_PREFIX.length()); if (StringUtils.isNotBlank(newKey)) { - catalogOptions.set(newKey, v); + boolean excluded = userStoragePrefixes.stream().anyMatch(k::startsWith); + if (!excluded) { + catalogOptions.set(newKey, v); + } } } }); @@ -78,12 +83,16 @@ protected void appendCatalogOptions(List storagePropertiesLis /** * Build catalog options including common and subclass-specific ones. */ - public void buildCatalogOptions(List storagePropertiesList) { + public void buildCatalogOptions() { catalogOptions = new Options(); - appendCatalogOptions(storagePropertiesList); + appendCatalogOptions(); appendCustomCatalogOptions(); } + protected void appendUserHadoopConfig(Configuration conf) { + normalizeS3Config().forEach(conf::set); + } + public Map getCatalogOptionsMap() { // Return the cached map if already initialized Map existing = catalogOptionsMapRef.get(); @@ -112,6 +121,38 @@ public Map getCatalogOptionsMap() { } } + /** + * @See org.apache.paimon.s3.S3FileIO + * Possible S3 config key prefixes: + * 1. "s3." - Paimon legacy custom prefix + * 2. "s3a." - Paimon-supported shorthand + * 3. "fs.s3a." - Hadoop S3A official prefix + * + * All of them are normalized to the Hadoop-recognized prefix "fs.s3a." + */ + private final List userStoragePrefixes = ImmutableList.of( + "paimon.s3.", "paimon.s3a.", "paimon.fs.s3.", "paimon.fs.oss." + ); + + /** Hadoop S3A standard prefix */ + private static final String FS_S3A_PREFIX = "fs.s3a."; + + /** + * Normalizes user-provided S3 config keys to Hadoop S3A keys + */ + protected Map normalizeS3Config() { + Map result = new HashMap<>(); + origProps.forEach((key, value) -> { + for (String prefix : userStoragePrefixes) { + if (key.startsWith(prefix)) { + result.put(FS_S3A_PREFIX + key.substring(prefix.length()), value); + return; // stop after the first matching prefix + } + } + }); + return result; + } + /** * Hook method for subclasses to append metastore-specific or custom catalog options. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java index 9323d78e318825..d644b7b06e0a51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergFileSystemMetaStoreProperties.java @@ -46,7 +46,7 @@ public Catalog initCatalog(String catalogName, Map catalogProps, List storagePropertiesList) { Configuration configuration = buildConfiguration(storagePropertiesList); HadoopCatalog catalog = new HadoopCatalog(); - buildCatalogProps(catalogProps, storagePropertiesList); + buildCatalogProps(storagePropertiesList); catalog.setConf(configuration); try { this.executionAuthenticator.execute(() -> { @@ -71,7 +71,7 @@ private Configuration buildConfiguration(List storageProperti return configuration; } - private void buildCatalogProps(Map props, List storagePropertiesList) { + private void buildCatalogProps(List storagePropertiesList) { if (storagePropertiesList.size() == 1 && storagePropertiesList.get(0) instanceof HdfsProperties) { HdfsProperties hdfsProps = (HdfsProperties) storagePropertiesList.get(0); if (hdfsProps.isKerberos()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java index d0217f082d232e..dc6b4b448ae4a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergHMSMetaStoreProperties.java @@ -71,9 +71,6 @@ public Catalog initCatalog(String catalogName, Map catalogProps, HiveCatalog hiveCatalog = new HiveCatalog(); hiveCatalog.setConf(conf); storagePropertiesList.forEach(sp -> { - for (Map.Entry entry : sp.getHadoopStorageConfig()) { - catalogProps.put(entry.getKey(), entry.getValue()); - } // NOTE: Custom FileIO implementation (KerberizedHadoopFileIO) is commented out by default. // Using FileIO for Kerberos authentication may cause serialization issues when accessing // Iceberg system tables (e.g., history, snapshots, manifests). diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java index 0bdfc2a4337929..982c7864324c2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java @@ -21,7 +21,6 @@ import org.apache.doris.datasource.property.ConnectorProperty; import org.apache.doris.datasource.property.ParamRules; import org.apache.doris.datasource.property.storage.AbstractS3CompatibleProperties; -import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties; import org.apache.doris.datasource.property.storage.StorageProperties; import com.google.common.collect.Maps; @@ -339,14 +338,12 @@ public void toFileIOProperties(List storagePropertiesList, Map fileIOProperties, Configuration conf) { for (StorageProperties storageProperties : storagePropertiesList) { - if (storageProperties instanceof HdfsCompatibleProperties) { - storageProperties.getBackendConfigProperties().forEach(conf::set); - } else if (storageProperties instanceof AbstractS3CompatibleProperties) { + if (storageProperties instanceof AbstractS3CompatibleProperties) { // For all S3-compatible storage types, put properties in fileIOProperties map toS3FileIOProperties((AbstractS3CompatibleProperties) storageProperties, fileIOProperties); } else { // For other storage types, just use fileIOProperties map - fileIOProperties.putAll(storageProperties.getBackendConfigProperties()); + storageProperties.getBackendConfigProperties().forEach(conf::set); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java index ae4aeda48f877f..a3e6c9dd85a189 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java @@ -87,7 +87,7 @@ private HiveConf buildHiveConf() { @Override public Catalog initializeCatalog(String catalogName, List storagePropertiesList) { HiveConf hiveConf = buildHiveConf(); - buildCatalogOptions(storagePropertiesList); + buildCatalogOptions(); StorageProperties ossProps = storagePropertiesList.stream() .filter(sp -> sp.getType() == StorageProperties.Type.OSS) .findFirst() @@ -97,10 +97,8 @@ public Catalog initializeCatalog(String catalogName, List sto throw new IllegalStateException("Expected OSSProperties type."); } OSSProperties ossProperties = (OSSProperties) ossProps; - for (Map.Entry entry : ossProperties.getHadoopStorageConfig()) { - catalogOptions.set(entry.getKey(), entry.getValue()); - } hiveConf.addResource(ossProperties.getHadoopStorageConfig()); + appendUserHadoopConfig(hiveConf); CatalogContext catalogContext = CatalogContext.create(catalogOptions, hiveConf); return CatalogFactory.createCatalog(catalogContext); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java index 9a74775b9b298c..df0ebae97490a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonFileSystemMetaStoreProperties.java @@ -38,19 +38,16 @@ protected PaimonFileSystemMetaStoreProperties(Map props) { @Override public Catalog initializeCatalog(String catalogName, List storagePropertiesList) { - buildCatalogOptions(storagePropertiesList); + buildCatalogOptions(); Configuration conf = new Configuration(); storagePropertiesList.forEach(storageProperties -> { - for (Map.Entry entry : storageProperties.getHadoopStorageConfig()) { - catalogOptions.set(entry.getKey(), entry.getValue()); - } conf.addResource(storageProperties.getHadoopStorageConfig()); if (storageProperties.getType().equals(StorageProperties.Type.HDFS)) { this.executionAuthenticator = new HadoopExecutionAuthenticator(((HdfsProperties) storageProperties) .getHadoopAuthenticator()); } }); - + appendUserHadoopConfig(conf); CatalogContext catalogContext = CatalogContext.create(catalogOptions, conf); try { return this.executionAuthenticator.execute(() -> CatalogFactory.createCatalog(catalogContext)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java index 2f67d109dac51a..24342fce457e0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonHMSMetaStoreProperties.java @@ -87,11 +87,9 @@ private Configuration buildHiveConfiguration(List storageProp @Override public Catalog initializeCatalog(String catalogName, List storagePropertiesList) { + buildCatalogOptions(); Configuration conf = buildHiveConfiguration(storagePropertiesList); - buildCatalogOptions(storagePropertiesList); - for (Map.Entry entry : conf) { - catalogOptions.set(entry.getKey(), entry.getValue()); - } + appendUserHadoopConfig(conf); CatalogContext catalogContext = CatalogContext.create(catalogOptions, conf); try { return executionAuthenticator.execute(() -> CatalogFactory.createCatalog(catalogContext)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStoreProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStoreProperties.java index 91e47c102530e5..85651a2dc8304e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStoreProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStoreProperties.java @@ -77,7 +77,7 @@ public String getPaimonCatalogType() { @Override public Catalog initializeCatalog(String catalogName, List storagePropertiesList) { - buildCatalogOptions(storagePropertiesList); + buildCatalogOptions(); CatalogContext catalogContext = CatalogContext.create(catalogOptions); return CatalogFactory.createCatalog(catalogContext); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java index 78aa8069de0027..ed03e1f0fcd09d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java @@ -21,7 +21,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.logging.log4j.LogManager; @@ -227,8 +226,6 @@ public static Optional extractRegion(Set endpointPatterns, Stri protected abstract Set endpointPatterns(); - protected abstract Set schemas(); - // This method should be overridden by subclasses to provide a default endpoint based on the region. // Because for aws s3, only region is needed, the endpoint can be constructed from the region. // But for other s3 compatible storage, the endpoint may need to be specified explicitly. @@ -250,17 +247,11 @@ public String validateAndGetUri(Map loadProps) throws UserExcept @Override public void initializeHadoopStorageConfig() { hadoopStorageConfig = new Configuration(); - origProps.forEach((key, value) -> { - if (key.startsWith("fs.")) { - hadoopStorageConfig.set(key, value); - } - }); // Compatibility note: Due to historical reasons, even when the underlying // storage is OSS, OBS, etc., users may still configure the schema as "s3://". // To ensure backward compatibility, we append S3-related properties by default. appendS3HdfsProperties(hadoopStorageConfig); setDefaultRequestChecksum(); - ensureDisableCache(hadoopStorageConfig, origProps); } private void appendS3HdfsProperties(Configuration hadoopStorageConfig) { @@ -314,38 +305,6 @@ public static void setDefaultRequestChecksum() { } } - /** - * By default, Hadoop caches FileSystem instances per scheme and authority (e.g. s3a://bucket/), meaning that all - * subsequent calls using the same URI will reuse the same FileSystem object. - * In multi-tenant or dynamic credential environments — where different users may access the same bucket using - * different access keys or tokens — this cache reuse can lead to cross-credential contamination. - *

- * Specifically, if the cache is not disabled, a FileSystem instance initialized with one set of credentials may - * be reused by another session targeting the same bucket but with a different AK/SK. This results in: - *

- * Incorrect authentication (using stale credentials) - *

- * Unexpected permission errors or access denial - *

- * Potential data leakage between users - *

- * To avoid such risks, the configuration property - * fs..impl.disable.cache - * must be set to true for all object storage backends (e.g., S3A, OSS, COS, OBS), ensuring that each new access - * creates an isolated FileSystem instance with its own credentials and configuration context. - */ - private void ensureDisableCache(Configuration conf, Map origProps) { - for (String schema : schemas()) { - String key = "fs." + schema + ".impl.disable.cache"; - String userValue = origProps.get(key); - if (StringUtils.isNotBlank(userValue)) { - conf.setBoolean(key, BooleanUtils.toBoolean(userValue)); - } else { - conf.setBoolean(key, true); - } - } - } - @Override public String getStorageName() { return "S3"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java index a0e217f2b51b39..c98fe4a4fb4dc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java @@ -22,6 +22,7 @@ import org.apache.doris.datasource.property.ConnectorProperty; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; import lombok.Getter; import lombok.Setter; import org.apache.hadoop.conf.Configuration; @@ -29,6 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Stream; /** @@ -189,6 +191,11 @@ public void initializeHadoopStorageConfig() { setAzureAccountKeys(hadoopStorageConfig, accountName, accountKey); } + @Override + protected Set schemas() { + return ImmutableSet.of("wasb", "wasbs", "abfs", "abfss"); + } + private static void setAzureAccountKeys(Configuration conf, String accountName, String accountKey) { String[] endpoints = { "dfs.core.windows.net", diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java index 5c61597a09dbe8..2987eb762aee10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java @@ -20,12 +20,14 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.ConnectorProperty; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import lombok.Getter; import lombok.Setter; import java.util.HashMap; import java.util.Map; +import java.util.Set; public class BrokerProperties extends StorageProperties { @@ -94,6 +96,12 @@ public void initializeHadoopStorageConfig() { // do nothing } + @Override + protected Set schemas() { + //not used + return ImmutableSet.of(); + } + private Map extractBrokerProperties() { Map brokerProperties = new HashMap<>(); for (String key : origProps.keySet()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsCompatibleProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsCompatibleProperties.java index 39d3fa2bc4e791..9a2d05d841f6ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsCompatibleProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsCompatibleProperties.java @@ -21,9 +21,11 @@ import org.apache.doris.common.security.authentication.HadoopSimpleAuthenticator; import org.apache.doris.common.security.authentication.SimpleAuthenticationConfig; +import com.google.common.collect.ImmutableSet; import lombok.Getter; import java.util.Map; +import java.util.Set; public abstract class HdfsCompatibleProperties extends StorageProperties { @@ -49,4 +51,9 @@ public void initializeHadoopStorageConfig() { //nothing to do } + @Override + protected Set schemas() { + return ImmutableSet.of("hdfs"); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/LocalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/LocalProperties.java index f04e16f1434536..0345d9b43e89c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/LocalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/LocalProperties.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import java.util.Map; +import java.util.Set; public class LocalProperties extends StorageProperties { public static final String PROP_FILE_PATH = "file_path"; @@ -80,4 +81,9 @@ public void initializeHadoopStorageConfig() { hadoopStorageConfig.set("fs.local.impl", "org.apache.hadoop.fs.LocalFileSystem"); hadoopStorageConfig.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem"); } + + @Override + protected Set schemas() { + return ImmutableSet.of(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java index 80a2d50fa6384d..cedfe8388b38aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java @@ -23,6 +23,8 @@ import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException; import lombok.Getter; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import java.lang.reflect.Field; @@ -31,6 +33,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; public abstract class StorageProperties extends ConnectionProperties { @@ -51,6 +54,8 @@ public abstract class StorageProperties extends ConnectionProperties { public static final String FS_PROVIDER_KEY = "provider"; + protected final String userFsPropsPrefix = "fs."; + public enum Type { HDFS, S3, @@ -132,7 +137,7 @@ public static List createAll(Map origProps) t for (StorageProperties storageProperties : result) { storageProperties.initNormalizeAndCheckProps(); - storageProperties.initializeHadoopStorageConfig(); + storageProperties.buildHadoopStorageConfig(); } return result; } @@ -152,7 +157,7 @@ public static StorageProperties createPrimary(Map origProps) { StorageProperties p = func.apply(origProps); if (p != null) { p.initNormalizeAndCheckProps(); - p.initializeHadoopStorageConfig(); + p.buildHadoopStorageConfig(); return p; } } @@ -243,5 +248,56 @@ protected static boolean checkIdentifierKey(Map origProps, List< public abstract String getStorageName(); - public abstract void initializeHadoopStorageConfig(); + private void buildHadoopStorageConfig() { + initializeHadoopStorageConfig(); + if (null == hadoopStorageConfig) { + return; + } + appendUserFsConfig(origProps); + ensureDisableCache(hadoopStorageConfig, origProps); + } + + private void appendUserFsConfig(Map userProps) { + userProps.forEach((k, v) -> { + if (k.startsWith(userFsPropsPrefix) && StringUtils.isNotBlank(v)) { + hadoopStorageConfig.set(k, v); + } + }); + } + + protected abstract void initializeHadoopStorageConfig(); + + protected abstract Set schemas(); + + /** + * By default, Hadoop caches FileSystem instances per scheme and authority (e.g. s3a://bucket/), meaning that all + * subsequent calls using the same URI will reuse the same FileSystem object. + * In multi-tenant or dynamic credential environments — where different users may access the same bucket using + * different access keys or tokens — this cache reuse can lead to cross-credential contamination. + *

+ * Specifically, if the cache is not disabled, a FileSystem instance initialized with one set of credentials may + * be reused by another session targeting the same bucket but with a different AK/SK. This results in: + *

+ * Incorrect authentication (using stale credentials) + *

+ * Unexpected permission errors or access denial + *

+ * Potential data leakage between users + *

+ * To avoid such risks, the configuration property + * fs..impl.disable.cache + * must be set to true for all object storage backends (e.g., S3A, OSS, COS, OBS), ensuring that each new access + * creates an isolated FileSystem instance with its own credentials and configuration context. + */ + private void ensureDisableCache(Configuration conf, Map origProps) { + for (String schema : schemas()) { + String key = "fs." + schema + ".impl.disable.cache"; + String userValue = origProps.get(key); + if (StringUtils.isNotBlank(userValue)) { + conf.setBoolean(key, BooleanUtils.toBoolean(userValue)); + } else { + conf.setBoolean(key, true); + } + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractPaimonPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractPaimonPropertiesTest.java new file mode 100644 index 00000000000000..e5a775ba6e3ef2 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/AbstractPaimonPropertiesTest.java @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.metastore; + +import org.apache.doris.datasource.property.storage.StorageProperties; + +import org.apache.paimon.catalog.Catalog; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class AbstractPaimonPropertiesTest { + + private static class TestPaimonProperties extends AbstractPaimonProperties { + + + protected TestPaimonProperties(Map props) { + super(props); + } + + @Override + public String getPaimonCatalogType() { + return "test"; + } + + @Override + public Catalog initializeCatalog(String catalogName, List storagePropertiesList) { + return null; + } + + @Override + protected void appendCustomCatalogOptions() { + + } + + @Override + protected String getMetastoreType() { + return "test"; + } + } + + TestPaimonProperties props; + + @BeforeEach + void setup() { + Map input = new HashMap<>(); + input.put("warehouse", "s3://tmp/warehouse"); + input.put("paimon.metastore", "filesystem"); + input.put("paimon.s3.access-key", "AK"); + input.put("paimon.s3.secret-key", "SK"); + input.put("paimon.custom.key", "value"); + props = new TestPaimonProperties(input); + } + + @Test + void testNormalizeS3Config() { + Map input = new HashMap<>(); + input.put("paimon.s3.list.version", "1"); + input.put("paimon.s3.paging.maximum", "100"); + input.put("paimon.fs.s3.read.ahead.buffer.size", "1"); + input.put("paimon.s3a.replication.factor", "3"); + TestPaimonProperties testProps = new TestPaimonProperties(input); + Map result = testProps.normalizeS3Config(); + Assertions.assertTrue("1".equals(result.get("fs.s3a.list.version"))); + Assertions.assertTrue("100".equals(result.get("fs.s3a.paging.maximum"))); + Assertions.assertTrue("1".equals(result.get("fs.s3a.read.ahead.buffer.size"))); + Assertions.assertTrue("3".equals(result.get("fs.s3a.replication.factor"))); + } + +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStorePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStorePropertiesTest.java index 265e67c3b6528f..cbfa6a01c80012 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStorePropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonRestMetaStorePropertiesTest.java @@ -18,15 +18,12 @@ package org.apache.doris.datasource.property.metastore; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; -import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.paimon.options.Options; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; public class PaimonRestMetaStorePropertiesTest { @@ -63,9 +60,8 @@ public void testUriAliases() { restProps2.initNormalizeAndCheckProps(); // Both should work and set the same URI in catalog options - List storagePropertiesList = new ArrayList<>(); - restProps1.buildCatalogOptions(storagePropertiesList); - restProps2.buildCatalogOptions(storagePropertiesList); + restProps1.buildCatalogOptions(); + restProps2.buildCatalogOptions(); Options options1 = restProps1.getCatalogOptions(); Options options2 = restProps2.getCatalogOptions(); @@ -87,8 +83,7 @@ public void testPaimonRestPropertiesPassthrough() { PaimonRestMetaStoreProperties restProps = new PaimonRestMetaStoreProperties(props); restProps.initNormalizeAndCheckProps(); - List storagePropertiesList = new ArrayList<>(); - restProps.buildCatalogOptions(storagePropertiesList); + restProps.buildCatalogOptions(); Options catalogOptions = restProps.getCatalogOptions(); // Basic URI should be set @@ -356,8 +351,7 @@ public void testPaimonRestPropertiesWithMultipleCustomProperties() { PaimonRestMetaStoreProperties restProps = new PaimonRestMetaStoreProperties(props); restProps.initNormalizeAndCheckProps(); - List storagePropertiesList = new ArrayList<>(); - restProps.buildCatalogOptions(storagePropertiesList); + restProps.buildCatalogOptions(); Options catalogOptions = restProps.getCatalogOptions(); // paimon.rest.* properties should be passed through without prefix From b0e1d8d20a5170b6d9f789919fb41f0f43f95cfd Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Fri, 14 Nov 2025 17:15:45 +0800 Subject: [PATCH 3/3] [feat](catalog)Support OneLake (#57933) --- .../doris/common/util/LocationPath.java | 9 ++ .../metastore/IcebergRestProperties.java | 4 +- .../property/storage/AzureProperties.java | 116 ++++++++++++++++-- .../property/storage/AzurePropertyUtils.java | 13 +- .../storage/exception/AzureAuthType.java | 23 ++++ .../doris/common/util/LocationPathTest.java | 17 +++ .../property/storage/AzurePropertiesTest.java | 47 ++++++- .../storage/AzurePropertyUtilsTest.java | 16 +++ 8 files changed, 228 insertions(+), 17 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/exception/AzureAuthType.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java index d11b8121c6c0a1..e4b9aa0b25c121 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/LocationPath.java @@ -18,6 +18,7 @@ package org.apache.doris.common.util; import org.apache.doris.common.UserException; +import org.apache.doris.datasource.property.storage.AzurePropertyUtils; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException; import org.apache.doris.fs.FileSystemType; @@ -307,6 +308,10 @@ public static String getTempWritePath(String loc, String prefix) { } public TFileType getTFileTypeForBE() { + if ((SchemaTypeMapper.ABFS.getSchema().equals(schema) || SchemaTypeMapper.ABFSS.getSchema() + .equals(schema)) && AzurePropertyUtils.isOneLakeLocation(normalizedLocation)) { + return TFileType.FILE_HDFS; + } if (StringUtils.isNotBlank(normalizedLocation) && isHdfsOnOssEndpoint(normalizedLocation)) { return TFileType.FILE_HDFS; } @@ -324,6 +329,10 @@ public Path toStorageLocation() { public FileSystemType getFileSystemType() { + if ((SchemaTypeMapper.ABFS.getSchema().equals(schema) || SchemaTypeMapper.ABFSS.getSchema() + .equals(schema)) && AzurePropertyUtils.isOneLakeLocation(normalizedLocation)) { + return FileSystemType.HDFS; + } return SchemaTypeMapper.fromSchemaToFileSystemType(schema); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java index 982c7864324c2d..688a268522b2cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/IcebergRestProperties.java @@ -85,6 +85,7 @@ public class IcebergRestProperties extends AbstractIcebergProperties { @ConnectorProperty(names = {"iceberg.rest.oauth2.credential"}, required = false, + sensitive = true, description = "The oauth2 credential for the iceberg rest catalog service.") private String icebergRestOauth2Credential; @@ -149,6 +150,7 @@ public class IcebergRestProperties extends AbstractIcebergProperties { @ConnectorProperty(names = {"iceberg.rest.secret-access-key"}, required = false, + sensitive = true, description = "The secret access key for the iceberg rest catalog service.") private String icebergRestSecretAccessKey = ""; @@ -343,7 +345,7 @@ public void toFileIOProperties(List storagePropertiesList, toS3FileIOProperties((AbstractS3CompatibleProperties) storageProperties, fileIOProperties); } else { // For other storage types, just use fileIOProperties map - storageProperties.getBackendConfigProperties().forEach(conf::set); + conf.addResource(storageProperties.getHadoopStorageConfig()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java index c98fe4a4fb4dc5..b4848aa61b6d39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java @@ -20,6 +20,8 @@ import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.ConnectorProperty; +import org.apache.doris.datasource.property.ParamRules; +import org.apache.doris.datasource.property.storage.exception.AzureAuthType; import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; @@ -69,6 +71,7 @@ public class AzureProperties extends StorageProperties { @Getter @ConnectorProperty(names = {"azure.account_name", "azure.access_key", "s3.access_key", "AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"}, + required = false, sensitive = true, description = "The access key of S3.") protected String accountName = ""; @@ -77,9 +80,37 @@ public class AzureProperties extends StorageProperties { @ConnectorProperty(names = {"azure.account_key", "azure.secret_key", "s3.secret_key", "AWS_SECRET_KEY", "secret_key"}, sensitive = true, + required = false, description = "The secret key of S3.") protected String accountKey = ""; + @ConnectorProperty(names = {"azure.oauth2_client_id"}, + required = false, + description = "The client id of Azure AD application.") + private String clientId; + + @ConnectorProperty(names = {"azure.oauth2_client_secret"}, + required = false, + sensitive = true, + description = "The client secret of Azure AD application.") + private String clientSecret; + + + @ConnectorProperty(names = {"azure.oauth2_server_uri"}, + required = false, + description = "The account host of Azure blob.") + private String oauthServerUri; + + @ConnectorProperty(names = {"azure.oauth2_account_host"}, + required = false, + description = "The account host of Azure blob.") + private String accountHost; + + @ConnectorProperty(names = {"azure.auth_type"}, + required = false, + description = "The auth type of Azure blob.") + private String azureAuthType = AzureAuthType.SharedKey.name(); + @Getter @ConnectorProperty(names = {"container", "azure.bucket", "s3.bucket"}, required = false, @@ -110,11 +141,11 @@ public AzureProperties(Map origProps) { public void initNormalizeAndCheckProps() { super.initNormalizeAndCheckProps(); //check endpoint - if (!endpoint.endsWith(AZURE_ENDPOINT_SUFFIX)) { - throw new IllegalArgumentException(String.format("Endpoint '%s' is not valid. It should end with '%s'.", - endpoint, AZURE_ENDPOINT_SUFFIX)); - } this.endpoint = formatAzureEndpoint(endpoint, accountName); + buildRules().validate(); + if (AzureAuthType.OAuth2.name().equals(azureAuthType) && (!isIcebergRestCatalog())) { + throw new UnsupportedOperationException("OAuth2 auth type is only supported for iceberg rest catalog"); + } } public static boolean guessIsMe(Map origProps) { @@ -136,14 +167,25 @@ public static boolean guessIsMe(Map origProps) { @Override public Map getBackendConfigProperties() { + if (!azureAuthType.equalsIgnoreCase("OAuth2")) { + Map s3Props = new HashMap<>(); + s3Props.put("AWS_ENDPOINT", endpoint); + s3Props.put("AWS_REGION", "dummy_region"); + s3Props.put("AWS_ACCESS_KEY", accountName); + s3Props.put("AWS_SECRET_KEY", accountKey); + s3Props.put("AWS_NEED_OVERRIDE_ENDPOINT", "true"); + s3Props.put("provider", "azure"); + s3Props.put("use_path_style", usePathStyle); + return s3Props; + } + // oauth2 use hadoop config Map s3Props = new HashMap<>(); - s3Props.put("AWS_ENDPOINT", endpoint); - s3Props.put("AWS_REGION", "dummy_region"); - s3Props.put("AWS_ACCESS_KEY", accountName); - s3Props.put("AWS_SECRET_KEY", accountKey); - s3Props.put("AWS_NEED_OVERRIDE_ENDPOINT", "true"); - s3Props.put("provider", "azure"); - s3Props.put("use_path_style", usePathStyle); + hadoopStorageConfig.forEach(entry -> { + String key = entry.getKey(); + + s3Props.put(key, entry.getValue()); + + }); return s3Props; } @@ -188,7 +230,11 @@ public void initializeHadoopStorageConfig() { hadoopStorageConfig.set(k, v); } }); - setAzureAccountKeys(hadoopStorageConfig, accountName, accountKey); + if (azureAuthType != null && azureAuthType.equalsIgnoreCase("OAuth2")) { + setHDFSAzureOauth2Config(hadoopStorageConfig); + } else { + setHDFSAzureAccountKeys(hadoopStorageConfig, accountName, accountKey); + } } @Override @@ -196,7 +242,7 @@ protected Set schemas() { return ImmutableSet.of("wasb", "wasbs", "abfs", "abfss"); } - private static void setAzureAccountKeys(Configuration conf, String accountName, String accountKey) { + private static void setHDFSAzureAccountKeys(Configuration conf, String accountName, String accountKey) { String[] endpoints = { "dfs.core.windows.net", "blob.core.windows.net" @@ -208,4 +254,48 @@ private static void setAzureAccountKeys(Configuration conf, String accountName, conf.set("fs.azure.account.key", accountKey); } + private void setHDFSAzureOauth2Config(Configuration conf) { + conf.set(String.format("fs.azure.account.auth.type.%s", accountHost), "OAuth"); + conf.set(String.format("fs.azure.account.oauth.provider.type.%s", accountHost), + "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"); + conf.set(String.format("fs.azure.account.oauth2.client.id.%s", accountHost), clientId); + conf.set(String.format("fs.azure.account.oauth2.client.secret.%s", accountHost), clientSecret); + conf.set(String.format("fs.azure.account.oauth2.client.endpoint.%s", accountHost), oauthServerUri); + } + + private ParamRules buildRules() { + return new ParamRules() + // OAuth2 requires either credential or token, but not both + .requireIf(azureAuthType, AzureAuthType.OAuth2.name(), new String[]{accountHost, + clientId, + clientSecret, + oauthServerUri}, "When auth_type is OAuth2, oauth2_account_host, oauth2_client_id" + + ", oauth2_client_secret, and oauth2_server_uri are required.") + .requireIf(azureAuthType, AzureAuthType.SharedKey.name(), new String[]{accountName, accountKey}, + "When auth_type is SharedKey, account_name and account_key are required."); + } + + // NB:Temporary check: + // Temporary check: Currently using OAuth2 for accessing Onalake storage via HDFS. + // In the future, OAuth2 will be supported via native SDK to reduce maintenance. + // For now, OAuth2 authentication is only allowed for Iceberg REST. + // TODO: Remove this temporary check later + private static final String ICEBERG_CATALOG_TYPE_KEY = "iceberg.catalog.type"; + private static final String ICEBERG_CATALOG_TYPE_REST = "rest"; + private static final String TYPE_KEY = "type"; + private static final String ICEBERG_VALUE = "iceberg"; + + private boolean isIcebergRestCatalog() { + // check iceberg type + boolean hasIcebergType = origProps.entrySet().stream() + .anyMatch(entry -> TYPE_KEY.equalsIgnoreCase(entry.getKey()) + && ICEBERG_VALUE.equalsIgnoreCase(entry.getValue())); + if (!hasIcebergType && origProps.keySet().stream().anyMatch(TYPE_KEY::equalsIgnoreCase)) { + return false; + } + return origProps.entrySet().stream() + .anyMatch(entry -> ICEBERG_CATALOG_TYPE_KEY.equalsIgnoreCase(entry.getKey()) + && ICEBERG_CATALOG_TYPE_REST.equalsIgnoreCase(entry.getValue())); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java index 8c986b74da0208..f126620fba68a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzurePropertyUtils.java @@ -25,6 +25,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Map; +import java.util.regex.Pattern; public class AzurePropertyUtils { @@ -69,10 +70,16 @@ public static String validateAndNormalizeUri(String path) throws UserException { || lower.startsWith("s3://"))) { throw new StoragePropertiesException("Unsupported Azure URI scheme: " + path); } - + if (isOneLakeLocation(path)) { + return path; + } return convertToS3Style(path); } + private static final Pattern ONELAKE_PATTERN = Pattern.compile( + "abfs[s]?://([^@]+)@([^/]+)\\.dfs\\.fabric\\.microsoft\\.com(/.*)?", Pattern.CASE_INSENSITIVE); + + /** * Converts an Azure Blob Storage URI into a unified {@code s3:///} format. *

@@ -186,4 +193,8 @@ public static String validateAndGetUri(Map props) { .findFirst() .orElseThrow(() -> new StoragePropertiesException("Properties must contain 'uri' key")); } + + public static boolean isOneLakeLocation(String location) { + return ONELAKE_PATTERN.matcher(location).matches(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/exception/AzureAuthType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/exception/AzureAuthType.java new file mode 100644 index 00000000000000..269ce5a8da0a0b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/exception/AzureAuthType.java @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.storage.exception; + +public enum AzureAuthType { + OAuth2, + SharedKey; +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java index a896931babeedc..0051ea494b0871 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/LocationPathTest.java @@ -283,4 +283,21 @@ public void testHdfsStorageLocationConvert() { Assertions.assertEquals(location, locationPath.getNormalizedLocation()); } + @Test + public void testOnelakeStorageLocationConvert() { + String location = "abfss://1a2b3c4d-1234-5678-abcd-9876543210ef@onelake.dfs.fabric.microsoft.com/myworkspace/lakehouse/default/Files/data/test.parquet"; + LocationPath locationPath = LocationPath.of(location, STORAGE_PROPERTIES_MAP); + Assertions.assertEquals(TFileType.FILE_HDFS, locationPath.getTFileTypeForBE()); + Assertions.assertEquals(FileSystemType.HDFS, locationPath.getFileSystemType()); + location = "abfs://1a2b3c4d-1234-5678-abcd-9876543210ef@onelake.dfs.fabric.microsoft.com/myworkspace/lakehouse/default/Files/data/test.parquet"; + locationPath = LocationPath.of(location, STORAGE_PROPERTIES_MAP); + Assertions.assertEquals(TFileType.FILE_HDFS, locationPath.getTFileTypeForBE()); + Assertions.assertEquals(FileSystemType.HDFS, locationPath.getFileSystemType()); + location = "abfss://mycontainer@mystorageaccount.dfs.core.windows.net/data/2025/11/11/"; + locationPath = LocationPath.of(location, STORAGE_PROPERTIES_MAP); + Assertions.assertEquals(TFileType.FILE_S3, locationPath.getTFileTypeForBE()); + Assertions.assertEquals(FileSystemType.S3, locationPath.getFileSystemType()); + + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java index 7542bdbce29220..927a9f1c1484db 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertiesTest.java @@ -20,6 +20,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException; +import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -84,7 +85,7 @@ public void testMissingProvider() throws UserException { origProps.put("s3.endpoint", "https://mystorageaccount.net"); // Expect an exception due to missing provider origProps.put("provider", "azure"); - Assertions.assertThrows(IllegalArgumentException.class, () -> + Assertions.assertDoesNotThrow(() -> StorageProperties.createPrimary(origProps), "Endpoint 'https://mystorageaccount.net' is not valid. It should end with '.blob.core.windows.net'."); } @@ -110,7 +111,7 @@ public void testParsingUri() throws Exception { Assertions.assertEquals("s3://mycontainer/blob.txt", azureProperties.validateAndNormalizeUri("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt")); Assertions.assertThrowsExactly(StoragePropertiesException.class, () -> - azureProperties.validateAndGetUri(origProps), + azureProperties.validateAndGetUri(origProps), "props must contain uri"); origProps.put("uri", "https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt"); Assertions.assertEquals("https://mystorageaccount.blob.core.windows.net/mycontainer/blob.txt", @@ -170,4 +171,46 @@ public void testEmptyPath() throws UserException { Assertions.assertThrows(StoragePropertiesException.class, () -> azureProperties.validateAndNormalizeUri(""), "Path cannot be empty."); } + + @Test + public void testOneLake() throws UserException { + origProps.put("azure.auth_type", "OAuth2"); + origProps.put("azure.endpoint", "https://onelake.dfs.fabric.microsoft.com"); + Assertions.assertThrows(StoragePropertiesException.class, () -> + StorageProperties.createPrimary(origProps), "For OAuth2 authentication, please provide oauth2_client_id, " + + "oauth2_tenant_id, oauth2_client_secret, and oauth2_server_uri."); + origProps.put("azure.oauth2_client_id", "5c64f06f-5289-5289-5289-5aa0820ee310"); + Assertions.assertThrows(StoragePropertiesException.class, () -> + StorageProperties.createPrimary(origProps), "For OAuth2 authentication, please provide oauth2_client_id, " + + "oauth2_tenant_id, oauth2_client_secret, and oauth2_server_uri."); + origProps.put("azure.oauth2_tenant_id", "72f988bf-5289-5289-5289-2d7cd011db47"); + Assertions.assertThrows(StoragePropertiesException.class, () -> + StorageProperties.createPrimary(origProps), "For OAuth2 authentication, please provide oauth2_client_id, " + + "oauth2_tenant_id, oauth2_client_secret, and oauth2_server_uri."); + origProps.put("azure.oauth2_client_secret", "myAzureClientSecret"); + Assertions.assertThrows(StoragePropertiesException.class, () -> + StorageProperties.createPrimary(origProps), "For OAuth2 authentication, please provide oauth2_client_id, " + + "oauth2_tenant_id, oauth2_client_secret, and oauth2_server_uri."); + origProps.put("azure.oauth2_server_uri", "https://login.microsoftonline.com/72f988bf-5289-5289-5289-2d7cd011db47/oauth2/token"); + Assertions.assertThrows(StoragePropertiesException.class, () -> + StorageProperties.createPrimary(origProps), "For OAuth2 authentication, please provide oauth2_client_id, " + + "oauth2_tenant_id, oauth2_client_secret, and oauth2_server_uri."); + origProps.put("azure.oauth2_account_host", "onelake.dfs.fabric.microsoft.com"); + Assertions.assertThrows(StoragePropertiesException.class, () -> + StorageProperties.createPrimary(origProps), "For OAuth2 authentication, please provide oauth2_client_id, " + + "oauth2_tenant_id, oauth2_client_secret, and oauth2_server_uri."); + origProps.put("fs.azure.support", "true"); + Assertions.assertThrows(UnsupportedOperationException.class, () -> + StorageProperties.createPrimary(origProps), "Azure OAuth2 is not supported in the current backend."); + origProps.put("type", "iceberg"); + origProps.put("iceberg.catalog.type", "rest"); + AzureProperties azureProperties = (AzureProperties) StorageProperties.createPrimary(origProps); + Configuration hadoopStorageConfig = azureProperties.getHadoopStorageConfig(); + Assertions.assertEquals("OAuth", hadoopStorageConfig.get("fs.azure.account.auth.type.onelake.dfs.fabric.microsoft.com")); + Assertions.assertEquals("org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", hadoopStorageConfig.get("fs.azure.account.oauth.provider.type.onelake.dfs.fabric.microsoft.com")); + Assertions.assertEquals("5c64f06f-5289-5289-5289-5aa0820ee310", hadoopStorageConfig.get("fs.azure.account.oauth2.client.id.onelake.dfs.fabric.microsoft.com")); + Assertions.assertEquals("myAzureClientSecret", hadoopStorageConfig.get("fs.azure.account.oauth2.client.secret.onelake.dfs.fabric.microsoft.com")); + Assertions.assertEquals("https://login.microsoftonline.com/72f988bf-5289-5289-5289-2d7cd011db47/oauth2/token", hadoopStorageConfig.get("fs.azure.account.oauth2.client.endpoint.onelake.dfs.fabric.microsoft.com")); + + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java index dc6eb8ad74c015..cbc584d9a10b0f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/AzurePropertyUtilsTest.java @@ -43,6 +43,22 @@ public void testS3Uri() throws Exception { Assertions.assertEquals(expected, AzurePropertyUtils.validateAndNormalizeUri(input)); } + @Test + public void testAbfssUri() throws Exception { + String input = "abfss://container@account.blob.core.windows.net/data/file.txt"; + String expected = "s3://container/data/file.txt"; + Assertions.assertEquals(expected, AzurePropertyUtils.validateAndNormalizeUri(input)); + input = "abfs://container@account.blob.core.windows.net/data/file.txt"; + expected = "s3://container/data/file.txt"; + Assertions.assertEquals(expected, AzurePropertyUtils.validateAndNormalizeUri(input)); + input = "abfss://1a2b3c4d-1234-5678-abcd-9876543210ef@onelake.dfs.fabric.microsoft.com/myworkspace/lakehouse/default/Files/data/test.parquet"; + Assertions.assertEquals(input, + AzurePropertyUtils.validateAndNormalizeUri(input)); + input = "abfs://1a2b3c4d-1234-5678-abcd-9876543210ef@onelake.dfs.fabric.microsoft.com/myworkspace/lakehouse/default/Files/data/test.parquet"; + Assertions.assertEquals(input, + AzurePropertyUtils.validateAndNormalizeUri(input)); + } + @Test public void testAbfssUriWithoutPath() throws Exception { String input = "abfss://container@account.blob.core.windows.net";