Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -149,10 +150,6 @@ public void initNormalizeAndCheckProps() {
}
}

boolean isEndpointCheckRequired() {
return true;
}

/**
* Checks and validates the configured endpoint.
* <p>
Expand Down Expand Up @@ -229,6 +226,8 @@ public static Optional<String> extractRegion(Set<Pattern> endpointPatterns, Stri

protected abstract Set<Pattern> endpointPatterns();

protected abstract Set<String> schemas();

// This method should be overridden by subclasses to provide a default endpoint based on the region.
// Because for aws s3, only region is needed, the endpoint can be constructed from the region.
// But for other s3 compatible storage, the endpoint may need to be specified explicitly.
Expand All @@ -250,10 +249,16 @@ public String validateAndGetUri(Map<String, String> loadProps) throws UserExcept
@Override
public void initializeHadoopStorageConfig() {
hadoopStorageConfig = new Configuration();
origProps.forEach((key, value) -> {
if (key.startsWith("fs.")) {
hadoopStorageConfig.set(key, value);
}
});
// Compatibility note: Due to historical reasons, even when the underlying
// storage is OSS, OBS, etc., users may still configure the schema as "s3://".
// To ensure backward compatibility, we append S3-related properties by default.
appendS3HdfsProperties(hadoopStorageConfig);
ensureDisableCache(hadoopStorageConfig, origProps);
}

private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
Expand All @@ -279,6 +284,38 @@ private void appendS3HdfsProperties(Configuration hadoopStorageConfig) {
hadoopStorageConfig.set("fs.s3a.path.style.access", getUsePathStyle());
}

/**
* By default, Hadoop caches FileSystem instances per scheme and authority (e.g. s3a://bucket/), meaning that all
* subsequent calls using the same URI will reuse the same FileSystem object.
* In multi-tenant or dynamic credential environments — where different users may access the same bucket using
* different access keys or tokens — this cache reuse can lead to cross-credential contamination.
* <p>
* Specifically, if the cache is not disabled, a FileSystem instance initialized with one set of credentials may
* be reused by another session targeting the same bucket but with a different AK/SK. This results in:
* <p>
* Incorrect authentication (using stale credentials)
* <p>
* Unexpected permission errors or access denial
* <p>
* Potential data leakage between users
* <p>
* To avoid such risks, the configuration property
* fs.<schema>.impl.disable.cache
* must be set to true for all object storage backends (e.g., S3A, OSS, COS, OBS), ensuring that each new access
* creates an isolated FileSystem instance with its own credentials and configuration context.
*/
private void ensureDisableCache(Configuration conf, Map<String, String> origProps) {
for (String schema : schemas()) {
String key = "fs." + schema + ".impl.disable.cache";
String userValue = origProps.get(key);
if (StringUtils.isNotBlank(userValue)) {
conf.setBoolean(key, BooleanUtils.toBoolean(userValue));
} else {
conf.setBoolean(key, true);
}
}
}

@Override
public String getStorageName() {
return "S3";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,9 @@ public void initializeHadoopStorageConfig() {
hadoopStorageConfig.set("fs.cosn.userinfo.secretId", accessKey);
hadoopStorageConfig.set("fs.cosn.userinfo.secretKey", secretKey);
}

@Override
protected Set<String> schemas() {
return ImmutableSet.of("cos", "cosn");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ public Map<String, String> getBackendConfigProperties() {
return backendProperties;
}

@Override
protected Set<String> schemas() {
return ImmutableSet.of("gs");
}

@Override
public AwsCredentialsProvider getAwsCredentialsProvider() {
AwsCredentialsProvider credentialsProvider = super.getAwsCredentialsProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,9 @@ protected void setEndpointIfPossible() {
throw new IllegalArgumentException("Property minio.endpoint is required.");
}
}

@Override
protected Set<String> schemas() {
return ImmutableSet.of("s3");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,9 @@ protected void setEndpointIfPossible() {
throw new IllegalArgumentException("Property obs.endpoint is required.");
}
}

@Override
protected Set<String> schemas() {
return ImmutableSet.of("obs");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ public AwsCredentialsProvider getAwsCredentialsProvider() {
return null;
}

@Override
protected Set<String> schemas() {
return ImmutableSet.of("oss");
}

@Override
public void initializeHadoopStorageConfig() {
super.initializeHadoopStorageConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,6 @@ public void initNormalizeAndCheckProps() {
convertGlueToS3EndpointIfNeeded();
}

@Override
boolean isEndpointCheckRequired() {
return false;
}

/**
* Guess if the storage properties is for this storage type.
* Subclass should override this method to provide the correct implementation.
Expand Down Expand Up @@ -268,6 +263,11 @@ protected Set<Pattern> endpointPatterns() {
return ENDPOINT_PATTERN;
}

@Override
protected Set<String> schemas() {
return ImmutableSet.of("s3", "s3a", "s3n");
}

@Override
public Map<String, String> getBackendConfigProperties() {
Map<String, String> backendProperties = generateBackendS3Configuration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;

import com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -176,4 +177,19 @@ public void testAwsCredentialsProvider() throws Exception {
obsStorageProperties = (COSProperties) StorageProperties.createPrimary(props);
Assertions.assertEquals(StaticCredentialsProvider.class, obsStorageProperties.getAwsCredentialsProvider().getClass());
}

@Test
public void testS3DisableHadoopCache() throws UserException {
Map<String, String> props = Maps.newHashMap();
props.put("cos.endpoint", "cos.ap-beijing.myqcloud.com");
COSProperties s3Properties = (COSProperties) StorageProperties.createPrimary(props);
Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.cos.impl.disable.cache"));
Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3.impl.disable.cache"));
Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.cosn.impl.disable.cache"));
props.put("fs.cos.impl.disable.cache", "true");
props.put("fs.cosn.impl.disable.cache", "false");
s3Properties = (COSProperties) StorageProperties.createPrimary(props);
Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.cos.impl.disable.cache"));
Assertions.assertEquals("false", s3Properties.hadoopStorageConfig.get("fs.cosn.impl.disable.cache"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.property.storage;

import com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -110,4 +111,21 @@ public void testGCSAwsCredentialsProvider() throws Exception {
gcsStorageProperties = (GCSProperties) StorageProperties.createPrimary(gcsProps);
Assertions.assertEquals(StaticCredentialsProvider.class, gcsStorageProperties.getAwsCredentialsProvider().getClass());
}

@Test
public void testS3DisableHadoopCache() {
Map<String, String> props = Maps.newHashMap();
props.put("fs.gcs.support", "true");
GCSProperties s3Properties = (GCSProperties) StorageProperties.createPrimary(props);
Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.gs.impl.disable.cache", false));
props.put("fs.gs.impl.disable.cache", "true");
s3Properties = (GCSProperties) StorageProperties.createPrimary(props);
Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.gs.impl.disable.cache", false));
props.put("fs.gs.impl.disable.cache", "false");
s3Properties = (GCSProperties) StorageProperties.createPrimary(props);
Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.gs.impl.disable.cache", false));
props.put("fs.gs.impl.disable.cache", "null");
s3Properties = (GCSProperties) StorageProperties.createPrimary(props);
Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.gs.impl.disable.cache", false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.UserException;

import com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
Expand Down Expand Up @@ -152,6 +153,23 @@ public void testAwsCredentialsProvider() throws Exception {
Assertions.assertEquals(StaticCredentialsProvider.class, obsStorageProperties.getAwsCredentialsProvider().getClass());
}

@Test
public void testS3DisableHadoopCache() {
Map<String, String> props = Maps.newHashMap();
props.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com");
OBSProperties s3Properties = (OBSProperties) StorageProperties.createPrimary(props);
Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.obs.impl.disable.cache", false));
props.put("fs.obs.impl.disable.cache", "true");
s3Properties = (OBSProperties) StorageProperties.createPrimary(props);
Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.obs.impl.disable.cache", false));
props.put("fs.obs.impl.disable.cache", "false");
s3Properties = (OBSProperties) StorageProperties.createPrimary(props);
Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.obs.impl.disable.cache", false));
props.put("fs.obs.impl.disable.cache", "null");
s3Properties = (OBSProperties) StorageProperties.createPrimary(props);
Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.obs.impl.disable.cache", false));
}

@Test
public void testMissingSecretKey() {
origProps.put("obs.endpoint", "obs.cn-north-4.myhuaweicloud.com");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.UserException;

import com.google.common.collect.Maps;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
Expand Down Expand Up @@ -250,4 +251,21 @@ public void testAwsCredentialsProvider() throws Exception {
Assertions.assertEquals(StaticCredentialsProvider.class, ossStorageProperties.getAwsCredentialsProvider().getClass());
}

@Test
public void testS3DisableHadoopCache() throws UserException {
Map<String, String> props = Maps.newHashMap();
props.put("oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
OSSProperties s3Properties = (OSSProperties) StorageProperties.createPrimary(props);
Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false));
props.put("fs.oss.impl.disable.cache", "true");
s3Properties = (OSSProperties) StorageProperties.createPrimary(props);
Assertions.assertTrue(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false));
props.put("fs.oss.impl.disable.cache", "false");
s3Properties = (OSSProperties) StorageProperties.createPrimary(props);
Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false));
props.put("fs.oss.impl.disable.cache", "null");
s3Properties = (OSSProperties) StorageProperties.createPrimary(props);
Assertions.assertFalse(s3Properties.hadoopStorageConfig.getBoolean("fs.oss.impl.disable.cache", false));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -423,4 +423,21 @@ public void testS3PropertiesAwsAnonymousCredentialsProvider() {
Assertions.assertEquals(AwsCredentialsProviderChain.class, provider.getClass());
Config.aws_credentials_provider_version = "v2";
}

@Test
public void testS3DisableHadoopCache() throws UserException {
Map<String, String> props = Maps.newHashMap();
props.put("s3.endpoint", "s3.us-west-2.amazonaws.com");
S3Properties s3Properties = (S3Properties) StorageProperties.createPrimary(props);
Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3a.impl.disable.cache"));
Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3.impl.disable.cache"));
Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3n.impl.disable.cache"));
props.put("fs.s3a.impl.disable.cache", "true");
props.put("fs.s3.impl.disable.cache", "false");
props.put("fs.s3n.impl.disable.cache", "null");
s3Properties = (S3Properties) StorageProperties.createPrimary(props);
Assertions.assertEquals("true", s3Properties.hadoopStorageConfig.get("fs.s3a.impl.disable.cache"));
Assertions.assertEquals("false", s3Properties.hadoopStorageConfig.get("fs.s3.impl.disable.cache"));
Assertions.assertEquals("false", s3Properties.hadoopStorageConfig.get("fs.s3n.impl.disable.cache"));
}
}
Loading