Skip to content

Commit

Permalink
[bigfix][S3 File]:Change the [SCHEMA] attribute of the [S3CONF class]…
Browse files Browse the repository at this point in the history
… to be non-static to avoid being reassigned after deserialization (#6717)
  • Loading branch information
LeonYoah authored Apr 17, 2024
1 parent 2811bfe commit 79bb701
Showing 1 changed file with 13 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class S3Conf extends HadoopConf {
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
private static final String S3A_SCHEMA = "s3a";
private static final String DEFAULT_SCHEMA = "s3n";
private static String SCHEMA = DEFAULT_SCHEMA;
private String schema = DEFAULT_SCHEMA;

@Override
public String getFsHdfsImpl() {
Expand All @@ -40,7 +40,11 @@ public String getFsHdfsImpl() {

@Override
public String getSchema() {
return SCHEMA;
return this.schema;
}

public void setSchema(String schema) {
this.schema = schema;
}

private S3Conf(String hdfsNameKey) {
Expand All @@ -49,13 +53,13 @@ private S3Conf(String hdfsNameKey) {

public static HadoopConf buildWithConfig(Config config) {

HadoopConf hadoopConf = new S3Conf(config.getString(S3ConfigOptions.S3_BUCKET.key()));
String bucketName = config.getString(S3ConfigOptions.S3_BUCKET.key());
S3Conf hadoopConf = new S3Conf(bucketName);
if (bucketName.startsWith(S3A_SCHEMA)) {
SCHEMA = S3A_SCHEMA;
hadoopConf.setSchema(S3A_SCHEMA);
}
HashMap<String, String> s3Options = new HashMap<>();
putS3SK(s3Options, config);
hadoopConf.putS3SK(s3Options, config);
if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
.forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
Expand All @@ -73,45 +77,26 @@ public static HadoopConf buildWithConfig(Config config) {

public static HadoopConf buildWithReadOnlyConfig(ReadonlyConfig readonlyConfig) {
Config config = readonlyConfig.toConfig();
HadoopConf hadoopConf = new S3Conf(readonlyConfig.get(S3ConfigOptions.S3_BUCKET));
String bucketName = readonlyConfig.get(S3ConfigOptions.S3_BUCKET);
if (bucketName.startsWith(S3A_SCHEMA)) {
SCHEMA = S3A_SCHEMA;
}
HashMap<String, String> s3Options = new HashMap<>();
putS3SK(s3Options, config);
if (CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_PROPERTIES.key())) {
config.getObject(S3ConfigOptions.S3_PROPERTIES.key())
.forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
}

s3Options.put(
S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
readonlyConfig.get(S3ConfigOptions.S3A_AWS_CREDENTIALS_PROVIDER).getProvider());
s3Options.put(
S3ConfigOptions.FS_S3A_ENDPOINT.key(),
readonlyConfig.get(S3ConfigOptions.FS_S3A_ENDPOINT));
hadoopConf.setExtraOptions(s3Options);
return hadoopConf;
return buildWithConfig(config);
}

private String switchHdfsImpl() {
switch (SCHEMA) {
switch (this.schema) {
case S3A_SCHEMA:
return HDFS_S3A_IMPL;
default:
return HDFS_S3N_IMPL;
}
}

private static void putS3SK(Map<String, String> s3Options, Config config) {
private void putS3SK(Map<String, String> s3Options, Config config) {
if (!CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_ACCESS_KEY.key())
&& !CheckConfigUtil.isValidParam(config, S3ConfigOptions.S3_SECRET_KEY.key())) {
return;
}
String accessKey = config.getString(S3ConfigOptions.S3_ACCESS_KEY.key());
String secretKey = config.getString(S3ConfigOptions.S3_SECRET_KEY.key());
if (S3A_SCHEMA.equals(SCHEMA)) {
if (S3A_SCHEMA.equals(this.schema)) {
s3Options.put("fs.s3a.access.key", accessKey);
s3Options.put("fs.s3a.secret.key", secretKey);
return;
Expand Down

0 comments on commit 79bb701

Please sign in to comment.