Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated the FlowEventQueue #299

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

/**
* A specialized priority queue for future event of {@link FlowNode}s sorted on time.
* The queue is based on a min heap binary tree (https://www.digitalocean.com/community/tutorials/min-heap-binary-tree)
* The nodes keep a timerIndex which indicates their placement in the tree.
* The timerIndex is -1 when a node is not in the tree.
*
* <p>
* By using a specialized priority queue, we reduce the overhead caused by the default priority queue implementation
* being generic.
Expand All @@ -52,20 +56,24 @@ public FlowEventQueue(int initialCapacity) {
}

/**
* Enqueue a timer for the specified context or update the existing timer.
* Enqueue a timer for the specified node or update the existing timer.
*
* When Long.MAX_VALUE is given as a deadline, the node is removed from the queue
* @param node node to queue
*/
public void enqueue(FlowNode node) {
FlowNode[] es = queue;
int k = node.getTimerIndex();
// The timerIndex indicates whether a node is already in the queue
int timerIndex = node.getTimerIndex();

if (node.getDeadline() != Long.MAX_VALUE) {
if (k >= 0) {
update(es, node, k);
if (timerIndex >= 0) {
update(this.queue, node, timerIndex);
} else {
add(es, node);
add(this.queue, node);
}
} else if (k >= 0) {
delete(es, k);
} else if (timerIndex >= 0) {
delete(this.queue, timerIndex);
node.setTimerIndex(-1);
}
}

Expand All @@ -80,22 +88,23 @@ public FlowNode poll(long now) {
return null;
}

final FlowNode[] es = queue;
final FlowNode head = es[0];
final FlowNode head = this.queue[0];

if (now < head.getDeadline()) {
return null;
}

int n = size - 1;
this.size = n;
final FlowNode next = es[n];
es[n] = null; // Clear the last element of the queue
// Move the last element of the queue to the front
this.size--;
final FlowNode next = this.queue[this.size];
this.queue[this.size] = null; // Clear the last element of the queue

if (n > 0) {
siftDown(0, next, es, n);
// Sift down the new head.
if (this.size > 0) {
siftDown(0, next, this.queue, this.size);
}

// Set the index of the head to -1 indicating it is not scheduled anymore
head.setTimerIndex(-1);
return head;
}
Expand All @@ -115,46 +124,58 @@ public long peekDeadline() {
* Add a new entry to the queue.
*/
private void add(FlowNode[] es, FlowNode node) {
if (this.size >= es.length) {
if (this.size >= this.queue.length) {
// Re-fetch the resized array
es = grow();
this.grow();
}

siftUp(this.size, node, es);
siftUp(this.size, node, this.queue);

this.size++;
}

/**
* Update the deadline of an existing entry in the queue.
*/
private void update(FlowNode[] es, FlowNode node, int k) {
if (k > 0) {
int parent = (k - 1) >>> 1;
if (es[parent].getDeadline() > node.getDeadline()) {
siftUp(k, node, es);
private void update(FlowNode[] eventList, FlowNode node, int timerIndex) {
if (timerIndex > 0) {
int parentIndex = (timerIndex - 1) >>> 1;
if (eventList[parentIndex].getDeadline() > node.getDeadline()) {
siftUp(timerIndex, node, eventList);
return;
}
}

siftDown(k, node, es, this.size);
siftDown(timerIndex, node, eventList, this.size);
}

/**
* Deadline an entry from the queue.
* The move a node from the queue
*
* @param eventList all scheduled events
* @param timerIndex the index of the node to remove
*/
private void delete(FlowNode[] es, int k) {
int s = --this.size;
if (s == k) {
es[k] = null; // Element is last in the queue
} else {
FlowNode moved = es[s];
es[s] = null;

siftDown(k, moved, es, s);

if (es[k] == moved) {
siftUp(k, moved, es);
private void delete(FlowNode[] eventList, int timerIndex) {
this.size--;

// If the element is the last element, simply remove it
if (timerIndex == this.size) {
eventList[timerIndex] = null;
}

// Else, swap the node to remove with the last node and sift it up or down to get the moved node in the correct
// position.
else {

// swap the node with the last element
FlowNode moved = eventList[this.size];
eventList[this.size] = null;

siftDown(timerIndex, moved, eventList, this.size);

// SiftUp, if siftDown did not move the node
if (eventList[timerIndex] == moved) {
siftUp(timerIndex, moved, eventList);
}
}
}
Expand All @@ -172,35 +193,92 @@ private FlowNode[] grow() {
return queue;
}

private static void siftUp(int k, FlowNode key, FlowNode[] es) {
while (k > 0) {
int parent = (k - 1) >>> 1;
FlowNode e = es[parent];
if (key.getDeadline() >= e.getDeadline()) break;
es[k] = e;
e.setTimerIndex(k);
k = parent;
/**
* Iteratively swap the node at the given timerIndex with its parents until the node is at the correct
* position in the binary tree (ie, both children of the node are bigger than the node itself).
*
* @param timerIndex the index of the node that needs to be sifted up
* @param node the node that needs to be sifted up
* @param eventList the list of nodes that are scheduled
*/
private static void siftUp(int timerIndex, FlowNode node, FlowNode[] eventList) {
while (timerIndex > 0) {
// Find parent Node
int parentIndex = (timerIndex - 1) >>> 1;
FlowNode parentNode = eventList[parentIndex];

// Break if the deadline of the node is bigger than the parent node
if (node.getDeadline() >= parentNode.getDeadline()) break;

// Otherwise, swap node with the parentNode
eventList[timerIndex] = parentNode;
parentNode.setTimerIndex(timerIndex);
timerIndex = parentIndex;
}
es[k] = key;
key.setTimerIndex(k);

eventList[timerIndex] = node;
node.setTimerIndex(timerIndex);
}

private static void siftDown(int k, FlowNode key, FlowNode[] es, int n) {
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
FlowNode c = es[child];
int right = child + 1;
if (right < n && c.getDeadline() > es[right].getDeadline()) c = es[child = right];
/**
* Iteratively swap the node at the given timerIndex with its smallest child until the node is at the correct
* position in the binary tree (ie, both children of the node are bigger than the node itself).
*
* @param timerIndex the index of the node that needs to be sifted down
* @param node the node that needs to be sifted down
* @param eventList the list of nodes that are scheduled
* @param queueSize the current size of the queue
*/
private static void siftDown(int timerIndex, FlowNode node, FlowNode[] eventList, int queueSize) {
int half = queueSize >>> 1; // loop while a non-leaf
while (timerIndex < half) {

// Get the index of the smallest child
int smallestChildIndex = getSmallestChildIndex(timerIndex, eventList, queueSize);

if (key.getDeadline() <= c.getDeadline()) break;
// Get the smallest child
FlowNode smallestChildNode = eventList[smallestChildIndex];

es[k] = c;
c.setTimerIndex(k);
k = child;
// If the node is smaller than the smallest child, break
if (node.getDeadline() <= smallestChildNode.getDeadline()) break;

// Otherwise, swap the node with its smallest child
eventList[timerIndex] = smallestChildNode;
smallestChildNode.setTimerIndex(timerIndex);
timerIndex = smallestChildIndex;
}

es[k] = key;
key.setTimerIndex(k);
eventList[timerIndex] = node;
node.setTimerIndex(timerIndex);
}

/**
* Return the index of the child with the smallest deadline time of the node at the given timerIndex
*
* @param timerIndex the index of the parent node
* @param eventList the list of all scheduled events
* @param queueSize the current size of the queue
* @return the timerIndex of the smallest child
*/
private static int getSmallestChildIndex(int timerIndex, FlowNode[] eventList, int queueSize) {
// Calculate the index of the left child
int leftChildIndex = (timerIndex << 1) + 1;

// If the left child is at the end of the queue, there is no right child.
// Thus, the left child is always the smallest
if (leftChildIndex + 1 >= queueSize) return leftChildIndex;

FlowNode leftChildNode = eventList[leftChildIndex];

// Get right child
int rightChildIndex = leftChildIndex + 1;
FlowNode rightChildNode = eventList[rightChildIndex];

// If the rightChild is smaller, return its index
// otherwise, return the index of the left child
if (rightChildNode.getDeadline() < leftChildNode.getDeadline()) {
return rightChildIndex;
}
return leftChildIndex;
}
}
Loading