From ab9704560525468d28294c7c6c0e4266e79efab4 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Fri, 20 Sep 2024 14:25:17 +0000 Subject: [PATCH 1/3] SynchronizationContextTest changes for scheduleFixedDelay with Duration --- .../java/io/grpc/SynchronizationContext.java | 33 ++++ .../io/grpc/SynchronizationContextTest.java | 172 +++++++++++------- 2 files changed, 135 insertions(+), 70 deletions(-) diff --git a/api/src/main/java/io/grpc/SynchronizationContext.java b/api/src/main/java/io/grpc/SynchronizationContext.java index 5a7677ac15f..046988b4779 100644 --- a/api/src/main/java/io/grpc/SynchronizationContext.java +++ b/api/src/main/java/io/grpc/SynchronizationContext.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import java.lang.Thread.UncaughtExceptionHandler; +import java.time.Duration; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -194,6 +195,38 @@ public String toString() { } + public final ScheduledHandle scheduleWithFixedDelay( + final Runnable task, Duration initialDelay, Duration delay, TimeUnit unit, + ScheduledExecutorService timerService) { + final ManagedRunnable runnable = new ManagedRunnable(task); + System.out.println("Inside Durationcall"); + ScheduledFuture future = timerService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + execute(runnable); + } + + @Override + public String toString() { + return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay + + ")"; + } + }, toNanosSaturated(initialDelay), toNanosSaturated(delay), unit); + return new ScheduledHandle(runnable, future); + } + static long toNanosSaturated(Duration duration) { + // Using a try/catch seems lazy, but the catch block will rarely get invoked (except for + // durations longer than approximately +/- 292 years). + try { + //long delay = TimeUnit.MILLISECONDS.convert(500, TimeUnit.SECONDS); // Converts 500 seconds to milliseconds + return duration.toNanos(); + //return TimeUnit.NANOSECONDS.convert(duration); + + } catch (ArithmeticException tooBig) { + return duration.isNegative() ? Long.MIN_VALUE : Long.MAX_VALUE; + } + } + private static class ManagedRunnable implements Runnable { final Runnable task; boolean isCancelled; diff --git a/api/src/test/java/io/grpc/SynchronizationContextTest.java b/api/src/test/java/io/grpc/SynchronizationContextTest.java index 3d5e7fa42b9..a4d9e80837f 100644 --- a/api/src/test/java/io/grpc/SynchronizationContextTest.java +++ b/api/src/test/java/io/grpc/SynchronizationContextTest.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.testing.TestingExecutors; import io.grpc.SynchronizationContext.ScheduledHandle; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -52,6 +53,7 @@ */ @RunWith(JUnit4.class) public class SynchronizationContextTest { + private final BlockingQueue uncaughtErrors = new LinkedBlockingQueue<>(); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @@ -72,8 +74,9 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private Runnable task3; - - @After public void tearDown() { + + @After + public void tearDown() { assertThat(uncaughtErrors).isEmpty(); } @@ -105,36 +108,36 @@ public void multiThread() throws Exception { final AtomicReference task2Thread = new AtomicReference<>(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Thread.set(Thread.currentThread()); - task1Running.countDown(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return null; + @Override + public Void answer(InvocationOnMock invocation) { + task1Thread.set(Thread.currentThread()); + task1Running.countDown(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }).when(task1).run(); + return null; + } + }).when(task1).run(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task2Thread.set(Thread.currentThread()); - return null; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + task2Thread.set(Thread.currentThread()); + return null; + } + }).when(task2).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.executeLater(task1); - task1Added.countDown(); - syncContext.drain(); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.executeLater(task1); + task1Added.countDown(); + syncContext.drain(); + sideThreadDone.countDown(); + } + }; sideThread.start(); assertTrue(task1Added.await(5, TimeUnit.SECONDS)); @@ -162,26 +165,26 @@ public void throwIfNotInThisSynchronizationContext() throws Exception { final CountDownLatch task1Proceed = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - syncContext.throwIfNotInThisSynchronizationContext(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - taskSuccess.set(true); - return null; + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + syncContext.throwIfNotInThisSynchronizationContext(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }).when(task1).run(); + taskSuccess.set(true); + return null; + } + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - } - }; + @Override + public void run() { + syncContext.execute(task1); + } + }; sideThread.start(); assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue(); @@ -215,11 +218,11 @@ public void taskThrows() { InOrder inOrder = inOrder(task1, task2, task3); final RuntimeException e = new RuntimeException("Simulated"); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - throw e; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + throw e; + } + }).when(task2).run(); syncContext.executeLater(task1); syncContext.executeLater(task2); syncContext.executeLater(task3); @@ -246,6 +249,24 @@ public void schedule() { verify(task1).run(); } + @Test + public void testScheduleWithFixedDelay() { + MockScheduledExecutorService executorService = new MockScheduledExecutorService(); + + ScheduledHandle handle = + syncContext.scheduleWithFixedDelay(task1, Duration.ofNanos(110), Duration.ofNanos(110), + TimeUnit.NANOSECONDS, executorService); + + assertThat(executorService.delay) + .isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS)); + assertThat(handle.isPending()).isTrue(); + verify(task1, never()).run(); + + executorService.command.run(); + assertThat(handle.isPending()).isFalse(); + verify(task1).run(); + } + @Test public void scheduleDueImmediately() { MockScheduledExecutorService executorService = new MockScheduledExecutorService(); @@ -288,28 +309,28 @@ public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception { final CountDownLatch sideThreadDone = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - try { - ScheduledHandle task2Handle; - assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); - task2Handle.cancel(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - task1Done.set(true); - return null; + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + try { + ScheduledHandle task2Handle; + assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); + task2Handle.cancel(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }).when(task1).run(); + task1Done.set(true); + return null; + } + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.execute(task1); + sideThreadDone.countDown(); + } + }; ScheduledHandle handle = syncContext.schedule(task2, 10, TimeUnit.NANOSECONDS, executorService); // This will execute and block in task1 @@ -340,6 +361,7 @@ public void run() { } static class MockScheduledExecutorService extends ForwardingScheduledExecutorService { + private ScheduledExecutorService delegate = TestingExecutors.noOpScheduledExecutor(); Runnable command; @@ -347,15 +369,25 @@ static class MockScheduledExecutorService extends ForwardingScheduledExecutorSer TimeUnit unit; ScheduledFuture future; - @Override public ScheduledExecutorService delegate() { + @Override + public ScheduledExecutorService delegate() { return delegate; } - @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { this.command = command; this.delay = delay; this.unit = unit; return future = super.schedule(command, delay, unit); } + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long intialDelay, long delay, + TimeUnit unit) { + this.command = command; + this.delay = delay; + this.unit = unit; + return future = super.scheduleWithFixedDelay(command, intialDelay, delay, unit); + } } } From fef4c923f04864bbb8ef9420da910f687bb0da6d Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Fri, 20 Sep 2024 14:47:45 +0000 Subject: [PATCH 2/3] Revert "SynchronizationContextTest changes for scheduleFixedDelay with Duration" This reverts commit ab9704560525468d28294c7c6c0e4266e79efab4. --- .../java/io/grpc/SynchronizationContext.java | 33 ---- .../io/grpc/SynchronizationContextTest.java | 172 +++++++----------- 2 files changed, 70 insertions(+), 135 deletions(-) diff --git a/api/src/main/java/io/grpc/SynchronizationContext.java b/api/src/main/java/io/grpc/SynchronizationContext.java index 046988b4779..5a7677ac15f 100644 --- a/api/src/main/java/io/grpc/SynchronizationContext.java +++ b/api/src/main/java/io/grpc/SynchronizationContext.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkState; import java.lang.Thread.UncaughtExceptionHandler; -import java.time.Duration; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -195,38 +194,6 @@ public String toString() { } - public final ScheduledHandle scheduleWithFixedDelay( - final Runnable task, Duration initialDelay, Duration delay, TimeUnit unit, - ScheduledExecutorService timerService) { - final ManagedRunnable runnable = new ManagedRunnable(task); - System.out.println("Inside Durationcall"); - ScheduledFuture future = timerService.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - execute(runnable); - } - - @Override - public String toString() { - return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay - + ")"; - } - }, toNanosSaturated(initialDelay), toNanosSaturated(delay), unit); - return new ScheduledHandle(runnable, future); - } - static long toNanosSaturated(Duration duration) { - // Using a try/catch seems lazy, but the catch block will rarely get invoked (except for - // durations longer than approximately +/- 292 years). - try { - //long delay = TimeUnit.MILLISECONDS.convert(500, TimeUnit.SECONDS); // Converts 500 seconds to milliseconds - return duration.toNanos(); - //return TimeUnit.NANOSECONDS.convert(duration); - - } catch (ArithmeticException tooBig) { - return duration.isNegative() ? Long.MIN_VALUE : Long.MAX_VALUE; - } - } - private static class ManagedRunnable implements Runnable { final Runnable task; boolean isCancelled; diff --git a/api/src/test/java/io/grpc/SynchronizationContextTest.java b/api/src/test/java/io/grpc/SynchronizationContextTest.java index a4d9e80837f..3d5e7fa42b9 100644 --- a/api/src/test/java/io/grpc/SynchronizationContextTest.java +++ b/api/src/test/java/io/grpc/SynchronizationContextTest.java @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.testing.TestingExecutors; import io.grpc.SynchronizationContext.ScheduledHandle; -import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -53,7 +52,6 @@ */ @RunWith(JUnit4.class) public class SynchronizationContextTest { - private final BlockingQueue uncaughtErrors = new LinkedBlockingQueue<>(); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @@ -74,9 +72,8 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private Runnable task3; - - @After - public void tearDown() { + + @After public void tearDown() { assertThat(uncaughtErrors).isEmpty(); } @@ -108,36 +105,36 @@ public void multiThread() throws Exception { final AtomicReference task2Thread = new AtomicReference<>(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Thread.set(Thread.currentThread()); - task1Running.countDown(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); + @Override + public Void answer(InvocationOnMock invocation) { + task1Thread.set(Thread.currentThread()); + task1Running.countDown(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return null; } - return null; - } - }).when(task1).run(); + }).when(task1).run(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task2Thread.set(Thread.currentThread()); - return null; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + task2Thread.set(Thread.currentThread()); + return null; + } + }).when(task2).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.executeLater(task1); - task1Added.countDown(); - syncContext.drain(); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.executeLater(task1); + task1Added.countDown(); + syncContext.drain(); + sideThreadDone.countDown(); + } + }; sideThread.start(); assertTrue(task1Added.await(5, TimeUnit.SECONDS)); @@ -165,26 +162,26 @@ public void throwIfNotInThisSynchronizationContext() throws Exception { final CountDownLatch task1Proceed = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - syncContext.throwIfNotInThisSynchronizationContext(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + syncContext.throwIfNotInThisSynchronizationContext(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + taskSuccess.set(true); + return null; } - taskSuccess.set(true); - return null; - } - }).when(task1).run(); + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - } - }; + @Override + public void run() { + syncContext.execute(task1); + } + }; sideThread.start(); assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue(); @@ -218,11 +215,11 @@ public void taskThrows() { InOrder inOrder = inOrder(task1, task2, task3); final RuntimeException e = new RuntimeException("Simulated"); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - throw e; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + throw e; + } + }).when(task2).run(); syncContext.executeLater(task1); syncContext.executeLater(task2); syncContext.executeLater(task3); @@ -249,24 +246,6 @@ public void schedule() { verify(task1).run(); } - @Test - public void testScheduleWithFixedDelay() { - MockScheduledExecutorService executorService = new MockScheduledExecutorService(); - - ScheduledHandle handle = - syncContext.scheduleWithFixedDelay(task1, Duration.ofNanos(110), Duration.ofNanos(110), - TimeUnit.NANOSECONDS, executorService); - - assertThat(executorService.delay) - .isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS)); - assertThat(handle.isPending()).isTrue(); - verify(task1, never()).run(); - - executorService.command.run(); - assertThat(handle.isPending()).isFalse(); - verify(task1).run(); - } - @Test public void scheduleDueImmediately() { MockScheduledExecutorService executorService = new MockScheduledExecutorService(); @@ -309,28 +288,28 @@ public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception { final CountDownLatch sideThreadDone = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - try { - ScheduledHandle task2Handle; - assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); - task2Handle.cancel(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + try { + ScheduledHandle task2Handle; + assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); + task2Handle.cancel(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + task1Done.set(true); + return null; } - task1Done.set(true); - return null; - } - }).when(task1).run(); + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.execute(task1); + sideThreadDone.countDown(); + } + }; ScheduledHandle handle = syncContext.schedule(task2, 10, TimeUnit.NANOSECONDS, executorService); // This will execute and block in task1 @@ -361,7 +340,6 @@ public void run() { } static class MockScheduledExecutorService extends ForwardingScheduledExecutorService { - private ScheduledExecutorService delegate = TestingExecutors.noOpScheduledExecutor(); Runnable command; @@ -369,25 +347,15 @@ static class MockScheduledExecutorService extends ForwardingScheduledExecutorSer TimeUnit unit; ScheduledFuture future; - @Override - public ScheduledExecutorService delegate() { + @Override public ScheduledExecutorService delegate() { return delegate; } - @Override - public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { this.command = command; this.delay = delay; this.unit = unit; return future = super.schedule(command, delay, unit); } - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long intialDelay, long delay, - TimeUnit unit) { - this.command = command; - this.delay = delay; - this.unit = unit; - return future = super.scheduleWithFixedDelay(command, intialDelay, delay, unit); - } } } From fe56c02771b82d10a22b2dda62fb04acacd16013 Mon Sep 17 00:00:00 2001 From: SreeramdasLavanya Date: Tue, 10 Dec 2024 10:34:36 +0000 Subject: [PATCH 3/3] stub: Added withChannelAndCallOption method --- stub/src/main/java/io/grpc/stub/AbstractStub.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/stub/src/main/java/io/grpc/stub/AbstractStub.java b/stub/src/main/java/io/grpc/stub/AbstractStub.java index 06dd55ff466..d8fd62b1efb 100644 --- a/stub/src/main/java/io/grpc/stub/AbstractStub.java +++ b/stub/src/main/java/io/grpc/stub/AbstractStub.java @@ -192,6 +192,10 @@ public final S withChannel(Channel newChannel) { return build(newChannel, callOptions); } + public final S withChannelAndCallOption(Channel newChannel, CallOptions callOptions) { + return build(newChannel, callOptions); + } + /** * Sets a custom option to be passed to client interceptors on the channel * {@link io.grpc.ClientInterceptor} via the CallOptions parameter.