Skip to content

Commit

Permalink
COS bucket:support ofs://bucketname-appid/
Browse files Browse the repository at this point in the history
  • Loading branch information
henryswang committed Feb 28, 2022
1 parent 2d2faab commit aedb742
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 11 deletions.
Binary file removed jar/chdfs_hadoop_plugin_network-2.6.jar
Binary file not shown.
Binary file added jar/chdfs_hadoop_plugin_network-2.7.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion jar/jar.md5
Original file line number Diff line number Diff line change
@@ -1 +1 @@
MD5 (chdfs_hadoop_plugin_network-2.6.jar) = 135f11c92d99abbe1807d0b749a1fcc9
MD5 (chdfs_hadoop_plugin_network-2.7.jar) = 1f53b615285aab61537fe47b6e7387f5
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.qcloud</groupId>
<artifactId>chdfs_hadoop_plugin_network</artifactId>
<version>2.6</version>
<version>2.7</version>
<packaging>jar</packaging>

<name>chdfs_hadoop_plugin_network</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.hash.Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,21 +27,24 @@
import java.io.IOException;
import java.net.URI;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class CHDFSHadoopFileSystemAdapter extends FileSystemWithLockCleaner {
static final String SCHEME = "ofs";
private static final Logger log = LoggerFactory.getLogger(CHDFSHadoopFileSystemAdapter.class);
private static final String MOUNT_POINT_ADDR_PATTERN =
private static final String MOUNT_POINT_ADDR_PATTERN_CHDFS_TYPE =
"^([a-zA-Z0-9-]+)\\.chdfs(-dualstack)?(\\.inner)?\\.([a-z0-9-]+)\\.([a-z0-9-.]+)";
private static final String MOUNT_POINT_ADDR_PATTERN_COS_TYPE =
"^([a-z0-9-]+)-([a-zA-Z0-9]+)$";
private static final String CHDFS_USER_APPID_KEY = "fs.ofs.user.appid";
private static final String CHDFS_TMP_CACHE_DIR_KEY = "fs.ofs.tmp.cache.dir";
private static final String CHDFS_META_SERVER_PORT_KEY = "fs.ofs.meta.server.port";
private static final String CHDFS_META_TRANSFER_USE_TLS_KEY = "fs.ofs.meta.transfer.tls";
private static final String CHDFS_BUCKET_REGION = "fs.ofs.bucket.region";
private static final boolean DEFAULT_CHDFS_META_TRANSFER_USE_TLS = true;
private static final int DEFAULT_CHDFS_META_SERVER_PORT = 443;

Expand All @@ -66,10 +68,23 @@ public void initialize(URI name, Configuration conf) throws IOException {
super.initialize(name, conf);
this.setConf(conf);
String mountPointAddr = name.getHost();
if (mountPointAddr == null || !isValidMountPointAddr(mountPointAddr)) {
if (mountPointAddr == null) {
String errMsg = String.format("mountPointAddr is null, fullUri: %s, exp. f4mabcdefgh-xyzw.chdfs"
+ ".ap-guangzhou.myqcloud.com or examplebucket-1250000000 or f4mabcdefgh-xyzw", name.toString());
log.error(errMsg);
throw new IOException(errMsg);
}

String ofsHost;
if (isValidMountPointAddrChdfsType(mountPointAddr)) {
ofsHost = mountPointAddr;
} else if (isValidMountPointAddrCosType(mountPointAddr)) {
String bucketRegion = getChdfsBucketRegion(conf);
ofsHost = String.format("%s.chdfs.%s.myqcloud.com", mountPointAddr, bucketRegion);
} else {
String errMsg = String.format("mountPointAddr %s is invalid, fullUri: %s, exp. f4mabcdefgh-xyzw.chdfs"
+ ".ap-guangzhou.myqcloud.com", mountPointAddr == null ? "null" : mountPointAddr,
name.toString());
+ ".ap-guangzhou.myqcloud.com or examplebucket-1250000000 or f4mabcdefgh-xyzw",
mountPointAddr, name.toString());
log.error(errMsg);
throw new IOException(errMsg);
}
Expand All @@ -79,7 +94,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
String tmpDirPath = initCacheTmpDir(conf);
boolean jarPluginServerHttpsFlag = isJarPluginServerHttps(conf);

initJarLoadWithRetry(mountPointAddr, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttpsFlag);
initJarLoadWithRetry(ofsHost, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttpsFlag);

this.actualImplFS = jarLoader.getActualFileSystem();
if (this.actualImplFS == null) {
Expand All @@ -102,8 +117,21 @@ public void initialize(URI name, Configuration conf) throws IOException {
log.debug("total init file system, [elapse-ms: {}]", System.currentTimeMillis() - initStartMs);
}

boolean isValidMountPointAddr(String mountPointAddr) {
return Pattern.matches(MOUNT_POINT_ADDR_PATTERN, mountPointAddr);
boolean isValidMountPointAddrChdfsType(String mountPointAddr) {
return Pattern.matches(MOUNT_POINT_ADDR_PATTERN_CHDFS_TYPE, mountPointAddr);
}
boolean isValidMountPointAddrCosType(String mountPointAddr) {
return Pattern.matches(MOUNT_POINT_ADDR_PATTERN_COS_TYPE, mountPointAddr);
}

private String getChdfsBucketRegion(Configuration conf) throws IOException {
String bucketRegion = conf.get(CHDFS_BUCKET_REGION);
if (bucketRegion == null) {
String errMsg = String.format("ofs config %s is missing", CHDFS_BUCKET_REGION);
log.error(errMsg);
throw new IOException(errMsg);
}
return bucketRegion;
}

private long getAppid(Configuration conf) throws IOException {
Expand Down

0 comments on commit aedb742

Please sign in to comment.