Skip to content

Commit

Permalink
provides leaks tracking tests and tooling
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Apr 13, 2020
1 parent 908ab2e commit d761e59
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@ private LeaksTrackingByteBufAllocator(ByteBufAllocator delegate) {
public LeaksTrackingByteBufAllocator assertHasNoLeaks() {
Assertions.assertThat(tracker)
.allSatisfy(buf -> {
ByteBuf unwrap = buf.unwrap();
if (unwrap instanceof CompositeByteBuf) {
if (buf instanceof CompositeByteBuf) {
if (buf.refCnt() > 0) {
List<ByteBuf> decomposed = ((CompositeByteBuf) unwrap).decompose(0, unwrap.readableBytes());
List<ByteBuf> decomposed = ((CompositeByteBuf) buf).decompose(0, buf.readableBytes());
for (int i = 0; i < decomposed.size(); i++) {
Assertions.assertThat(decomposed.get(i))
.matches(bb -> bb.refCnt() == 0, "Got unreleased CompositeByteBuf");
Expand Down
185 changes: 158 additions & 27 deletions rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,14 @@

package io.rsocket.core;

import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
import static io.rsocket.frame.FrameHeaderFlyweight.frameType;
import static io.rsocket.frame.FrameType.CANCEL;
import static io.rsocket.frame.FrameType.KEEPALIVE;
import static io.rsocket.frame.FrameType.REQUEST_CHANNEL;
import static io.rsocket.frame.FrameType.REQUEST_RESPONSE;
import static io.rsocket.frame.FrameType.REQUEST_STREAM;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.exceptions.RejectedSetupException;
import io.rsocket.frame.CancelFrameFlyweight;
Expand All @@ -50,31 +35,59 @@
import io.rsocket.frame.RequestChannelFrameFlyweight;
import io.rsocket.frame.RequestNFrameFlyweight;
import io.rsocket.frame.RequestStreamFrameFlyweight;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.internal.subscriber.AssertSubscriber;
import io.rsocket.lease.RequesterLeaseHandler;
import io.rsocket.test.util.TestSubscriber;
import io.rsocket.util.ByteBufPayload;
import io.rsocket.util.DefaultPayload;
import io.rsocket.util.EmptyPayload;
import io.rsocket.util.MultiSubscriberRSocket;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.runners.model.Statement;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;
import reactor.test.StepVerifier;
import reactor.test.util.RaceTestUtils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE;
import static io.rsocket.frame.FrameHeaderFlyweight.frameType;
import static io.rsocket.frame.FrameType.CANCEL;
import static io.rsocket.frame.FrameType.KEEPALIVE;
import static io.rsocket.frame.FrameType.REQUEST_CHANNEL;
import static io.rsocket.frame.FrameType.REQUEST_RESPONSE;
import static io.rsocket.frame.FrameType.REQUEST_STREAM;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;

public class RSocketRequesterTest {

Expand Down Expand Up @@ -333,6 +346,124 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
.verify();
}


private static Stream<Arguments> racingCases() {
return Stream.of(
Arguments.of(
(Runnable) () -> System.out.println("RequestChannel downstream cancellation case"),
(Function<ClientSocketRule, Publisher<Payload>>) (rule) -> rule.socket.requestChannel(Flux.just(EmptyPayload.INSTANCE)),
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>) (as, rule) -> {
LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
ByteBuf metadata = allocator.buffer();
metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
ByteBuf data = allocator.buffer();
data.writeCharSequence("def", CharsetUtil.UTF_8);
int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
ByteBuf frame = PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, metadata, data);

RaceTestUtils.race(as::cancel, () -> rule.connection.addToReceivedBuffer(frame));
}
)/*,*/
// Arguments.of(
// (Runnable) () -> System.out.println("RequestChannel upstream cancellation 1"),
// (Function<ClientSocketRule, Publisher<Payload>>) (rule) -> {
// LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
// ByteBuf metadata = allocator.buffer();
// metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
// ByteBuf data = allocator.buffer();
// data.writeCharSequence("def", CharsetUtil.UTF_8);
// return rule.socket.requestChannel(Flux.just(ByteBufPayload.create(data, metadata)));
// },
// (BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>) (as, rule) -> {
// LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
// int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
// ByteBuf frame = CancelFrameFlyweight.encode(allocator, streamId);
//
// RaceTestUtils.race(() -> as.request(1), () -> rule.connection.addToReceivedBuffer(frame));
// }
// ),
// Arguments.of(
// (Runnable) () -> System.out.println("RequestChannel upstream cancellation 2"),
// (Function<ClientSocketRule, Publisher<Payload>>) (rule) -> {
// return rule.socket.requestChannel(Flux.just(ByteBufPayload.create("a", "b"), ByteBufPayload.create("c", "d"), ByteBufPayload.create("e", "f")));
// },
// (BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>) (as, rule) -> {
// LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
// int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
// ByteBuf frame = CancelFrameFlyweight.encode(allocator, streamId);
//
// as.request(1);
//
// RaceTestUtils.race(() -> as.request(Long.MAX_VALUE), () -> rule.connection.addToReceivedBuffer(frame));
// }
// ),
// Arguments.of(
// (Runnable) () -> System.out.println("RequestResponse downstream cancellation"),
// (Function<ClientSocketRule, Publisher<Payload>>) (rule) -> rule.socket.requestResponse(EmptyPayload.INSTANCE),
// (BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>) (as, rule) -> {
// LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
// ByteBuf metadata = allocator.buffer();
// metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
// ByteBuf data = allocator.buffer();
// data.writeCharSequence("def", CharsetUtil.UTF_8);
// int streamId = rule.getStreamIdForRequestType(REQUEST_RESPONSE);
// ByteBuf frame = PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, metadata, data);
//
// RaceTestUtils.race(as::cancel, () -> rule.connection.addToReceivedBuffer(frame));
// }
// )
);
}

@Test
@SuppressWarnings("unchecked")
public void checkNoLeaksOnRacingTest() {

racingCases()
.forEach(a -> {
Hooks.onNextDropped(ReferenceCountUtil::safeRelease);
LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
((Runnable)a.get()[0]).run();
checkNoLeaksOnRacing(allocator, (Function<ClientSocketRule, Publisher<Payload>>) a.get()[1], (BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>) a.get()[2]);

Hooks.resetOnNextDropped();
LeaksTrackingByteBufAllocator.deinstrumentDefault();
});
}

public void checkNoLeaksOnRacing(LeaksTrackingByteBufAllocator allocator, Function<ClientSocketRule, Publisher<Payload>> initiator, BiConsumer<AssertSubscriber<Payload>, ClientSocketRule> runner) {
for (int i = 0; i < 100000; i++) {
System.out.println(i);
ClientSocketRule clientSocketRule = new ClientSocketRule();
try {
clientSocketRule.apply(new Statement() {
@Override
public void evaluate() throws Throwable {

}
}, null).evaluate();
} catch (Throwable throwable) {
throwable.printStackTrace();
}

Publisher<Payload> payloadP = initiator.apply(clientSocketRule);
AssertSubscriber<Payload> assertSubscriber = AssertSubscriber.create();

if (payloadP instanceof Flux) {
((Flux<Payload>)payloadP).doOnNext(Payload::release).subscribe(assertSubscriber);
} else {
((Mono<Payload>)payloadP).doOnNext(Payload::release).subscribe(assertSubscriber);
}

runner.accept(assertSubscriber, clientSocketRule);

Assertions.assertThat(clientSocketRule.connection.getSent())
.allMatch(ReferenceCounted::release);

allocator.assertHasNoLeaks();
}
}

static Stream<BiFunction<RSocket, Payload, Publisher<?>>> prepareCalls() {
return Stream.of(
RSocket::fireAndForget,
Expand Down Expand Up @@ -360,7 +491,7 @@ protected RSocketRequester newRSocket() {
return new RSocketRequester(
ByteBufAllocator.DEFAULT,
connection,
DefaultPayload::create,
PayloadDecoder.ZERO_COPY,
throwable -> errors.add(throwable),
StreamIdSupplier.clientSupplier(),
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.IllegalReferenceCountException;
import org.assertj.core.presentation.StandardRepresentation;

public final class ByteBufRepresentation extends StandardRepresentation {

@Override
protected String fallbackToStringOf(Object object) {
if (object instanceof ByteBuf) {
return ByteBufUtil.prettyHexDump((ByteBuf) object);
try {
return ByteBufUtil.prettyHexDump((ByteBuf) object);
} catch (IllegalReferenceCountException e) {
//noops
}
}

return super.fallbackToStringOf(object);
Expand Down

0 comments on commit d761e59

Please sign in to comment.