Skip to content

Commit

Permalink
Merge pull request #23 from iabetor/master
Browse files Browse the repository at this point in the history
set ua and set customer host
  • Loading branch information
wucheng committed Nov 21, 2022
2 parents 41c647e + 6a2982d commit d35b398
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 44 deletions.
Binary file removed jar/chdfs_hadoop_plugin_network-2.9.jar
Binary file not shown.
Binary file added jar/chdfs_hadoop_plugin_network-3.0.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion jar/jar.md5
Original file line number Diff line number Diff line change
@@ -1 +1 @@
MD5 (chdfs_hadoop_plugin_network-2.9.jar) = 20c993bd6dcfe9d4adfb08038ff1bfa7
MD5 (chdfs_hadoop_plugin_network-3.0.jar) = 7a6ada35fca5e51b6f81af997871c37c
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.qcloud</groupId>
<artifactId>chdfs_hadoop_plugin_network</artifactId>
<version>2.9</version>
<version>3.0</version>
<packaging>jar</packaging>

<name>chdfs_hadoop_plugin_network</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public class CHDFSHadoopFileSystemAdapter extends FileSystemWithLockCleaner {
private static final boolean DEFAULT_CHDFS_META_TRANSFER_USE_TLS = true;
private static final int DEFAULT_CHDFS_META_SERVER_PORT = 443;

public static final String CHDFS_DATA_TRANSFER_DISTINGUISH_HOST = "fs.ofs.data.transfer.distinguish.host";

public static final boolean DEFAULT_CHDFS_DATA_TRANSFER_DISTINGUISH_FLAG = false;

private final CHDFSHadoopFileSystemJarLoader jarLoader = new CHDFSHadoopFileSystemJarLoader();
private FileSystemWithLockCleaner actualImplFS = null;
private URI uri = null;
Expand Down Expand Up @@ -110,8 +114,10 @@ public void initialize(URI name, Configuration conf) throws IOException {
String tmpDirPath = initCacheTmpDir(conf);
boolean jarPluginServerHttpsFlag = isJarPluginServerHttps(conf);
String cosEndPointSuffix = getCosEndPointSuffix(conf);

initJarLoadWithRetry(ofsHost, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttpsFlag, cosEndPointSuffix);
boolean distinguishHost = isDistinguishHost(conf);
log.debug("fs.ofs.data.transfer.distinguish.host: {}", distinguishHost);
initJarLoadWithRetry(ofsHost, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttpsFlag,
cosEndPointSuffix, distinguishHost, networkVersionId);

this.actualImplFS = jarLoader.getActualFileSystem();
if (this.actualImplFS == null) {
Expand Down Expand Up @@ -255,12 +261,19 @@ private boolean isJarPluginServerHttps(Configuration conf) {
return conf.getBoolean(CHDFS_META_TRANSFER_USE_TLS_KEY, DEFAULT_CHDFS_META_TRANSFER_USE_TLS);
}

private boolean isDistinguishHost(Configuration conf) {
return conf.getBoolean(CHDFS_DATA_TRANSFER_DISTINGUISH_HOST, DEFAULT_CHDFS_DATA_TRANSFER_DISTINGUISH_FLAG);
}


private void initJarLoadWithRetry(String mountPointAddr, long appid, int jarPluginServerPort, String tmpDirPath,
boolean jarPluginServerHttps, String cosEndPointSuffix) throws IOException {
boolean jarPluginServerHttps, String cosEndPointSuffix, boolean distinguishHost
, String networkVersionId) throws IOException {
int maxRetry = 5;
for (int retryIndex = 0; retryIndex <= maxRetry; retryIndex++) {
try {
jarLoader.init(mountPointAddr, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttps, cosEndPointSuffix);
jarLoader.init(mountPointAddr, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttps,
cosEndPointSuffix, distinguishHost, networkVersionId);
return;
} catch (IOException e) {
if (retryIndex < maxRetry) {
Expand Down
100 changes: 62 additions & 38 deletions src/main/java/com/qcloud/chdfs/fs/CHDFSHadoopFileSystemJarLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.VersionInfo;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -21,7 +25,6 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLConnection;
import java.net.URLEncoder;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
Expand All @@ -32,20 +35,23 @@ class CHDFSHadoopFileSystemJarLoader {
private static AlreadyLoadedFileSystemInfo alreadyLoadedFileSystemInfo;
private String versionId;
private String jarPath;

private String jarHost;
private String jarMd5;
private FileSystemWithLockCleaner actualFileSystem;

CHDFSHadoopFileSystemJarLoader() {
}

synchronized void init(String mountPointAddr, long appid, int jarPluginServerPort, String tmpDirPath,
boolean jarPluginServerHttps, String cosEndPointSuffix) throws IOException {
boolean jarPluginServerHttps, String cosEndPointSuffix, boolean distinguishHost,
String networkVersionId) throws IOException {
if (this.actualFileSystem == null) {
long queryStartMs = System.currentTimeMillis();
queryJarPluginInfo(mountPointAddr, appid, jarPluginServerPort, jarPluginServerHttps, cosEndPointSuffix);
log.debug("query jar plugin info usedMs: {}", System.currentTimeMillis() - queryStartMs);
this.actualFileSystem = getAlreadyLoadedClassInfo(this.getClass().getClassLoader(), this.jarPath,
this.versionId, this.jarMd5, tmpDirPath);
this.versionId, this.jarMd5, tmpDirPath, this.jarHost, distinguishHost, networkVersionId);
if (this.actualFileSystem == null) {
String errMsg = "CHDFSHadoopFileSystemJarLoader getAlreadyLoadedClassInfo return null";
throw new IOException(errMsg);
Expand Down Expand Up @@ -80,6 +86,7 @@ private void parseJarPluginInfoResp(String respStr, String cosEndPointSuffix) th
log.error(errMsg);
throw new IOException(errMsg);
} else {
this.jarHost = new URL(jarInfoJson.get("JarPath").getAsString()).getAuthority();
if (cosEndPointSuffix != null) {
String jarPath = jarInfoJson.get("JarPath").getAsString();
int dotIndex = jarPath.indexOf('.');
Expand All @@ -95,7 +102,7 @@ private void parseJarPluginInfoResp(String respStr, String cosEndPointSuffix) th
log.error(errMsg);
throw new IOException(errMsg);
}
this.jarPath = jarPath.substring(0, dotIndex+1) + cosEndPointSuffix + jarPath.substring(slashIndex);
this.jarPath = jarPath.substring(0, dotIndex + 1) + cosEndPointSuffix + jarPath.substring(slashIndex);
} else {
this.jarPath = jarInfoJson.get("JarPath").getAsString();
}
Expand All @@ -111,7 +118,7 @@ private void parseJarPluginInfoResp(String respStr, String cosEndPointSuffix) th
}

private void queryJarPluginInfo(String mountPointAddr, long appid, int jarPluginServerPort,
boolean jarPluginServerHttpsFlag, String cosEndPointSuffix) throws IOException {
boolean jarPluginServerHttpsFlag, String cosEndPointSuffix) throws IOException {
String hadoopVersion = VersionInfo.getVersion();
if (hadoopVersion == null) {
hadoopVersion = "unknown";
Expand Down Expand Up @@ -168,7 +175,10 @@ private void queryJarPluginInfo(String mountPointAddr, long appid, int jarPlugin
}

private static synchronized FileSystemWithLockCleaner getAlreadyLoadedClassInfo(ClassLoader currentClassLoader,
String jarPath, String versionId, String jarMd5, String tmpDirPath) throws IOException {
String jarPath, String versionId,
String jarMd5, String tmpDirPath,
String jarHost,
boolean distinguishHost, String networkVersionId) throws IOException {
if (alreadyLoadedFileSystemInfo != null && alreadyLoadedFileSystemInfo.jarPath.equals(jarPath)
&& alreadyLoadedFileSystemInfo.versionId.equals(versionId) && alreadyLoadedFileSystemInfo.jarMd5.equals(
jarMd5)) {
Expand All @@ -182,7 +192,7 @@ private static synchronized FileSystemWithLockCleaner getAlreadyLoadedClassInfo(
}
}

File jarFile = downloadJarPath(jarPath, versionId, jarMd5, tmpDirPath);
File jarFile = downloadJarPath(jarPath, versionId, jarMd5, tmpDirPath, jarHost, distinguishHost, networkVersionId);
URL jarUrl = null;
try {
jarUrl = jarFile.toURI().toURL();
Expand All @@ -205,7 +215,8 @@ private static synchronized FileSystemWithLockCleaner getAlreadyLoadedClassInfo(
}
}

private static File downloadJarPath(String jarPath, String versionId, String jarMd5, String tmpDirPath)
private static File downloadJarPath(String jarPath, String versionId, String jarMd5, String tmpDirPath,
String jarHost, boolean distinguishHost, String networkVersionId)
throws IOException {
File localCacheJarFile = new File(String.format("%s/chdfs_hadoop_plugin-%s-shaded.jar", tmpDirPath, versionId));
File localCacheJarLockFile = new File(
Expand Down Expand Up @@ -268,44 +279,57 @@ private static File downloadJarPath(String jarPath, String versionId, String jar
return localCacheJarFile;
}
}
URL downloadJarUrl = null;
CloseableHttpClient httpclient = null;
CloseableHttpResponse response = null;
HttpGet httpGet = null;
try {
downloadJarUrl = new URL(jarPath);
} catch (MalformedURLException e) {
String errMsg = String.format("invalid download jar url %s", jarPath);
log.error(errMsg, e);
throw new IOException(errMsg, e);
}

try {
URLConnection conn = downloadJarUrl.openConnection();
conn.connect();
bis = new BufferedInputStream(conn.getInputStream());
fos = new BufferedOutputStream(new FileOutputStream(localCacheJarFile));
IOUtils.copyBytes(bis, fos, 4096, true);

// set jar and lock file permission 777
localCacheJarFile.setReadable(true, false);
localCacheJarFile.setWritable(true, false);
localCacheJarFile.setExecutable(true, false);

localCacheJarLockFile.setReadable(true, false);
localCacheJarLockFile.setWritable(true, false);
localCacheJarLockFile.setExecutable(true, false);

bis = null;
fos = null;
httpclient = HttpClients.createDefault();
httpGet = new HttpGet(jarPath);
httpGet.setHeader("User-Agent", String.format("chdfs_hadoop-plugin_network-%s", networkVersionId));
if (distinguishHost) {
httpGet.addHeader("Host", jarHost);
log.debug("host: {} already set", jarHost);
}

fileLock.release();
fileLock = null;
// execute request
response = httpclient.execute(httpGet);
// judge status code == 200
if (response.getStatusLine().getStatusCode() == 200) {
// get content
bis = new BufferedInputStream(response.getEntity().getContent());
fos = new BufferedOutputStream(new FileOutputStream(localCacheJarFile));
IOUtils.copyBytes(bis, fos, 4096, true);

// set jar and lock file permission 777
localCacheJarFile.setReadable(true, false);
localCacheJarFile.setWritable(true, false);
localCacheJarFile.setExecutable(true, false);

localCacheJarLockFile.setReadable(true, false);
localCacheJarLockFile.setWritable(true, false);
localCacheJarLockFile.setExecutable(true, false);
}

fileLockOutPut.close();
fileLockOutPut = null;
httpGet.releaseConnection();
} catch (IOException e) {
httpGet.abort();
String errMsg = String.format("download jar failed, localJarPath: %s",
localCacheJarFile.getAbsolutePath());
log.error(errMsg, e);
throw new IOException(errMsg);
} finally {
if (response != null) {
try {
response.close();
} catch (IOException ignored) {
}
}
if (httpclient != null) {
try {
httpclient.close();
} catch (IOException ignored) {
}
}
}

String md5Hex = getFileHexMd5(localCacheJarFile);
Expand Down

0 comments on commit d35b398

Please sign in to comment.