Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support l5 #25

Open
wants to merge 1 commit into
base: feature-support-l5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
chdfs_hadoop_plugin_network.iml
.idea/
target/
.DS_Store
.DS_Store
dependency-reduced-pom.xml
Binary file removed jar/chdfs_hadoop_plugin_network-3.0.jar
Binary file not shown.
Binary file added jar/chdfs_hadoop_plugin_network-3.1.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion jar/jar.md5
Original file line number Diff line number Diff line change
@@ -1 +1 @@
MD5 (chdfs_hadoop_plugin_network-3.0.jar) = 7a6ada35fca5e51b6f81af997871c37c
MD5 (chdfs_hadoop_plugin_network-3.1.jar) = 9de3e5ab112ce8a07e30cb4a2d66b34d
49 changes: 47 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.qcloud</groupId>
<artifactId>chdfs_hadoop_plugin_network</artifactId>
<version>3.0</version>
<version>3.1</version>
<packaging>jar</packaging>

<name>chdfs_hadoop_plugin_network</name>
Expand All @@ -31,7 +31,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<hadoop.version>2.8.5</hadoop.version>
<hadoop.version>2.7.3</hadoop.version>
<maven-shade-plugin.version>3.4.0</maven-shade-plugin.version>
</properties>

<scm>
Expand All @@ -58,6 +59,25 @@
<version>4.13.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.tencent.jungle</groupId>
<artifactId>jungle-udp-l5</artifactId>
<version>1.0.0-20190917.073536-2</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.4</version>
</dependency>
</dependencies>

<distributionManagement>
Expand Down Expand Up @@ -126,6 +146,31 @@
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<!-- <shadedArtifactAttached>true</shadedArtifactAttached>
<outputDirectory>${project.basedir}/script_conf/jar</outputDirectory>
-->
<relocations>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>chdfs.${project.version}.org.apache.http</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
Expand Down
102 changes: 63 additions & 39 deletions src/main/java/com/qcloud/chdfs/fs/CHDFSHadoopFileSystemAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,24 +39,7 @@
public class CHDFSHadoopFileSystemAdapter extends FileSystemWithLockCleaner {
static final String SCHEME = "ofs";
private static final Logger log = LoggerFactory.getLogger(CHDFSHadoopFileSystemAdapter.class);
private static final String MOUNT_POINT_ADDR_PATTERN_CHDFS_TYPE =
"^([a-zA-Z0-9-]+)\\.chdfs(-dualstack)?(\\.inner)?\\.([a-z0-9-]+)\\.([a-z0-9-.]+)";
private static final String MOUNT_POINT_ADDR_PATTERN_COS_TYPE =
"^([a-z0-9-]+)-([a-zA-Z0-9]+)$";
private static final String CHDFS_USER_APPID_KEY = "fs.ofs.user.appid";
private static final String CHDFS_TMP_CACHE_DIR_KEY = "fs.ofs.tmp.cache.dir";
private static final String CHDFS_META_SERVER_PORT_KEY = "fs.ofs.meta.server.port";
private static final String CHDFS_META_TRANSFER_USE_TLS_KEY = "fs.ofs.meta.transfer.tls";
private static final String CHDFS_BUCKET_REGION = "fs.ofs.bucket.region";
private static final String COS_ENDPOINT_SUFFIX = "fs.ofs.data.transfer.endpoint.suffix";

private static final String CHDFS_META_ENDPOINT_SUFFIX_KEY = "fs.ofs.meta.endpoint.suffix";
private static final boolean DEFAULT_CHDFS_META_TRANSFER_USE_TLS = true;
private static final int DEFAULT_CHDFS_META_SERVER_PORT = 443;

public static final String CHDFS_DATA_TRANSFER_DISTINGUISH_HOST = "fs.ofs.data.transfer.distinguish.host";

public static final boolean DEFAULT_CHDFS_DATA_TRANSFER_DISTINGUISH_FLAG = false;

private final CHDFSHadoopFileSystemJarLoader jarLoader = new CHDFSHadoopFileSystemJarLoader();
private FileSystemWithLockCleaner actualImplFS = null;
Expand Down Expand Up @@ -93,7 +76,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
if (!metaEndpointSuffix.isEmpty()) {
ofsHost = mountPointAddr + "." + metaEndpointSuffix;
// force close tls
conf.setBoolean(CHDFS_META_TRANSFER_USE_TLS_KEY, false);
conf.setBoolean(ConfigKey.CHDFS_META_TRANSFER_USE_TLS_KEY, false);
} else {
String bucketRegion = getChdfsBucketRegion(conf);
ofsHost = String.format("%s.chdfs.%s.myqcloud.com", mountPointAddr, bucketRegion);
Expand All @@ -113,11 +96,20 @@ public void initialize(URI name, Configuration conf) throws IOException {
int jarPluginServerPort = getJarPluginServerPort(conf);
String tmpDirPath = initCacheTmpDir(conf);
boolean jarPluginServerHttpsFlag = isJarPluginServerHttps(conf);

boolean useL5 = isUseL5(conf);
String metaL5Address = "";
String cosL5Address = "";
if (useL5) {
metaL5Address = getMetaL5Address(conf);
cosL5Address = getCosL5Address(conf);
}

String cosEndPointSuffix = getCosEndPointSuffix(conf);
boolean distinguishHost = isDistinguishHost(conf);
log.debug("fs.ofs.data.transfer.distinguish.host: {}", distinguishHost);
initJarLoadWithRetry(ofsHost, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttpsFlag,
cosEndPointSuffix, distinguishHost, networkVersionId);
cosEndPointSuffix, distinguishHost, networkVersionId, useL5, metaL5Address, cosL5Address);

this.actualImplFS = jarLoader.getActualFileSystem();
if (this.actualImplFS == null) {
Expand All @@ -141,29 +133,29 @@ public void initialize(URI name, Configuration conf) throws IOException {
}

boolean isValidMountPointAddrChdfsType(String mountPointAddr) {
return Pattern.matches(MOUNT_POINT_ADDR_PATTERN_CHDFS_TYPE, mountPointAddr);
return Pattern.matches(ConfigKey.MOUNT_POINT_ADDR_PATTERN_CHDFS_TYPE, mountPointAddr);
}

boolean isValidMountPointAddrCosType(String mountPointAddr) {
return Pattern.matches(MOUNT_POINT_ADDR_PATTERN_COS_TYPE, mountPointAddr);
return Pattern.matches(ConfigKey.MOUNT_POINT_ADDR_PATTERN_COS_TYPE, mountPointAddr);
}

private String getCosEndPointSuffix(Configuration conf) throws IOException {
return conf.get(COS_ENDPOINT_SUFFIX);
return conf.get(ConfigKey.COS_ENDPOINT_SUFFIX);
}

private String getChdfsBucketRegion(Configuration conf) throws IOException {
String bucketRegion = conf.get(CHDFS_BUCKET_REGION);
String bucketRegion = conf.get(ConfigKey.CHDFS_BUCKET_REGION);
if (bucketRegion == null) {
String errMsg = String.format("ofs config %s is missing", CHDFS_BUCKET_REGION);
String errMsg = String.format("ofs config %s is missing", ConfigKey.CHDFS_BUCKET_REGION);
log.error(errMsg);
throw new IOException(errMsg);
}
return bucketRegion;
}

private String getMetaEndpointSuffix(Configuration conf) throws IOException {
return initStringValue(conf, CHDFS_META_ENDPOINT_SUFFIX_KEY, "").toLowerCase();
return initStringValue(conf, ConfigKey.CHDFS_META_ENDPOINT_SUFFIX_KEY, "").toLowerCase();
}

private String initStringValue(Configuration conf, String configKey, String defaultValue)
Expand Down Expand Up @@ -192,30 +184,58 @@ private String initStringValue(Configuration conf, String configKey, String defa
private long getAppid(Configuration conf) throws IOException {
long appid = 0;
try {
appid = conf.getLong(CHDFS_USER_APPID_KEY, 0);
appid = conf.getLong(ConfigKey.CHDFS_USER_APPID_KEY, 0);
} catch (NumberFormatException e) {
throw new IOException(String.format("config for %s is invalid appid number", CHDFS_USER_APPID_KEY));
throw new IOException(String.format("config for %s is invalid appid number", ConfigKey.CHDFS_USER_APPID_KEY));
}
if (appid <= 0) {
throw new IOException(
String.format("config for %s is missing or invalid appid number", CHDFS_USER_APPID_KEY));
String.format("config for %s is missing or invalid appid number", ConfigKey.CHDFS_USER_APPID_KEY));
}
return appid;
}

private int getJarPluginServerPort(Configuration conf) {
return conf.getInt(CHDFS_META_SERVER_PORT_KEY, DEFAULT_CHDFS_META_SERVER_PORT);
return conf.getInt(ConfigKey.CHDFS_META_SERVER_PORT_KEY, ConfigKey.DEFAULT_CHDFS_META_SERVER_PORT);
}

private String getMetaL5Address(Configuration conf) throws IOException {
String l5Address = conf.get(ConfigKey.CHDFS_META_USE_L5_ADDRESS);
if (l5Address == null) {
throw new IOException(
String.format("config for %s is missing", ConfigKey.CHDFS_META_USE_L5_ADDRESS));
}
String[] address = l5Address.split(ConfigKey.L5Separator);
if (address.length != 2) {
throw new IOException(
String.format("config for %s invalid l5 address", ConfigKey.CHDFS_META_USE_L5_ADDRESS));
}
return l5Address;
}

private String getCosL5Address(Configuration conf) throws IOException {
String l5Address = conf.get(ConfigKey.CHDFS_COS_USE_L5_ADDRESS);
if (l5Address == null) {
throw new IOException(
String.format("config for %s is missing", ConfigKey.CHDFS_COS_USE_L5_ADDRESS));
}
String[] address = l5Address.split(ConfigKey.L5Separator);
if (address.length != 2) {
throw new IOException(
String.format("config for %s invalid l5 address", ConfigKey.CHDFS_COS_USE_L5_ADDRESS));
}
return l5Address;
}

private String initCacheTmpDir(Configuration conf) throws IOException {
String chdfsTmpCacheDirPath = conf.get(CHDFS_TMP_CACHE_DIR_KEY);
String chdfsTmpCacheDirPath = conf.get(ConfigKey.CHDFS_TMP_CACHE_DIR_KEY);
if (chdfsTmpCacheDirPath == null) {
String errMsg = String.format("chdfs config %s is missing", CHDFS_TMP_CACHE_DIR_KEY);
String errMsg = String.format("chdfs config %s is missing", ConfigKey.CHDFS_TMP_CACHE_DIR_KEY);
log.error(errMsg);
throw new IOException(errMsg);
}
if (!chdfsTmpCacheDirPath.startsWith("/")) {
String errMsg = String.format("chdfs config [%s: %s] must be absolute path", CHDFS_TMP_CACHE_DIR_KEY,
String errMsg = String.format("chdfs config [%s: %s] must be absolute path", ConfigKey.CHDFS_TMP_CACHE_DIR_KEY,
chdfsTmpCacheDirPath);
log.error(errMsg);
throw new IOException(errMsg);
Expand All @@ -235,21 +255,21 @@ private String initCacheTmpDir(Configuration conf) throws IOException {
}

if (!chdfsTmpCacheDir.isDirectory()) {
String errMsg = String.format("chdfs config [%s: %s] is invalid directory", CHDFS_TMP_CACHE_DIR_KEY,
String errMsg = String.format("chdfs config [%s: %s] is invalid directory", ConfigKey.CHDFS_TMP_CACHE_DIR_KEY,
chdfsTmpCacheDir.getAbsolutePath());
log.error(errMsg);
throw new IOException(errMsg);
}

if (!chdfsTmpCacheDir.canRead()) {
String errMsg = String.format("chdfs config [%s: %s] is not readable", CHDFS_TMP_CACHE_DIR_KEY,
String errMsg = String.format("chdfs config [%s: %s] is not readable", ConfigKey.CHDFS_TMP_CACHE_DIR_KEY,
chdfsTmpCacheDirPath);
log.error(errMsg);
throw new IOException(errMsg);
}

if (!chdfsTmpCacheDir.canWrite()) {
String errMsg = String.format("chdfs config [%s: %s] is not writeable", CHDFS_TMP_CACHE_DIR_KEY,
String errMsg = String.format("chdfs config [%s: %s] is not writeable", ConfigKey.CHDFS_TMP_CACHE_DIR_KEY,
chdfsTmpCacheDirPath);
log.error(errMsg);
throw new IOException(errMsg);
Expand All @@ -258,22 +278,26 @@ private String initCacheTmpDir(Configuration conf) throws IOException {
}

private boolean isJarPluginServerHttps(Configuration conf) {
return conf.getBoolean(CHDFS_META_TRANSFER_USE_TLS_KEY, DEFAULT_CHDFS_META_TRANSFER_USE_TLS);
return conf.getBoolean(ConfigKey.CHDFS_META_TRANSFER_USE_TLS_KEY, ConfigKey.DEFAULT_CHDFS_META_TRANSFER_USE_TLS);
}

private boolean isUseL5(Configuration conf) {
return conf.getBoolean(ConfigKey.CHDFS_USE_L5_FLAG, ConfigKey.DEFAULT_CHDFS_USE_L5_FLAG);
}

private boolean isDistinguishHost(Configuration conf) {
return conf.getBoolean(CHDFS_DATA_TRANSFER_DISTINGUISH_HOST, DEFAULT_CHDFS_DATA_TRANSFER_DISTINGUISH_FLAG);
return conf.getBoolean(ConfigKey.CHDFS_DATA_TRANSFER_DISTINGUISH_HOST, ConfigKey.DEFAULT_CHDFS_DATA_TRANSFER_DISTINGUISH_FLAG);
}


private void initJarLoadWithRetry(String mountPointAddr, long appid, int jarPluginServerPort, String tmpDirPath,
boolean jarPluginServerHttps, String cosEndPointSuffix, boolean distinguishHost
, String networkVersionId) throws IOException {
, String networkVersionId, boolean useL5, String metaL5Address, String cosL5Address) throws IOException {
int maxRetry = 5;
for (int retryIndex = 0; retryIndex <= maxRetry; retryIndex++) {
try {
jarLoader.init(mountPointAddr, appid, jarPluginServerPort, tmpDirPath, jarPluginServerHttps,
cosEndPointSuffix, distinguishHost, networkVersionId);
cosEndPointSuffix, distinguishHost, networkVersionId, useL5, metaL5Address, cosL5Address);
return;
} catch (IOException e) {
if (retryIndex < maxRetry) {
Expand Down
Loading