Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
package io.kubernetes.client.extended.workqueue;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -35,10 +33,14 @@ public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements Dela
private DelayQueue<WaitForEntry<T>> delayQueue;
private ConcurrentMap<T, WaitForEntry<T>> waitingEntryByData;
protected BlockingQueue<WaitForEntry<T>> waitingForAddQueue;
private Supplier<Instant> timeSource;
private Supplier<Long> timeSource;

private static Long now() {
return System.nanoTime() / 1000000;
}

public DefaultDelayingQueue(ExecutorService waitingWorker) {
this.timeSource = Instant::now;
this.timeSource = DefaultDelayingQueue::now;
this.delayQueue = new DelayQueue<>();
this.waitingEntryByData = new ConcurrentHashMap<>();
this.waitingForAddQueue = new LinkedBlockingQueue<>(1000);
Expand All @@ -61,12 +63,12 @@ public void addAfter(T item, Duration duration) {
return;
}
WaitForEntry<T> entry =
new WaitForEntry<>(item, duration.addTo(this.timeSource.get()), this.timeSource);
new WaitForEntry<>(item, this.timeSource.get() + duration.toMillis(), this.timeSource);
this.waitingForAddQueue.offer(entry);
}

// Visible for testing
protected void injectTimeSource(Supplier<Instant> fn) {
protected void injectTimeSource(Supplier<Long> fn) {
this.timeSource = fn;
}

Expand All @@ -87,21 +89,21 @@ private void waitingLoop() {
// a. if ready, remove it from the delay-queue and push it into underlying
// work-queue
// b. if not, refresh the next ready-at time.
Instant now = this.timeSource.get();
if (!Duration.between(entry.readyAtMillis, now).isNegative()) {
long now = this.timeSource.get();
if (!((now - entry.readyAtMillis) < 0)) {
delayQueue.remove(entry);
super.add(entry.data);
this.waitingEntryByData.remove(entry.data);
continue;
} else {
nextReadyAt = Duration.between(now, entry.readyAtMillis);
nextReadyAt = Duration.ofMillis(entry.readyAtMillis - now);
}
}

WaitForEntry<T> waitForEntry =
waitingForAddQueue.poll(nextReadyAt.toMillis(), TimeUnit.MILLISECONDS);
if (waitForEntry != null) {
if (Duration.between(waitForEntry.readyAtMillis, this.timeSource.get()).isNegative()) {
if (this.timeSource.get() - waitForEntry.readyAtMillis < 0) {
// the item is not yet ready, insert it to the delay-queue
insert(this.delayQueue, this.waitingEntryByData, waitForEntry);
} else {
Expand All @@ -119,7 +121,7 @@ private void insert(
DelayQueue<WaitForEntry<T>> q, Map<T, WaitForEntry<T>> knownEntries, WaitForEntry entry) {
WaitForEntry existing = knownEntries.get((T) entry.data);
if (existing != null) {
if (Duration.between(existing.readyAtMillis, entry.readyAtMillis).isNegative()) {
if ((entry.readyAtMillis - existing.readyAtMillis) < 0) {
q.remove(existing);
existing.readyAtMillis = entry.readyAtMillis;
q.add(existing);
Expand All @@ -135,20 +137,20 @@ private void insert(
// WaitForEntry holds the data to add and the time it should be added.
private static class WaitForEntry<T> implements Delayed {

private WaitForEntry(T data, Temporal readyAtMillis, Supplier<Instant> timeSource) {
private WaitForEntry(T data, long readyAtMillis, Supplier<Long> timeSource) {
this.data = data;
this.readyAtMillis = readyAtMillis;
this.timeSource = timeSource;
}

private T data;
private Temporal readyAtMillis;
private Supplier<Instant> timeSource;
private long readyAtMillis;
private Supplier<Long> timeSource;

@Override
public long getDelay(TimeUnit unit) {
Duration duration = Duration.between(this.timeSource.get(), readyAtMillis);
return unit.convert(duration.toMillis(), TimeUnit.MILLISECONDS);
long duration = readyAtMillis - this.timeSource.get();
return unit.convert(duration, TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void testSimpleDelayingQueue() throws Exception {
// Hold time still
queue.injectTimeSource(
() -> {
return staticTime;
return staticTime.toEpochMilli();
});
queue.addAfter("foo", Duration.ofMillis(50));

Expand All @@ -40,7 +40,7 @@ public void testSimpleDelayingQueue() throws Exception {
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(100);
return staticTime.plusMillis(100).toEpochMilli();
});
assertTrue(waitForAdded(queue, 1));
String item = queue.get();
Expand All @@ -59,7 +59,7 @@ public void testDeduping() throws Exception {
// Hold time still
queue.injectTimeSource(
() -> {
return staticTime;
return staticTime.toEpochMilli();
});

queue.addAfter(item, Duration.ofMillis(50));
Expand All @@ -71,7 +71,7 @@ public void testDeduping() throws Exception {
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(60);
return staticTime.plusMillis(60).toEpochMilli();
});
assertTrue(waitForAdded(queue, 1));
item = queue.get();
Expand All @@ -81,7 +81,7 @@ public void testDeduping() throws Exception {
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(90);
return staticTime.plusMillis(90).toEpochMilli();
});
assertTrue("should not have added", queue.length() == 0);

Expand All @@ -94,7 +94,7 @@ public void testDeduping() throws Exception {
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(150);
return staticTime.plusMillis(150).toEpochMilli();
});
assertTrue(waitForAdded(queue, 1));
item = queue.get();
Expand All @@ -104,7 +104,7 @@ public void testDeduping() throws Exception {
// Advance time
queue.injectTimeSource(
() -> {
return staticTime.plusMillis(190);
return staticTime.plusMillis(190).toEpochMilli();
});
assertTrue("should not have added", queue.length() == 0);
}
Expand All @@ -115,7 +115,7 @@ public void testCopyShifting() throws Exception {
DefaultDelayingQueue<String> queue = new DefaultDelayingQueue<>();
queue.injectTimeSource(
() -> {
return staticTime;
return staticTime.toEpochMilli();
});

final String first = "foo";
Expand All @@ -130,7 +130,7 @@ public void testCopyShifting() throws Exception {

queue.injectTimeSource(
() -> {
return staticTime.plusMillis(2000);
return staticTime.plusMillis(2000).toEpochMilli();
});
assertTrue(waitForAdded(queue, 3));
String actualFirst = queue.get();
Expand Down