From fd35688687cabdc2189740cc6f74615fc6d01368 Mon Sep 17 00:00:00 2001 From: joeyleeeeeee97 Date: Tue, 19 Jan 2021 16:58:32 +0800 Subject: [PATCH] [Wisp] WispControlGroup shutdown Summary: Support shutdown by wispControlGroup. Test Plan: com/alibaba/wisp/thread/PreemptTest.java Reviewed-by: leiyu, zhengxiaolinX Issue: https://github.com/alibaba/dragonwell8/issues/205 --- .../com/alibaba/wisp/engine/WispCarrier.java | 17 +- .../alibaba/wisp/engine/WispControlGroup.java | 26 ++- .../com/alibaba/wisp/engine/WispEngine.java | 155 +++++++++++------- .../alibaba/wisp/engine/WispScheduler.java | 2 +- .../com/alibaba/wisp/engine/WispTask.java | 8 + .../classes/java/dyn/CoroutineSupport.java | 7 +- test/com/alibaba/wisp/thread/PreemptTest.java | 17 ++ 7 files changed, 160 insertions(+), 72 deletions(-) diff --git a/src/linux/classes/com/alibaba/wisp/engine/WispCarrier.java b/src/linux/classes/com/alibaba/wisp/engine/WispCarrier.java index 466da450dd..d2e7228b20 100644 --- a/src/linux/classes/com/alibaba/wisp/engine/WispCarrier.java +++ b/src/linux/classes/com/alibaba/wisp/engine/WispCarrier.java @@ -261,9 +261,17 @@ final void schedule(boolean terminal) { } current.resumeEntry.setStealEnable(true); yieldTo(threadTask, terminal); // letting the scheduler choose runnable task - if (engine.hasBeenShutdown && current != threadTask - && !WispTask.SHUTDOWN_TASK_NAME.equals(current.getName())) { - CoroutineSupport.checkAndThrowException(current.ctx); + current.carrier.checkAndDispatchShutdown(); + } + + private void checkAndDispatchShutdown() { + assert WispCarrier.current() == this; + if ((engine.hasBeenShutdown + || (current.inDestoryedGroup() && current.inheritedFromNonRootContainer())) + && !WispTask.SHUTDOWN_TASK_NAME.equals(current.getName()) + && current.isAlive() + && CoroutineSupport.checkAndThrowException(current.ctx)) { + throw new ThreadDeath(); } } @@ -309,7 +317,7 @@ public void run() { source.wakeupTask(task); return; } - // notify detached empty worker to exit + // notify terminated empty worker to exit if (source.worker.hasBeenHandoff && TASK_COUNT_UPDATER.get(source.engine) == 0) { source.worker.signal(); } @@ -440,6 +448,7 @@ void yield() { // delay it, make sure wakeupTask is called after yield out schedule(false); } + current.carrier.checkAndDispatchShutdown(); } else { WispEngine.JLA.yield0(); } diff --git a/src/linux/classes/com/alibaba/wisp/engine/WispControlGroup.java b/src/linux/classes/com/alibaba/wisp/engine/WispControlGroup.java index a844ea4b57..127239746f 100644 --- a/src/linux/classes/com/alibaba/wisp/engine/WispControlGroup.java +++ b/src/linux/classes/com/alibaba/wisp/engine/WispControlGroup.java @@ -8,8 +8,10 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; /** * WispControlGroup is used to limit a group of wisp threads'{@link WispTask} @@ -36,6 +38,8 @@ class WispControlGroup extends AbstractExecutorService { */ private static final int ESTIMATED_PERIOD = Math.max(MIN_PERIOD, Math.min(MAX_PERIOD, WispConfiguration.SYSMON_TICK_US * SCHEDULE_TIMES)); + private static final AtomicReferenceFieldUpdater SHUTDOWN_UPDATER + = AtomicReferenceFieldUpdater.newUpdater(WispControlGroup.class, Boolean.class, "destroyed"); private static int defaultCfsPeriod() { // prior to adopt configured cfs period. @@ -80,6 +84,8 @@ private WispControlGroup(int cfsQuotaUs, int cfsPeriodUs) { private CpuLimit cpuLimit; private final AtomicLong currentPeriodStart; private final AtomicLong remainQuota; + volatile Boolean destroyed = false; + CountDownLatch destroyLatch = new CountDownLatch(1); private static class CpuLimit { long cfsPeriod; @@ -149,7 +155,13 @@ private void attach() { task.controlGroup = this; long delay = checkCpuLimit(task, true); if (delay != 0) { - WispTask.jdkPark(delay); + try { + WispTask.jdkPark(delay); + } catch (ThreadDeath threadDeath) { + assert task.enterTs != 0; + detach(); + throw threadDeath; + } } assert task.enterTs != 0; } @@ -186,27 +198,29 @@ public void execute(Runnable command) { @Override public void shutdown() { - throw new UnsupportedOperationException("NYI"); + if (SHUTDOWN_UPDATER.compareAndSet(this, false, true)) { + WispEngine.WISP_ROOT_ENGINE.shutdown(this); + } } @Override public List shutdownNow() { - return null; + throw new UnsupportedOperationException("not implemented"); } @Override public boolean isShutdown() { - return false; + return destroyed; } @Override public boolean isTerminated() { - return false; + return destroyLatch.getCount() == 0; } @Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return false; + return destroyLatch.await(timeout, unit); } @Override diff --git a/src/linux/classes/com/alibaba/wisp/engine/WispEngine.java b/src/linux/classes/com/alibaba/wisp/engine/WispEngine.java index 47081d32ec..70f9b59598 100644 --- a/src/linux/classes/com/alibaba/wisp/engine/WispEngine.java +++ b/src/linux/classes/com/alibaba/wisp/engine/WispEngine.java @@ -141,11 +141,13 @@ private static void initializeClasses() { Class.forName(WispThreadWrapper.class.getName()); Class.forName(TaskDispatcher.class.getName()); Class.forName(StartShutdown.class.getName()); - Class.forName(NotifyAndWaitTasksForShutdown.class.getName()); Class.forName(Coroutine.StealResult.class.getName()); Class.forName(WispCounterMXBeanImpl.class.getName()); Class.forName(ThreadAsWisp.class.getName()); Class.forName(WispEventPump.class.getName()); + Class.forName(ShutdownEngine.class.getName()); + Class.forName(AbstractShutdownTask.class.getName()); + Class.forName(ShutdownControlGroup.class.getName()); if (WispConfiguration.WISP_PROFILE) { Class.forName(WispPerfCounterMonitor.class.getName()); } @@ -389,7 +391,7 @@ private int translateToSelectionKey(int event) { volatile int runningTaskCount = 0; private CountDownLatch shutdownFuture; volatile Boolean hasBeenShutdown = false; - volatile boolean detached; + volatile boolean terminated; /** * Create a new WispEngine for executing tasks. @@ -501,75 +503,121 @@ public void shutdown() { for (WispCarrier carrier : carrierEngines) { deRegisterPerfCounter(carrier); } - scheduler.execute(new StartShutdown()); + scheduler.execute(new StartShutdown(null)); + } + + void shutdown(WispControlGroup group) { + scheduler.execute(new StartShutdown(group)); } @Override public List shutdownNow() { throw new UnsupportedOperationException(); } + class StartShutdown implements StealAwareRunnable { + private final WispControlGroup wispControlGroup; + + /** + * @param wispControlGroup which WispControlGroup to shutdown, + * null indicates the whole engine. + */ + StartShutdown(WispControlGroup wispControlGroup) { + this.wispControlGroup = wispControlGroup; + } - class StartShutdown extends StealDisabledRunnable { @Override public void run() { - WispCarrier.current().runTaskInternal(new NotifyAndWaitTasksForShutdown(), + WispCarrier.current().runTaskInternal( + wispControlGroup == null ? new ShutdownEngine() : new ShutdownControlGroup(wispControlGroup), WispTask.SHUTDOWN_TASK_NAME, null, null); } } - class NotifyAndWaitTasksForShutdown implements Runnable { + abstract class AbstractShutdownTask implements StealAwareRunnable { @Override public void run() { - try { - // wait until current 'shutdown wispTask' is the only - // running wispTask on this carrier - while (runningTaskCount != 1) { - List runningTasks = getRunningTasks(); - for (WispTask task : runningTasks) { - if (task.carrier.engine == WispEngine.this - && task.isAlive() - && !WispTask.SHUTDOWN_TASK_NAME.equals(task.getName())) { - task.interrupt(); - } + List tasks; + do { + tasks = getTasksForShutdown(); + for (WispTask task : tasks) { + if (task.isAlive()) { + task.jdkUnpark(); + task.unpark(); } - WispCarrier.current().yield(); - } - assert WispTask.SHUTDOWN_TASK_NAME.equals(WispCarrier.current().current.getName()); - detached = true; - //notify all worker to exit - for (WispCarrier carrier : carrierEngines) { - carrier.worker.signal(); } - shutdownFuture.countDown(); - } catch (Exception e) { - e.printStackTrace(); - } + // wait tasks to exit on fixed frequency instead of polling + WispTask.jdkPark(TimeUnit.MILLISECONDS.toNanos(1)); + } while (!tasks.isEmpty()); + finishShutdown(); } - /** - * 1. In Wisp2, each WispCarrier's runningTask is modified when WispTask is stolen, we can't guarantee - * the accuracy of the task set. - * 2. this function is only called in shutdown, so it's not performance sensitive - * 3. this function should only be called by current WispTask - */ - private List getRunningTasks() { + abstract List getTasksForShutdown(); + + abstract void finishShutdown(); + } + + class ShutdownEngine extends AbstractShutdownTask { + @Override + List getTasksForShutdown() { + return getRunningTasks(null); + } + + @Override + void finishShutdown() { assert WispTask.SHUTDOWN_TASK_NAME.equals(WispCarrier.current().current.getName()); - WispCarrier carrier = WispCarrier.current(); - ArrayList runningTasks = new ArrayList<>(); - boolean isInCritical0 = carrier.isInCritical; - carrier.isInCritical = true; - try { - for (WispTask task : WispTask.id2Task.values()) { - if (task.isAlive() - && task.carrier.engine == WispEngine.this - && !task.isThreadTask()) { - runningTasks.add(task); - } + terminated = true; + //notify all worker to exit + for (WispCarrier carrier : carrierEngines) { + carrier.worker.signal(); + } + shutdownFuture.countDown(); + } + + } + + class ShutdownControlGroup extends AbstractShutdownTask { + private final WispControlGroup wispControlGroup; + + ShutdownControlGroup(WispControlGroup wispControlGroup) { + this.wispControlGroup = wispControlGroup; + } + @Override + List getTasksForShutdown() { + return getRunningTasks(wispControlGroup); + } + + @Override + void finishShutdown() { + wispControlGroup.destroyLatch.countDown(); + } + } + + + /** + * 1. In Wisp2, each WispCarrier's runningTask is modified when WispTask is stolen, we can't guarantee + * the accuracy of the task set. + * 2. this function is only called in shutdown, so it's not performance sensitive + * 3. this function should only be called by current WispTask + */ + private List getRunningTasks(WispControlGroup group) { + assert WispTask.SHUTDOWN_TASK_NAME.equals(WispCarrier.current().current.getName()); + WispCarrier carrier = WispCarrier.current(); + ArrayList runningTasks = new ArrayList<>(); + boolean isInCritical0 = carrier.isInCritical; + carrier.isInCritical = true; + try { + for (WispTask task : WispTask.id2Task.values()) { + if (task.isAlive() + && task.carrier.engine == WispEngine.this + && !task.isThreadTask() + && !task.getName().equals(WispTask.SHUTDOWN_TASK_NAME) + && (group == null || task.inDestoryedGroup())) { + runningTasks.add(task); } - return runningTasks; - } finally { - carrier.isInCritical = isInCritical0; } + return runningTasks; + } finally { + carrier.isInCritical = isInCritical0; } } @@ -580,7 +628,7 @@ public boolean isShutdown() { @Override public boolean isTerminated() { - return detached; + return terminated; } @Override @@ -626,13 +674,6 @@ void startAsThread(Runnable target, String name, Thread thread) { target, name, thread)); } - abstract static class StealDisabledRunnable implements StealAwareRunnable { - @Override - public final boolean isStealEnable() { - return false; - } - } - private static native void registerNatives(); private static native int getProxyUnpark(int[] res); diff --git a/src/linux/classes/com/alibaba/wisp/engine/WispScheduler.java b/src/linux/classes/com/alibaba/wisp/engine/WispScheduler.java index 141c8ab39d..37be621dea 100644 --- a/src/linux/classes/com/alibaba/wisp/engine/WispScheduler.java +++ b/src/linux/classes/com/alibaba/wisp/engine/WispScheduler.java @@ -161,7 +161,7 @@ private void runCarrier(final WispCarrier carrier) { doExec(task); } - if (carrier.engine.detached) { + if (carrier.engine.terminated) { return; } else if ((task = SCHEDULING_POLICY.steal(this, r = nextRandom(r))) != null) { doExec(task); diff --git a/src/linux/classes/com/alibaba/wisp/engine/WispTask.java b/src/linux/classes/com/alibaba/wisp/engine/WispTask.java index 18a2a8cad6..022ccbf5fd 100644 --- a/src/linux/classes/com/alibaba/wisp/engine/WispTask.java +++ b/src/linux/classes/com/alibaba/wisp/engine/WispTask.java @@ -518,6 +518,14 @@ boolean isInterrupted() { return interrupted != 0; } + boolean inDestoryedGroup() { + return controlGroup != null && controlGroup.destroyed; + } + + boolean inheritedFromNonRootContainer() { + return WispEngine.JLA.getInheritedResourceContainer(threadWrapper) != ResourceContainer.root(); + } + boolean testInterruptedAndClear(boolean clear) { boolean nativeInterrupt = false; if (alreadyCheckNativeInterrupt == 0 && // only do it once diff --git a/src/share/classes/java/dyn/CoroutineSupport.java b/src/share/classes/java/dyn/CoroutineSupport.java index e1af5afae3..42997e43f1 100644 --- a/src/share/classes/java/dyn/CoroutineSupport.java +++ b/src/share/classes/java/dyn/CoroutineSupport.java @@ -90,8 +90,8 @@ Thread getThread() { return thread; } - public static void checkAndThrowException(Coroutine coroutine) { - checkAndThrowException0(coroutine.nativeCoroutine); + public static boolean checkAndThrowException(Coroutine coroutine) { + return shouldThrowException0(coroutine.nativeCoroutine); } public void drain() { @@ -384,5 +384,4 @@ public CoroutineBase getCurrent() { */ public static native StackTraceElement[] getCoroutineStack(long coroPtr); - private static native void checkAndThrowException0(long coroPtr); -} + private static native boolean shouldThrowException0(long coroPtr);} diff --git a/test/com/alibaba/wisp/thread/PreemptTest.java b/test/com/alibaba/wisp/thread/PreemptTest.java index a05b43b9c0..41e5b94c9a 100644 --- a/test/com/alibaba/wisp/thread/PreemptTest.java +++ b/test/com/alibaba/wisp/thread/PreemptTest.java @@ -40,6 +40,8 @@ public class PreemptTest { public static void main(String[] args) throws Exception { doTest(PreemptTest::simpleLoop); + doTest(PreemptTest::jniLoop); + doTest(PreemptTest::voidLoop); } private static void doTest(Runnable r) throws Exception { @@ -47,6 +49,7 @@ private static void doTest(Runnable r) throws Exception { CountDownLatch latch = new CountDownLatch(1); WispEngine.dispatch(latch::countDown); assertTrue(latch.await(5, TimeUnit.SECONDS)); + } private static void complexLoop() { @@ -72,5 +75,19 @@ private static void simpleLoop() { // TODO: handle safepoint consumed by state switch + + + private static void voidLoop() { + while (true) {} + } + + private static void jniLoop() { + try { + while (true) { Thread.sleep(0);} + } catch (Exception e) { + assertTrue(false); + } + } + volatile static int n; }