diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/LocalTaskQueue.java b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/LocalTaskQueue.java index cef7015e068..533308faca6 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/LocalTaskQueue.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/parfor/LocalTaskQueue.java @@ -43,7 +43,9 @@ public class LocalTaskQueue public static final Object NO_MORE_TASKS = null; //object to signal NO_MORE_TASKS private LinkedList _data = null; - private boolean _closedInput = false; + private boolean _closedInput = false; + private boolean _resettable = false; + private int _readPosition = 0; private static final Log LOG = LogFactory.getLog(LocalTaskQueue.class.getName()); public LocalTaskQueue() @@ -74,12 +76,13 @@ public synchronized void enqueueTask( T t ) /** * Synchronized read and delete from the top of the FIFO queue. - * + * In resettable mode, reads without removing tasks from queue. + * * @return task * @throws InterruptedException if InterruptedException occurs */ @SuppressWarnings("unchecked") - public synchronized T dequeueTask() + public synchronized T dequeueTask() throws InterruptedException { while( _data.isEmpty() ) @@ -87,13 +90,23 @@ public synchronized T dequeueTask() if( !_closedInput ) wait(); // wait for writers else - return (T)NO_MORE_TASKS; + return (T)NO_MORE_TASKS; } - - T t = _data.removeFirst(); - + + T t; + if (_resettable) { + // Resettable mode: read without removing + if (_readPosition >= _data.size()) { + return (T)NO_MORE_TASKS; + } + t = _data.get(_readPosition++); + } else { + // Normal mode: remove after reading + t = _data.removeFirst(); + } + notify(); // notify waiting writers - + return t; } @@ -107,14 +120,43 @@ public synchronized void closeInput() notifyAll(); //notify all waiting readers } + /** + * Set the queue to resettable mode, allowing multiple consumers to iterate + * over the same tasks without removing them from the queue. + * + * @param resettable true to enable resettable mode, false for normal FIFO behavior + */ + public synchronized void setResettable(boolean resettable) + { + _resettable = resettable; + _readPosition = 0; + } + + /** + * Reset the read position to the beginning of the queue for resettable mode. + * Only works when resettable mode is enabled. + */ + public synchronized void resetIterator() + { + if (_resettable) { + _readPosition = 0; + } + } + @Override - public synchronized String toString() + public synchronized String toString() { StringBuilder sb = new StringBuilder(); sb.append("TASK QUEUE (size="); sb.append(_data.size()); sb.append(",close="); sb.append(_closedInput); + sb.append(",resettable="); + sb.append(_resettable); + if (_resettable) { + sb.append(",pos="); + sb.append(_readPosition); + } sb.append(")\n"); int count = 1;