Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ public class LocalTaskQueue<T>
public static final Object NO_MORE_TASKS = null; //object to signal NO_MORE_TASKS

private LinkedList<T> _data = null;
private boolean _closedInput = false;
private boolean _closedInput = false;
private boolean _resettable = false;
private int _readPosition = 0;
private static final Log LOG = LogFactory.getLog(LocalTaskQueue.class.getName());

public LocalTaskQueue()
Expand Down Expand Up @@ -74,26 +76,37 @@ public synchronized void enqueueTask( T t )

/**
* Synchronized read and delete from the top of the FIFO queue.
*
* In resettable mode, reads without removing tasks from queue.
*
* @return task
* @throws InterruptedException if InterruptedException occurs
*/
@SuppressWarnings("unchecked")
public synchronized T dequeueTask()
public synchronized T dequeueTask()
throws InterruptedException
{
while( _data.isEmpty() )
{
if( !_closedInput )
wait(); // wait for writers
else
return (T)NO_MORE_TASKS;
return (T)NO_MORE_TASKS;
}

T t = _data.removeFirst();


T t;
if (_resettable) {
// Resettable mode: read without removing
if (_readPosition >= _data.size()) {
return (T)NO_MORE_TASKS;
}
t = _data.get(_readPosition++);
} else {
// Normal mode: remove after reading
t = _data.removeFirst();
}

notify(); // notify waiting writers

return t;
}

Expand All @@ -107,14 +120,43 @@ public synchronized void closeInput()
notifyAll(); //notify all waiting readers
}

/**
* Set the queue to resettable mode, allowing multiple consumers to iterate
* over the same tasks without removing them from the queue.
*
* @param resettable true to enable resettable mode, false for normal FIFO behavior
*/
public synchronized void setResettable(boolean resettable)
{
_resettable = resettable;
_readPosition = 0;
}

/**
* Reset the read position to the beginning of the queue for resettable mode.
* Only works when resettable mode is enabled.
*/
public synchronized void resetIterator()
{
if (_resettable) {
_readPosition = 0;
}
}

@Override
public synchronized String toString()
public synchronized String toString()
{
StringBuilder sb = new StringBuilder();
sb.append("TASK QUEUE (size=");
sb.append(_data.size());
sb.append(",close=");
sb.append(_closedInput);
sb.append(",resettable=");
sb.append(_resettable);
if (_resettable) {
sb.append(",pos=");
sb.append(_readPosition);
}
sb.append(")\n");

int count = 1;
Expand Down
Loading