Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
d8b34b5
[feat](refactor-param)Integrate New Storage System Support for BACKU…
CalvinKirs Apr 24, 2025
a1f5cd1
remove unless change
CalvinKirs May 6, 2025
aa8e885
remove unless change
CalvinKirs May 6, 2025
622b5d7
fix test
CalvinKirs May 6, 2025
ec0a117
remove unless change
CalvinKirs May 8, 2025
74a5c09
Merge branch 'master' into master-refactor-params-non-ctl
CalvinKirs May 8, 2025
4d59f91
fix
CalvinKirs May 8, 2025
d09ad86
fix
CalvinKirs May 8, 2025
44331e6
fix
CalvinKirs May 8, 2025
e1a29e2
fix
CalvinKirs May 9, 2025
568d54c
fix
CalvinKirs May 9, 2025
2ac9275
fix
CalvinKirs May 9, 2025
b52820a
fix
CalvinKirs May 9, 2025
b167236
fix
CalvinKirs May 9, 2025
6b229b4
fix
CalvinKirs May 9, 2025
12e532c
fix
CalvinKirs May 9, 2025
8b2e2ed
fix
CalvinKirs May 10, 2025
ff64ed7
remove unchanged file
CalvinKirs May 10, 2025
74c6141
remove unchanged file
CalvinKirs May 10, 2025
a84caea
remove unchanged file
CalvinKirs May 10, 2025
5feb9e0
remove unchanged file
CalvinKirs May 10, 2025
425f634
add log
CalvinKirs May 10, 2025
c62318d
add log
CalvinKirs May 11, 2025
b5d4ed6
add log
CalvinKirs May 12, 2025
b7b9382
test
CalvinKirs May 12, 2025
411389b
test
CalvinKirs May 12, 2025
8b23775
fix license
CalvinKirs May 12, 2025
b6401b4
fix license
CalvinKirs May 12, 2025
ffdbb9b
fix test
CalvinKirs May 12, 2025
0d71072
add out file
CalvinKirs May 12, 2025
31e1a6f
add out file
CalvinKirs May 12, 2025
7b128ac
add out file
CalvinKirs May 13, 2025
154e272
add out file
CalvinKirs May 13, 2025
ee187ba
add out file
CalvinKirs May 13, 2025
3c5173c
Merge branch 'master' into master-refactor-params-non-ctl
CalvinKirs May 13, 2025
d542a76
mv to external p0
CalvinKirs May 13, 2025
d6fe3dc
Merge remote-tracking branch 'origin/master-refactor-params-non-ctl' …
CalvinKirs May 13, 2025
bbf5708
delete out file
CalvinKirs May 13, 2025
1421af3
Merge branch 'master' into master-refactor-params-non-ctl
CalvinKirs May 15, 2025
f33a593
Merge branch 'master' into master-refactor-params-non-ctl
CalvinKirs May 16, 2025
4f2d381
fix
CalvinKirs May 16, 2025
2896245
Merge remote-tracking branch 'origin/master-refactor-params-non-ctl' …
CalvinKirs May 16, 2025
deebb9e
fix
CalvinKirs May 16, 2025
f8aec90
Merge branch 'master' into master-refactor-params-non-ctl
CalvinKirs May 19, 2025
f065783
upgrade maven plugin memory
CalvinKirs May 19, 2025
e0164b3
Merge branch 'master' into master-refactor-params-non-ctl
CalvinKirs May 19, 2025
9299df0
commit
CalvinKirs May 20, 2025
1b9dd83
Merge branch 'master' into master-refactor-params-non-ctl
CalvinKirs May 20, 2025
6f23a8b
commit
CalvinKirs May 20, 2025
e8248df
Merge remote-tracking branch 'origin/master-refactor-params-non-ctl' …
CalvinKirs May 20, 2025
d1d7033
commit
CalvinKirs May 20, 2025
cf86a9b
commit
CalvinKirs May 20, 2025
2b7ec44
commit
CalvinKirs May 20, 2025
3290d94
commit
CalvinKirs May 20, 2025
0d92811
Merge remote-tracking branch 'refs/remotes/upstream/master' into mast…
CalvinKirs May 21, 2025
fa97519
commit
CalvinKirs May 21, 2025
5ec515c
refactor test
CalvinKirs May 21, 2025
71ff1e0
fix
CalvinKirs May 22, 2025
838623a
fix some duplicate code
morningman May 22, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ under the License.
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>com.google.re2j</groupId>
<artifactId>re2j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
Expand Down
63 changes: 41 additions & 22 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.datasource.property.storage.exception.StoragePropertiesException;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TFileType;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -53,6 +55,7 @@ public class BrokerDesc extends StorageDesc implements Writable {
// just for multi load
public static final String MULTI_LOAD_BROKER = "__DORIS_MULTI_LOAD_BROKER__";
public static final String MULTI_LOAD_BROKER_BACKEND_KEY = "__DORIS_MULTI_LOAD_BROKER_BACKEND__";
@Deprecated
@SerializedName("cts3")
private boolean convertedToS3 = false;

Expand All @@ -75,42 +78,56 @@ public BrokerDesc(String name, Map<String, String> properties) {
if (properties != null) {
this.properties.putAll(properties);
}
// Assume the storage type is BROKER by default
// If it's a multi-load broker, override the storage type to LOCAL
if (isMultiLoadBroker()) {
this.storageType = StorageBackend.StorageType.LOCAL;
} else {
this.storageType = StorageBackend.StorageType.BROKER;
}
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, this.storageType);
if (this.convertedToS3) {
this.storageType = StorageBackend.StorageType.S3;

// Try to determine the actual storage type from properties if available
if (MapUtils.isNotEmpty(this.properties)) {
try {
// Create primary storage properties from the given configuration
this.storageProperties = StorageProperties.createPrimary(this.properties);
// Override the storage type based on property configuration
this.storageType = StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
} catch (StoragePropertiesException e) {
// Currently ignored: these properties might be broker-specific.
// Support for broker properties will be added in the future.
LOG.info("Failed to create storage properties for broker: {}, properties: {}", name, properties, e);
}
}
//only storage type is broker
if (StringUtils.isBlank(this.name) && (this.getStorageType() != StorageType.BROKER)) {
this.name = this.storageType().name();
}
}

public BrokerDesc(String name, StorageBackend.StorageType storageType, Map<String, String> properties) {
this.name = name;
this.properties = Maps.newHashMap();
this.storageType = storageType;
if (properties != null) {
this.properties.putAll(properties);
}
this.storageType = storageType;
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, this.storageType);
if (this.convertedToS3) {
this.storageType = StorageBackend.StorageType.S3;
if (MapUtils.isNotEmpty(this.properties) && StorageType.REFACTOR_STORAGE_TYPES.contains(storageType)) {
this.storageProperties = StorageProperties.createPrimary(properties);
}

}

public String getFileLocation(String location) {
return this.convertedToS3 ? BosProperties.convertPathToS3(location) : location;
public String getFileLocation(String location) throws UserException {
return (null != storageProperties) ? storageProperties.validateAndNormalizeUri(location) : location;
}

public static BrokerDesc createForStreamLoad() {
return new BrokerDesc("", StorageType.STREAM, null);
}

public boolean isMultiLoadBroker() {
return this.name.equalsIgnoreCase(MULTI_LOAD_BROKER);
return StringUtils.isNotBlank(this.name) && this.name.equalsIgnoreCase(MULTI_LOAD_BROKER);
}

public TFileType getFileType() {
Expand Down Expand Up @@ -150,16 +167,18 @@ public void readFields(DataInput in) throws IOException {
final String val = Text.readString(in);
properties.put(key, val);
}
StorageBackend.StorageType st = StorageBackend.StorageType.BROKER;
String typeStr = properties.remove(PersistentFileSystem.STORAGE_TYPE);
if (typeStr != null) {
if (MapUtils.isNotEmpty(properties)) {
try {
st = StorageBackend.StorageType.valueOf(typeStr);
} catch (IllegalArgumentException e) {
LOG.warn("set to BROKER, because of exception", e);
this.storageProperties = StorageProperties.createPrimary(properties);
this.storageType = StorageBackend.StorageType.valueOf(storageProperties.getStorageName());
} catch (RuntimeException e) {
// Currently ignored: these properties might be broker-specific.
// Support for broker properties will be added in the future.
LOG.warn("Failed to create storage properties for broker: {}, properties: {}", name, properties, e);
this.storageType = StorageBackend.StorageType.BROKER;
}

}
storageType = st;
}

public static BrokerDesc read(DataInput in) throws IOException {
Expand Down
100 changes: 13 additions & 87 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,21 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.cloud.proto.Cloud.ObjectStoreInfoPB;
import org.apache.doris.cloud.security.SecurityChecker;
import org.apache.doris.cloud.storage.RemoteBase;
import org.apache.doris.cloud.storage.RemoteBase.ObjectInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.AzureProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.datasource.property.storage.ObjectStorageProperties;
import org.apache.doris.fsv2.FileSystemFactory;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileType;

import com.google.common.base.Function;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -102,15 +100,11 @@ public class LoadStmt extends DdlStmt implements NotFallbackInParser {
// deprecated, keeping this property to make LoadStmt#checkProperties() happy
public static final String USE_NEW_LOAD_SCAN_NODE = "use_new_load_scan_node";

// for load data from Baidu Object Store(BOS)
// for load data from Baidu Object Store(BOS) todo wait new property support
public static final String BOS_ENDPOINT = "bos_endpoint";
public static final String BOS_ACCESSKEY = "bos_accesskey";
public static final String BOS_SECRET_ACCESSKEY = "bos_secret_accesskey";

// for S3 load check
public static final List<String> PROVIDERS =
new ArrayList<>(Arrays.asList("cos", "oss", "s3", "obs", "bos", "azure"));

// mini load params
public static final String KEY_IN_PARAM_COLUMNS = "columns";
public static final String KEY_IN_PARAM_SET = "set";
Expand Down Expand Up @@ -454,8 +448,6 @@ public void analyze(Analyzer analyzer) throws UserException {
for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
String location = brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
dataDescription.getFilePaths().set(i, location);
StorageBackend.checkPath(dataDescription.getFilePaths().get(i),
brokerDesc.getStorageType(), "DATA INFILE must be specified.");
dataDescription.getFilePaths().set(i, dataDescription.getFilePaths().get(i));
}
}
Expand Down Expand Up @@ -522,27 +514,6 @@ public void analyze(Analyzer analyzer) throws UserException {
user = ConnectContext.get().getQualifiedUser();
}


private String getProviderFromEndpoint() {
Map<String, String> properties = brokerDesc.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().equalsIgnoreCase(S3Properties.PROVIDER)) {
// S3 Provider properties should be case insensitive.
return entry.getValue().toUpperCase();
}
}
return S3Properties.S3_PROVIDER;
}

private String getBucketFromFilePath(String filePath) throws Exception {
String[] parts = filePath.split("\\/\\/");
if (parts.length < 2) {
throw new Exception("filePath is not valid");
}
String buckt = parts[1].split("\\/")[0];
return buckt;
}

public String getComment() {
return comment;
}
Expand Down Expand Up @@ -630,21 +601,17 @@ private void checkEndpoint(String endpoint) throws UserException {
}

public void checkS3Param() throws UserException {
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
if (brokerDescProperties.containsKey(S3Properties.Env.ENDPOINT)
&& brokerDescProperties.containsKey(S3Properties.Env.ACCESS_KEY)
&& brokerDescProperties.containsKey(S3Properties.Env.SECRET_KEY)
&& brokerDescProperties.containsKey(S3Properties.Env.REGION)) {
String endpoint = brokerDescProperties.get(S3Properties.Env.ENDPOINT);
endpoint = endpoint.replaceFirst("^http://", "");
endpoint = endpoint.replaceFirst("^https://", "");
brokerDescProperties.put(S3Properties.Env.ENDPOINT, endpoint);
if (brokerDesc.getFileType() != null && brokerDesc.getFileType().equals(TFileType.FILE_S3)) {

ObjectStorageProperties storageProperties = (ObjectStorageProperties) brokerDesc.getStorageProperties();
String endpoint = storageProperties.getEndpoint();
checkEndpoint(endpoint);
checkWhiteList(endpoint);
if (AzureProperties.checkAzureProviderPropertyExist(brokerDescProperties)) {
return;
//should add connectivity test
boolean connectivityTest = FileSystemFactory.get(brokerDesc.getStorageProperties()).connectivityTest();
if (!connectivityTest) {
throw new UserException("Failed to access object storage, message=connectivity test failed");
}
checkEndpoint(endpoint);
checkAkSk();
}
}

Expand All @@ -657,47 +624,6 @@ public void checkWhiteList(String endpoint) throws UserException {
}
}

private void checkAkSk() throws UserException {
RemoteBase remote = null;
ObjectInfo objectInfo = null;
String curFile = null;
try {
Map<String, String> brokerDescProperties = brokerDesc.getProperties();
String provider = getProviderFromEndpoint();
for (DataDescription dataDescription : dataDescriptions) {
for (String filePath : dataDescription.getFilePaths()) {
curFile = filePath;
String bucket = getBucketFromFilePath(filePath);
objectInfo = new ObjectInfo(ObjectStoreInfoPB.Provider.valueOf(provider.toUpperCase()),
brokerDescProperties.get(S3Properties.Env.ACCESS_KEY),
brokerDescProperties.get(S3Properties.Env.SECRET_KEY),
bucket, brokerDescProperties.get(S3Properties.Env.ENDPOINT),
brokerDescProperties.get(S3Properties.Env.REGION), "");
remote = RemoteBase.newInstance(objectInfo);
// RemoteBase#headObject does not throw exception if key does not exist.
remote.headObject("1");
remote.listObjects(null);
remote.close();
}
}
} catch (Exception e) {
LOG.warn("Failed to access object storage, file={}, proto={}, err={}", curFile, objectInfo, e.toString());
String msg;
if (e instanceof UserException) {
msg = ((UserException) e).getDetailMessage();
} else {
msg = e.getMessage();
}
throw new UserException(InternalErrorCode.GET_REMOTE_DATA_ERROR,
"Failed to access object storage, message=" + msg, e);
} finally {
if (remote != null) {
remote.close();
}
}

}

@Override
public StmtType stmtType() {
return StmtType.LOAD;
Expand Down
Loading
Loading