Skip to content

Commit

Permalink
Merge pull request #29 from wucheng/master
Browse files Browse the repository at this point in the history
 增加check permission接口支持
  • Loading branch information
wucheng authored Jan 5, 2024
2 parents 0269e80 + 5b8a8c1 commit 42ac793
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 53 deletions.
Binary file removed jar/chdfs_hadoop_plugin_network-3.2.jar
Binary file not shown.
Binary file added jar/chdfs_hadoop_plugin_network-3.3.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion jar/jar.md5
Original file line number Diff line number Diff line change
@@ -1 +1 @@
MD5 (chdfs_hadoop_plugin_network-3.2.jar) = 93150ac1e82f7a54fd8a9d9de05f9d54
MD5 (chdfs_hadoop_plugin_network-3.3.jar) = 8e83f99a6ed83d9be05847e84c18e9db
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.qcloud</groupId>
<artifactId>chdfs_hadoop_plugin_network</artifactId>
<version>3.2</version>
<version>3.3</version>
<packaging>jar</packaging>

<name>chdfs_hadoop_plugin_network</name>
Expand Down
51 changes: 34 additions & 17 deletions src/main/java/com/qcloud/chdfs/fs/CHDFSHadoopFileSystemAdapter.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.qcloud.chdfs.fs;

import com.qcloud.chdfs.permission.RangerAccessType;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
Expand All @@ -10,6 +10,7 @@
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
Expand All @@ -36,7 +37,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;

public class CHDFSHadoopFileSystemAdapter extends FileSystemWithCleanerAndSSE {
public class CHDFSHadoopFileSystemAdapter extends FileSystemWithCleanerAndSSE implements RangerPermissionChecker {
static final String SCHEME = "ofs";
private static final Logger log = LoggerFactory.getLogger(CHDFSHadoopFileSystemAdapter.class);
private static final String MOUNT_POINT_ADDR_PATTERN_CHDFS_TYPE =
Expand All @@ -63,10 +64,9 @@ public class CHDFSHadoopFileSystemAdapter extends FileSystemWithCleanerAndSSE {
public static final boolean DEFAULT_CHDFS_USE_SHORT_BUCKETNAME = false;

private final CHDFSHadoopFileSystemJarLoader jarLoader = new CHDFSHadoopFileSystemJarLoader();
private FileSystemWithLockCleaner actualImplFS = null;
private FileSystem actualImplFS = null;
private URI uri = null;
private Path workingDir = null;
private long initStartMs;

@Override
public String getScheme() {
Expand All @@ -76,15 +76,15 @@ public String getScheme() {
@Override
public void initialize(URI name, Configuration conf) throws IOException {
log.debug("CHDFSHadoopFileSystemAdapter adapter initialize");
this.initStartMs = System.currentTimeMillis();
long initStartMs = System.currentTimeMillis();
log.debug("CHDFSHadoopFileSystemAdapter start-init-start time: {}", initStartMs);
try {
super.initialize(name, conf);
this.setConf(conf);
String mountPointAddr = name.getHost();
if (mountPointAddr == null) {
String errMsg = String.format("mountPointAddr is null, fullUri: %s, exp. f4mabcdefgh-xyzw.chdfs"
+ ".ap-guangzhou.myqcloud.com or examplebucket-1250000000 or f4mabcdefgh-xyzw", name.toString());
+ ".ap-guangzhou.myqcloud.com or examplebucket-1250000000 or f4mabcdefgh-xyzw", name);
log.error(errMsg);
throw new IOException(errMsg);
}
Expand All @@ -109,7 +109,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
} else {
String errMsg = String.format("mountPointAddr %s is invalid, fullUri: %s, exp. f4mabcdefgh-xyzw.chdfs"
+ ".ap-guangzhou.myqcloud.com or examplebucket-1250000000 or f4mabcdefgh-xyzw",
mountPointAddr, name.toString());
mountPointAddr, name);
log.error(errMsg);
throw new IOException(errMsg);
}
Expand Down Expand Up @@ -143,7 +143,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
throw ioe;
} catch (Exception e) {
log.error("initialize failed! a unexpected exception occur!", e);
throw new IOException("initialize failed! oops! a unexpected exception occur! " + e.toString(), e);
throw new IOException("initialize failed! oops! a unexpected exception occur! " + e, e);
}
log.debug("total init file system, [elapse-ms: {}]", System.currentTimeMillis() - initStartMs);
}
Expand Down Expand Up @@ -631,26 +631,43 @@ public ContentSummary getContentSummary(Path f) throws IOException {

@Override
public void releaseFileLock(Path p) throws IOException {
if (this.actualImplFS == null) {
throw new IOException("please init the fileSystem first!");
judgeActualFSInitialized();
if (this.actualImplFS instanceof FileLockCleaner) {
((FileLockCleaner) this.actualImplFS).releaseFileLock(p);
} else {
throw new IOException("the actual fileSystem not implemented the lock cleaner interface!");
}
this.actualImplFS.releaseFileLock(p);
}

@Override
public void enableSSECos() throws IOException {
if (this.actualImplFS == null) {
throw new IOException("please init the fileSystem first!");
judgeActualFSInitialized();

if (this.actualImplFS instanceof ServerSideEncryption) {
((ServerSideEncryption) this.actualImplFS).enableSSECos();
} else {
throw new IOException("the actual fileSystem not implemented the enable sse interface!");
}
((FileSystemWithCleanerAndSSE) this.actualImplFS).enableSSECos();
}

@Override
public void disableSSE() throws IOException {
if (this.actualImplFS == null) {
throw new IOException("please init the fileSystem first!");
judgeActualFSInitialized();
if (this.actualImplFS instanceof ServerSideEncryption) {
((ServerSideEncryption) this.actualImplFS).disableSSE();
} else {
throw new IOException("the actual fileSystem not implemented the enable sse interface!");
}
}

@Override
public void checkPermission(Path f, RangerAccessType rangerAccessType) throws IOException {
judgeActualFSInitialized();
if (this.actualImplFS instanceof RangerPermissionChecker) {
((RangerPermissionChecker) this.actualImplFS).checkPermission(f, rangerAccessType);
} else {
throw new IOException("the actual fileSystem not implemented the permission check interface!");
}
((FileSystemWithCleanerAndSSE) this.actualImplFS).disableSSE();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.util.VersionInfo;
Expand Down Expand Up @@ -38,52 +39,44 @@ class CHDFSHadoopFileSystemJarLoader {

private String jarHost;
private String jarMd5;
private FileSystemWithLockCleaner actualFileSystem;
private FileSystem actualFileSystem;

CHDFSHadoopFileSystemJarLoader() {
}

synchronized void init(String mountPointAddr, long appid, int jarPluginServerPort, String tmpDirPath,
boolean jarPluginServerHttps, String cosEndPointSuffix, boolean distinguishHost,
String networkVersionId) throws IOException {
boolean jarPluginServerHttps, String cosEndPointSuffix, boolean distinguishHost, String networkVersionId)
throws IOException {
if (this.actualFileSystem == null) {
long queryStartMs = System.currentTimeMillis();
queryJarPluginInfo(mountPointAddr, appid, jarPluginServerPort, jarPluginServerHttps, cosEndPointSuffix);
log.debug("query jar plugin info usedMs: {}", System.currentTimeMillis() - queryStartMs);
this.actualFileSystem = getAlreadyLoadedClassInfo(this.getClass().getClassLoader(), this.jarPath,
this.versionId, this.jarMd5, tmpDirPath, this.jarHost, distinguishHost, networkVersionId);
if (this.actualFileSystem == null) {
String errMsg = "CHDFSHadoopFileSystemJarLoader getAlreadyLoadedClassInfo return null";
throw new IOException(errMsg);
}
}
}

private void parseJarPluginInfoResp(String respStr, String cosEndPointSuffix) throws IOException {
JsonObject respJson = new JsonParser().parse(respStr).getAsJsonObject();
if (!respJson.has("Response")) {
String errMsg = String.format("resp json miss element Response, resp: %s", respStr);
log.error(errMsg);
throw new IOException(errMsg);
}

if (!respJson.get("Response").getAsJsonObject().has("HadoopPluginJar")) {
String errMsg = String.format("resp json miss element Response.HadoopPluginJar, resp: %s", respStr);
log.error(errMsg);
throw new IOException(errMsg);
}
JsonObject jarInfoJson = respJson.get("Response").getAsJsonObject().get("HadoopPluginJar").getAsJsonObject();
if (!jarInfoJson.has("VersionId")) {
String errMsg = String.format("resp miss config Response.HadoopPluginJar.VersionId, resp: %s", respStr);
log.error(errMsg);
throw new IOException(errMsg);
} else {
this.versionId = jarInfoJson.get("VersionId").getAsString();
}

if (!jarInfoJson.has("JarPath")) {
String errMsg = String.format("resp miss config Response.HadoopPluginJar.JarPath, resp: %s", respStr);
log.error(errMsg);
throw new IOException(errMsg);
} else {
this.jarHost = new URL(jarInfoJson.get("JarPath").getAsString()).getAuthority();
Expand All @@ -92,14 +85,12 @@ private void parseJarPluginInfoResp(String respStr, String cosEndPointSuffix) th
int dotIndex = jarPath.indexOf('.');
if (dotIndex == -1) {
String errMsg = String.format("invalid jar path : %s", jarPath);
log.error(errMsg);
throw new IOException(errMsg);
}

int slashIndex = jarPath.indexOf('/', dotIndex);
if (slashIndex == -1) {
String errMsg = String.format("invalid jar path : %s", jarPath);
log.error(errMsg);
throw new IOException(errMsg);
}
this.jarPath = jarPath.substring(0, dotIndex + 1) + cosEndPointSuffix + jarPath.substring(slashIndex);
Expand All @@ -110,21 +101,20 @@ private void parseJarPluginInfoResp(String respStr, String cosEndPointSuffix) th

if (!jarInfoJson.has("JarMd5")) {
String errMsg = String.format("resp miss config Response.HadoopPluginJar.JarMd5, resp: %s", respStr);
log.error(errMsg);
throw new IOException(errMsg);
} else {
this.jarMd5 = jarInfoJson.get("JarMd5").getAsString();
}
}

private void queryJarPluginInfo(String mountPointAddr, long appid, int jarPluginServerPort,
boolean jarPluginServerHttpsFlag, String cosEndPointSuffix) throws IOException {
private void doQueryJarPluginInfo(String mountPointAddr, long appid, int jarPluginServerPort,
boolean jarPluginServerHttpsFlag, String cosEndPointSuffix) throws IOException {
String hadoopVersion = VersionInfo.getVersion();
if (hadoopVersion == null) {
hadoopVersion = "unknown";
}

URL queryJarUrl = null;
URL queryJarUrl;
String queryJarUrlStr = "";
try {
queryJarUrlStr = String.format("%s://%s:%d/chdfs-hadoop-plugin?appid=%d&hadoop_version=%s",
Expand All @@ -133,7 +123,6 @@ private void queryJarPluginInfo(String mountPointAddr, long appid, int jarPlugin
queryJarUrl = new URL(queryJarUrlStr);
} catch (MalformedURLException | UnsupportedEncodingException e) {
String errMsg = String.format("invalid url %s", queryJarUrlStr);
log.error(errMsg, e);
throw new IOException(errMsg, e);
}

Expand All @@ -158,7 +147,7 @@ private void queryJarPluginInfo(String mountPointAddr, long appid, int jarPlugin
parseJarPluginInfoResp(respStr, cosEndPointSuffix);
} catch (IOException e) {
String errMsg = "queryJarPluginInfo occur an io exception";
log.error(errMsg, e);
log.warn(errMsg, e);
throw new IOException(errMsg, e);
} finally {
if (bis != null) {
Expand All @@ -174,16 +163,39 @@ private void queryJarPluginInfo(String mountPointAddr, long appid, int jarPlugin
log.debug("query jarPluginInfo, usedTimeMs: {}", (System.nanoTime() - startTimeNs) * 1.0 / 1000000);
}

private static synchronized FileSystemWithLockCleaner getAlreadyLoadedClassInfo(ClassLoader currentClassLoader,
String jarPath, String versionId,
String jarMd5, String tmpDirPath,
String jarHost,
boolean distinguishHost, String networkVersionId) throws IOException {
private void queryJarPluginInfo(String mountPointAddr, long appid, int jarPluginServerPort,
boolean jarPluginServerHttpsFlag, String cosEndPointSuffix) throws IOException {

final int maxRetry = 5;
IOException finalException = null;
for (int retryIndex = 0; retryIndex <= maxRetry; retryIndex++) {
try {
doQueryJarPluginInfo(mountPointAddr, appid, jarPluginServerPort, jarPluginServerHttpsFlag,
cosEndPointSuffix);
return;
} catch (IOException e) {
log.warn(String.format("query jar plugin info failed, retryIndex: [%d/%d]", retryIndex, maxRetry), e);
finalException = e;
}

try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
}

log.error("query jar plugin info failed after retry", finalException);
throw finalException;
}

private static synchronized FileSystem getAlreadyLoadedClassInfo(ClassLoader currentClassLoader, String jarPath,
String versionId, String jarMd5, String tmpDirPath, String jarHost, boolean distinguishHost,
String networkVersionId) throws IOException {
if (alreadyLoadedFileSystemInfo != null && alreadyLoadedFileSystemInfo.jarPath.equals(jarPath)
&& alreadyLoadedFileSystemInfo.versionId.equals(versionId) && alreadyLoadedFileSystemInfo.jarMd5.equals(
jarMd5)) {
try {
return (FileSystemWithLockCleaner) alreadyLoadedFileSystemInfo.chdfsFSClass.newInstance();
return (FileSystem) alreadyLoadedFileSystemInfo.chdfsFSClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
String errMsg = String.format("load chdfs class failed, className: %s",
alreadyLoadedFileSystemInfo.chdfsFSClass.getName());
Expand All @@ -192,8 +204,9 @@ private static synchronized FileSystemWithLockCleaner getAlreadyLoadedClassInfo(
}
}

File jarFile = downloadJarPath(jarPath, versionId, jarMd5, tmpDirPath, jarHost, distinguishHost, networkVersionId);
URL jarUrl = null;
File jarFile = downloadJarPath(jarPath, versionId, jarMd5, tmpDirPath, jarHost, distinguishHost,
networkVersionId);
URL jarUrl;
try {
jarUrl = jarFile.toURI().toURL();
} catch (MalformedURLException e) {
Expand All @@ -204,8 +217,8 @@ private static synchronized FileSystemWithLockCleaner getAlreadyLoadedClassInfo(
URLClassLoader chdfsJarClassLoader = new URLClassLoader(new URL[]{jarUrl}, currentClassLoader);
final String className = String.format("chdfs.%s.com.qcloud.chdfs.fs.CHDFSHadoopFileSystem", versionId);
try {
Class chdfsFSClass = chdfsJarClassLoader.loadClass(className);
FileSystemWithLockCleaner actualFileSystem = (FileSystemWithLockCleaner) chdfsFSClass.newInstance();
Class<?> chdfsFSClass = chdfsJarClassLoader.loadClass(className);
FileSystem actualFileSystem = (FileSystem) chdfsFSClass.newInstance();
alreadyLoadedFileSystemInfo = new AlreadyLoadedFileSystemInfo(versionId, jarPath, jarMd5, chdfsFSClass);
return actualFileSystem;
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
Expand All @@ -216,8 +229,7 @@ private static synchronized FileSystemWithLockCleaner getAlreadyLoadedClassInfo(
}

private static File downloadJarPath(String jarPath, String versionId, String jarMd5, String tmpDirPath,
String jarHost, boolean distinguishHost, String networkVersionId)
throws IOException {
String jarHost, boolean distinguishHost, String networkVersionId) throws IOException {
File localCacheJarFile = new File(String.format("%s/chdfs_hadoop_plugin-%s-shaded.jar", tmpDirPath, versionId));
File localCacheJarLockFile = new File(
String.format("%s/chdfs_hadoop_plugin-%s-shaded.jar.LOCK", tmpDirPath, versionId));
Expand All @@ -228,8 +240,7 @@ private static File downloadJarPath(String jarPath, String versionId, String jar
}
}

FileOutputStream fileLockOutPut = null;
FileLock fileLock = null;
FileOutputStream fileLockOutPut;
try {
fileLockOutPut = new FileOutputStream(localCacheJarLockFile);
} catch (IOException e) {
Expand All @@ -239,6 +250,7 @@ private static File downloadJarPath(String jarPath, String versionId, String jar
throw new IOException(errMsg, e);
}

FileLock fileLock;
while (true) {
try {
fileLock = fileLockOutPut.getChannel().lock();
Expand Down Expand Up @@ -386,7 +398,7 @@ private static String getFileHexMd5(File inFile) throws IOException {
}
}

FileSystemWithLockCleaner getActualFileSystem() {
FileSystem getActualFileSystem() {
return actualFileSystem;
}
}
10 changes: 10 additions & 0 deletions src/main/java/com/qcloud/chdfs/fs/RangerPermissionChecker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.qcloud.chdfs.fs;

import com.qcloud.chdfs.permission.RangerAccessType;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

public interface RangerPermissionChecker {
void checkPermission(Path f, RangerAccessType rangerAccessType) throws IOException;
}
11 changes: 11 additions & 0 deletions src/main/java/com/qcloud/chdfs/permission/RangerAccessType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.qcloud.chdfs.permission;

public enum RangerAccessType {
LIST,
WRITE,
READ,
DELETE;

RangerAccessType() {
}
}

0 comments on commit 42ac793

Please sign in to comment.