properties) {
this.storageDesc = new StorageDesc(storageName, storageType, properties);
this.location = location;
- boolean convertedToS3 = BosProperties.tryConvertBosToS3(properties, storageType);
+ /*boolean convertedToS3 = BosProperties.tryConvertBosToS3(properties, storageType);
if (convertedToS3) {
this.storageDesc.setStorageType(StorageBackend.StorageType.S3);
this.location = BosProperties.convertPathToS3(location);
} else {
this.location = location;
- }
+ }*/
}
public void setStorageDesc(StorageDesc storageDesc) {
@@ -138,8 +101,8 @@ public String toSql() {
sb.append(" `").append(storageDesc.getName()).append("`");
}
sb.append(" ON LOCATION ").append(location).append(" PROPERTIES(")
- .append(new PrintableMap<>(storageDesc.getProperties(), " = ", true, false, true))
- .append(")");
+ .append(new PrintableMap<>(storageDesc.getProperties(), " = ", true, false, true))
+ .append(")");
return sb.toString();
}
@@ -185,6 +148,35 @@ public TStorageBackendType toThrift() {
return TStorageBackendType.BROKER;
}
}
+
+ /**
+ * A set of storage types that currently support parameter refactoring.
+ *
+ * Includes: S3 (referring to all systems compatible with the S3 protocol),
+ * HDFS, OFS, JFS, and AZURE. For S3, this is a generalized type that matches
+ * any system whose storage type name is returned as "s3" (or compatible)
+ * by {@link org.apache.doris.datasource.property.storage.StorageProperties#getStorageName()}.
+ *
+ * This set is a temporary solution. Once parameter refactoring is fully supported
+ * across all storage systems, this class can be removed.
+ */
+ public static final Set REFACTOR_STORAGE_TYPES =
+ ImmutableSet.of(StorageType.S3, StorageType.HDFS, StorageType.OFS, StorageType.JFS, StorageType.AZURE);
+
+ public static StorageType convertToStorageType(String storageName) {
+ switch (storageName.toLowerCase()) {
+ case "hdfs":
+ return StorageType.HDFS;
+ case "s3":
+ return StorageType.S3;
+ case "jfs":
+ return StorageType.JFS;
+ case "local":
+ return StorageType.LOCAL;
+ default:
+ throw new IllegalArgumentException("Invalid storage type: " + storageName);
+ }
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
index 902f5d24b91820..bbfcd8e9d2c45b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageDesc.java
@@ -17,7 +17,11 @@
package org.apache.doris.analysis;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+
import com.google.gson.annotations.SerializedName;
+import lombok.Getter;
import java.util.Map;
@@ -32,18 +36,33 @@
* The broker's StorageBackend.StorageType desc
*/
public class StorageDesc extends ResourceDesc {
+
+ @Deprecated
@SerializedName("st")
protected StorageBackend.StorageType storageType;
+ @Getter
+ protected StorageProperties storageProperties;
+
public StorageDesc() {
}
public StorageDesc(String name, StorageBackend.StorageType storageType, Map properties) {
this.name = name;
this.storageType = storageType;
+ if (!storageType.equals(StorageBackend.StorageType.BROKER)) {
+ this.storageProperties = StorageProperties.createPrimary(properties);
+ }
this.properties = properties;
}
+ public StorageDesc(String name, Map properties) throws UserException {
+ this.name = name;
+ this.properties = properties;
+ this.storageProperties = StorageProperties.createPrimary(properties);
+ this.storageType = StorageBackend.StorageType.convertToStorageType(storageProperties.getStorageName());
+ }
+
public void setName(String name) {
this.name = name;
}
@@ -67,4 +86,11 @@ public StorageBackend.StorageType getStorageType() {
public Map getProperties() {
return properties;
}
+
+ public Map getBackendConfigProperties() {
+ if (null == storageProperties) {
+ return properties;
+ }
+ return storageProperties.getBackendConfigProperties();
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 64ca3582db160e..399f996e314fd3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -19,7 +19,6 @@
import org.apache.doris.analysis.AbstractBackupStmt;
import org.apache.doris.analysis.AbstractBackupTableRefClause;
-import org.apache.doris.analysis.AlterRepositoryStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.BackupStmt.BackupType;
import org.apache.doris.analysis.CancelBackupStmt;
@@ -45,15 +44,17 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
-import org.apache.doris.fs.FileSystemFactory;
-import org.apache.doris.fs.remote.AzureFileSystem;
-import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.fs.remote.S3FileSystem;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.fsv2.FileSystemFactory;
+import org.apache.doris.fsv2.remote.AzureFileSystem;
+import org.apache.doris.fsv2.remote.RemoteFileSystem;
+import org.apache.doris.fsv2.remote.S3FileSystem;
import org.apache.doris.nereids.trees.plans.commands.CancelBackupCommand;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.task.DirMoveTask;
@@ -215,10 +216,18 @@ public void createRepository(CreateRepositoryStmt stmt) throws DdlException {
"broker does not exist: " + stmt.getBrokerName());
}
- RemoteFileSystem fileSystem = FileSystemFactory.get(stmt.getBrokerName(), stmt.getStorageType(),
- stmt.getProperties());
+ RemoteFileSystem fileSystem;
+ try {
+ fileSystem = FileSystemFactory.get(stmt.getStorageType(), stmt.getProperties());
+ } catch (UserException e) {
+ throw new DdlException("Failed to initialize remote file system: " + e.getMessage());
+ }
+ org.apache.doris.fs.remote.RemoteFileSystem oldfs = org.apache.doris.fs.FileSystemFactory
+ .get(stmt.getBrokerName(), stmt.getStorageType(),
+ stmt.getProperties());
long repoId = env.getNextId();
- Repository repo = new Repository(repoId, stmt.getName(), stmt.isReadOnly(), stmt.getLocation(), fileSystem);
+ Repository repo = new Repository(repoId, stmt.getName(), stmt.isReadOnly(), stmt.getLocation(),
+ fileSystem, oldfs);
Status st = repoMgr.addAndInitRepoIfNotExist(repo, false);
if (!st.ok()) {
@@ -231,61 +240,104 @@ public void createRepository(CreateRepositoryStmt stmt) throws DdlException {
}
}
- public void alterRepository(AlterRepositoryStmt stmt) throws DdlException {
- alterRepositoryInternal(stmt.getName(), stmt.getProperties());
- }
-
- public void alterRepositoryInternal(String repoName, Map properties) throws DdlException {
+ /**
+ * Alters an existing repository by applying the given new properties.
+ *
+ * @param repoName The name of the repository to alter.
+ * @param newProps The new properties to apply to the repository.
+ * @param strictCheck If true, only allows altering S3 or Azure repositories and validates properties accordingly.
+ * TODO: Investigate why only S3 and Azure repositories are supported for alter operation
+ * @throws DdlException if the repository does not exist, fails to apply properties, or cannot connect
+ * to the updated repository.
+ */
+ public void alterRepository(String repoName, Map newProps, boolean strictCheck)
+ throws DdlException {
tryLock();
try {
- Repository repo = repoMgr.getRepo(repoName);
- if (repo == null) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository does not exist");
+ Repository oldRepo = repoMgr.getRepo(repoName);
+ if (oldRepo == null) {
+ throw new DdlException("Repository does not exist");
}
-
- if (repo.getRemoteFileSystem() instanceof S3FileSystem
- || repo.getRemoteFileSystem() instanceof AzureFileSystem) {
- Map oldProperties = new HashMap<>(properties);
- Status status = repo.alterRepositoryS3Properties(oldProperties);
- if (!status.ok()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, status.getErrMsg());
- }
- RemoteFileSystem fileSystem = null;
- if (repo.getRemoteFileSystem() instanceof S3FileSystem) {
- fileSystem = FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
- StorageBackend.StorageType.S3, oldProperties);
- } else if (repo.getRemoteFileSystem() instanceof AzureFileSystem) {
- fileSystem = FileSystemFactory.get(repo.getRemoteFileSystem().getName(),
- StorageBackend.StorageType.AZURE, oldProperties);
- }
-
- Repository newRepo = new Repository(repo.getId(), repo.getName(), repo.isReadOnly(),
- repo.getLocation(), fileSystem);
- if (!newRepo.ping()) {
- LOG.warn("Failed to connect repository {}. msg: {}", repo.getName(), repo.getErrorMsg());
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
- "Repo can not ping with new s3 properties");
- }
-
- Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
- if (!st.ok()) {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
- "Failed to alter repository: " + st.getErrMsg());
- }
- for (AbstractJob job : getAllCurrentJobs()) {
- if (!job.isDone() && job.getRepoId() == repo.getId()) {
- job.updateRepo(newRepo);
- }
- }
- } else {
- ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
- "Only support alter s3 or azure repository");
+ // Merge new properties with the existing repository's properties
+ Map mergedProps = mergeProperties(oldRepo, newProps, strictCheck);
+ // Create new remote file system with merged properties
+ RemoteFileSystem fileSystem = FileSystemFactory.get(StorageProperties.createPrimary(mergedProps));
+ org.apache.doris.fs.remote.RemoteFileSystem oldfs = null;
+ if (oldRepo.getRemoteFileSystem() instanceof S3FileSystem) {
+ oldfs = org.apache.doris.fs.FileSystemFactory.get(oldRepo.getRemoteFileSystem().getName(),
+ StorageBackend.StorageType.S3, mergedProps);
+ } else if (oldRepo.getRemoteFileSystem() instanceof AzureFileSystem) {
+ oldfs = org.apache.doris.fs.FileSystemFactory.get(oldRepo.getRemoteFileSystem().getName(),
+ StorageBackend.StorageType.AZURE, mergedProps);
+ }
+ // Create new Repository instance with updated file system
+ Repository newRepo = new Repository(
+ oldRepo.getId(), oldRepo.getName(), oldRepo.isReadOnly(),
+ oldRepo.getLocation(), fileSystem, oldfs
+ );
+ // Verify the repository can be connected with new settings
+ if (!newRepo.ping()) {
+ LOG.warn("Failed to connect repository {}. msg: {}", repoName, newRepo.getErrorMsg());
+ throw new DdlException("Repository ping failed with new properties");
+ }
+ // Apply the new repository metadata
+ Status st = repoMgr.alterRepo(newRepo, false /* not replay */);
+ if (!st.ok()) {
+ throw new DdlException("Failed to alter repository: " + st.getErrMsg());
}
+ // Update all running jobs that are using this repository
+ updateOngoingJobs(oldRepo.getId(), newRepo);
} finally {
seqlock.unlock();
}
}
+ /**
+ * Merges new user-provided properties into the existing repository's configuration.
+ * In strict mode, only supports S3 or Azure repositories and applies internal S3 merge logic.
+ *
+ * @param repo The existing repository.
+ * @param newProps New user-specified properties.
+ * @param strictCheck Whether to enforce S3/Azure-only and validate the new properties.
+ * @return A complete set of merged properties.
+ * @throws DdlException if the merge fails or the repository type is unsupported.
+ */
+ private Map mergeProperties(Repository repo, Map newProps, boolean strictCheck)
+ throws DdlException {
+ if (strictCheck) {
+ if (!(repo.getRemoteFileSystem() instanceof S3FileSystem
+ || repo.getRemoteFileSystem() instanceof AzureFileSystem)) {
+ throw new DdlException("Only support altering S3 or Azure repository");
+ }
+ // Let the repository validate and enrich the new S3/Azure properties
+ Map propsCopy = new HashMap<>(newProps);
+ Status status = repo.alterRepositoryS3Properties(propsCopy);
+ if (!status.ok()) {
+ throw new DdlException("Failed to merge S3 properties: " + status.getErrMsg());
+ }
+ return propsCopy;
+ } else {
+ // General case: just override old props with new ones
+ Map combined = new HashMap<>(repo.getRemoteFileSystem().getProperties());
+ combined.putAll(newProps);
+ return combined;
+ }
+ }
+
+ /**
+ * Updates all currently running jobs associated with the given repository ID.
+ * Used to ensure that all jobs operate on the new repository instance after alteration.
+ *
+ * @param repoId The ID of the altered repository.
+ * @param newRepo The new repository instance.
+ */
+ private void updateOngoingJobs(long repoId, Repository newRepo) {
+ for (AbstractJob job : getAllCurrentJobs()) {
+ if (!job.isDone() && job.getRepoId() == repoId) {
+ job.updateRepo(newRepo);
+ }
+ }
+ }
// handle drop repository stmt
public void dropRepository(DropRepositoryStmt stmt) throws DdlException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index b530d3b35e0a68..92e25ecc341503 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -39,7 +39,6 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -388,7 +387,7 @@ public synchronized Status updateRepo(Repository repo) {
continue;
}
((UploadTask) task).updateBrokerProperties(
- S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+ repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties());
AgentTaskQueue.updateTask(beId, TTaskType.UPLOAD, signature, task);
}
LOG.info("finished to update upload job properties. {}", this);
@@ -783,7 +782,7 @@ private void uploadSnapshot() {
long signature = env.getNextId();
UploadTask task = new UploadTask(null, beId, signature, jobId, dbId, srcToDest,
brokers.get(0),
- S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()),
+ repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties(),
repo.getRemoteFileSystem().getStorageType(), repo.getLocation());
batchTask.addTask(task);
unfinishedTaskIds.put(signature, beId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
index 1450ef9a03459f..5154c74ab58294 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
@@ -25,19 +25,19 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
-import org.apache.doris.fs.FileSystemFactory;
-import org.apache.doris.fs.PersistentFileSystem;
-import org.apache.doris.fs.remote.AzureFileSystem;
-import org.apache.doris.fs.remote.BrokerFileSystem;
-import org.apache.doris.fs.remote.RemoteFile;
-import org.apache.doris.fs.remote.RemoteFileSystem;
-import org.apache.doris.fs.remote.S3FileSystem;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+import org.apache.doris.datasource.property.storage.StorageProperties;
+import org.apache.doris.fsv2.FileSystemFactory;
+import org.apache.doris.fsv2.PersistentFileSystem;
+import org.apache.doris.fsv2.remote.BrokerFileSystem;
+import org.apache.doris.fsv2.remote.RemoteFile;
+import org.apache.doris.fsv2.remote.RemoteFileSystem;
+import org.apache.doris.fsv2.remote.S3FileSystem;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.Backend;
@@ -132,18 +132,28 @@ public class Repository implements Writable, GsonPostProcessable {
private String location;
@SerializedName("fs")
+ private org.apache.doris.fs.PersistentFileSystem oldfs;
+
+ // Temporary field: currently still using the legacy fs config (oldfs).
+ // This field can be removed once the new fs configuration is fully enabled.
private PersistentFileSystem fileSystem;
+ public org.apache.doris.fs.PersistentFileSystem getOldfs() {
+ return oldfs;
+ }
+
private Repository() {
// for persist
}
- public Repository(long id, String name, boolean isReadOnly, String location, RemoteFileSystem fileSystem) {
+ public Repository(long id, String name, boolean isReadOnly, String location, RemoteFileSystem fileSystem,
+ org.apache.doris.fs.PersistentFileSystem oldFs) {
this.id = id;
this.name = name;
this.isReadOnly = isReadOnly;
this.location = location;
this.fileSystem = fileSystem;
+ this.oldfs = oldFs;
this.createTime = System.currentTimeMillis();
}
@@ -203,17 +213,53 @@ public static Repository read(DataInput in) throws IOException {
}
}
+ //todo why only support alter S3 properties
+ public Status alterRepositoryS3Properties(Map properties) {
+ if (this.fileSystem instanceof S3FileSystem) {
+ Map oldProperties = new HashMap<>(this.getRemoteFileSystem().getProperties());
+ oldProperties.remove(S3Properties.ACCESS_KEY);
+ oldProperties.remove(S3Properties.SECRET_KEY);
+ oldProperties.remove(S3Properties.SESSION_TOKEN);
+ oldProperties.remove(S3Properties.Env.ACCESS_KEY);
+ oldProperties.remove(S3Properties.Env.SECRET_KEY);
+ oldProperties.remove(S3Properties.Env.TOKEN);
+ for (Map.Entry entry : properties.entrySet()) {
+ if (Objects.equals(entry.getKey(), S3Properties.ACCESS_KEY)
+ || Objects.equals(entry.getKey(), S3Properties.Env.ACCESS_KEY)) {
+ oldProperties.putIfAbsent(S3Properties.ACCESS_KEY, entry.getValue());
+ }
+ if (Objects.equals(entry.getKey(), S3Properties.SECRET_KEY)
+ || Objects.equals(entry.getKey(), S3Properties.Env.SECRET_KEY)) {
+ oldProperties.putIfAbsent(S3Properties.SECRET_KEY, entry.getValue());
+ }
+ if (Objects.equals(entry.getKey(), S3Properties.SESSION_TOKEN)
+ || Objects.equals(entry.getKey(), S3Properties.Env.TOKEN)) {
+ oldProperties.putIfAbsent(S3Properties.SESSION_TOKEN, entry.getValue());
+ }
+ }
+ properties.clear();
+ properties.putAll(oldProperties);
+ return Status.OK;
+ } else {
+ return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 repository");
+ }
+ }
+
@Override
public void gsonPostProcess() {
StorageBackend.StorageType type = StorageBackend.StorageType.BROKER;
- if (this.fileSystem.properties.containsKey(PersistentFileSystem.STORAGE_TYPE)) {
+ if (this.oldfs.properties.containsKey(org.apache.doris.fs.PersistentFileSystem.STORAGE_TYPE)) {
type = StorageBackend.StorageType.valueOf(
- this.fileSystem.properties.get(PersistentFileSystem.STORAGE_TYPE));
- this.fileSystem.properties.remove(PersistentFileSystem.STORAGE_TYPE);
+ this.oldfs.properties.get(org.apache.doris.fs.PersistentFileSystem.STORAGE_TYPE));
+ this.oldfs.properties.remove(org.apache.doris.fs.PersistentFileSystem.STORAGE_TYPE);
}
- this.fileSystem = FileSystemFactory.get(this.fileSystem.getName(),
+ this.oldfs = org.apache.doris.fs.FileSystemFactory.get(this.oldfs.getName(),
type,
- this.fileSystem.getProperties());
+ this.oldfs.getProperties());
+ if (!type.equals(StorageBackend.StorageType.BROKER)) {
+ StorageProperties storageProperties = StorageProperties.createPrimary(this.oldfs.properties);
+ this.fileSystem = FileSystemFactory.get(storageProperties);
+ }
}
public long getId() {
@@ -229,7 +275,18 @@ public boolean isReadOnly() {
}
public String getLocation() {
- return location;
+ if (null == fileSystem) {
+ return location;
+ }
+ try {
+ if (null == fileSystem.getStorageProperties()) {
+ return location;
+ } else {
+ return fileSystem.getStorageProperties().validateAndNormalizeUri(location);
+ }
+ } catch (UserException e) {
+ throw new RuntimeException(e);
+ }
}
public String getErrorMsg() {
@@ -277,7 +334,7 @@ public Status initRepository() {
if (name.compareTo((String) root.get("name")) != 0) {
return new Status(ErrCode.COMMON_ERROR,
"Invalid repository __repo_info, expected repo '" + name + "', but get name '"
- + (String) root.get("name") + "' from " + repoInfoFilePath);
+ + (String) root.get("name") + "' from " + repoInfoFilePath);
}
name = (String) root.get("name");
createTime = TimeUtils.timeStringToLong((String) root.get("create_time"));
@@ -307,54 +364,23 @@ public Status initRepository() {
}
}
- public Status alterRepositoryS3Properties(Map properties) {
- if (fileSystem instanceof S3FileSystem) {
- Map oldProperties = new HashMap<>(this.getRemoteFileSystem().getProperties());
- oldProperties.remove(S3Properties.ACCESS_KEY);
- oldProperties.remove(S3Properties.SECRET_KEY);
- oldProperties.remove(S3Properties.SESSION_TOKEN);
- oldProperties.remove(S3Properties.Env.ACCESS_KEY);
- oldProperties.remove(S3Properties.Env.SECRET_KEY);
- oldProperties.remove(S3Properties.Env.TOKEN);
- for (Map.Entry entry : properties.entrySet()) {
- if (Objects.equals(entry.getKey(), S3Properties.ACCESS_KEY)
- || Objects.equals(entry.getKey(), S3Properties.Env.ACCESS_KEY)) {
- oldProperties.putIfAbsent(S3Properties.ACCESS_KEY, entry.getValue());
- }
- if (Objects.equals(entry.getKey(), S3Properties.SECRET_KEY)
- || Objects.equals(entry.getKey(), S3Properties.Env.SECRET_KEY)) {
- oldProperties.putIfAbsent(S3Properties.SECRET_KEY, entry.getValue());
- }
- if (Objects.equals(entry.getKey(), S3Properties.SESSION_TOKEN)
- || Objects.equals(entry.getKey(), S3Properties.Env.TOKEN)) {
- oldProperties.putIfAbsent(S3Properties.SESSION_TOKEN, entry.getValue());
- }
- }
- properties.clear();
- properties.putAll(oldProperties);
- return Status.OK;
- } else {
- return new Status(ErrCode.COMMON_ERROR, "Only support alter s3 repository");
- }
- }
-
// eg: location/__palo_repository_repo_name/__repo_info
public String assembleRepoInfoFilePath() {
- return Joiner.on(PATH_DELIMITER).join(location,
+ return Joiner.on(PATH_DELIMITER).join(getLocation(),
joinPrefix(PREFIX_REPO, name),
FILE_REPO_INFO);
}
// eg: location/__palo_repository_repo_name/__my_sp1/__meta
public String assembleMetaInfoFilePath(String label) {
- return Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name),
+ return Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
FILE_META_INFO);
}
// eg: location/__palo_repository_repo_name/__my_sp1/__info_2018-01-01-08-00-00
public String assembleJobInfoFilePath(String label, long createTime) {
- return Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name),
+ return Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
jobInfoFileNameWithTimestamp(createTime));
}
@@ -362,7 +388,7 @@ public String assembleJobInfoFilePath(String label, long createTime) {
// eg:
// __palo_repository_repo_name/__ss_my_ss1/__ss_content/__db_10001/__tbl_10020/__part_10031/__idx_10020/__10022/
public String getRepoTabletPathBySnapshotInfo(String label, SnapshotInfo info) {
- String path = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name),
+ String path = Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
DIR_SNAPSHOT_CONTENT,
joinPrefix(PREFIX_DB, info.getDbId()),
@@ -381,7 +407,7 @@ public String getRepoTabletPathBySnapshotInfo(String label, SnapshotInfo info) {
}
public String getRepoPath(String label, String childPath) {
- String path = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name),
+ String path = Joiner.on(PATH_DELIMITER).join(getLocation(), joinPrefix(PREFIX_REPO, name),
joinPrefix(PREFIX_SNAPSHOT_DIR, label),
DIR_SNAPSHOT_CONTENT,
childPath);
@@ -568,7 +594,7 @@ public Status upload(String localFilePath, String remoteFilePath) {
if (!st.ok()) {
return st;
}
- } else if (fileSystem instanceof S3FileSystem || fileSystem instanceof AzureFileSystem) {
+ } else {
if (LOG.isDebugEnabled()) {
LOG.debug("get md5sum of file: {}. final remote path: {}", localFilePath, finalRemotePath);
}
@@ -577,20 +603,6 @@ public Status upload(String localFilePath, String remoteFilePath) {
return st;
}
- // upload final file
- st = fileSystem.upload(localFilePath, finalRemotePath);
- if (!st.ok()) {
- return st;
- }
- } else if (fileSystem instanceof DFSFileSystem) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("hdfs get md5sum of file: {}. final remote path: {}", localFilePath, finalRemotePath);
- }
- st = fileSystem.delete(finalRemotePath);
- if (!st.ok()) {
- return st;
- }
-
// upload final file
st = fileSystem.upload(localFilePath, finalRemotePath);
if (!st.ok()) {
@@ -637,7 +649,7 @@ public Status download(String remoteFilePath, String localFilePath) {
// 2. download
status = fileSystem.downloadWithFileSize(remoteFilePathWithChecksum, localFilePath,
- remoteFiles.get(0).getSize());
+ remoteFiles.get(0).getSize());
if (!status.ok()) {
return status;
}
@@ -855,7 +867,13 @@ public void readFields(DataInput in) throws IOException {
name = Text.readString(in);
isReadOnly = in.readBoolean();
location = Text.readString(in);
- fileSystem = PersistentFileSystem.read(in);
+ oldfs = org.apache.doris.fs.PersistentFileSystem.read(in);
+ try {
+ fileSystem = FileSystemFactory.get(oldfs.getStorageType(), oldfs.getProperties());
+ } catch (UserException e) {
+ // do we ignore this exception?
+ throw new IOException("Failed to create file system: " + e.getMessage());
+ }
createTime = in.readLong();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
index 853c1841449046..d57593c5443098 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
@@ -23,8 +23,8 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Daemon;
-import org.apache.doris.fs.remote.AzureFileSystem;
-import org.apache.doris.fs.remote.S3FileSystem;
+import org.apache.doris.fsv2.remote.AzureFileSystem;
+import org.apache.doris.fsv2.remote.S3FileSystem;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -104,6 +104,9 @@ public Repository getRepo(long repoId) {
return repoIdMap.get(repoId);
}
+ /**
+ * todo: why not support alter other file system like hdfs
+ */
public Status alterRepo(Repository newRepo, boolean isReplay) {
lock.lock();
try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index a3230de290e739..2643d0efacd5e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -68,7 +68,6 @@
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
@@ -438,7 +437,7 @@ public synchronized Status updateRepo(Repository repo) {
continue;
}
((DownloadTask) task).updateBrokerProperties(
- S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()));
+ repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties());
AgentTaskQueue.updateTask(beId, TTaskType.DOWNLOAD, signature, task);
}
LOG.info("finished to update download job properties. {}", this);
@@ -1864,7 +1863,7 @@ private void downloadRemoteSnapshots() {
long signature = env.getNextId();
DownloadTask task = new DownloadTask(null, beId, signature, jobId, dbId, srcToDest,
brokerAddrs.get(0),
- S3ClientBEProperties.getBeFSProperties(repo.getRemoteFileSystem().getProperties()),
+ repo.getRemoteFileSystem().getStorageProperties().getBackendConfigProperties(),
repo.getRemoteFileSystem().getStorageType(), repo.getLocation());
batchTask.addTask(task);
unfinishedSignatureToId.put(signature, beId);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java
index 592d9f94d114b3..bb3162aa1e7f01 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java
@@ -103,7 +103,7 @@ public TBrokerFD open(String path) throws IOException {
String clientId = NetUtils
.getHostPortInAccessibleFormat(FrontendOptions.getLocalHostAddress(), Config.rpc_port);
TBrokerOpenReaderRequest tOpenReaderRequest = new TBrokerOpenReaderRequest(
- TBrokerVersion.VERSION_ONE, path, 0, clientId, brokerDesc.getProperties());
+ TBrokerVersion.VERSION_ONE, path, 0, clientId, brokerDesc.getBackendConfigProperties());
TBrokerOpenReaderResponse tOpenReaderResponse = null;
try {
tOpenReaderResponse = client.openReader(tOpenReaderRequest);
@@ -137,7 +137,7 @@ public void close(TBrokerFD fd) {
public long getFileLength(String path) throws IOException {
TBrokerListPathRequest request = new TBrokerListPathRequest(
- TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getProperties());
+ TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getBackendConfigProperties());
TBrokerListResponse tBrokerListResponse = null;
try {
tBrokerListResponse = client.listPath(request);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 617d346ba3daa3..e043b7a9fd6406 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -28,9 +28,9 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
-import org.apache.doris.fs.FileSystemFactory;
-import org.apache.doris.fs.remote.RemoteFile;
-import org.apache.doris.fs.remote.RemoteFileSystem;
+import org.apache.doris.fsv2.FileSystemFactory;
+import org.apache.doris.fsv2.remote.RemoteFile;
+import org.apache.doris.fsv2.remote.RemoteFileSystem;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
@@ -85,8 +85,7 @@ public class BrokerUtil {
public static void parseFile(String path, BrokerDesc brokerDesc, List fileStatuses)
throws UserException {
List rfiles = new ArrayList<>();
- try (RemoteFileSystem fileSystem = FileSystemFactory.get(
- brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties())) {
+ try (RemoteFileSystem fileSystem = FileSystemFactory.get(brokerDesc)) {
Status st = fileSystem.globList(path, rfiles, false);
if (!st.ok()) {
throw new UserException(st.getErrMsg());
@@ -107,8 +106,7 @@ public static void parseFile(String path, BrokerDesc brokerDesc, List getLocationProperties() throws UserException;
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java
index 70d55a497e458a..2b028cf695ddb7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/LoadScanProvider.java
@@ -99,9 +99,10 @@ public FileLoadScanNode.ParamCreateContext createContext(Analyzer analyzer) thro
&& fileGroupInfo.getFileGroup().getFileFormat().equals("hive_text")) {
params.setTextSerdeType(TTextSerdeType.HIVE_TEXT_SERDE);
}
- params.setProperties(fileGroupInfo.getBrokerDesc().getProperties());
+ params.setProperties(fileGroupInfo.getBrokerDesc().getBackendConfigProperties());
if (fileGroupInfo.getBrokerDesc().getFileType() == TFileType.FILE_HDFS) {
- THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(fileGroupInfo.getBrokerDesc().getProperties());
+ THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(fileGroupInfo.getBrokerDesc()
+ .getBackendConfigProperties());
params.setHdfsParams(tHdfsParams);
}
TFileAttributes fileAttributes = new TFileAttributes();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
index 7fe00cedec7984..2c86c67e4ce4a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/ConnectionProperties.java
@@ -18,7 +18,7 @@
package org.apache.doris.datasource.property;
import org.apache.doris.common.CatalogConfigFileUtils;
-import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
@@ -55,7 +55,7 @@ protected ConnectionProperties(Map origProps) {
this.origProps = origProps;
}
- protected void initNormalizeAndCheckProps() throws UserException {
+ protected void initNormalizeAndCheckProps() {
List supportedProps = PropertyUtils.getConnectorProperties(this.getClass());
for (Field field : supportedProps) {
field.setAccessible(true);
@@ -67,7 +67,8 @@ protected void initNormalizeAndCheckProps() throws UserException {
field.set(this, origProps.get(name));
matchedProperties.put(name, origProps.get(name));
} catch (IllegalAccessException e) {
- throw new RuntimeException("Failed to set property " + name + ", " + e.getMessage(), e);
+ throw new StoragePropertiesException("Failed to set property " + name
+ + ", " + e.getMessage(), e);
}
break;
}
@@ -115,7 +116,8 @@ protected void checkRequiredProperties() {
throw new IllegalArgumentException("Property " + names[0] + " is required.");
}
} catch (IllegalAccessException e) {
- throw new RuntimeException(e);
+ throw new StoragePropertiesException("Failed to get property " + names[0]
+ + ", " + e.getMessage(), e);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AliyunDLFProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AliyunDLFProperties.java
index c0096baddc467b..37b79b7dcc7f22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AliyunDLFProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AliyunDLFProperties.java
@@ -17,7 +17,6 @@
package org.apache.doris.datasource.property.metastore;
-import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.ConnectorProperty;
import com.google.common.base.Strings;
@@ -67,7 +66,7 @@ public AliyunDLFProperties(Map origProps) {
}
@Override
- protected void initNormalizeAndCheckProps() throws UserException {
+ protected void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
// Other properties that start with "dlf." will be saved in otherDlfProps,
// and passed to the DLF client.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSProperties.java
index b7c18a3e74a808..2702ae474918ae 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/HMSProperties.java
@@ -18,7 +18,6 @@
package org.apache.doris.datasource.property.metastore;
import org.apache.doris.common.CatalogConfigFileUtils;
-import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.ConnectorProperty;
import com.google.common.base.Strings;
@@ -95,7 +94,7 @@ protected void checkRequiredProperties() {
}
@Override
- protected void initNormalizeAndCheckProps() throws UserException {
+ protected void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
hiveConfParams = loadConfigFromFile(getResourceConfigPropName());
initHmsConnectionProperties();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
index 5685c278347867..3c0422954e54e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AbstractS3CompatibleProperties.java
@@ -80,6 +80,12 @@ public abstract class AbstractS3CompatibleProperties extends StorageProperties i
@Getter
protected String forceParsingByStandardUrl = "false";
+ @Getter
+ @ConnectorProperty(names = {"s3.session_token", "session_token"},
+ required = false,
+ description = "The session token of S3.")
+ protected String sessionToken = "";
+
/**
* Constructor to initialize the object storage properties with the provided type and original properties map.
*
@@ -135,6 +141,9 @@ private Map doBuildS3Configuration(String maxConnections,
s3Props.put("AWS_REQUEST_TIMEOUT_MS", requestTimeoutMs);
s3Props.put("AWS_CONNECTION_TIMEOUT_MS", connectionTimeoutMs);
s3Props.put("use_path_style", usePathStyle);
+ if (StringUtils.isNotBlank(getSessionToken())) {
+ s3Props.put("AWS_TOKEN", getSessionToken());
+ }
return s3Props;
}
@@ -145,7 +154,7 @@ public Map getBackendConfigProperties() {
@Override
- protected void initNormalizeAndCheckProps() throws UserException {
+ protected void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
setEndpointIfNotSet();
if (!isValidEndpoint(getEndpoint())) {
@@ -184,7 +193,7 @@ private boolean isValidEndpoint(String endpoint) {
return endpointPattern().matcher(endpoint).matches();
}
- private void setEndpointIfNotSet() throws UserException {
+ private void setEndpointIfNotSet() {
if (StringUtils.isNotBlank(getEndpoint())) {
return;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
index 512ed92cca9054..9d85c3657668c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/AzureProperties.java
@@ -100,7 +100,7 @@ public AzureProperties(Map origProps) {
private static final String AZURE_ENDPOINT_SUFFIX = ".blob.core.windows.net";
@Override
- protected void initNormalizeAndCheckProps() throws UserException {
+ protected void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
//check endpoint
if (!endpoint.endsWith(AZURE_ENDPOINT_SUFFIX)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java
new file mode 100644
index 00000000000000..b53a65ecdc7ac9
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/BrokerProperties.java
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.common.UserException;
+
+import java.util.Map;
+
+public class BrokerProperties extends StorageProperties {
+ public BrokerProperties(Map origProps) {
+ super(Type.BROKER, origProps);
+ }
+
+ @Override
+ public Map getBackendConfigProperties() {
+ return origProps;
+ }
+
+ @Override
+ public String validateAndNormalizeUri(String url) throws UserException {
+ return url;
+ }
+
+ @Override
+ public String validateAndGetUri(Map loadProps) throws UserException {
+ return loadProps.get("uri");
+ }
+
+ @Override
+ public String getStorageName() {
+ return "BROKER";
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
index 6dc72e1b27cc97..730a4bdb7b2fac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/COSProperties.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -45,7 +46,7 @@ public class COSProperties extends AbstractS3CompatibleProperties {
protected String region = "";
@Getter
- @ConnectorProperty(names = {"cos.access_key", "AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
+ @ConnectorProperty(names = {"cos.access_key", "s3.access_key", "AWS_ACCESS_KEY", "access_key", "ACCESS_KEY"},
description = "The access key of COS.")
protected String accessKey = "";
@@ -75,12 +76,13 @@ protected static boolean guessIsMe(Map origProps) {
.findFirst()
.orElse(null);
if (!Strings.isNullOrEmpty(value)) {
- return ENDPOINT_PATTERN.matcher(value).matches();
+ return value.contains("myqcloud.com");
}
- if (!origProps.containsKey("uri")) {
- return false;
- }
- return origProps.get("uri").contains("myqcloud.com");
+ Optional uriValue = origProps.entrySet().stream()
+ .filter(e -> e.getKey().equalsIgnoreCase("uri"))
+ .map(Map.Entry::getValue)
+ .findFirst();
+ return uriValue.isPresent() && uriValue.get().contains("myqcloud.com");
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsProperties.java
index d4722a3c1ad0f5..97c6c28db145a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsProperties.java
@@ -58,6 +58,11 @@ public class HdfsProperties extends HdfsCompatibleProperties {
description = "Whether to enable the impersonation of HDFS.")
private boolean hdfsImpersonationEnabled = false;
+ @ConnectorProperty(names = {"ipc.client.fallback-to-simple-auth-allowed"},
+ required = false,
+ description = "Whether to allow fallback to simple authentication.")
+ private String allowFallbackToSimpleAuth = "";
+
private Map backendConfigProperties;
/**
@@ -69,6 +74,8 @@ public class HdfsProperties extends HdfsCompatibleProperties {
*/
private Map userOverriddenHdfsConfig;
+ public static final String HDFS_DEFAULT_FS_NAME = "fs.defaultFS";
+
private static final List HDFS_PROPERTIES_KEYS = Arrays.asList("hdfs.authentication.type",
"hadoop.security.authentication", "hadoop.username",
"hdfs.authentication.kerberos.principal", "hadoop.kerberos.principal", "dfs.nameservices");
@@ -81,6 +88,9 @@ public static boolean guessIsMe(Map props) {
if (MapUtils.isEmpty(props)) {
return false;
}
+ if (HdfsPropertiesUtils.validateUriIsHdfsUri(props)) {
+ return true;
+ }
if (HDFS_PROPERTIES_KEYS.stream().anyMatch(props::containsKey)) {
return true;
}
@@ -98,7 +108,7 @@ public static boolean guessIsMe(Map props) {
}
@Override
- protected void initNormalizeAndCheckProps() throws UserException {
+ protected void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
extractUserOverriddenHdfsConfig(origProps);
initHadoopConfiguration();
@@ -111,7 +121,7 @@ private void extractUserOverriddenHdfsConfig(Map origProps) {
}
userOverriddenHdfsConfig = new HashMap<>();
origProps.forEach((key, value) -> {
- if (key.startsWith("hadoop.") || key.startsWith("dfs.") || key.equals("fs.defaultFS")) {
+ if (key.startsWith("hadoop.") || key.startsWith("dfs.") || key.startsWith("fs.")) {
userOverriddenHdfsConfig.put(key, value);
}
});
@@ -132,11 +142,7 @@ protected void checkRequiredProperties() {
// fsDefaultFS is not strictly required here.
// This is a best-effort fallback to populate fsDefaultFS when possible.
if (StringUtils.isBlank(fsDefaultFS)) {
- try {
- this.fsDefaultFS = HdfsPropertiesUtils.validateAndGetUri(origProps);
- } catch (UserException e) {
- //ignore
- }
+ this.fsDefaultFS = HdfsPropertiesUtils.extractDefaultFsFromUri(origProps);
}
}
@@ -148,7 +154,12 @@ private void initHadoopConfiguration() {
userOverriddenHdfsConfig.forEach(conf::set);
}
if (StringUtils.isNotBlank(fsDefaultFS)) {
- conf.set("fs.defaultFS", fsDefaultFS);
+ conf.set(HDFS_DEFAULT_FS_NAME, fsDefaultFS);
+ }
+ if (StringUtils.isNotBlank(allowFallbackToSimpleAuth)) {
+ conf.set("ipc.client.fallback-to-simple-auth-allowed", allowFallbackToSimpleAuth);
+ } else {
+ conf.set("ipc.client.fallback-to-simple-auth-allowed", "true");
}
conf.set("hdfs.security.authentication", hdfsAuthenticationType);
if ("kerberos".equalsIgnoreCase(hdfsAuthenticationType)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
index 5e97a2fd639fb4..77822f8426d093 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/HdfsPropertiesUtils.java
@@ -20,120 +20,101 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.URI;
+import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
+import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang3.StringUtils;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class HdfsPropertiesUtils {
private static final String URI_KEY = "uri";
+ private static final Set supportSchema = ImmutableSet.of("hdfs", "viewfs");
- private static Set supportSchema = new HashSet<>();
-
- static {
- supportSchema.add("hdfs");
- supportSchema.add("viewfs");
- }
-
- /**
- * Validates that the 'uri' property exists in the provided props map, and normalizes it.
- *
- * @param props the map of properties that must include a 'uri' entry
- * @return a normalized URI string like 'hdfs://host/path'
- * @throws UserException if the map is empty or does not contain the required 'uri' key
- *
- * Example:
- * Input: {"uri": "hdfs://namenode:9000/data/input"}
- * Output: "hdfs://namenode:9000/data/input"
- */
public static String validateAndGetUri(Map props) throws UserException {
if (props.isEmpty()) {
throw new UserException("props is empty");
}
- if (!props.containsKey(URI_KEY)) {
- throw new UserException("props must contain uri");
+ String uriStr = getUri(props);
+ if (StringUtils.isBlank(uriStr)) {
+ throw new StoragePropertiesException("props must contain uri");
}
- String uriStr = props.get(URI_KEY);
return validateAndNormalizeUri(uriStr);
}
- /**
- * Validates and normalizes a raw URI string.
- *
- * @param uriStr the URI string to validate
- * @return a normalized URI in the form of 'scheme://authority/path'
- * @throws UserException if the URI is invalid or unsupported
- *
- * Example:
- * Input: "viewfs://ns1/path/to/file"
- * Output: "viewfs://ns1/path/to/file"
- */
- public static String convertUrlToFilePath(String uriStr) throws UserException {
- return validateAndNormalizeUri(uriStr);
+ public static boolean validateUriIsHdfsUri(Map props) {
+ String uriStr = getUri(props);
+ if (StringUtils.isBlank(uriStr)) {
+ return false;
+ }
+ try {
+ URI uri = URI.create(uriStr);
+ String schema = uri.getScheme();
+ if (StringUtils.isBlank(schema)) {
+ throw new IllegalArgumentException("Invalid uri: " + uriStr + ", extract schema is null");
+ }
+ return isSupportedSchema(schema);
+ } catch (AnalysisException e) {
+ throw new IllegalArgumentException("Invalid uri: " + uriStr, e);
+ }
}
- /**
- * Constructs the default filesystem URI (scheme + authority) from a full URI string in the props map.
- *
- * @param props the map of properties, expected to contain a valid 'uri' entry
- * @return a URI prefix like 'hdfs://host:port', or null if the URI is missing or invalid
- *
- * Example:
- * Input: {"uri": "hdfs://namenode:8020/data"}
- * Output: "hdfs://namenode:8020"
- */
- public static String constructDefaultFsFromUri(Map props) {
- if (props.isEmpty()) {
+ public static String extractDefaultFsFromPath(String filePath) {
+ if (StringUtils.isBlank(filePath)) {
return null;
}
- if (!props.containsKey(URI_KEY)) {
- return null;
+ try {
+ URI uri = URI.create(filePath);
+ return uri.getScheme() + "://" + uri.getAuthority();
+ } catch (AnalysisException e) {
+ throw new IllegalArgumentException("Invalid file path: " + filePath, e);
}
- String uriStr = props.get(URI_KEY);
+ }
+
+ public static String extractDefaultFsFromUri(Map props) {
+ String uriStr = getUri(props);
if (StringUtils.isBlank(uriStr)) {
return null;
}
- URI uri = null;
try {
- uri = URI.create(uriStr);
+ URI uri = URI.create(uriStr);
+ if (!isSupportedSchema(uri.getScheme())) {
+ return null;
+ }
+ return uri.getScheme() + "://" + uri.getAuthority();
} catch (AnalysisException e) {
- return null;
- }
- String schema = uri.getScheme();
- if (StringUtils.isBlank(schema)) {
- throw new IllegalArgumentException("Invalid uri: " + uriStr + "extract schema is null");
- }
- if (!supportSchema.contains(schema.toLowerCase())) {
- throw new IllegalArgumentException("Invalid export path:"
- + schema + " , please use valid 'hdfs://' or 'viewfs://' path.");
+ throw new IllegalArgumentException("Invalid uri: " + uriStr, e);
}
- return uri.getScheme() + "://" + uri.getAuthority();
}
- /**
- * Internal method that validates and normalizes a URI string.
- * Ensures it has a valid scheme and is supported (e.g., hdfs, viewfs).
- *
- * @param uriStr the URI string to validate
- * @return the normalized URI string
- * @throws AnalysisException if the URI is blank or has an unsupported scheme
- *
- * Example:
- * Input: "hdfs://host:8020/user/data"
- * Output: "hdfs://host:8020/user/data"
- */
+ public static String convertUrlToFilePath(String uriStr) throws UserException {
+ return validateAndNormalizeUri(uriStr);
+ }
+
+ private static String getUri(Map props) {
+ return props.entrySet().stream()
+ .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+ .map(Map.Entry::getValue)
+ .filter(StringUtils::isNotBlank)
+ .findFirst()
+ .orElse(null);
+ }
+
+ private static boolean isSupportedSchema(String schema) {
+ return schema != null && supportSchema.contains(schema.toLowerCase());
+ }
+
private static String validateAndNormalizeUri(String uriStr) throws AnalysisException {
if (StringUtils.isBlank(uriStr)) {
- throw new IllegalArgumentException("uri is null, pls check your params");
+ throw new IllegalArgumentException("Properties 'uri' is required");
}
URI uri = URI.create(uriStr);
String schema = uri.getScheme();
if (StringUtils.isBlank(schema)) {
- throw new IllegalArgumentException("Invalid uri: " + uriStr + "extract schema is null");
+ throw new IllegalArgumentException("Invalid uri: " + uriStr + ", extract schema is null");
}
- if (!supportSchema.contains(schema.toLowerCase())) {
+ if (!isSupportedSchema(schema)) {
throw new IllegalArgumentException("Invalid export path:"
+ schema + " , please use valid 'hdfs://' or 'viewfs://' path.");
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
new file mode 100644
index 00000000000000..8f5c3f3dfa324a
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/MinioProperties.java
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.storage;
+
+import org.apache.doris.datasource.property.ConnectorProperty;
+
+import com.google.common.collect.ImmutableSet;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class MinioProperties extends AbstractS3CompatibleProperties {
+ @Setter
+ @Getter
+ @ConnectorProperty(names = {"minio.endpoint", "s3.endpoint", "AWS_ENDPOINT", "endpoint", "ENDPOINT"},
+ required = false, description = "The endpoint of Minio.")
+ protected String endpoint = "";
+
+ @Getter
+ @Setter
+ protected String region = "us-east-1";
+
+ @Getter
+ @ConnectorProperty(names = {"minio.access_key", "AWS_ACCESS_KEY", "ACCESS_KEY", "access_key", "s3.access_key"},
+ description = "The access key of Minio.")
+ protected String accessKey = "";
+
+ @Getter
+ @ConnectorProperty(names = {"minio.secret_key", "s3.secret_key", "AWS_SECRET_KEY", "secret_key", "SECRET_KEY"},
+ description = "The secret key of Minio.")
+ protected String secretKey = "";
+
+ private static final Set IDENTIFIERS = ImmutableSet.of("minio.access_key", "AWS_ACCESS_KEY", "ACCESS_KEY",
+ "access_key", "s3.access_key", "minio.endpoint", "s3.endpoint", "AWS_ENDPOINT", "endpoint", "ENDPOINT");
+
+ /**
+ * Constructor to initialize the object storage properties with the provided type and original properties map.
+ *
+ * @param origProps the original properties map.
+ */
+ protected MinioProperties(Map origProps) {
+ super(Type.MINIO, origProps);
+ }
+
+ public static boolean guessIsMe(Map origProps) {
+ //ugly, but we need to check if the user has set any of the identifiers
+ if (AzureProperties.guessIsMe(origProps) || COSProperties.guessIsMe(origProps)
+ || OSSProperties.guessIsMe(origProps) || S3Properties.guessIsMe(origProps)) {
+ return false;
+ }
+
+ return IDENTIFIERS.stream().map(origProps::get).anyMatch(value -> value != null && !value.isEmpty());
+ }
+
+
+ @Override
+ protected Pattern endpointPattern() {
+ return Pattern.compile("^(?:https?://)?[a-zA-Z0-9.-]+(?::\\d+)?$");
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
index 87ad9b5761c7ab..b99f36bcd30303 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OBSProperties.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -38,12 +39,12 @@ public class OBSProperties extends AbstractS3CompatibleProperties {
protected String endpoint = "";
@Getter
- @ConnectorProperty(names = {"obs.access_key", "AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
+ @ConnectorProperty(names = {"obs.access_key", "s3.access_key", "AWS_ACCESS_KEY", "access_key", "ACCESS_KEY"},
description = "The access key of OBS.")
protected String accessKey = "";
@Getter
- @ConnectorProperty(names = {"obs.secret_key", "secret_key", "s3.secret_key"},
+ @ConnectorProperty(names = {"obs.secret_key", "s3.secret_key", "AWS_SECRET_KEY", "secret_key", "SECRET_KEY"},
description = "The secret key of OBS.")
protected String secretKey = "";
@@ -80,13 +81,13 @@ protected static boolean guessIsMe(Map origProps) {
.orElse(null);
if (!Strings.isNullOrEmpty(value)) {
- return ENDPOINT_PATTERN.matcher(value).matches();
+ return value.contains("myhuaweicloud.com");
}
- if (!origProps.containsKey("uri")) {
- return false;
- }
- // Check if the uri property contains "myhuaweicloud.com"
- return origProps.get("uri").contains("myhuaweicloud.com");
+ Optional uriValue = origProps.entrySet().stream()
+ .filter(e -> e.getKey().equalsIgnoreCase("uri"))
+ .map(Map.Entry::getValue)
+ .findFirst();
+ return uriValue.isPresent() && uriValue.get().contains("myhuaweicloud.com");
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSHdfsProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSHdfsProperties.java
index e0e0def53f6d01..0a09a8602532dc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSHdfsProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSHdfsProperties.java
@@ -82,7 +82,7 @@ protected void checkRequiredProperties() {
}
@Override
- protected void initNormalizeAndCheckProps() throws UserException {
+ protected void initNormalizeAndCheckProps() {
super.initNormalizeAndCheckProps();
initConfigurationParams();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
index 34982f4b690039..26170dcef8bbad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/OSSProperties.java
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -38,7 +39,7 @@ public class OSSProperties extends AbstractS3CompatibleProperties {
protected String endpoint = "";
@Getter
- @ConnectorProperty(names = {"oss.access_key", "s3.access_key", "AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
+ @ConnectorProperty(names = {"oss.access_key", "s3.access_key", "AWS_ACCESS_KEY", "access_key", "ACCESS_KEY"},
description = "The access key of OSS.")
protected String accessKey = "";
@@ -78,12 +79,17 @@ protected static boolean guessIsMe(Map origProps) {
.findFirst()
.orElse(null);
if (!Strings.isNullOrEmpty(value)) {
- return ENDPOINT_PATTERN.matcher(value).matches();
+ return value.contains("aliyuncs.com");
}
- if (!origProps.containsKey("uri")) {
+ Optional uriValue = origProps.entrySet().stream()
+ .filter(e -> e.getKey().equalsIgnoreCase("uri"))
+ .map(Map.Entry::getValue)
+ .findFirst();
+ if (!uriValue.isPresent()) {
return false;
}
- return origProps.get("uri").contains("aliyuncs.com");
+ String uri = uriValue.get();
+ return uri.contains("aliyuncs.com") && (!uri.contains("oss-dls.aliyuncs.com"));
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
index 478f45ee3d06b5..343b4b29d39ef5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3Properties.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -49,7 +50,7 @@ public class S3Properties extends AbstractS3CompatibleProperties {
protected String region = "";
@Getter
- @ConnectorProperty(names = {"s3.access_key", "AWS_ACCESS_KEY", "ACCESS_KEY", "access_key"},
+ @ConnectorProperty(names = {"s3.access_key", "AWS_ACCESS_KEY", "access_key", "ACCESS_KEY"},
description = "The access key of S3.")
protected String accessKey = "";
@@ -134,14 +135,20 @@ protected static boolean guessIsMe(Map origProps) {
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
+ /**
+ * Check if the endpoint contains "amazonaws.com" to determine if it's an S3-compatible storage.
+ * Note: This check should not be overly strict, as a malformed or misconfigured endpoint may
+ * cause the type detection to fail, leading to missed recognition of valid S3 properties.
+ * A more robust approach would allow further validation downstream rather than failing early here.
+ */
if (!Strings.isNullOrEmpty(endpoint)) {
- return ENDPOINT_PATTERN.matcher(endpoint).matches();
+ return endpoint.contains("amazonaws.com");
}
- if (!origProps.containsKey("uri")) {
- return false;
- }
- String uri = origProps.get("uri");
- return uri.contains("amazonaws.com");
+ Optional uriValue = origProps.entrySet().stream()
+ .filter(e -> e.getKey().equalsIgnoreCase("uri"))
+ .map(Map.Entry::getValue)
+ .findFirst();
+ return uriValue.isPresent() && uriValue.get().contains("amazonaws.com");
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
index 745838438dd012..0a4fda0bcdfe8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/S3PropertyUtils.java
@@ -19,10 +19,12 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
+import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import org.apache.commons.lang3.StringUtils;
import java.util.Map;
+import java.util.Optional;
public class S3PropertyUtils {
@@ -43,14 +45,28 @@ public class S3PropertyUtils {
*/
public static String constructEndpointFromUrl(Map props,
String stringUsePathStyle,
- String stringForceParsingByStandardUri) throws UserException {
- String uri = props.get(URI_KEY);
- if (uri == null || uri.isEmpty()) {
+ String stringForceParsingByStandardUri) {
+ Optional uriOptional = props.entrySet().stream()
+ .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+ .map(Map.Entry::getValue)
+ .findFirst();
+
+ if (!uriOptional.isPresent()) {
+ return null;
+ }
+ String uri = uriOptional.get();
+ if (StringUtils.isBlank(uri)) {
return null;
}
boolean usePathStyle = Boolean.parseBoolean(stringUsePathStyle);
boolean forceParsingByStandardUri = Boolean.parseBoolean(stringForceParsingByStandardUri);
- S3URI s3uri = S3URI.create(uri, usePathStyle, forceParsingByStandardUri);
+ S3URI s3uri;
+ try {
+ s3uri = S3URI.create(uri, usePathStyle, forceParsingByStandardUri);
+ } catch (UserException e) {
+ throw new IllegalArgumentException("Invalid S3 URI: " + uri + ",usePathStyle: " + usePathStyle
+ + " forceParsingByStandardUri: " + forceParsingByStandardUri, e);
+ }
return s3uri.getEndpoint().orElse(null);
}
@@ -68,14 +84,28 @@ public static String constructEndpointFromUrl(Map props,
*/
public static String constructRegionFromUrl(Map props,
String stringUsePathStyle,
- String stringForceParsingByStandardUri) throws UserException {
- String uri = props.get(URI_KEY);
- if (uri == null || uri.isEmpty()) {
+ String stringForceParsingByStandardUri) {
+ Optional uriOptional = props.entrySet().stream()
+ .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+ .map(Map.Entry::getValue)
+ .findFirst();
+
+ if (!uriOptional.isPresent()) {
+ return null;
+ }
+ String uri = uriOptional.get();
+ if (StringUtils.isBlank(uri)) {
return null;
}
boolean usePathStyle = Boolean.parseBoolean(stringUsePathStyle);
boolean forceParsingByStandardUri = Boolean.parseBoolean(stringForceParsingByStandardUri);
- S3URI s3uri = S3URI.create(uri, usePathStyle, forceParsingByStandardUri);
+ S3URI s3uri = null;
+ try {
+ s3uri = S3URI.create(uri, usePathStyle, forceParsingByStandardUri);
+ } catch (UserException e) {
+ throw new IllegalArgumentException("Invalid S3 URI: " + uri + ",usePathStyle: " + usePathStyle
+ + " forceParsingByStandardUri: " + forceParsingByStandardUri, e);
+ }
return s3uri.getRegion().orElse(null);
}
@@ -99,7 +129,7 @@ public static String validateAndNormalizeUri(String path,
String stringUsePathStyle,
String stringForceParsingByStandardUri) throws UserException {
if (StringUtils.isBlank(path)) {
- throw new UserException("path is null");
+ throw new StoragePropertiesException("path is null");
}
if (path.startsWith("s3://")) {
return path;
@@ -122,13 +152,18 @@ public static String validateAndNormalizeUri(String path,
* Input: {"uri": "s3://my-bucket/my-key"}
* Output: "s3://my-bucket/my-key"
*/
- public static String validateAndGetUri(Map props) throws UserException {
+ public static String validateAndGetUri(Map props) {
if (props.isEmpty()) {
- throw new UserException("props is empty");
+ throw new StoragePropertiesException("props is empty");
}
- if (!props.containsKey(URI_KEY)) {
- throw new UserException("props must contain uri");
+ Optional uriOptional = props.entrySet().stream()
+ .filter(e -> e.getKey().equalsIgnoreCase(URI_KEY))
+ .map(Map.Entry::getValue)
+ .findFirst();
+
+ if (!uriOptional.isPresent()) {
+ throw new StoragePropertiesException("props must contain uri");
}
- return props.get(URI_KEY);
+ return uriOptional.get();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
index 58cdedde869a5e..ae69deea400ba0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/storage/StorageProperties.java
@@ -20,6 +20,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.ConnectionProperties;
import org.apache.doris.datasource.property.ConnectorProperty;
+import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import lombok.Getter;
@@ -35,6 +36,7 @@ public abstract class StorageProperties extends ConnectionProperties {
public static final String FS_HDFS_SUPPORT = "fs.hdfs.support";
public static final String FS_S3_SUPPORT = "fs.s3.support";
public static final String FS_GCS_SUPPORT = "fs.gcs.support";
+ public static final String FS_MINIO_SUPPORT = "fs.minio.support";
public static final String FS_AZURE_SUPPORT = "fs.azure.support";
public static final String FS_OSS_SUPPORT = "fs.oss.support";
public static final String FS_OBS_SUPPORT = "fs.obs.support";
@@ -50,7 +52,9 @@ public enum Type {
OSS,
OBS,
COS,
+ MINIO,
AZURE,
+ BROKER,
UNKNOWN
}
@@ -98,7 +102,7 @@ public static List createAll(Map origProps) t
* @return a StorageProperties instance for the primary storage type
* @throws RuntimeException if no supported storage type is found
*/
- public static StorageProperties createPrimary(Map origProps) throws UserException {
+ public static StorageProperties createPrimary(Map origProps) {
for (Function