Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completely resolve Issue #3 #8

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,22 @@

package io.vlingo.wire.fdx.bidirectional;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import java.nio.ByteBuffer;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.vlingo.actors.Logger;
import io.vlingo.actors.World;
import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.actors.testkit.AccessSafely;
import io.vlingo.wire.message.ByteBufferAllocator;
import io.vlingo.wire.node.Address;
import io.vlingo.wire.node.AddressType;
import io.vlingo.wire.node.Host;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.nio.ByteBuffer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

public class SocketRequestResponseChannelTest {
private static final int POOL_SIZE = 100;
Expand All @@ -43,18 +44,18 @@ public void testBasicRequestResponse() throws Exception {
clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength;
request(request);

serverConsumer.untilConsume = TestUntil.happenings(1);
clientConsumer.untilConsume = TestUntil.happenings(1);
final AccessSafely serverConsumeCalls = serverConsumer.expectConsumeTimes(1);
final AccessSafely clientConsumeCalls = clientConsumer.expectConsumeTimes(1);

while (serverConsumer.untilConsume.remaining() > 0) {
while (serverConsumeCalls.totalWrites() < 1) {
;
}
serverConsumer.untilConsume.completes();
serverConsumeCalls.readFrom("completed");

while (clientConsumer.untilConsume.remaining() > 0) {
while (clientConsumeCalls.totalWrites() < 1) {
client.probeChannel();
}
clientConsumer.untilConsume.completes();
clientConsumeCalls.readFrom("completed");

assertFalse(serverConsumer.requests.isEmpty());
assertEquals(1, serverConsumer.consumeCount);
Expand Down Expand Up @@ -83,18 +84,18 @@ public void testGappyRequestResponse() throws Exception {
request(requestPart2);
Thread.sleep(200);
request(requestPart3);
serverConsumer.untilConsume = TestUntil.happenings(1);
while (serverConsumer.untilConsume.remaining() > 0) {
final AccessSafely serverConsumeCalls = serverConsumer.expectConsumeTimes(1);
while (serverConsumeCalls.totalWrites() < 1) {
;
}
serverConsumer.untilConsume.completes();
serverConsumeCalls.readFrom("completed");

clientConsumer.untilConsume = TestUntil.happenings(1);
while (clientConsumer.untilConsume.remaining() > 0) {
final AccessSafely clientConsumeCalls = clientConsumer.expectConsumeTimes(1);
while (clientConsumeCalls.totalWrites() < 1) {
Thread.sleep(10);
client.probeChannel();
}
clientConsumer.untilConsume.completes();
clientConsumeCalls.readFrom("completed");

assertFalse(serverConsumer.requests.isEmpty());
assertEquals(1, serverConsumer.consumeCount);
Expand All @@ -114,19 +115,19 @@ public void test10RequestResponse() throws Exception {
serverConsumer.currentExpectedRequestLength = request.length() + 1; // digits 0 - 9
clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength;

serverConsumer.untilConsume = TestUntil.happenings(10);
clientConsumer.untilConsume = TestUntil.happenings(10);
final AccessSafely serverConsumeCalls = serverConsumer.expectConsumeTimes(10);
final AccessSafely clientConsumeCalls = clientConsumer.expectConsumeTimes(10);

for (int idx = 0; idx < 10; ++idx) {
request(request + idx);
}

while (clientConsumer.untilConsume.remaining() > 0) {
while (clientConsumeCalls.totalWrites() < 10) {
client.probeChannel();
}

serverConsumer.untilConsume.completes();
clientConsumer.untilConsume.completes();
serverConsumeCalls.readFrom("completed");
clientConsumeCalls.readFrom("completed");

assertFalse(serverConsumer.requests.isEmpty());
assertEquals(10, serverConsumer.consumeCount);
Expand All @@ -150,18 +151,18 @@ public void testThatRequestResponsePoolLimitsNotExceeded() throws Exception {
serverConsumer.currentExpectedRequestLength = request.length() + 3; // digits 000 - 999
clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength;

serverConsumer.untilConsume = TestUntil.happenings(TOTAL);
clientConsumer.untilConsume = TestUntil.happenings(TOTAL);
final AccessSafely serverConsumeCalls = serverConsumer.expectConsumeTimes(TOTAL);
final AccessSafely clientConsumeCalls = clientConsumer.expectConsumeTimes(TOTAL);

for (int idx = 0; idx < TOTAL; ++idx) {
request(request + String.format("%03d", idx));
}

while (clientConsumer.untilConsume.remaining() > 0) {
while (clientConsumeCalls.totalWrites() < TOTAL) {
client.probeChannel();
}
serverConsumer.untilConsume.completes();
clientConsumer.untilConsume.completes();
serverConsumeCalls.readFrom("completed");
clientConsumeCalls.readFrom("completed");

assertFalse(serverConsumer.requests.isEmpty());
assertEquals(TOTAL, serverConsumer.consumeCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.ArrayList;
import java.util.List;

import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.actors.testkit.AccessSafely;
import io.vlingo.wire.channel.RequestChannelConsumer;
import io.vlingo.wire.channel.RequestResponseContext;
import io.vlingo.wire.message.BasicConsumerByteBuffer;
Expand All @@ -21,15 +21,48 @@ public class TestRequestChannelConsumer implements RequestChannelConsumer {
public int currentExpectedRequestLength;
public int consumeCount;
public List<String> requests = new ArrayList<>();
public TestUntil untilClosed;
public TestUntil untilConsume;


private AccessSafely closeWithCalls = AccessSafely.afterCompleting(0);
private AccessSafely consumeCalls = AccessSafely.afterCompleting(0);;

private StringBuilder requestBuilder = new StringBuilder();
private String remaining = "";

@Override
/**
* Answer with an AccessSafely which writes context, data to "closeWith" and reads the write count from "completed".
* <p>
* Note: Clients can replace the default lambdas with their own via readingWith/writingWith.
*
* @param n Number of times closeWith(context, data) must be called before readFrom(...) will return.
* @return
*/
public AccessSafely expectCloseWithTimes(final int n) {
closeWithCalls = AccessSafely.afterCompleting(n)
.writingWith("closeWith", (context, data) -> {})
.readingWith("completed", () -> closeWithCalls.totalWrites())
;
return closeWithCalls;
}

/**
* Answer with an AccessSafely which writes context, buffer to "consume" and reads the write count from "completed".
* <p>
* Note: Clients can replace the default lambdas with their own via readingWith/writingWith.
*
* @param n Number of times consume(context, buffer) must be called before readFrom(...) will return.
* @return
*/
public AccessSafely expectConsumeTimes(final int n) {
consumeCalls = AccessSafely.afterCompleting(n)
.writingWith("consume", (context, data) -> {})
.readingWith("completed", () -> consumeCalls.totalWrites())
;
return consumeCalls;
}

@Override
public void closeWith(final RequestResponseContext<?> requestResponseContext, final Object data) {
if (untilClosed != null) untilClosed.happened();
closeWithCalls.writeUsing("closeWith", requestResponseContext, data);
}

@Override
Expand Down Expand Up @@ -63,7 +96,7 @@ public void consume(RequestResponseContext<?> context, final ConsumerByteBuffer

last = currentIndex == combinedLength;

if (untilConsume != null) untilConsume.happened();
consumeCalls.writeUsing("consume", context, buffer);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package io.vlingo.wire.fdx.bidirectional;

import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.wire.channel.RequestChannelConsumer;
import io.vlingo.wire.channel.RequestChannelConsumerProvider;

public class TestRequestChannelConsumerProvider implements RequestChannelConsumerProvider {
public TestUntil until;
public RequestChannelConsumer consumer = new TestRequestChannelConsumer();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.ArrayList;
import java.util.List;

import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.actors.testkit.AccessSafely;
import io.vlingo.wire.channel.ResponseChannelConsumer;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.Converters;
Expand All @@ -19,10 +19,25 @@ public class TestResponseChannelConsumer implements ResponseChannelConsumer {
public int currentExpectedResponseLength;
public int consumeCount;
public List<String> responses = new ArrayList<>();
public TestUntil untilConsume;


private AccessSafely consumeCalls = AccessSafely.afterCompleting(0);
private final StringBuilder responseBuilder = new StringBuilder();

/**
* Answer with an AccessSafely which writes buffer to "consume" and reads the write count from "completed".
* <p>
* Note: Clients can replace the default lambdas with their own via readingWith/writingWith.
*
* @param n Number of times consume(buffer) must be called before readFrom(...) will return.
* @return
*/
public AccessSafely expectConsumeTimes(final int n) {
consumeCalls = AccessSafely.afterCompleting(n)
.writingWith("consume", buffer -> {})
.readingWith("completed", () -> consumeCalls.totalWrites())
;
return consumeCalls;
}

@Override
public void consume(final ConsumerByteBuffer buffer) {
final String responsePart = Converters.bytesToText(buffer.array(), 0, buffer.limit());
Expand All @@ -46,7 +61,7 @@ public void consume(final ConsumerByteBuffer buffer) {

last = currentIndex == combinedLength;

untilConsume.happened();
consumeCalls.writeUsing("consume", buffer);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import org.junit.Test;

import io.vlingo.actors.Definition;
import io.vlingo.actors.testkit.AccessSafely;
import io.vlingo.actors.testkit.TestActor;
import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.actors.testkit.TestWorld;
import io.vlingo.wire.channel.MockChannelReader;
import io.vlingo.wire.message.AbstractMessageTool;
Expand All @@ -30,16 +30,16 @@ public class InboundStreamTest extends AbstractMessageTool {

@Test
public void testInbound() throws Exception {
interest.testResults.untilStops = TestUntil.happenings(1);
final AccessSafely handleInboundStreamMessageCalls = interest.testResults.expectHandleInboundStreamMessageTimes(1);
while (reader.probeChannelCount.get() == 0)
;
inboundStream.actor().stop();
handleInboundStreamMessageCalls.readFrom("completed");
int count = 0;
for (final String message : interest.testResults.messages) {
++count;
assertEquals(MockChannelReader.MessagePrefix + count, message);
}
interest.testResults.untilStops.completes();

assertTrue(interest.testResults.messageCount.get() > 0);
assertEquals(count, reader.probeChannelCount.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.actors.testkit.AccessSafely;
import io.vlingo.wire.message.AbstractMessageTool;
import io.vlingo.wire.message.RawMessage;
import io.vlingo.wire.node.AddressType;
Expand All @@ -24,15 +24,37 @@ public MockInboundStreamInterest() { }
@Override
public void handleInboundStreamMessage(final AddressType addressType, final RawMessage message) {
final String textMessage = message.asTextMessage();
testResults.messages.add(textMessage);
testResults.messageCount.incrementAndGet();
System.out.println("INTEREST: " + textMessage + " list-size: " + testResults.messages.size() + " count: " + testResults.messageCount.get() + " count-down: " + testResults.untilStops.remaining());
testResults.untilStops.happened();
System.out.println("INTEREST: " + textMessage + " list-size: " + testResults.messages.size()
+ " count: " + testResults.messageCount.get()
+ " total-writes: " + testResults.handleInboundStreamMessageCalls.totalWrites());
testResults.handleInboundStreamMessageCalls.writeUsing("handleInboundStreamMessage", addressType, message);
}

static class TestResults {
public final AtomicInteger messageCount = new AtomicInteger(0);
public final List<String> messages = new CopyOnWriteArrayList<>();
public TestUntil untilStops;

private AccessSafely handleInboundStreamMessageCalls = AccessSafely.afterCompleting(0);

/**
* Answer with an AccessSafely which writes addressType, message to "handleInboundStreamMessage" and reads the write count from "completed".
* <p>
* Note: Clients can replace the default lambdas with their own via readingWith/writingWith.
*
* @param n Number of times handleInboundStreamMessage(addressType, message) must be called before readFrom(...) will return.
* @return
*/
public AccessSafely expectHandleInboundStreamMessageTimes(final int n) {
handleInboundStreamMessageCalls = AccessSafely.afterCompleting(n)
.writingWith("handleInboundStreamMessage", (addressType, message) -> {
final String textMessage = ((RawMessage) message).asTextMessage();
messages.add(textMessage);
messageCount.incrementAndGet();
})
.readingWith("completed", () -> handleInboundStreamMessageCalls.totalWrites())
;
return handleInboundStreamMessageCalls;
}

}
}