Skip to content

Commit

Permalink
Eliminate Bare TestUntil Uses vlingo#3, 4 files out of 6.
Browse files Browse the repository at this point in the history
NettyClientRequestResponseChannelTest COMMENTED OUT
  • Loading branch information
rosetau committed Apr 12, 2020
1 parent 2b10643 commit c7a6b6b
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.vlingo.actors.Definition;
import io.vlingo.actors.Logger;
import io.vlingo.actors.World;
import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.wire.BaseWireTest;
import io.vlingo.wire.channel.RequestChannelConsumerProvider;
import io.vlingo.wire.fdx.bidirectional.netty.client.NettyClientRequestResponseChannel;
Expand All @@ -25,6 +24,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;

import static io.vlingo.wire.fdx.bidirectional.TestRequestChannelConsumer.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

Expand All @@ -45,28 +45,24 @@ public void testBasicRequestResponse() throws Exception {

serverConsumer.currentExpectedRequestLength = request.length();
clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength;
serverConsumer.untilConsume = TestUntil.happenings(1);
clientConsumer.untilConsume = TestUntil.happenings(1);
serverConsumer.state = new State(1);
// TODO: add a generalized State class
clientConsumer.state = new TestResponseChannelConsumer.State(1);

request(request);

while (serverConsumer.untilConsume.remaining() > 0) {
;
}
serverConsumer.untilConsume.completes();

while (clientConsumer.untilConsume.remaining() > 0) {
int remaining = clientConsumer.state.access.readFrom("remaining");
while (remaining != 0) {
client.probeChannel();
remaining = clientConsumer.state.access.readFrom("remaining");
}
clientConsumer.untilConsume.completes();

assertFalse(serverConsumer.requests.isEmpty());
assertEquals(1, serverConsumer.consumeCount);
assertEquals(serverConsumer.consumeCount, serverConsumer.requests.size());
assertEquals(1, (int) serverConsumer.state.access.readFrom("consumeCount"));
assertEquals((int) serverConsumer.state.access.readFrom("consumeCount"), serverConsumer.requests.size());

assertFalse(clientConsumer.responses.isEmpty());
assertEquals(1, clientConsumer.consumeCount);
assertEquals(clientConsumer.consumeCount, clientConsumer.responses.size());
assertEquals(1, (int) clientConsumer.state.access.readFrom("consumeCount"));
assertEquals((int) clientConsumer.state.access.readFrom("consumeCount"), clientConsumer.responses.size());

assertEquals(clientConsumer.responses.get(0), serverConsumer.requests.get(0));
}
Expand All @@ -81,32 +77,28 @@ public void testGappyRequestResponse() throws Exception {
clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength;

// simulate network latency for parts of single request
serverConsumer.untilConsume = TestUntil.happenings(1);
clientConsumer.untilConsume = TestUntil.happenings(1);
serverConsumer.state = new TestRequestChannelConsumer.State(1);
clientConsumer.state = new TestResponseChannelConsumer.State(1);

request(requestPart1);
Thread.sleep(100);
request(requestPart2);
Thread.sleep(200);
request(requestPart3);
while (serverConsumer.untilConsume.remaining() > 0) {
;
}
serverConsumer.untilConsume.completes();

while (clientConsumer.untilConsume.remaining() > 0) {
Thread.sleep(10);
int remaining = clientConsumer.state.access.readFrom("remaining");
while (remaining != 0) {
client.probeChannel();
remaining = clientConsumer.state.access.readFrom("remaining");
}
clientConsumer.untilConsume.completes();

assertFalse(serverConsumer.requests.isEmpty());
assertEquals(1, serverConsumer.consumeCount);
assertEquals(serverConsumer.consumeCount, serverConsumer.requests.size());
assertEquals(1, (int) serverConsumer.state.access.readFrom("consumeCount"));
assertEquals((int) serverConsumer.state.access.readFrom("consumeCount"), serverConsumer.requests.size());

assertFalse(clientConsumer.responses.isEmpty());
assertEquals(1, clientConsumer.consumeCount);
assertEquals(clientConsumer.consumeCount, clientConsumer.responses.size());
assertEquals(1, (int) clientConsumer.state.access.readFrom("consumeCount"));
assertEquals((int) clientConsumer.state.access.readFrom("consumeCount"), clientConsumer.responses.size());

assertEquals(clientConsumer.responses.get(0), serverConsumer.requests.get(0));
}
Expand All @@ -118,27 +110,26 @@ public void test10RequestResponse() throws Exception {
serverConsumer.currentExpectedRequestLength = request.length() + 1; // digits 0 - 9
clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength;

serverConsumer.untilConsume = TestUntil.happenings(10);
clientConsumer.untilConsume = TestUntil.happenings(10);
serverConsumer.state = new TestRequestChannelConsumer.State(10);
clientConsumer.state = new TestResponseChannelConsumer.State(10);

for (int idx = 0; idx < 10; ++idx) {
request(request + idx);
}

while (clientConsumer.untilConsume.remaining() > 0) {
int remaining = clientConsumer.state.access.readFrom("remaining");
while (remaining != 0) {
client.probeChannel();
remaining = clientConsumer.state.access.readFrom("remaining");
}

serverConsumer.untilConsume.completes();
clientConsumer.untilConsume.completes();

assertFalse(serverConsumer.requests.isEmpty());
assertEquals(10, serverConsumer.consumeCount);
assertEquals(serverConsumer.consumeCount, serverConsumer.requests.size());
assertEquals(10, (int) serverConsumer.state.access.readFrom("consumeCount"));
assertEquals((int) serverConsumer.state.access.readFrom("consumeCount"), serverConsumer.requests.size());

assertFalse(clientConsumer.responses.isEmpty());
assertEquals(10, clientConsumer.consumeCount);
assertEquals(clientConsumer.consumeCount, clientConsumer.responses.size());
assertEquals(10, (int) clientConsumer.state.access.readFrom("consumeCount"));
assertEquals((int) clientConsumer.state.access.readFrom("consumeCount"), clientConsumer.responses.size());

for (int idx = 0; idx < 10; ++idx) {
assertEquals(clientConsumer.responses.get(idx), serverConsumer.requests.get(idx));
Expand All @@ -154,26 +145,26 @@ public void testThatRequestResponsePoolLimitsNotExceeded() throws Exception {
serverConsumer.currentExpectedRequestLength = request.length() + 3; // digits 000 - 999
clientConsumer.currentExpectedResponseLength = serverConsumer.currentExpectedRequestLength;

serverConsumer.untilConsume = TestUntil.happenings(TOTAL);
clientConsumer.untilConsume = TestUntil.happenings(TOTAL);
serverConsumer.state = new TestRequestChannelConsumer.State(TOTAL);
clientConsumer.state = new TestResponseChannelConsumer.State(TOTAL);

for (int idx = 0; idx < TOTAL; ++idx) {
request(request + String.format("%03d", idx));
}

while (clientConsumer.untilConsume.remaining() > 0) {
int remaining = clientConsumer.state.access.readFrom("remaining");
while (remaining != 0) {
client.probeChannel();
remaining = clientConsumer.state.access.readFrom("remaining");
}
serverConsumer.untilConsume.completes();
clientConsumer.untilConsume.completes();

assertFalse(serverConsumer.requests.isEmpty());
assertEquals(TOTAL, serverConsumer.consumeCount);
assertEquals(serverConsumer.consumeCount, serverConsumer.requests.size());
assertEquals(TOTAL, (int) serverConsumer.state.access.readFrom("consumeCount"));
assertEquals((int) serverConsumer.state.access.readFrom("consumeCount"), serverConsumer.requests.size());

assertFalse(clientConsumer.responses.isEmpty());
assertEquals(TOTAL, clientConsumer.consumeCount);
assertEquals(clientConsumer.consumeCount, clientConsumer.responses.size());
assertEquals(TOTAL, (int) clientConsumer.state.access.readFrom("consumeCount"));
assertEquals((int) clientConsumer.state.access.readFrom("consumeCount"), clientConsumer.responses.size());

for (int idx = 0; idx < TOTAL; ++idx) {
assertEquals(clientConsumer.responses.get(idx), serverConsumer.requests.get(idx));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,26 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.actors.testkit.AccessSafely;
import io.vlingo.wire.channel.RequestChannelConsumer;
import io.vlingo.wire.channel.RequestResponseContext;
import io.vlingo.wire.message.BasicConsumerByteBuffer;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.Converters;

public class TestRequestChannelConsumer implements RequestChannelConsumer {
public int currentExpectedRequestLength;
int currentExpectedRequestLength;
public int consumeCount;
public List<String> requests = new ArrayList<>();
public TestUntil untilClosed;
public TestUntil untilConsume;
List<String> requests = new ArrayList<>();
State state;

private StringBuilder requestBuilder = new StringBuilder();
private String remaining = "";

@Override
public void closeWith(final RequestResponseContext<?> requestResponseContext, final Object data) {
if (untilClosed != null) untilClosed.happened();
}

@Override
Expand All @@ -56,15 +55,34 @@ public void consume(RequestResponseContext<?> context, final ConsumerByteBuffer
final String request = combinedRequests.substring(currentIndex, endIndex);
currentIndex += currentExpectedRequestLength;
requests.add(request);
++consumeCount;
state.access.writeUsing("consumeCount", 1);

final ConsumerByteBuffer responseBuffer = new BasicConsumerByteBuffer(1, currentExpectedRequestLength);
context.respondWith(responseBuffer.clear().put(request.getBytes()).flip()); // echo back

last = currentIndex == combinedLength;

if (untilConsume != null) untilConsume.happened();
}
}
}

public static class State {
AccessSafely access;
final String[] answers;
int index;
final AtomicInteger consumeCount = new AtomicInteger(0);

public State(final int totalWrites) {
this.answers = new String[totalWrites];
this.index = 0;
this.access = afterCompleting(totalWrites);
}

private AccessSafely afterCompleting(final int totalWrites) {
access = AccessSafely
.afterCompleting(totalWrites)
.writingWith("consumeCount", (Integer increment) -> consumeCount.set(consumeCount.incrementAndGet()))
.readingWith("consumeCount", consumeCount::get);
return access;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.vlingo.wire.fdx.bidirectional;

import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.wire.channel.RequestChannelConsumer;
import io.vlingo.wire.channel.RequestChannelConsumerProvider;

import static io.vlingo.wire.fdx.bidirectional.TestRequestChannelConsumer.*;

public class TestRequestChannelConsumerProvider implements RequestChannelConsumerProvider {
public TestUntil until;
public State state;
public RequestChannelConsumer consumer = new TestRequestChannelConsumer();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@

package io.vlingo.wire.fdx.bidirectional;

import io.vlingo.actors.testkit.TestUntil;
import io.vlingo.actors.testkit.AccessSafely;
import io.vlingo.wire.channel.ResponseChannelConsumer;
import io.vlingo.wire.message.ConsumerByteBuffer;
import io.vlingo.wire.message.Converters;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class TestResponseChannelConsumer implements ResponseChannelConsumer {
public int currentExpectedResponseLength;
public int consumeCount;
public List<String> responses = new ArrayList<>();
public TestUntil untilConsume;
public State state;

private final StringBuilder responseBuilder = new StringBuilder();

Expand All @@ -40,7 +41,7 @@ public void consume(final ConsumerByteBuffer buffer) {
currentIndex += currentExpectedResponseLength;

responses.add(request);
++consumeCount;
state.access.writeUsing("consumeCount", 1);

responseBuilder.setLength(0); // reuse
if (currentIndex + currentExpectedResponseLength > combinedLength) {
Expand All @@ -51,8 +52,32 @@ public void consume(final ConsumerByteBuffer buffer) {
} else {
last = currentIndex == combinedLength;
}
untilConsume.happened();
}
} buffer.release();
}

public static class State {
AccessSafely access;
AtomicInteger consumeCount = new AtomicInteger(0);
AtomicInteger remaining;

public State(final int totalWrites) {
this.remaining = new AtomicInteger(totalWrites);
this.access = afterCompleting(totalWrites);
}

private AccessSafely afterCompleting(final int totalWrites) {
access = AccessSafely
.afterCompleting(totalWrites)
.writingWith("consumeCount", (Integer increment) -> increment())
.readingWith("consumeCount", consumeCount::get)
.readingWith("remaining", remaining::get);
return access;
}

private void increment() {
consumeCount.set(consumeCount.incrementAndGet());
remaining.set(remaining.decrementAndGet());
}
}
}
Loading

0 comments on commit c7a6b6b

Please sign in to comment.