From f98f6041b1d9540912fd753aebbd4db86037290f Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Mon, 20 Oct 2025 15:43:56 +0800 Subject: [PATCH] [fix](catalog)Disable Hadoop FileSystem cache for multi-configurationobject storage catalogs (#57063) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … Iceberg and Paimon catalogs access object storage (OSS, OBS, etc.) using Hadoop FileSystem libraries. In environments with multiple storage configurations, the FileSystem cache may cause instances to be shared across configurations, leading to potential conflicts or incorrect access. This PR disables the Hadoop FileSystem cache (`fs.oss.impl.disable.cache=true`) for each OSS to ensure configuration isolation and prevent cache overwrites. If necessary, the cache can also be explicitly enabled for specific catalogs by setting `fs.oss.impl.disable.cache=false`. --- .../AbstractS3CompatibleProperties.java | 45 +++++++++++++++++-- .../property/storage/COSProperties.java | 5 +++ .../property/storage/GCSProperties.java | 5 +++ .../property/storage/MinioProperties.java | 5 +++ .../property/storage/OBSProperties.java | 5 +++ .../property/storage/OSSProperties.java | 5 +++ .../property/storage/S3Properties.java | 10 ++--- .../property/storage/COSPropertiesTest.java | 16 +++++++ .../property/storage/GCSPropertiesTest.java | 18 ++++++++ .../property/storage/OBSPropertyTest.java | 18 ++++++++ .../property/storage/OSSPropertiesTest.java | 18 ++++++++ .../property/storage/S3PropertiesTest.java | 17 +++++++ 12 files changed, 158 insertions(+), 9 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java index 2a7e58d787fa15..c4cc493abe9232 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.logging.log4j.LogManager; @@ -149,10 +150,6 @@ public void initNormalizeAndCheckProps() { } } - boolean isEndpointCheckRequired() { - return true; - } - /** * Checks and validates the configured endpoint. *

@@ -229,6 +226,8 @@ public static Optional extractRegion(Set endpointPatterns, Stri protected abstract Set endpointPatterns(); + protected abstract Set schemas(); + // This method should be overridden by subclasses to provide a default endpoint based on the region. // Because for aws s3, only region is needed, the endpoint can be constructed from the region. // But for other s3 compatible storage, the endpoint may need to be specified explicitly. @@ -250,10 +249,16 @@ public String validateAndGetUri(Map loadProps) throws UserExcept @Override public void initializeHadoopStorageConfig() { hadoopStorageConfig = new Configuration(); + origProps.forEach((key, value) -> { + if (key.startsWith("fs.")) { + hadoopStorageConfig.set(key, value); + } + }); // Compatibility note: Due to historical reasons, even when the underlying // storage is OSS, OBS, etc., users may still configure the schema as "s3://". // To ensure backward compatibility, we append S3-related properties by default. appendS3HdfsProperties(hadoopStorageConfig); + ensureDisableCache(hadoopStorageConfig, origProps); } private void appendS3HdfsProperties(Configuration hadoopStorageConfig) { @@ -279,6 +284,38 @@ private void appendS3HdfsProperties(Configuration hadoopStorageConfig) { hadoopStorageConfig.set("fs.s3a.path.style.access", getUsePathStyle()); } + /** + * By default, Hadoop caches FileSystem instances per scheme and authority (e.g. s3a://bucket/), meaning that all + * subsequent calls using the same URI will reuse the same FileSystem object. + * In multi-tenant or dynamic credential environments — where different users may access the same bucket using + * different access keys or tokens — this cache reuse can lead to cross-credential contamination. + *

+ * Specifically, if the cache is not disabled, a FileSystem instance initialized with one set of credentials may + * be reused by another session targeting the same bucket but with a different AK/SK. This results in: + *

+ * Incorrect authentication (using stale credentials) + *

+ * Unexpected permission errors or access denial + *

+ * Potential data leakage between users + *

+ * To avoid such risks, the configuration property + * fs..impl.disable.cache + * must be set to true for all object storage backends (e.g., S3A, OSS, COS, OBS), ensuring that each new access + * creates an isolated FileSystem instance with its own credentials and configuration context. + */ + private void ensureDisableCache(Configuration conf, Map origProps) { + for (String schema : schemas()) { + String key = "fs." + schema + ".impl.disable.cache"; + String userValue = origProps.get(key); + if (StringUtils.isNotBlank(userValue)) { + conf.setBoolean(key, BooleanUtils.toBoolean(userValue)); + } else { + conf.setBoolean(key, true); + } + } + } + @Override public String getStorageName() { return "S3"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java index af4c332442481d..8f1d4b88f3a1fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java @@ -167,4 +167,9 @@ public void initializeHadoopStorageConfig() { hadoopStorageConfig.set("fs.cosn.userinfo.secretId", accessKey); hadoopStorageConfig.set("fs.cosn.userinfo.secretKey", secretKey); } + + @Override + protected Set schemas() { + return ImmutableSet.of("cos", "cosn"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/GCSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/GCSProperties.java index dffd18d0d58ec2..20de10f97cd46e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/GCSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/GCSProperties.java @@ -183,6 +183,11 @@ public Map getBackendConfigProperties() { return backendProperties; } + @Override + protected Set schemas() { + return ImmutableSet.of("gs"); + } + @Override public AwsCredentialsProvider getAwsCredentialsProvider() { AwsCredentialsProvider credentialsProvider = super.getAwsCredentialsProvider(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java index 1f52a84d392c52..34450383d83dde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java @@ -136,4 +136,9 @@ protected void setEndpointIfPossible() { throw new IllegalArgumentException("Property minio.endpoint is required."); } } + + @Override + protected Set schemas() { + return ImmutableSet.of("s3"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java index e2adb19895c105..92abf2997b4460 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java @@ -180,4 +180,9 @@ protected void setEndpointIfPossible() { throw new IllegalArgumentException("Property obs.endpoint is required."); } } + + @Override + protected Set schemas() { + return ImmutableSet.of("obs"); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java index dee34a76d162d4..47558497a68c63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java @@ -282,6 +282,11 @@ public AwsCredentialsProvider getAwsCredentialsProvider() { return null; } + @Override + protected Set schemas() { + return ImmutableSet.of("oss"); + } + @Override public void initializeHadoopStorageConfig() { super.initializeHadoopStorageConfig(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java index 1b162e87bf014f..47d0301ad9bed4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java @@ -215,11 +215,6 @@ public void initNormalizeAndCheckProps() { convertGlueToS3EndpointIfNeeded(); } - @Override - boolean isEndpointCheckRequired() { - return false; - } - /** * Guess if the storage properties is for this storage type. * Subclass should override this method to provide the correct implementation. @@ -268,6 +263,11 @@ protected Set endpointPatterns() { return ENDPOINT_PATTERN; } + @Override + protected Set schemas() { + return ImmutableSet.of("s3", "s3a", "s3n"); + } + @Override public Map getBackendConfigProperties() { Map backendProperties = generateBackendS3Configuration(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java index aadecf63c3f81d..df20ea5d333547 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/COSPropertiesTest.java @@ -20,6 +20,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException; +import com.google.common.collect.Maps; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -176,4 +177,19 @@ public void testAwsCredentialsProvider() throws Exception { obsStorageProperties = (COSProperties) StorageProperties.createPrimary(props); Assertions.assertEquals(StaticCredentialsProvider.class, obsStorageProperties.getAwsCredentialsProvider().getClass()); } + + @Test + public void testS3DisableHadoopCache() throws UserException { + Map props = Maps.newHashMap(); + props.put("cos.endpoint", "cos.ap-beijing.myqcloud.com"); + COSProperties s3Properties = (COSProperties) StorageProperties.createPrimary(props); + Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.cos.impl.disable.cache")); + Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3.impl.disable.cache")); + Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.cosn.impl.disable.cache")); + props.put("fs.cos.impl.disable.cache", "true"); + props.put("fs.cosn.impl.disable.cache", "false"); + s3Properties = (COSProperties) StorageProperties.createPrimary(props); + Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.cos.impl.disable.cache")); + Assertions.assertEquals("false", s3Properties.hadoopStorageConfig.get("fs.cosn.impl.disable.cache")); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/GCSPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/GCSPropertiesTest.java index c7167a8eba7bf4..179655582b79cb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/GCSPropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/GCSPropertiesTest.java @@ -17,6 +17,7 @@ package org.apache.doris.datasource.property.storage; +import com.google.common.collect.Maps; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -110,4 +111,21 @@ public void testGCSAwsCredentialsProvider() throws Exception { gcsStorageProperties = (GCSProperties) StorageProperties.createPrimary(gcsProps); Assertions.assertEquals(StaticCredentialsProvider.class, gcsStorageProperties.getAwsCredentialsProvider().getClass()); } + + @Test + public void testS3DisableHadoopCache() { + Map props = Maps.newHashMap(); + props.put("fs.gcs.support", "true"); + GCSProperties s3Properties = (GCSProperties) StorageProperties.createPrimary(props); + Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.gs.impl.disable.cache", false)); + props.put("fs.gs.impl.disable.cache", "true"); + s3Properties = (GCSProperties) StorageProperties.createPrimary(props); + Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.gs.impl.disable.cache", false)); + props.put("fs.gs.impl.disable.cache", "false"); + s3Properties = (GCSProperties) StorageProperties.createPrimary(props); + Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.gs.impl.disable.cache", false)); + props.put("fs.gs.impl.disable.cache", "null"); + s3Properties = (GCSProperties) StorageProperties.createPrimary(props); + Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.gs.impl.disable.cache", false)); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java index 1d9fee06e19e03..4329368254a45d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OBSPropertyTest.java @@ -20,6 +20,7 @@ import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.UserException; +import com.google.common.collect.Maps; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; @@ -152,6 +153,23 @@ public void testAwsCredentialsProvider() throws Exception { Assertions.assertEquals(StaticCredentialsProvider.class, obsStorageProperties.getAwsCredentialsProvider().getClass()); } + @Test + public void testS3DisableHadoopCache() { + Map props = Maps.newHashMap(); + props.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com"); + OBSProperties s3Properties = (OBSProperties) StorageProperties.createPrimary(props); + Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.obs.impl.disable.cache", false)); + props.put("fs.obs.impl.disable.cache", "true"); + s3Properties = (OBSProperties) StorageProperties.createPrimary(props); + Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.obs.impl.disable.cache", false)); + props.put("fs.obs.impl.disable.cache", "false"); + s3Properties = (OBSProperties) StorageProperties.createPrimary(props); + Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.obs.impl.disable.cache", false)); + props.put("fs.obs.impl.disable.cache", "null"); + s3Properties = (OBSProperties) StorageProperties.createPrimary(props); + Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.obs.impl.disable.cache", false)); + } + @Test public void testMissingSecretKey() { origProps.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java index 7a0460f3153e6c..874aea6e5108ea 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/OSSPropertiesTest.java @@ -20,6 +20,7 @@ import org.apache.doris.common.ExceptionChecker; import org.apache.doris.common.UserException; +import com.google.common.collect.Maps; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; @@ -250,4 +251,21 @@ public void testAwsCredentialsProvider() throws Exception { Assertions.assertEquals(StaticCredentialsProvider.class, ossStorageProperties.getAwsCredentialsProvider().getClass()); } + @Test + public void testS3DisableHadoopCache() throws UserException { + Map props = Maps.newHashMap(); + props.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com"); + OSSProperties s3Properties = (OSSProperties) StorageProperties.createPrimary(props); + Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false)); + props.put("fs.oss.impl.disable.cache", "true"); + s3Properties = (OSSProperties) StorageProperties.createPrimary(props); + Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false)); + props.put("fs.oss.impl.disable.cache", "false"); + s3Properties = (OSSProperties) StorageProperties.createPrimary(props); + Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false)); + props.put("fs.oss.impl.disable.cache", "null"); + s3Properties = (OSSProperties) StorageProperties.createPrimary(props); + Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false)); + } + } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java index a0e94042c59d15..f4ea2fcb381a0c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/storage/S3PropertiesTest.java @@ -423,4 +423,21 @@ public void testS3PropertiesAwsAnonymousCredentialsProvider() { Assertions.assertEquals(AwsCredentialsProviderChain.class, provider.getClass()); Config.aws_credentials_provider_version = "v2"; } + + @Test + public void testS3DisableHadoopCache() throws UserException { + Map props = Maps.newHashMap(); + props.put("s3.endpoint", "s3.us-west-2.amazonaws.com"); + S3Properties s3Properties = (S3Properties) StorageProperties.createPrimary(props); + Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3a.impl.disable.cache")); + Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3.impl.disable.cache")); + Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3n.impl.disable.cache")); + props.put("fs.s3a.impl.disable.cache", "true"); + props.put("fs.s3.impl.disable.cache", "false"); + props.put("fs.s3n.impl.disable.cache", "null"); + s3Properties = (S3Properties) StorageProperties.createPrimary(props); + Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3a.impl.disable.cache")); + Assertions.assertEquals("false", s3Properties.hadoopStorageConfig.get("fs.s3.impl.disable.cache")); + Assertions.assertEquals("false", s3Properties.hadoopStorageConfig.get("fs.s3n.impl.disable.cache")); + } }