Skip to content

Commit 7e9f9b7

Browse files
authored
Merge pull request #2435 from brendandburns/time
Fix the default delay queue to use a fixed time source.
2 parents 59cdf63 + 4f47ac0 commit 7e9f9b7

File tree

2 files changed

+27
-25
lines changed

2 files changed

+27
-25
lines changed

extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueue.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
package io.kubernetes.client.extended.workqueue;
1414

1515
import java.time.Duration;
16-
import java.time.Instant;
17-
import java.time.temporal.Temporal;
1816
import java.util.Map;
1917
import java.util.concurrent.BlockingQueue;
2018
import java.util.concurrent.ConcurrentHashMap;
@@ -35,10 +33,14 @@ public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements Dela
3533
private DelayQueue<WaitForEntry<T>> delayQueue;
3634
private ConcurrentMap<T, WaitForEntry<T>> waitingEntryByData;
3735
protected BlockingQueue<WaitForEntry<T>> waitingForAddQueue;
38-
private Supplier<Instant> timeSource;
36+
private Supplier<Long> timeSource;
37+
38+
private static Long now() {
39+
return System.nanoTime() / 1000000;
40+
}
3941

4042
public DefaultDelayingQueue(ExecutorService waitingWorker) {
41-
this.timeSource = Instant::now;
43+
this.timeSource = DefaultDelayingQueue::now;
4244
this.delayQueue = new DelayQueue<>();
4345
this.waitingEntryByData = new ConcurrentHashMap<>();
4446
this.waitingForAddQueue = new LinkedBlockingQueue<>(1000);
@@ -61,12 +63,12 @@ public void addAfter(T item, Duration duration) {
6163
return;
6264
}
6365
WaitForEntry<T> entry =
64-
new WaitForEntry<>(item, duration.addTo(this.timeSource.get()), this.timeSource);
66+
new WaitForEntry<>(item, this.timeSource.get() + duration.toMillis(), this.timeSource);
6567
this.waitingForAddQueue.offer(entry);
6668
}
6769

6870
// Visible for testing
69-
protected void injectTimeSource(Supplier<Instant> fn) {
71+
protected void injectTimeSource(Supplier<Long> fn) {
7072
this.timeSource = fn;
7173
}
7274

@@ -87,21 +89,21 @@ private void waitingLoop() {
8789
// a. if ready, remove it from the delay-queue and push it into underlying
8890
// work-queue
8991
// b. if not, refresh the next ready-at time.
90-
Instant now = this.timeSource.get();
91-
if (!Duration.between(entry.readyAtMillis, now).isNegative()) {
92+
long now = this.timeSource.get();
93+
if (!((now - entry.readyAtMillis) < 0)) {
9294
delayQueue.remove(entry);
9395
super.add(entry.data);
9496
this.waitingEntryByData.remove(entry.data);
9597
continue;
9698
} else {
97-
nextReadyAt = Duration.between(now, entry.readyAtMillis);
99+
nextReadyAt = Duration.ofMillis(entry.readyAtMillis - now);
98100
}
99101
}
100102

101103
WaitForEntry<T> waitForEntry =
102104
waitingForAddQueue.poll(nextReadyAt.toMillis(), TimeUnit.MILLISECONDS);
103105
if (waitForEntry != null) {
104-
if (Duration.between(waitForEntry.readyAtMillis, this.timeSource.get()).isNegative()) {
106+
if (this.timeSource.get() - waitForEntry.readyAtMillis < 0) {
105107
// the item is not yet ready, insert it to the delay-queue
106108
insert(this.delayQueue, this.waitingEntryByData, waitForEntry);
107109
} else {
@@ -119,7 +121,7 @@ private void insert(
119121
DelayQueue<WaitForEntry<T>> q, Map<T, WaitForEntry<T>> knownEntries, WaitForEntry entry) {
120122
WaitForEntry existing = knownEntries.get((T) entry.data);
121123
if (existing != null) {
122-
if (Duration.between(existing.readyAtMillis, entry.readyAtMillis).isNegative()) {
124+
if ((entry.readyAtMillis - existing.readyAtMillis) < 0) {
123125
q.remove(existing);
124126
existing.readyAtMillis = entry.readyAtMillis;
125127
q.add(existing);
@@ -135,20 +137,20 @@ private void insert(
135137
// WaitForEntry holds the data to add and the time it should be added.
136138
private static class WaitForEntry<T> implements Delayed {
137139

138-
private WaitForEntry(T data, Temporal readyAtMillis, Supplier<Instant> timeSource) {
140+
private WaitForEntry(T data, long readyAtMillis, Supplier<Long> timeSource) {
139141
this.data = data;
140142
this.readyAtMillis = readyAtMillis;
141143
this.timeSource = timeSource;
142144
}
143145

144146
private T data;
145-
private Temporal readyAtMillis;
146-
private Supplier<Instant> timeSource;
147+
private long readyAtMillis;
148+
private Supplier<Long> timeSource;
147149

148150
@Override
149151
public long getDelay(TimeUnit unit) {
150-
Duration duration = Duration.between(this.timeSource.get(), readyAtMillis);
151-
return unit.convert(duration.toMillis(), TimeUnit.MILLISECONDS);
152+
long duration = readyAtMillis - this.timeSource.get();
153+
return unit.convert(duration, TimeUnit.MILLISECONDS);
152154
}
153155

154156
@Override

extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public void testSimpleDelayingQueue() throws Exception {
2929
// Hold time still
3030
queue.injectTimeSource(
3131
() -> {
32-
return staticTime;
32+
return staticTime.toEpochMilli();
3333
});
3434
queue.addAfter("foo", Duration.ofMillis(50));
3535

@@ -40,7 +40,7 @@ public void testSimpleDelayingQueue() throws Exception {
4040
// Advance time
4141
queue.injectTimeSource(
4242
() -> {
43-
return staticTime.plusMillis(100);
43+
return staticTime.plusMillis(100).toEpochMilli();
4444
});
4545
assertTrue(waitForAdded(queue, 1));
4646
String item = queue.get();
@@ -59,7 +59,7 @@ public void testDeduping() throws Exception {
5959
// Hold time still
6060
queue.injectTimeSource(
6161
() -> {
62-
return staticTime;
62+
return staticTime.toEpochMilli();
6363
});
6464

6565
queue.addAfter(item, Duration.ofMillis(50));
@@ -71,7 +71,7 @@ public void testDeduping() throws Exception {
7171
// Advance time
7272
queue.injectTimeSource(
7373
() -> {
74-
return staticTime.plusMillis(60);
74+
return staticTime.plusMillis(60).toEpochMilli();
7575
});
7676
assertTrue(waitForAdded(queue, 1));
7777
item = queue.get();
@@ -81,7 +81,7 @@ public void testDeduping() throws Exception {
8181
// Advance time
8282
queue.injectTimeSource(
8383
() -> {
84-
return staticTime.plusMillis(90);
84+
return staticTime.plusMillis(90).toEpochMilli();
8585
});
8686
assertTrue("should not have added", queue.length() == 0);
8787

@@ -94,7 +94,7 @@ public void testDeduping() throws Exception {
9494
// Advance time
9595
queue.injectTimeSource(
9696
() -> {
97-
return staticTime.plusMillis(150);
97+
return staticTime.plusMillis(150).toEpochMilli();
9898
});
9999
assertTrue(waitForAdded(queue, 1));
100100
item = queue.get();
@@ -104,7 +104,7 @@ public void testDeduping() throws Exception {
104104
// Advance time
105105
queue.injectTimeSource(
106106
() -> {
107-
return staticTime.plusMillis(190);
107+
return staticTime.plusMillis(190).toEpochMilli();
108108
});
109109
assertTrue("should not have added", queue.length() == 0);
110110
}
@@ -115,7 +115,7 @@ public void testCopyShifting() throws Exception {
115115
DefaultDelayingQueue<String> queue = new DefaultDelayingQueue<>();
116116
queue.injectTimeSource(
117117
() -> {
118-
return staticTime;
118+
return staticTime.toEpochMilli();
119119
});
120120

121121
final String first = "foo";
@@ -130,7 +130,7 @@ public void testCopyShifting() throws Exception {
130130

131131
queue.injectTimeSource(
132132
() -> {
133-
return staticTime.plusMillis(2000);
133+
return staticTime.plusMillis(2000).toEpochMilli();
134134
});
135135
assertTrue(waitForAdded(queue, 3));
136136
String actualFirst = queue.get();

0 commit comments

Comments
 (0)