Skip to content

Commit

Permalink
support config fs.ofs.data.transfer.endpoint.suffix
Browse files Browse the repository at this point in the history
  • Loading branch information
henryswang committed Mar 25, 2022
1 parent a5d8a57 commit bf1fe82
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 11 deletions.
Binary file removed jar/chdfs_hadoop_plugin_network-2.7.jar
Binary file not shown.
Binary file added jar/chdfs_hadoop_plugin_network-2.8.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion jar/jar.md5
Original file line number Diff line number Diff line change
@@ -1 +1 @@
MD5 (chdfs_hadoop_plugin_network-2.7.jar) = 1f53b615285aab61537fe47b6e7387f5
MD5 (chdfs_hadoop_plugin_network-2.8.jar) = c64595ef603b045e296ee597d4219ad2
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.qcloud</groupId>
<artifactId>chdfs_hadoop_plugin_network</artifactId>
<version>2.7</version>
<version>2.8</version>
<packaging>jar</packaging>

<name>chdfs_hadoop_plugin_network</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class CHDFSHadoopFileSystemAdapter extends FileSystemWithLockCleaner {
private static final String CHDFS_META_SERVER_PORT_KEY = "fs.ofs.meta.server.port";
private static final String CHDFS_META_TRANSFER_USE_TLS_KEY = "fs.ofs.meta.transfer.tls";
private static final String CHDFS_BUCKET_REGION = "fs.ofs.bucket.region";
private static final String COS_ENDPOINT_SUFFIX = "fs.ofs.data.transfer.endpoint.suffix";
private static final boolean DEFAULT_CHDFS_META_TRANSFER_USE_TLS = true;
private static final int DEFAULT_CHDFS_META_SERVER_PORT = 443;

Expand Down Expand Up @@ -93,8 +94,9 @@ public void initialize(URI name, Configuration conf) throws IOException {
int jarPluginServerPort = getJarPluginServerPort(conf);
String tmpDirPath = initCacheTmpDir(conf);
boolean jarPluginServerHttpsFlag = isJarPluginServerHttps(conf);
String cosEndPointSuffix = getCosEndPointSuffix(conf);

initJarLoadWithRetry(ofsHost, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttpsFlag);
initJarLoadWithRetry(ofsHost, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttpsFlag, cosEndPointSuffix);

this.actualImplFS = jarLoader.getActualFileSystem();
if (this.actualImplFS == null) {
Expand Down Expand Up @@ -124,6 +126,10 @@ boolean isValidMountPointAddrCosType(String mountPointAddr) {
return Pattern.matches(MOUNT_POINT_ADDR_PATTERN_COS_TYPE, mountPointAddr);
}

private String getCosEndPointSuffix(Configuration conf ) throws IOException {
return conf.get(COS_ENDPOINT_SUFFIX);
}

private String getChdfsBucketRegion(Configuration conf) throws IOException {
String bucketRegion = conf.get(CHDFS_BUCKET_REGION);
if (bucketRegion == null) {
Expand Down Expand Up @@ -207,11 +213,11 @@ private boolean isJarPluginServerHttps(Configuration conf) {
}

private void initJarLoadWithRetry(String mountPointAddr, long appid, int jarPluginServerPort, String tmpDirPath,
boolean jarPluginServerHttps) throws IOException {
boolean jarPluginServerHttps, String cosEndPointSuffix) throws IOException {
int maxRetry = 5;
for (int retryIndex = 0; retryIndex <= maxRetry; retryIndex++) {
try {
jarLoader.init(mountPointAddr, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttps);
jarLoader.init(mountPointAddr, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttps, cosEndPointSuffix);
return;
} catch (IOException e) {
if (retryIndex < maxRetry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class CHDFSHadoopFileSystemJarLoader {
}

synchronized void init(String mountPointAddr, long appid, int jarPluginServerPort, String tmpDirPath,
boolean jarPluginServerHttps) throws IOException {
boolean jarPluginServerHttps, String cosEndPointSuffix) throws IOException {
if (this.actualFileSystem == null) {
long queryStartMs = System.currentTimeMillis();
queryJarPluginInfo(mountPointAddr, appid, jarPluginServerPort, jarPluginServerHttps);
queryJarPluginInfo(mountPointAddr, appid, jarPluginServerPort, jarPluginServerHttps, cosEndPointSuffix);
log.debug("query jar plugin info usedMs: {}", System.currentTimeMillis() - queryStartMs);
this.actualFileSystem = getAlreadyLoadedClassInfo(this.getClass().getClassLoader(), this.jarPath,
this.versionId, this.jarMd5, tmpDirPath);
Expand All @@ -53,7 +53,7 @@ synchronized void init(String mountPointAddr, long appid, int jarPluginServerPor
}
}

private void parseJarPluginInfoResp(String respStr) throws IOException {
private void parseJarPluginInfoResp(String respStr, String cosEndPointSuffix) throws IOException {
JsonObject respJson = new JsonParser().parse(respStr).getAsJsonObject();
if (!respJson.has("Response")) {
String errMsg = String.format("resp json miss element Response, resp: %s", respStr);
Expand All @@ -80,7 +80,25 @@ private void parseJarPluginInfoResp(String respStr) throws IOException {
log.error(errMsg);
throw new IOException(errMsg);
} else {
this.jarPath = jarInfoJson.get("JarPath").getAsString();
if (cosEndPointSuffix != null) {
String jarPath = jarInfoJson.get("JarPath").getAsString();
int dotIndex = jarPath.indexOf('.');
if (dotIndex == -1) {
String errMsg = String.format("invalid jar path : %s", jarPath);
log.error(errMsg);
throw new IOException(errMsg);
}

int slashIndex = jarPath.indexOf('/', dotIndex);
if (slashIndex == -1) {
String errMsg = String.format("invalid jar path : %s", jarPath);
log.error(errMsg);
throw new IOException(errMsg);
}
this.jarPath = jarPath.substring(0, dotIndex+1) + cosEndPointSuffix + jarPath.substring(slashIndex);
} else {
this.jarPath = jarInfoJson.get("JarPath").getAsString();
}
}

if (!jarInfoJson.has("JarMd5")) {
Expand All @@ -93,7 +111,7 @@ private void parseJarPluginInfoResp(String respStr) throws IOException {
}

private void queryJarPluginInfo(String mountPointAddr, long appid, int jarPluginServerPort,
boolean jarPluginServerHttpsFlag) throws IOException {
boolean jarPluginServerHttpsFlag, String cosEndPointSuffix) throws IOException {
String hadoopVersion = VersionInfo.getVersion();
if (hadoopVersion == null) {
hadoopVersion = "unknown";
Expand Down Expand Up @@ -130,7 +148,7 @@ private void queryJarPluginInfo(String mountPointAddr, long appid, int jarPlugin
bos.write(buf, 0, readLen);
}
String respStr = bos.toString();
parseJarPluginInfoResp(respStr);
parseJarPluginInfoResp(respStr, cosEndPointSuffix);
} catch (IOException e) {
String errMsg = "queryJarPluginInfo occur an io exception";
log.error(errMsg, e);
Expand Down

0 comments on commit bf1fe82

Please sign in to comment.