Skip to content

Commit

Permalink
provides extra hooks to ensure we capture all discarded elements
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
  • Loading branch information
OlegDokuka committed Apr 10, 2020
1 parent c2475c2 commit cba41f0
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package io.rsocket.core;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;

import java.util.Collection;
import java.util.Iterator;
import java.util.Queue;

final class CleanOnClearQueueDecorator implements Queue<Payload> {
final Queue<Payload> delegate;

CleanOnClearQueueDecorator(Queue<Payload> delegate) {
this.delegate = delegate;
}

@Override
public void clear() {
Payload p;
while ((p = delegate.poll()) != null) {
ReferenceCountUtil.safeRelease(p);
}
}

@Override
public int size() {
return delegate.size();
}

@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

@Override
public boolean contains(Object o) {
return delegate.contains(o);
}

@Override
public Iterator<Payload> iterator() {
return delegate.iterator();
}

@Override
public Object[] toArray() {
return delegate.toArray();
}

@Override
public <T> T[] toArray(T[] a) {
return delegate.toArray(a);
}

@Override
public boolean add(Payload payload) {
return delegate.add(payload);
}

@Override
public boolean remove(Object o) {
return delegate.remove(o);
}

@Override
public boolean containsAll(Collection<?> c) {
return delegate.containsAll(c);
}

@Override
public boolean addAll(Collection<? extends Payload> c) {
return delegate.addAll(c);
}

@Override
public boolean removeAll(Collection<?> c) {
return delegate.retainAll(c);
}

@Override
public boolean retainAll(Collection<?> c) {
return delegate.retainAll(c);
}

@Override
public boolean offer(Payload payload) {
return delegate.offer(payload);
}

@Override
public Payload remove() {
return delegate.remove();
}

@Override
public Payload poll() {
return delegate.poll();
}

@Override
public Payload element() {
return delegate.element();
}

@Override
public Payload peek() {
return delegate.peek();
}
}
19 changes: 13 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
Expand Down Expand Up @@ -77,6 +78,7 @@ class RSocketRequester implements RSocket {
AtomicReferenceFieldUpdater.newUpdater(
RSocketRequester.class, Throwable.class, "terminationError");
private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER = ReferenceCountUtil::safeRelease;

static {
CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
Expand Down Expand Up @@ -259,7 +261,8 @@ public void doOnTerminal(
});
receivers.put(streamId, receiver);

return receiver;
return receiver
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
}

private Flux<Payload> handleRequestStream(final Payload payload) {
Expand All @@ -277,7 +280,7 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
int streamId = streamIdSupplier.nextStreamId(receivers);

final UnboundedProcessor<ByteBuf> sendProcessor = this.sendProcessor;
final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
final UnicastProcessor<Payload> receiver = UnicastProcessor.create(new CleanOnClearQueueDecorator(Queues.<Payload>unbounded().get()));
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);

receivers.put(streamId, receiver);
Expand Down Expand Up @@ -323,7 +326,8 @@ public void accept(long n) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
})
.doFinally(s -> removeStreamReceiver(streamId));
.doFinally(s -> removeStreamReceiver(streamId))
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
}

private Flux<Payload> handleChannel(Flux<Payload> request) {
Expand Down Expand Up @@ -356,7 +360,7 @@ private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Paylo
final AtomicBoolean payloadReleasedFlag = new AtomicBoolean(false);
final int streamId = streamIdSupplier.nextStreamId(receivers);

final UnicastProcessor<Payload> receiver = UnicastProcessor.create();
final UnicastProcessor<Payload> receiver = UnicastProcessor.create(new CleanOnClearQueueDecorator(Queues.<Payload>unbounded().get()));
final BaseSubscriber<Payload> upstreamSubscriber =
new BaseSubscriber<Payload>() {

Expand Down Expand Up @@ -424,7 +428,9 @@ public void accept(long n) {
senders.put(streamId, upstreamSubscriber);
receivers.put(streamId, receiver);

inboundFlux.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(upstreamSubscriber);
inboundFlux.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(upstreamSubscriber);
if (!payloadReleasedFlag.getAndSet(true)) {
ByteBuf frame =
RequestChannelFrameFlyweight.encode(
Expand Down Expand Up @@ -461,7 +467,8 @@ public void accept(long n) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
upstreamSubscriber.cancel();
}
});
})
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
}

private Mono<Void> handleMetadataPush(Payload payload) {
Expand Down
15 changes: 11 additions & 4 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.collection.IntObjectMap;
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
Expand All @@ -45,6 +46,7 @@

/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
class RSocketResponder implements ResponderRSocket {
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER = ReferenceCountUtil::safeRelease;

private final DuplexConnection connection;
private final RSocket requestHandler;
Expand Down Expand Up @@ -418,7 +420,9 @@ protected void hookFinally(SignalType type) {
};

sendingSubscriptions.put(streamId, subscriber);
response.subscribe(subscriber);
response
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(subscriber);
}

private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
Expand Down Expand Up @@ -471,11 +475,13 @@ protected void hookFinally(SignalType type) {
};

sendingSubscriptions.put(streamId, subscriber);
response.limitRate(Queues.SMALL_BUFFER_SIZE).subscribe(subscriber);
response.limitRate(Queues.SMALL_BUFFER_SIZE)
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER)
.subscribe(subscriber);
}

private void handleChannel(int streamId, Payload payload, int initialRequestN) {
UnicastProcessor<Payload> frames = UnicastProcessor.create();
UnicastProcessor<Payload> frames = UnicastProcessor.create(new CleanOnClearQueueDecorator(Queues.<Payload>unbounded().get()));
channelProcessors.put(streamId, frames);

Flux<Payload> payloads =
Expand All @@ -499,7 +505,8 @@ public void accept(long l) {
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}
})
.doFinally(signalType -> channelProcessors.remove(streamId));
.doFinally(signalType -> channelProcessors.remove(streamId))
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);

// not chained, as the payload should be enqueued in the Unicast processor before this method
// returns
Expand Down

0 comments on commit cba41f0

Please sign in to comment.