From ef649b059b18387fb4e1a50fea9c8545f60ecd4f Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 17 Jul 2019 11:31:26 -0600 Subject: [PATCH 1/3] remove unused TestUntil (part of issue 3) --- .../fdx/bidirectional/TestRequestChannelConsumerProvider.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/test/java/io/vlingo/wire/fdx/bidirectional/TestRequestChannelConsumerProvider.java b/src/test/java/io/vlingo/wire/fdx/bidirectional/TestRequestChannelConsumerProvider.java index 43a137a..0022c47 100644 --- a/src/test/java/io/vlingo/wire/fdx/bidirectional/TestRequestChannelConsumerProvider.java +++ b/src/test/java/io/vlingo/wire/fdx/bidirectional/TestRequestChannelConsumerProvider.java @@ -1,11 +1,9 @@ package io.vlingo.wire.fdx.bidirectional; -import io.vlingo.actors.testkit.TestUntil; import io.vlingo.wire.channel.RequestChannelConsumer; import io.vlingo.wire.channel.RequestChannelConsumerProvider; public class TestRequestChannelConsumerProvider implements RequestChannelConsumerProvider { - public TestUntil until; public RequestChannelConsumer consumer = new TestRequestChannelConsumer(); @Override From e4db2d885175933213dfa2da908b41572c7e8f83 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 17 Jul 2019 11:32:46 -0600 Subject: [PATCH 2/3] replace TestUntil with AccessSafely in MockInboundStreamInterest (part of issue 3) --- .../wire/fdx/inbound/InboundStreamTest.java | 6 ++-- .../inbound/MockInboundStreamInterest.java | 34 +++++++++++++++---- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/test/java/io/vlingo/wire/fdx/inbound/InboundStreamTest.java b/src/test/java/io/vlingo/wire/fdx/inbound/InboundStreamTest.java index 8606d6a..fdd5a0b 100644 --- a/src/test/java/io/vlingo/wire/fdx/inbound/InboundStreamTest.java +++ b/src/test/java/io/vlingo/wire/fdx/inbound/InboundStreamTest.java @@ -15,8 +15,8 @@ import org.junit.Test; import io.vlingo.actors.Definition; +import io.vlingo.actors.testkit.AccessSafely; import io.vlingo.actors.testkit.TestActor; -import io.vlingo.actors.testkit.TestUntil; import io.vlingo.actors.testkit.TestWorld; import io.vlingo.wire.channel.MockChannelReader; import io.vlingo.wire.message.AbstractMessageTool; @@ -30,16 +30,16 @@ public class InboundStreamTest extends AbstractMessageTool { @Test public void testInbound() throws Exception { - interest.testResults.untilStops = TestUntil.happenings(1); + final AccessSafely handleInboundStreamMessageCalls = interest.testResults.expectHandleInboundStreamMessageTimes(1); while (reader.probeChannelCount.get() == 0) ; inboundStream.actor().stop(); + handleInboundStreamMessageCalls.readFrom("completed"); int count = 0; for (final String message : interest.testResults.messages) { ++count; assertEquals(MockChannelReader.MessagePrefix + count, message); } - interest.testResults.untilStops.completes(); assertTrue(interest.testResults.messageCount.get() > 0); assertEquals(count, reader.probeChannelCount.get()); diff --git a/src/test/java/io/vlingo/wire/fdx/inbound/MockInboundStreamInterest.java b/src/test/java/io/vlingo/wire/fdx/inbound/MockInboundStreamInterest.java index 22e3a62..0f6f76f 100644 --- a/src/test/java/io/vlingo/wire/fdx/inbound/MockInboundStreamInterest.java +++ b/src/test/java/io/vlingo/wire/fdx/inbound/MockInboundStreamInterest.java @@ -11,7 +11,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; -import io.vlingo.actors.testkit.TestUntil; +import io.vlingo.actors.testkit.AccessSafely; import io.vlingo.wire.message.AbstractMessageTool; import io.vlingo.wire.message.RawMessage; import io.vlingo.wire.node.AddressType; @@ -24,15 +24,37 @@ public MockInboundStreamInterest() { } @Override public void handleInboundStreamMessage(final AddressType addressType, final RawMessage message) { final String textMessage = message.asTextMessage(); - testResults.messages.add(textMessage); - testResults.messageCount.incrementAndGet(); - System.out.println("INTEREST: " + textMessage + " list-size: " + testResults.messages.size() + " count: " + testResults.messageCount.get() + " count-down: " + testResults.untilStops.remaining()); - testResults.untilStops.happened(); + System.out.println("INTEREST: " + textMessage + " list-size: " + testResults.messages.size() + + " count: " + testResults.messageCount.get() + + " total-writes: " + testResults.handleInboundStreamMessageCalls.totalWrites()); + testResults.handleInboundStreamMessageCalls.writeUsing("handleInboundStreamMessage", addressType, message); } static class TestResults { public final AtomicInteger messageCount = new AtomicInteger(0); public final List messages = new CopyOnWriteArrayList<>(); - public TestUntil untilStops; + + private AccessSafely handleInboundStreamMessageCalls = AccessSafely.afterCompleting(0); + + /** + * Answer with an AccessSafely which writes addressType, message to "handleInboundStreamMessage" and reads the write count from "completed". + *

+ * Note: Clients can replace the default lambdas with their own via readingWith/writingWith. + * + * @param n Number of times handleInboundStreamMessage(addressType, message) must be called before readFrom(...) will return. + * @return + */ + public AccessSafely expectHandleInboundStreamMessageTimes(final int n) { + handleInboundStreamMessageCalls = AccessSafely.afterCompleting(n) + .writingWith("handleInboundStreamMessage", (addressType, message) -> { + final String textMessage = ((RawMessage) message).asTextMessage(); + messages.add(textMessage); + messageCount.incrementAndGet(); + }) + .readingWith("completed", () -> handleInboundStreamMessageCalls.totalWrites()) + ; + return handleInboundStreamMessageCalls; + } + } } From 929dbb5d318b79a704739385a2386749d67312f3 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 17 Jul 2019 11:33:44 -0600 Subject: [PATCH 3/3] replace TestUntil with AccessSafely in TestRequestChannelConsumer and TestResponseChannelConsumer (part of issue 3) --- .../SocketRequestResponseChannelTest.java | 63 ++++++++++--------- .../TestRequestChannelConsumer.java | 47 +++++++++++--- .../TestResponseChannelConsumer.java | 25 ++++++-- 3 files changed, 92 insertions(+), 43 deletions(-) diff --git a/src/test/java/io/vlingo/wire/fdx/bidirectional/SocketRequestResponseChannelTest.java b/src/test/java/io/vlingo/wire/fdx/bidirectional/SocketRequestResponseChannelTest.java index a176874..764788e 100644 --- a/src/test/java/io/vlingo/wire/fdx/bidirectional/SocketRequestResponseChannelTest.java +++ b/src/test/java/io/vlingo/wire/fdx/bidirectional/SocketRequestResponseChannelTest.java @@ -7,21 +7,22 @@ package io.vlingo.wire.fdx.bidirectional; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.nio.ByteBuffer; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + import io.vlingo.actors.Logger; import io.vlingo.actors.World; -import io.vlingo.actors.testkit.TestUntil; +import io.vlingo.actors.testkit.AccessSafely; import io.vlingo.wire.message.ByteBufferAllocator; import io.vlingo.wire.node.Address; import io.vlingo.wire.node.AddressType; import io.vlingo.wire.node.Host; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.nio.ByteBuffer; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; public class SocketRequestResponseChannelTest { private static final int POOL_SIZE = 100; @@ -43,18 +44,18 @@ public void testBasicRequestResponse() throws Exception { clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength; request(request); - serverConsumer.untilConsume = TestUntil.happenings(1); - clientConsumer.untilConsume = TestUntil.happenings(1); + final AccessSafely serverConsumeCalls = serverConsumer.expectConsumeTimes(1); + final AccessSafely clientConsumeCalls = clientConsumer.expectConsumeTimes(1); - while (serverConsumer.untilConsume.remaining() > 0) { + while (serverConsumeCalls.totalWrites() < 1) { ; } - serverConsumer.untilConsume.completes(); + serverConsumeCalls.readFrom("completed"); - while (clientConsumer.untilConsume.remaining() > 0) { + while (clientConsumeCalls.totalWrites() < 1) { client.probeChannel(); } - clientConsumer.untilConsume.completes(); + clientConsumeCalls.readFrom("completed"); assertFalse(serverConsumer.requests.isEmpty()); assertEquals(1, serverConsumer.consumeCount); @@ -83,18 +84,18 @@ public void testGappyRequestResponse() throws Exception { request(requestPart2); Thread.sleep(200); request(requestPart3); - serverConsumer.untilConsume = TestUntil.happenings(1); - while (serverConsumer.untilConsume.remaining() > 0) { + final AccessSafely serverConsumeCalls = serverConsumer.expectConsumeTimes(1); + while (serverConsumeCalls.totalWrites() < 1) { ; } - serverConsumer.untilConsume.completes(); + serverConsumeCalls.readFrom("completed"); - clientConsumer.untilConsume = TestUntil.happenings(1); - while (clientConsumer.untilConsume.remaining() > 0) { + final AccessSafely clientConsumeCalls = clientConsumer.expectConsumeTimes(1); + while (clientConsumeCalls.totalWrites() < 1) { Thread.sleep(10); client.probeChannel(); } - clientConsumer.untilConsume.completes(); + clientConsumeCalls.readFrom("completed"); assertFalse(serverConsumer.requests.isEmpty()); assertEquals(1, serverConsumer.consumeCount); @@ -114,19 +115,19 @@ public void test10RequestResponse() throws Exception { serverConsumer.currentExpectedRequestLength = request.length() + 1; // digits 0 - 9 clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength; - serverConsumer.untilConsume = TestUntil.happenings(10); - clientConsumer.untilConsume = TestUntil.happenings(10); + final AccessSafely serverConsumeCalls = serverConsumer.expectConsumeTimes(10); + final AccessSafely clientConsumeCalls = clientConsumer.expectConsumeTimes(10); for (int idx = 0; idx < 10; ++idx) { request(request + idx); } - while (clientConsumer.untilConsume.remaining() > 0) { + while (clientConsumeCalls.totalWrites() < 10) { client.probeChannel(); } - serverConsumer.untilConsume.completes(); - clientConsumer.untilConsume.completes(); + serverConsumeCalls.readFrom("completed"); + clientConsumeCalls.readFrom("completed"); assertFalse(serverConsumer.requests.isEmpty()); assertEquals(10, serverConsumer.consumeCount); @@ -150,18 +151,18 @@ public void testThatRequestResponsePoolLimitsNotExceeded() throws Exception { serverConsumer.currentExpectedRequestLength = request.length() + 3; // digits 000 - 999 clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength; - serverConsumer.untilConsume = TestUntil.happenings(TOTAL); - clientConsumer.untilConsume = TestUntil.happenings(TOTAL); + final AccessSafely serverConsumeCalls = serverConsumer.expectConsumeTimes(TOTAL); + final AccessSafely clientConsumeCalls = clientConsumer.expectConsumeTimes(TOTAL); for (int idx = 0; idx < TOTAL; ++idx) { request(request + String.format("%03d", idx)); } - while (clientConsumer.untilConsume.remaining() > 0) { + while (clientConsumeCalls.totalWrites() < TOTAL) { client.probeChannel(); } - serverConsumer.untilConsume.completes(); - clientConsumer.untilConsume.completes(); + serverConsumeCalls.readFrom("completed"); + clientConsumeCalls.readFrom("completed"); assertFalse(serverConsumer.requests.isEmpty()); assertEquals(TOTAL, serverConsumer.consumeCount); diff --git a/src/test/java/io/vlingo/wire/fdx/bidirectional/TestRequestChannelConsumer.java b/src/test/java/io/vlingo/wire/fdx/bidirectional/TestRequestChannelConsumer.java index 48121d5..104d131 100644 --- a/src/test/java/io/vlingo/wire/fdx/bidirectional/TestRequestChannelConsumer.java +++ b/src/test/java/io/vlingo/wire/fdx/bidirectional/TestRequestChannelConsumer.java @@ -10,7 +10,7 @@ import java.util.ArrayList; import java.util.List; -import io.vlingo.actors.testkit.TestUntil; +import io.vlingo.actors.testkit.AccessSafely; import io.vlingo.wire.channel.RequestChannelConsumer; import io.vlingo.wire.channel.RequestResponseContext; import io.vlingo.wire.message.BasicConsumerByteBuffer; @@ -21,15 +21,48 @@ public class TestRequestChannelConsumer implements RequestChannelConsumer { public int currentExpectedRequestLength; public int consumeCount; public List requests = new ArrayList<>(); - public TestUntil untilClosed; - public TestUntil untilConsume; - + + private AccessSafely closeWithCalls = AccessSafely.afterCompleting(0); + private AccessSafely consumeCalls = AccessSafely.afterCompleting(0);; + private StringBuilder requestBuilder = new StringBuilder(); private String remaining = ""; - @Override + /** + * Answer with an AccessSafely which writes context, data to "closeWith" and reads the write count from "completed". + *

+ * Note: Clients can replace the default lambdas with their own via readingWith/writingWith. + * + * @param n Number of times closeWith(context, data) must be called before readFrom(...) will return. + * @return + */ + public AccessSafely expectCloseWithTimes(final int n) { + closeWithCalls = AccessSafely.afterCompleting(n) + .writingWith("closeWith", (context, data) -> {}) + .readingWith("completed", () -> closeWithCalls.totalWrites()) + ; + return closeWithCalls; + } + + /** + * Answer with an AccessSafely which writes context, buffer to "consume" and reads the write count from "completed". + *

+ * Note: Clients can replace the default lambdas with their own via readingWith/writingWith. + * + * @param n Number of times consume(context, buffer) must be called before readFrom(...) will return. + * @return + */ + public AccessSafely expectConsumeTimes(final int n) { + consumeCalls = AccessSafely.afterCompleting(n) + .writingWith("consume", (context, data) -> {}) + .readingWith("completed", () -> consumeCalls.totalWrites()) + ; + return consumeCalls; + } + +@Override public void closeWith(final RequestResponseContext requestResponseContext, final Object data) { - if (untilClosed != null) untilClosed.happened(); + closeWithCalls.writeUsing("closeWith", requestResponseContext, data); } @Override @@ -63,7 +96,7 @@ public void consume(RequestResponseContext context, final ConsumerByteBuffer last = currentIndex == combinedLength; - if (untilConsume != null) untilConsume.happened(); + consumeCalls.writeUsing("consume", context, buffer); } } } diff --git a/src/test/java/io/vlingo/wire/fdx/bidirectional/TestResponseChannelConsumer.java b/src/test/java/io/vlingo/wire/fdx/bidirectional/TestResponseChannelConsumer.java index 480f240..0f476bc 100644 --- a/src/test/java/io/vlingo/wire/fdx/bidirectional/TestResponseChannelConsumer.java +++ b/src/test/java/io/vlingo/wire/fdx/bidirectional/TestResponseChannelConsumer.java @@ -10,7 +10,7 @@ import java.util.ArrayList; import java.util.List; -import io.vlingo.actors.testkit.TestUntil; +import io.vlingo.actors.testkit.AccessSafely; import io.vlingo.wire.channel.ResponseChannelConsumer; import io.vlingo.wire.message.ConsumerByteBuffer; import io.vlingo.wire.message.Converters; @@ -19,10 +19,25 @@ public class TestResponseChannelConsumer implements ResponseChannelConsumer { public int currentExpectedResponseLength; public int consumeCount; public List responses = new ArrayList<>(); - public TestUntil untilConsume; - + + private AccessSafely consumeCalls = AccessSafely.afterCompleting(0); private final StringBuilder responseBuilder = new StringBuilder(); - + /** + * Answer with an AccessSafely which writes buffer to "consume" and reads the write count from "completed". + *

+ * Note: Clients can replace the default lambdas with their own via readingWith/writingWith. + * + * @param n Number of times consume(buffer) must be called before readFrom(...) will return. + * @return + */ + public AccessSafely expectConsumeTimes(final int n) { + consumeCalls = AccessSafely.afterCompleting(n) + .writingWith("consume", buffer -> {}) + .readingWith("completed", () -> consumeCalls.totalWrites()) + ; + return consumeCalls; + } + @Override public void consume(final ConsumerByteBuffer buffer) { final String responsePart = Converters.bytesToText(buffer.array(), 0, buffer.limit()); @@ -46,7 +61,7 @@ public void consume(final ConsumerByteBuffer buffer) { last = currentIndex == combinedLength; - untilConsume.happened(); + consumeCalls.writeUsing("consume", buffer); } } }