From b6fd3b5f7d656da559d48a3ad0f230f747a5eaba Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Sun, 18 Feb 2024 10:40:45 +0800 Subject: [PATCH] Hadoop: make libjfs singleton to avoid possible jnr weakhashmap infinity loop --- .../java/io/juicefs/JuiceFileSystemImpl.java | 111 +++++++++--------- .../java/io/juicefs/JuiceFileSystemTest.java | 23 ++++ 2 files changed, 80 insertions(+), 54 deletions(-) diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java index 3e74ea2a0078..b4d65a7b489e 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java @@ -57,12 +57,11 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; import java.util.jar.JarFile; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; @@ -84,7 +83,8 @@ public class JuiceFileSystemImpl extends FileSystem { private int minBufferSize; private int cacheReplica; private boolean fileChecksumEnabled; - private Libjfs lib; + private Libjfs lib = loadLibrary(); + private long handle; private UserGroupInformation ugi; private String homeDirPrefix = "/user"; @@ -347,7 +347,6 @@ public void initialize(URI uri, Configuration conf) throws IOException { String supergroup = getConf(conf, "supergroup", conf.get("dfs.permissions.superusergroup", "supergroup")); String mountpoint = getConf(conf, "mountpoint", ""); - lib = loadLibrary(); synchronized (JuiceFileSystemImpl.class) { if (callBack == null) { callBack = new LogCallBackImpl(lib); @@ -570,7 +569,7 @@ private static void initStubLoader() { } } - public static Libjfs loadLibrary() throws IOException { + public static Libjfs loadLibrary() { initStubLoader(); LibraryLoader libjfsLibraryLoader = LibraryLoader.create(Libjfs.class); @@ -609,69 +608,73 @@ public static Libjfs loadLibrary() throws IOException { } URLConnection con; try { - con = location.openConnection(); - } catch (FileNotFoundException e) { - // jar may changed - return loadExistLib(libjfsLibraryLoader, dir, name, libFile); - } - if (location.getProtocol().equals("jar") && (con instanceof JarURLConnection)) { - LOG.debug("juicefs-hadoop.jar is a nested jar"); - JarURLConnection connection = (JarURLConnection) con; - JarFile jfsJar = connection.getJarFile(); - ZipEntry entry = jfsJar.getJarEntry(resource); - soTime = entry.getLastModifiedTime().toMillis(); - ins = jfsJar.getInputStream(entry); - } else { - URI locationUri; try { - locationUri = location.toURI(); - } catch (URISyntaxException e) { + con = location.openConnection(); + } catch (FileNotFoundException e) { + // jar may changed return loadExistLib(libjfsLibraryLoader, dir, name, libFile); } - if (Files.isDirectory(Paths.get(locationUri))) { // for debug: sdk/java/target/classes - soTime = con.getLastModified(); - ins = JuiceFileSystemImpl.class.getClassLoader().getResourceAsStream(resource); + if (location.getProtocol().equals("jar") && (con instanceof JarURLConnection)) { + LOG.debug("juicefs-hadoop.jar is a nested jar"); + JarURLConnection connection = (JarURLConnection) con; + JarFile jfsJar = connection.getJarFile(); + ZipEntry entry = jfsJar.getJarEntry(resource); + soTime = entry.getLastModifiedTime().toMillis(); + ins = jfsJar.getInputStream(entry); } else { - JarFile jfsJar; + URI locationUri; try { - jfsJar = new JarFile(locationUri.getPath()); - } catch (FileNotFoundException fne) { + locationUri = location.toURI(); + } catch (URISyntaxException e) { return loadExistLib(libjfsLibraryLoader, dir, name, libFile); } - ZipEntry entry = jfsJar.getJarEntry(resource); - soTime = entry.getLastModifiedTime().toMillis(); - ins = jfsJar.getInputStream(entry); + if (Files.isDirectory(Paths.get(locationUri))) { // for debug: sdk/java/target/classes + soTime = con.getLastModified(); + ins = JuiceFileSystemImpl.class.getClassLoader().getResourceAsStream(resource); + } else { + JarFile jfsJar; + try { + jfsJar = new JarFile(locationUri.getPath()); + } catch (FileNotFoundException fne) { + return loadExistLib(libjfsLibraryLoader, dir, name, libFile); + } + ZipEntry entry = jfsJar.getJarEntry(resource); + soTime = entry.getLastModifiedTime().toMillis(); + ins = jfsJar.getInputStream(entry); + } } - } - synchronized (JuiceFileSystemImpl.class) { - if (!libFile.exists() || libFile.lastModified() < soTime) { - // try the name for current user - libFile = new File(dir, System.getProperty("user.name") + "-" + name); + synchronized (JuiceFileSystemImpl.class) { if (!libFile.exists() || libFile.lastModified() < soTime) { - InputStream reader = new GZIPInputStream(ins); - File tmp = File.createTempFile(name, null, dir); - FileOutputStream writer = new FileOutputStream(tmp); - byte[] buffer = new byte[128 << 10]; - int bytesRead = 0; - while ((bytesRead = reader.read(buffer)) != -1) { - writer.write(buffer, 0, bytesRead); - } - writer.close(); - reader.close(); - tmp.setLastModified(soTime); - tmp.setReadable(true, false); - try { - File org = new File(dir, name); - Files.move(tmp.toPath(), org.toPath(), StandardCopyOption.ATOMIC_MOVE); - libFile = org; - } catch (Exception ade) { - Files.move(tmp.toPath(), libFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + // try the name for current user + libFile = new File(dir, System.getProperty("user.name") + "-" + name); + if (!libFile.exists() || libFile.lastModified() < soTime) { + InputStream reader = new GZIPInputStream(ins); + File tmp = File.createTempFile(name, null, dir); + FileOutputStream writer = new FileOutputStream(tmp); + byte[] buffer = new byte[128 << 10]; + int bytesRead = 0; + while ((bytesRead = reader.read(buffer)) != -1) { + writer.write(buffer, 0, bytesRead); + } + writer.close(); + reader.close(); + tmp.setLastModified(soTime); + tmp.setReadable(true, false); + try { + File org = new File(dir, name); + Files.move(tmp.toPath(), org.toPath(), StandardCopyOption.ATOMIC_MOVE); + libFile = org; + } catch (Exception ade) { + Files.move(tmp.toPath(), libFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + } } } } + ins.close(); + } catch (Exception e) { + throw new RuntimeException("Init libjfs failed", e); } - ins.close(); return libjfsLibraryLoader.load(libFile.getAbsolutePath()); } diff --git a/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java index 8a059661695d..102a817805a5 100644 --- a/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java +++ b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java @@ -35,6 +35,9 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -921,4 +924,24 @@ public void testUserWithMultiGroups() throws Exception { tomFs.close(); superFs.close(); } + + public void testConcurrentCreate() throws Exception { + int threads = 100; + ExecutorService pool = Executors.newFixedThreadPool(threads); + for (int i = 0; i < threads; i++) { + pool.submit(() -> { + JuiceFileSystem jfs = new JuiceFileSystem(); + try { + jfs.initialize(URI.create("jfs://dev/"), cfg); + jfs.listStatus(new Path("/")); + jfs.close(); + } catch (IOException e) { + fail("concurrent create failed"); + System.exit(1); + } + }); + } + pool.shutdown(); + pool.awaitTermination(1, TimeUnit.MINUTES); + } }