diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java index 2f7bb604949a..3df1e43714c9 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystem.java @@ -15,6 +15,7 @@ */ package io.juicefs; +import io.juicefs.utils.BgTaskUtil; import io.juicefs.utils.PatchUtil; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -29,9 +30,6 @@ import java.io.IOException; import java.net.URI; import java.security.PrivilegedExceptionAction; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; /**************************************************************** @@ -45,8 +43,6 @@ public class JuiceFileSystem extends FilterFileSystem { private static boolean fileChecksumEnabled = false; private static boolean distcpPatched = false; - private volatile static ScheduledExecutorService emptier; - private ScheduledFuture emptierTask; private FileSystem emptierFs; static { @@ -76,18 +72,9 @@ public void initialize(URI uri, Configuration conf) throws IOException { } private void startTrashEmptier(URI uri, final Configuration conf) throws IOException { - if (emptier == null) { - synchronized (JuiceFileSystem.class) { - if (emptier == null) { - emptier = Executors.newScheduledThreadPool(3, r -> { - Thread t = new Thread(r, "Trash Emptier"); - t.setDaemon(true); - return t; - }); - } - } + if (BgTaskUtil.isRunning(uri.getScheme(), uri.getAuthority(), "Trash emptier")) { + return; } - try { UserGroupInformation superUser = UserGroupInformation.createRemoteUser(getConf(conf, "superuser", "hdfs")); emptierFs = superUser.doAs((PrivilegedExceptionAction) () -> { @@ -95,7 +82,7 @@ private void startTrashEmptier(URI uri, final Configuration conf) throws IOExcep fs.initialize(uri, conf); return fs; }); - emptierTask = emptier.schedule(new Trash(emptierFs, conf).getEmptier(), 10, TimeUnit.MINUTES); + BgTaskUtil.startTrashEmptier(uri.getScheme(), uri.getAuthority(), "Trash emptier", new Trash(emptierFs, conf).getEmptier(), TimeUnit.MINUTES.toMillis(10)); } catch (Exception e) { throw new IOException("start trash failed!",e); } @@ -160,24 +147,8 @@ public FileChecksum getFileChecksum(Path f) throws IOException { return super.getFileChecksum(f); } - @Override - public void close() throws IOException { - if (emptierTask != null) { - emptierTask.cancel(true); - } - super.close(); - } - @Override protected void finalize() { - if (emptier != null) { - try { - emptierFs.close(); - } catch (IOException e) { - LOG.warn("Close emptierFs failed!", e); - } - emptier.shutdownNow(); - emptier = null; - } + BgTaskUtil.close(); } } diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java index 4c44e12b2a1b..574ae7b4b4a1 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java @@ -18,6 +18,7 @@ import com.kenai.jffi.internal.StubLoader; import io.juicefs.exception.QuotaExceededException; import io.juicefs.metrics.JuiceFSInstrumentation; +import io.juicefs.utils.BgTaskUtil; import io.juicefs.utils.ConsistentHash; import io.juicefs.utils.NodesFetcher; import io.juicefs.utils.NodesFetcherBuilder; @@ -61,10 +62,7 @@ import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.jar.JarFile; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; @@ -95,11 +93,6 @@ public class JuiceFileSystemImpl extends FileSystem { private FsPermission uMask; private String hflushMethod; - private static volatile ScheduledExecutorService nodesFetcherThread; - private static volatile ScheduledExecutorService refreshUidThread; - private ScheduledFuture nodeFetcherTask; - private ScheduledFuture refreshGuidTask; - private Map lastFileStatus = new HashMap<>(); private static final DirectBufferPool directBufferPool = new DirectBufferPool(); @@ -227,6 +220,37 @@ protected void finalize() throws Throwable { } } + static class BgTask { + String scheme; + String authority; + String type; + + public BgTask(String scheme, String authority, String type) { + this.scheme = scheme; + this.authority = authority; + this.type = type; + } + + @Override + public int hashCode() { + return (scheme + authority + type).hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof BgTask) { + BgTask that = (BgTask)obj; + return Objects.equals(this.scheme, that.scheme) + && Objects.equals(this.authority, that.authority) + && Objects.equals(this.type, that.type); + } + return false; + } + } + static int EPERM = -0x01; static int ENOENT = -0x02; static int EINTR = -0x04; @@ -513,18 +537,7 @@ private void updateUidAndGrouping(String uidFile, String groupFile) { } private void refreshUidAndGrouping(String uidFile, String groupFile) { - if (refreshUidThread == null) { - synchronized (JuiceFileSystemImpl.class) { - if (refreshUidThread == null) { - refreshUidThread = Executors.newScheduledThreadPool(3, r -> { - Thread thread = new Thread(r, "Uid and group refresher"); - thread.setDaemon(true); - return thread; - }); - } - } - } - refreshGuidTask = refreshUidThread.scheduleAtFixedRate(() -> { + BgTaskUtil.startScheduleTask(uri.getScheme(), uri.getAuthority(), "Refresh guid", () -> { updateUidAndGrouping(uidFile, groupFile); }, 1, 1, TimeUnit.MINUTES); } @@ -723,18 +736,7 @@ private void initCache(Configuration conf) { } private void refreshCache(Configuration conf) { - if (nodesFetcherThread == null) { - synchronized (JuiceFileSystemImpl.class) { - if (nodesFetcherThread == null) { - nodesFetcherThread = Executors.newScheduledThreadPool(3, r -> { - Thread thread = new Thread(r, "Node fetcher"); - thread.setDaemon(true); - return thread; - }); - } - } - } - nodeFetcherTask = nodesFetcherThread.scheduleAtFixedRate(() -> { + BgTaskUtil.startScheduleTask(uri.getScheme(), uri.getAuthority(), "Node fetcher", () -> { initCache(conf); }, 10, 10, TimeUnit.MINUTES); } @@ -1629,12 +1631,6 @@ public void setTimes(Path p, long mtime, long atime) throws IOException { @Override public void close() throws IOException { super.close(); - if (refreshGuidTask != null) { - refreshGuidTask.cancel(true); - } - if (nodeFetcherTask != null) { - nodeFetcherTask.cancel(true); - } lib.jfs_term(Thread.currentThread().getId(), handle); if (metricsEnable) { JuiceFSInstrumentation.close(); @@ -1724,16 +1720,4 @@ public void removeXAttr(Path path, String name) throws IOException { if (r < 0) throw error(r, path); } - - @Override - protected void finalize() { - if (nodesFetcherThread != null) { - nodesFetcherThread.shutdownNow(); - nodesFetcherThread = null; - } - if (refreshUidThread != null) { - refreshUidThread.shutdownNow(); - refreshUidThread = null; - } - } } diff --git a/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java b/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java new file mode 100644 index 000000000000..76eabeb9c2fb --- /dev/null +++ b/sdk/java/src/main/java/io/juicefs/utils/BgTaskUtil.java @@ -0,0 +1,99 @@ +package io.juicefs.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class BgTaskUtil { + private static final Logger LOG = LoggerFactory.getLogger(BgTaskUtil.class); + private static final ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(2, r -> { + Thread thread = new Thread(r, "Background Task"); + thread.setDaemon(true); + return thread; + }); + // use timer to run trash emptier because it will occupy a thread + private static final List timers = new ArrayList<>(); + private static Set runningBgTask = new HashSet<>(); + + static class BgTaskKey { + String scheme; + String authority; + String type; + + public BgTaskKey(String scheme, String authority, String type) { + this.scheme = scheme; + this.authority = authority; + this.type = type; + } + + @Override + public int hashCode() { + return (scheme + authority + type).hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof BgTaskKey) { + BgTaskKey that = (BgTaskKey) obj; + return Objects.equals(this.scheme, that.scheme) + && Objects.equals(this.authority, that.authority) + && Objects.equals(this.type, that.type); + } + return false; + } + } + + public static void startScheduleTask(String scheme, String authority, String type, Runnable task, long initialDelay, long period, TimeUnit unit) { + synchronized (runningBgTask) { + if (isRunning(scheme, authority, type)) { + return; + } + threadPool.scheduleAtFixedRate(() -> { + try { + task.run(); + } catch (Exception e) { + LOG.error("Background task failed", e); + } + }, initialDelay, period, unit); + runningBgTask.add(new BgTaskKey(scheme, authority, type)); + } + } + + public static void startTrashEmptier(String scheme, String authority, String type, Runnable emptierTask, long delay) { + synchronized (runningBgTask) { + if (isRunning(scheme, authority, type)) { + return; + } + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + emptierTask.run(); + } + }, delay); + runningBgTask.add(new BgTaskKey(scheme, authority, type)); + timers.add(timer); + } + } + + public static boolean isRunning(String scheme, String authority, String type) { + synchronized (runningBgTask) { + return runningBgTask.contains(new BgTaskKey(scheme, authority, type)); + } + } + + public static void close() { + threadPool.shutdownNow(); + for (Timer timer : timers) { + timer.cancel(); + timer.purge(); + } + } +}