diff --git a/jar/chdfs_hadoop_plugin_network-2.6.jar b/jar/chdfs_hadoop_plugin_network-2.6.jar deleted file mode 100644 index c51de45..0000000 Binary files a/jar/chdfs_hadoop_plugin_network-2.6.jar and /dev/null differ diff --git a/jar/chdfs_hadoop_plugin_network-2.7.jar b/jar/chdfs_hadoop_plugin_network-2.7.jar new file mode 100644 index 0000000..d307ec2 Binary files /dev/null and b/jar/chdfs_hadoop_plugin_network-2.7.jar differ diff --git a/jar/jar.md5 b/jar/jar.md5 index 3d6f3e5..6e607cc 100644 --- a/jar/jar.md5 +++ b/jar/jar.md5 @@ -1 +1 @@ -MD5 (chdfs_hadoop_plugin_network-2.6.jar) = 135f11c92d99abbe1807d0b749a1fcc9 +MD5 (chdfs_hadoop_plugin_network-2.7.jar) = 1f53b615285aab61537fe47b6e7387f5 diff --git a/pom.xml b/pom.xml index 4e99f75..52a5c1d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.qcloud chdfs_hadoop_plugin_network - 2.6 + 2.7 jar chdfs_hadoop_plugin_network diff --git a/src/main/java/com/qcloud/chdfs/fs/CHDFSHadoopFileSystemAdapter.java b/src/main/java/com/qcloud/chdfs/fs/CHDFSHadoopFileSystemAdapter.java index bfb91fa..b9d0aa8 100644 --- a/src/main/java/com/qcloud/chdfs/fs/CHDFSHadoopFileSystemAdapter.java +++ b/src/main/java/com/qcloud/chdfs/fs/CHDFSHadoopFileSystemAdapter.java @@ -19,7 +19,6 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.hash.Hash; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,21 +27,24 @@ import java.io.IOException; import java.net.URI; import java.util.EnumSet; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.regex.Matcher; import java.util.regex.Pattern; public class CHDFSHadoopFileSystemAdapter extends FileSystemWithLockCleaner { static final String SCHEME = "ofs"; private static final Logger log = LoggerFactory.getLogger(CHDFSHadoopFileSystemAdapter.class); - private static final String MOUNT_POINT_ADDR_PATTERN = + private static final String MOUNT_POINT_ADDR_PATTERN_CHDFS_TYPE = "^([a-zA-Z0-9-]+)\\.chdfs(-dualstack)?(\\.inner)?\\.([a-z0-9-]+)\\.([a-z0-9-.]+)"; + private static final String MOUNT_POINT_ADDR_PATTERN_COS_TYPE = + "^([a-z0-9-]+)-([a-zA-Z0-9]+)$"; private static final String CHDFS_USER_APPID_KEY = "fs.ofs.user.appid"; private static final String CHDFS_TMP_CACHE_DIR_KEY = "fs.ofs.tmp.cache.dir"; private static final String CHDFS_META_SERVER_PORT_KEY = "fs.ofs.meta.server.port"; private static final String CHDFS_META_TRANSFER_USE_TLS_KEY = "fs.ofs.meta.transfer.tls"; + private static final String CHDFS_BUCKET_REGION = "fs.ofs.bucket.region"; private static final boolean DEFAULT_CHDFS_META_TRANSFER_USE_TLS = true; private static final int DEFAULT_CHDFS_META_SERVER_PORT = 443; @@ -66,10 +68,23 @@ public void initialize(URI name, Configuration conf) throws IOException { super.initialize(name, conf); this.setConf(conf); String mountPointAddr = name.getHost(); - if (mountPointAddr == null || !isValidMountPointAddr(mountPointAddr)) { + if (mountPointAddr == null) { + String errMsg = String.format("mountPointAddr is null, fullUri: %s, exp. f4mabcdefgh-xyzw.chdfs" + + ".ap-guangzhou.myqcloud.com or examplebucket-1250000000 or f4mabcdefgh-xyzw", name.toString()); + log.error(errMsg); + throw new IOException(errMsg); + } + + String ofsHost; + if (isValidMountPointAddrChdfsType(mountPointAddr)) { + ofsHost = mountPointAddr; + } else if (isValidMountPointAddrCosType(mountPointAddr)) { + String bucketRegion = getChdfsBucketRegion(conf); + ofsHost = String.format("%s.chdfs.%s.myqcloud.com", mountPointAddr, bucketRegion); + } else { String errMsg = String.format("mountPointAddr %s is invalid, fullUri: %s, exp. f4mabcdefgh-xyzw.chdfs" - + ".ap-guangzhou.myqcloud.com", mountPointAddr == null ? "null" : mountPointAddr, - name.toString()); + + ".ap-guangzhou.myqcloud.com or examplebucket-1250000000 or f4mabcdefgh-xyzw", + mountPointAddr, name.toString()); log.error(errMsg); throw new IOException(errMsg); } @@ -79,7 +94,7 @@ public void initialize(URI name, Configuration conf) throws IOException { String tmpDirPath = initCacheTmpDir(conf); boolean jarPluginServerHttpsFlag = isJarPluginServerHttps(conf); - initJarLoadWithRetry(mountPointAddr, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttpsFlag); + initJarLoadWithRetry(ofsHost, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttpsFlag); this.actualImplFS = jarLoader.getActualFileSystem(); if (this.actualImplFS == null) { @@ -102,8 +117,21 @@ public void initialize(URI name, Configuration conf) throws IOException { log.debug("total init file system, [elapse-ms: {}]", System.currentTimeMillis() - initStartMs); } - boolean isValidMountPointAddr(String mountPointAddr) { - return Pattern.matches(MOUNT_POINT_ADDR_PATTERN, mountPointAddr); + boolean isValidMountPointAddrChdfsType(String mountPointAddr) { + return Pattern.matches(MOUNT_POINT_ADDR_PATTERN_CHDFS_TYPE, mountPointAddr); + } + boolean isValidMountPointAddrCosType(String mountPointAddr) { + return Pattern.matches(MOUNT_POINT_ADDR_PATTERN_COS_TYPE, mountPointAddr); + } + + private String getChdfsBucketRegion(Configuration conf) throws IOException { + String bucketRegion = conf.get(CHDFS_BUCKET_REGION); + if (bucketRegion == null) { + String errMsg = String.format("ofs config %s is missing", CHDFS_BUCKET_REGION); + log.error(errMsg); + throw new IOException(errMsg); + } + return bucketRegion; } private long getAppid(Configuration conf) throws IOException {