diff --git a/extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueue.java b/extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueue.java index 7beafa94da..835da2b216 100644 --- a/extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueue.java +++ b/extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueue.java @@ -13,8 +13,6 @@ package io.kubernetes.client.extended.workqueue; import java.time.Duration; -import java.time.Instant; -import java.time.temporal.Temporal; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -35,10 +33,14 @@ public class DefaultDelayingQueue extends DefaultWorkQueue implements Dela private DelayQueue> delayQueue; private ConcurrentMap> waitingEntryByData; protected BlockingQueue> waitingForAddQueue; - private Supplier timeSource; + private Supplier timeSource; + + private static Long now() { + return System.nanoTime() / 1000000; + } public DefaultDelayingQueue(ExecutorService waitingWorker) { - this.timeSource = Instant::now; + this.timeSource = DefaultDelayingQueue::now; this.delayQueue = new DelayQueue<>(); this.waitingEntryByData = new ConcurrentHashMap<>(); this.waitingForAddQueue = new LinkedBlockingQueue<>(1000); @@ -61,12 +63,12 @@ public void addAfter(T item, Duration duration) { return; } WaitForEntry entry = - new WaitForEntry<>(item, duration.addTo(this.timeSource.get()), this.timeSource); + new WaitForEntry<>(item, this.timeSource.get() + duration.toMillis(), this.timeSource); this.waitingForAddQueue.offer(entry); } // Visible for testing - protected void injectTimeSource(Supplier fn) { + protected void injectTimeSource(Supplier fn) { this.timeSource = fn; } @@ -87,21 +89,21 @@ private void waitingLoop() { // a. if ready, remove it from the delay-queue and push it into underlying // work-queue // b. if not, refresh the next ready-at time. - Instant now = this.timeSource.get(); - if (!Duration.between(entry.readyAtMillis, now).isNegative()) { + long now = this.timeSource.get(); + if (!((now - entry.readyAtMillis) < 0)) { delayQueue.remove(entry); super.add(entry.data); this.waitingEntryByData.remove(entry.data); continue; } else { - nextReadyAt = Duration.between(now, entry.readyAtMillis); + nextReadyAt = Duration.ofMillis(entry.readyAtMillis - now); } } WaitForEntry waitForEntry = waitingForAddQueue.poll(nextReadyAt.toMillis(), TimeUnit.MILLISECONDS); if (waitForEntry != null) { - if (Duration.between(waitForEntry.readyAtMillis, this.timeSource.get()).isNegative()) { + if (this.timeSource.get() - waitForEntry.readyAtMillis < 0) { // the item is not yet ready, insert it to the delay-queue insert(this.delayQueue, this.waitingEntryByData, waitForEntry); } else { @@ -119,7 +121,7 @@ private void insert( DelayQueue> q, Map> knownEntries, WaitForEntry entry) { WaitForEntry existing = knownEntries.get((T) entry.data); if (existing != null) { - if (Duration.between(existing.readyAtMillis, entry.readyAtMillis).isNegative()) { + if ((entry.readyAtMillis - existing.readyAtMillis) < 0) { q.remove(existing); existing.readyAtMillis = entry.readyAtMillis; q.add(existing); @@ -135,20 +137,20 @@ private void insert( // WaitForEntry holds the data to add and the time it should be added. private static class WaitForEntry implements Delayed { - private WaitForEntry(T data, Temporal readyAtMillis, Supplier timeSource) { + private WaitForEntry(T data, long readyAtMillis, Supplier timeSource) { this.data = data; this.readyAtMillis = readyAtMillis; this.timeSource = timeSource; } private T data; - private Temporal readyAtMillis; - private Supplier timeSource; + private long readyAtMillis; + private Supplier timeSource; @Override public long getDelay(TimeUnit unit) { - Duration duration = Duration.between(this.timeSource.get(), readyAtMillis); - return unit.convert(duration.toMillis(), TimeUnit.MILLISECONDS); + long duration = readyAtMillis - this.timeSource.get(); + return unit.convert(duration, TimeUnit.MILLISECONDS); } @Override diff --git a/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java b/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java index cbf82a52f4..561c30b5b7 100644 --- a/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java +++ b/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java @@ -29,7 +29,7 @@ public void testSimpleDelayingQueue() throws Exception { // Hold time still queue.injectTimeSource( () -> { - return staticTime; + return staticTime.toEpochMilli(); }); queue.addAfter("foo", Duration.ofMillis(50)); @@ -40,7 +40,7 @@ public void testSimpleDelayingQueue() throws Exception { // Advance time queue.injectTimeSource( () -> { - return staticTime.plusMillis(100); + return staticTime.plusMillis(100).toEpochMilli(); }); assertTrue(waitForAdded(queue, 1)); String item = queue.get(); @@ -59,7 +59,7 @@ public void testDeduping() throws Exception { // Hold time still queue.injectTimeSource( () -> { - return staticTime; + return staticTime.toEpochMilli(); }); queue.addAfter(item, Duration.ofMillis(50)); @@ -71,7 +71,7 @@ public void testDeduping() throws Exception { // Advance time queue.injectTimeSource( () -> { - return staticTime.plusMillis(60); + return staticTime.plusMillis(60).toEpochMilli(); }); assertTrue(waitForAdded(queue, 1)); item = queue.get(); @@ -81,7 +81,7 @@ public void testDeduping() throws Exception { // Advance time queue.injectTimeSource( () -> { - return staticTime.plusMillis(90); + return staticTime.plusMillis(90).toEpochMilli(); }); assertTrue("should not have added", queue.length() == 0); @@ -94,7 +94,7 @@ public void testDeduping() throws Exception { // Advance time queue.injectTimeSource( () -> { - return staticTime.plusMillis(150); + return staticTime.plusMillis(150).toEpochMilli(); }); assertTrue(waitForAdded(queue, 1)); item = queue.get(); @@ -104,7 +104,7 @@ public void testDeduping() throws Exception { // Advance time queue.injectTimeSource( () -> { - return staticTime.plusMillis(190); + return staticTime.plusMillis(190).toEpochMilli(); }); assertTrue("should not have added", queue.length() == 0); } @@ -115,7 +115,7 @@ public void testCopyShifting() throws Exception { DefaultDelayingQueue queue = new DefaultDelayingQueue<>(); queue.injectTimeSource( () -> { - return staticTime; + return staticTime.toEpochMilli(); }); final String first = "foo"; @@ -130,7 +130,7 @@ public void testCopyShifting() throws Exception { queue.injectTimeSource( () -> { - return staticTime.plusMillis(2000); + return staticTime.plusMillis(2000).toEpochMilli(); }); assertTrue(waitForAdded(queue, 3)); String actualFirst = queue.get();