Skip to content

Commit

Permalink
provides extra hooks to ensure we capture all discarded elements
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Apr 10, 2020
1 parent c2475c2 commit 626dd81
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.rsocket.core;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;

final class CleanOnClearQueueDecorator implements Queue<Payload> {
final Queue<Payload> delegate;

CleanOnClearQueueDecorator(Queue<Payload> delegate) {
this.delegate = delegate;
}

@Override
public void clear() {
Payload p;
while ((p = delegate.poll()) != null) {
ReferenceCountUtil.safeRelease(p);
}
}

@Override
public int size() {
return delegate.size();
}

@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

@Override
public boolean contains(Object o) {
return delegate.contains(o);
}

@Override
public Iterator<Payload> iterator() {
return delegate.iterator();
}

@Override
public Object[] toArray() {
return delegate.toArray();
}

@Override
public <T> T[] toArray(T[] a) {
return delegate.toArray(a);
}

@Override
public boolean add(Payload payload) {
return delegate.add(payload);
}

@Override
public boolean remove(Object o) {
return delegate.remove(o);
}

@Override
public boolean containsAll(Collection<?> c) {
return delegate.containsAll(c);
}

@Override
public boolean addAll(Collection<? extends Payload> c) {
return delegate.addAll(c);
}

@Override
public boolean removeAll(Collection<?> c) {
return delegate.retainAll(c);
}

@Override
public boolean retainAll(Collection<?> c) {
return delegate.retainAll(c);
}

@Override
public boolean offer(Payload payload) {
return delegate.offer(payload);
}

@Override
public Payload remove() {
return delegate.remove();
}

@Override
public Payload poll() {
return delegate.poll();
}

@Override
public Payload element() {
return delegate.element();
}

@Override
public Payload peek() {
return delegate.peek();
}
}
22 changes: 16 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
Expand Down Expand Up @@ -77,6 +78,8 @@ class RSocketRequester implements RSocket {
AtomicReferenceFieldUpdater.newUpdater(
RSocketRequester.class, Throwable.class, "terminationError");
private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
ReferenceCountUtil::safeRelease;

static {
CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
Expand Down Expand Up @@ -259,7 +262,7 @@ public void doOnTerminal(
});
receivers.put(streamId, receiver);

return receiver;
return receiver.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
}

private Flux<Payload> handleRequestStream(final Payload payload) {
Expand All @@ -277,7 +280,8 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
int streamId = streamIdSupplier.nextStreamId(receivers);

final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
final UnicastProcessor<Payload> receiver =
UnicastProcessor.create(new CleanOnClearQueueDecorator(Queues.<Payload>unbounded().get()));
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);

receivers.put(streamId, receiver);
Expand Down Expand Up @@ -323,7 +327,8 @@ public void accept(long n) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
})
.doFinally(s -> removeStreamReceiver(streamId));
.doFinally(s -> removeStreamReceiver(streamId))
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
}

private Flux<Payload> handleChannel(Flux<Payload> request) {
Expand Down Expand Up @@ -356,7 +361,8 @@ private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Paylo
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);
final int streamId = streamIdSupplier.nextStreamId(receivers);

final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
final UnicastProcessor<Payload> receiver =
UnicastProcessor.create(new CleanOnClearQueueDecorator(Queues.<Payload>unbounded().get()));
final BaseSubscriber<Payload> upstreamSubscriber =
new BaseSubscriber<Payload>() {

Expand Down Expand Up @@ -424,7 +430,10 @@ public void accept(long n) {
senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(upstreamSubscriber);
inboundFlux
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);
if (!payloadReleasedFlag.getAndSet(true)) {
ByteBuf frame =
RequestChannelFrameFlyweight.encode(
Expand Down Expand Up @@ -461,7 +470,8 @@ public void accept(long n) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
upstreamSubscriber.cancel();
}
});
})
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
}

private Mono<Void> handleMetadataPush(Payload payload) {
Expand Down
16 changes: 12 additions & 4 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
Expand All @@ -45,6 +46,8 @@

/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
class RSocketResponder implements ResponderRSocket {
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
ReferenceCountUtil::safeRelease;

private final DuplexConnection connection;
private final RSocket requestHandler;
Expand Down Expand Up @@ -418,7 +421,7 @@ protected void hookFinally(SignalType type) {
};

sendingSubscriptions.put(streamId, subscriber);
response.subscribe(subscriber);
response.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe(subscriber);
}

private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
Expand Down Expand Up @@ -471,11 +474,15 @@ protected void hookFinally(SignalType type) {
};

sendingSubscriptions.put(streamId, subscriber);
response.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(subscriber);
response
.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(subscriber);
}

private void handleChannel(int streamId, Payload payload, int initialRequestN) {
UnicastProcessor<Payload> frames = UnicastProcessor.create();
UnicastProcessor<Payload> frames =
UnicastProcessor.create(new CleanOnClearQueueDecorator(Queues.<Payload>unbounded().get()));
channelProcessors.put(streamId, frames);

Flux<Payload> payloads =
Expand All @@ -499,7 +506,8 @@ public void accept(long l) {
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}
})
.doFinally(signalType -> channelProcessors.remove(streamId));
.doFinally(signalType -> channelProcessors.remove(streamId))
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);

// not chained, as the payload should be enqueued in the Unicast processor before this method
// returns
Expand Down

0 comments on commit 626dd81

Please sign in to comment.