-
Notifications
You must be signed in to change notification settings - Fork 930
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6c29f61
commit d0619c1
Showing
3 changed files
with
139 additions
and
85 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package io.juicefs.utils; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.*; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
public class BgTaskUtil { | ||
private static final Logger LOG = LoggerFactory.getLogger(BgTaskUtil.class); | ||
private static final ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(2, r -> { | ||
Thread thread = new Thread(r, "Background Task"); | ||
thread.setDaemon(true); | ||
return thread; | ||
}); | ||
// use timer to run trash emptier because it will occupy a thread | ||
private static final List<Timer> timers = new ArrayList<>(); | ||
private static Set<BgTaskKey> runningBgTask = new HashSet<>(); | ||
|
||
static class BgTaskKey { | ||
String scheme; | ||
String authority; | ||
String type; | ||
|
||
public BgTaskKey(String scheme, String authority, String type) { | ||
this.scheme = scheme; | ||
this.authority = authority; | ||
this.type = type; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return (scheme + authority + type).hashCode(); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (obj == this) { | ||
return true; | ||
} | ||
if (obj instanceof BgTaskKey) { | ||
BgTaskKey that = (BgTaskKey) obj; | ||
return Objects.equals(this.scheme, that.scheme) | ||
&& Objects.equals(this.authority, that.authority) | ||
&& Objects.equals(this.type, that.type); | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
public static void startScheduleTask(String scheme, String authority, String type, Runnable task, long initialDelay, long period, TimeUnit unit) { | ||
synchronized (runningBgTask) { | ||
if (isRunning(scheme, authority, type)) { | ||
return; | ||
} | ||
threadPool.scheduleAtFixedRate(() -> { | ||
try { | ||
task.run(); | ||
} catch (Exception e) { | ||
LOG.error("Background task failed", e); | ||
} | ||
}, initialDelay, period, unit); | ||
runningBgTask.add(new BgTaskKey(scheme, authority, type)); | ||
} | ||
} | ||
|
||
public static void startTrashEmptier(String scheme, String authority, String type, Runnable emptierTask, long delay) { | ||
synchronized (runningBgTask) { | ||
if (isRunning(scheme, authority, type)) { | ||
return; | ||
} | ||
Timer timer = new Timer(); | ||
timer.schedule(new TimerTask() { | ||
@Override | ||
public void run() { | ||
emptierTask.run(); | ||
} | ||
}, delay); | ||
runningBgTask.add(new BgTaskKey(scheme, authority, type)); | ||
timers.add(timer); | ||
} | ||
} | ||
|
||
public static boolean isRunning(String scheme, String authority, String type) { | ||
synchronized (runningBgTask) { | ||
return runningBgTask.contains(new BgTaskKey(scheme, authority, type)); | ||
} | ||
} | ||
|
||
public static void close() { | ||
threadPool.shutdownNow(); | ||
for (Timer timer : timers) { | ||
timer.cancel(); | ||
timer.purge(); | ||
} | ||
} | ||
} |