Skip to content

Commit

Permalink
[Wisp] WispControlGroup shutdown
Browse files Browse the repository at this point in the history
Summary: Support shutdown by wispControlGroup.

Test Plan: com/alibaba/wisp/thread/PreemptTest.java

Reviewed-by: leiyu, zhengxiaolinX

Issue: dragonwell-project/dragonwell8#205
  • Loading branch information
joeyleeeeeee97 authored and joeyleeeeeee97 committed Feb 7, 2021
1 parent acebb4e commit fd35688
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 72 deletions.
17 changes: 13 additions & 4 deletions src/linux/classes/com/alibaba/wisp/engine/WispCarrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,17 @@ final void schedule(boolean terminal) {
}
current.resumeEntry.setStealEnable(true);
yieldTo(threadTask, terminal); // letting the scheduler choose runnable task
if (engine.hasBeenShutdown && current != threadTask
&& !WispTask.SHUTDOWN_TASK_NAME.equals(current.getName())) {
CoroutineSupport.checkAndThrowException(current.ctx);
current.carrier.checkAndDispatchShutdown();
}

private void checkAndDispatchShutdown() {
assert WispCarrier.current() == this;
if ((engine.hasBeenShutdown
|| (current.inDestoryedGroup() && current.inheritedFromNonRootContainer()))
&& !WispTask.SHUTDOWN_TASK_NAME.equals(current.getName())
&& current.isAlive()
&& CoroutineSupport.checkAndThrowException(current.ctx)) {
throw new ThreadDeath();
}
}

Expand Down Expand Up @@ -309,7 +317,7 @@ public void run() {
source.wakeupTask(task);
return;
}
// notify detached empty worker to exit
// notify terminated empty worker to exit
if (source.worker.hasBeenHandoff && TASK_COUNT_UPDATER.get(source.engine) == 0) {
source.worker.signal();
}
Expand Down Expand Up @@ -440,6 +448,7 @@ void yield() {
// delay it, make sure wakeupTask is called after yield out
schedule(false);
}
current.carrier.checkAndDispatchShutdown();
} else {
WispEngine.JLA.yield0();
}
Expand Down
26 changes: 20 additions & 6 deletions src/linux/classes/com/alibaba/wisp/engine/WispControlGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* WispControlGroup is used to limit a group of wisp threads'{@link WispTask}
Expand All @@ -36,6 +38,8 @@ class WispControlGroup extends AbstractExecutorService {
*/
private static final int ESTIMATED_PERIOD = Math.max(MIN_PERIOD,
Math.min(MAX_PERIOD, WispConfiguration.SYSMON_TICK_US * SCHEDULE_TIMES));
private static final AtomicReferenceFieldUpdater<WispControlGroup, Boolean> SHUTDOWN_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(WispControlGroup.class, Boolean.class, "destroyed");

private static int defaultCfsPeriod() {
// prior to adopt configured cfs period.
Expand Down Expand Up @@ -80,6 +84,8 @@ private WispControlGroup(int cfsQuotaUs, int cfsPeriodUs) {
private CpuLimit cpuLimit;
private final AtomicLong currentPeriodStart;
private final AtomicLong remainQuota;
volatile Boolean destroyed = false;
CountDownLatch destroyLatch = new CountDownLatch(1);

private static class CpuLimit {
long cfsPeriod;
Expand Down Expand Up @@ -149,7 +155,13 @@ private void attach() {
task.controlGroup = this;
long delay = checkCpuLimit(task, true);
if (delay != 0) {
WispTask.jdkPark(delay);
try {
WispTask.jdkPark(delay);
} catch (ThreadDeath threadDeath) {
assert task.enterTs != 0;
detach();
throw threadDeath;
}
}
assert task.enterTs != 0;
}
Expand Down Expand Up @@ -186,27 +198,29 @@ public void execute(Runnable command) {

@Override
public void shutdown() {
throw new UnsupportedOperationException("NYI");
if (SHUTDOWN_UPDATER.compareAndSet(this, false, true)) {
WispEngine.WISP_ROOT_ENGINE.shutdown(this);
}
}

@Override
public List<Runnable> shutdownNow() {
return null;
throw new UnsupportedOperationException("not implemented");
}

@Override
public boolean isShutdown() {
return false;
return destroyed;
}

@Override
public boolean isTerminated() {
return false;
return destroyLatch.getCount() == 0;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
return destroyLatch.await(timeout, unit);
}

@Override
Expand Down
155 changes: 98 additions & 57 deletions src/linux/classes/com/alibaba/wisp/engine/WispEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ private static void initializeClasses() {
Class.forName(WispThreadWrapper.class.getName());
Class.forName(TaskDispatcher.class.getName());
Class.forName(StartShutdown.class.getName());
Class.forName(NotifyAndWaitTasksForShutdown.class.getName());
Class.forName(Coroutine.StealResult.class.getName());
Class.forName(WispCounterMXBeanImpl.class.getName());
Class.forName(ThreadAsWisp.class.getName());
Class.forName(WispEventPump.class.getName());
Class.forName(ShutdownEngine.class.getName());
Class.forName(AbstractShutdownTask.class.getName());
Class.forName(ShutdownControlGroup.class.getName());
if (WispConfiguration.WISP_PROFILE) {
Class.forName(WispPerfCounterMonitor.class.getName());
}
Expand Down Expand Up @@ -389,7 +391,7 @@ private int translateToSelectionKey(int event) {
volatile int runningTaskCount = 0;
private CountDownLatch shutdownFuture;
volatile Boolean hasBeenShutdown = false;
volatile boolean detached;
volatile boolean terminated;

/**
* Create a new WispEngine for executing tasks.
Expand Down Expand Up @@ -501,75 +503,121 @@ public void shutdown() {
for (WispCarrier carrier : carrierEngines) {
deRegisterPerfCounter(carrier);
}
scheduler.execute(new StartShutdown());
scheduler.execute(new StartShutdown(null));
}

void shutdown(WispControlGroup group) {
scheduler.execute(new StartShutdown(group));
}

@Override
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
class StartShutdown implements StealAwareRunnable {
private final WispControlGroup wispControlGroup;

/**
* @param wispControlGroup which WispControlGroup to shutdown,
* null indicates the whole engine.
*/
StartShutdown(WispControlGroup wispControlGroup) {
this.wispControlGroup = wispControlGroup;
}

class StartShutdown extends StealDisabledRunnable {
@Override
public void run() {
WispCarrier.current().runTaskInternal(new NotifyAndWaitTasksForShutdown(),
WispCarrier.current().runTaskInternal(
wispControlGroup == null ? new ShutdownEngine() : new ShutdownControlGroup(wispControlGroup),
WispTask.SHUTDOWN_TASK_NAME, null, null);
}
}

class NotifyAndWaitTasksForShutdown implements Runnable {
abstract class AbstractShutdownTask implements StealAwareRunnable {
@Override
public void run() {
try {
// wait until current 'shutdown wispTask' is the only
// running wispTask on this carrier
while (runningTaskCount != 1) {
List<WispTask> runningTasks = getRunningTasks();
for (WispTask task : runningTasks) {
if (task.carrier.engine == WispEngine.this
&& task.isAlive()
&& !WispTask.SHUTDOWN_TASK_NAME.equals(task.getName())) {
task.interrupt();
}
List<WispTask> tasks;
do {
tasks = getTasksForShutdown();
for (WispTask task : tasks) {
if (task.isAlive()) {
task.jdkUnpark();
task.unpark();
}
WispCarrier.current().yield();
}
assert WispTask.SHUTDOWN_TASK_NAME.equals(WispCarrier.current().current.getName());
detached = true;
//notify all worker to exit
for (WispCarrier carrier : carrierEngines) {
carrier.worker.signal();
}
shutdownFuture.countDown();
} catch (Exception e) {
e.printStackTrace();
}
// wait tasks to exit on fixed frequency instead of polling
WispTask.jdkPark(TimeUnit.MILLISECONDS.toNanos(1));
} while (!tasks.isEmpty());
finishShutdown();
}

/**
* 1. In Wisp2, each WispCarrier's runningTask is modified when WispTask is stolen, we can't guarantee
* the accuracy of the task set.
* 2. this function is only called in shutdown, so it's not performance sensitive
* 3. this function should only be called by current WispTask
*/
private List<WispTask> getRunningTasks() {
abstract List<WispTask> getTasksForShutdown();

abstract void finishShutdown();
}

class ShutdownEngine extends AbstractShutdownTask {
@Override
List<WispTask> getTasksForShutdown() {
return getRunningTasks(null);
}

@Override
void finishShutdown() {
assert WispTask.SHUTDOWN_TASK_NAME.equals(WispCarrier.current().current.getName());
WispCarrier carrier = WispCarrier.current();
ArrayList<WispTask> runningTasks = new ArrayList<>();
boolean isInCritical0 = carrier.isInCritical;
carrier.isInCritical = true;
try {
for (WispTask task : WispTask.id2Task.values()) {
if (task.isAlive()
&& task.carrier.engine == WispEngine.this
&& !task.isThreadTask()) {
runningTasks.add(task);
}
terminated = true;
//notify all worker to exit
for (WispCarrier carrier : carrierEngines) {
carrier.worker.signal();
}
shutdownFuture.countDown();
}

}

class ShutdownControlGroup extends AbstractShutdownTask {
private final WispControlGroup wispControlGroup;

ShutdownControlGroup(WispControlGroup wispControlGroup) {
this.wispControlGroup = wispControlGroup;
}
@Override
List<WispTask> getTasksForShutdown() {
return getRunningTasks(wispControlGroup);
}

@Override
void finishShutdown() {
wispControlGroup.destroyLatch.countDown();
}
}


/**
* 1. In Wisp2, each WispCarrier's runningTask is modified when WispTask is stolen, we can't guarantee
* the accuracy of the task set.
* 2. this function is only called in shutdown, so it's not performance sensitive
* 3. this function should only be called by current WispTask
*/
private List<WispTask> getRunningTasks(WispControlGroup group) {
assert WispTask.SHUTDOWN_TASK_NAME.equals(WispCarrier.current().current.getName());
WispCarrier carrier = WispCarrier.current();
ArrayList<WispTask> runningTasks = new ArrayList<>();
boolean isInCritical0 = carrier.isInCritical;
carrier.isInCritical = true;
try {
for (WispTask task : WispTask.id2Task.values()) {
if (task.isAlive()
&& task.carrier.engine == WispEngine.this
&& !task.isThreadTask()
&& !task.getName().equals(WispTask.SHUTDOWN_TASK_NAME)
&& (group == null || task.inDestoryedGroup())) {
runningTasks.add(task);
}
return runningTasks;
} finally {
carrier.isInCritical = isInCritical0;
}
return runningTasks;
} finally {
carrier.isInCritical = isInCritical0;
}
}

Expand All @@ -580,7 +628,7 @@ public boolean isShutdown() {

@Override
public boolean isTerminated() {
return detached;
return terminated;
}

@Override
Expand Down Expand Up @@ -626,13 +674,6 @@ void startAsThread(Runnable target, String name, Thread thread) {
target, name, thread));
}

abstract static class StealDisabledRunnable implements StealAwareRunnable {
@Override
public final boolean isStealEnable() {
return false;
}
}

private static native void registerNatives();

private static native int getProxyUnpark(int[] res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ private void runCarrier(final WispCarrier carrier) {
doExec(task);
}

if (carrier.engine.detached) {
if (carrier.engine.terminated) {
return;
} else if ((task = SCHEDULING_POLICY.steal(this, r = nextRandom(r))) != null) {
doExec(task);
Expand Down
8 changes: 8 additions & 0 deletions src/linux/classes/com/alibaba/wisp/engine/WispTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,14 @@ boolean isInterrupted() {
return interrupted != 0;
}

boolean inDestoryedGroup() {
return controlGroup != null && controlGroup.destroyed;
}

boolean inheritedFromNonRootContainer() {
return WispEngine.JLA.getInheritedResourceContainer(threadWrapper) != ResourceContainer.root();
}

boolean testInterruptedAndClear(boolean clear) {
boolean nativeInterrupt = false;
if (alreadyCheckNativeInterrupt == 0 && // only do it once
Expand Down
7 changes: 3 additions & 4 deletions src/share/classes/java/dyn/CoroutineSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ Thread getThread() {
return thread;
}

public static void checkAndThrowException(Coroutine coroutine) {
checkAndThrowException0(coroutine.nativeCoroutine);
public static boolean checkAndThrowException(Coroutine coroutine) {
return shouldThrowException0(coroutine.nativeCoroutine);
}

public void drain() {
Expand Down Expand Up @@ -384,5 +384,4 @@ public CoroutineBase getCurrent() {
*/
public static native StackTraceElement[] getCoroutineStack(long coroPtr);

private static native void checkAndThrowException0(long coroPtr);
}
private static native boolean shouldThrowException0(long coroPtr);}
Loading

0 comments on commit fd35688

Please sign in to comment.