Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak on netty #3059

Merged
merged 4 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public final class CacheBuilder {
private static final long UNSET = -1;

private boolean weakKeys;
private boolean weakValues;
private long maximumSize = UNSET;
private Executor executor = null;

Expand All @@ -32,6 +33,12 @@ public CacheBuilder setWeakKeys() {
return this;
}

/** Sets that values should be referenced weakly. */
public CacheBuilder setWeakValues() {
this.weakValues = true;
return this;
}

// Visible for testing
CacheBuilder setExecutor(Executor executor) {
this.executor = executor;
Expand All @@ -47,6 +54,9 @@ public <K, V> Cache<K, V> build() {
if (weakKeys) {
caffeine.weakKeys();
}
if (weakValues) {
caffeine.weakValues();
}
if (maximumSize != UNSET) {
caffeine.maximumSize(maximumSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,85 +11,78 @@
import io.netty.util.concurrent.ProgressiveFuture;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.instrumentation.api.caching.Cache;

public final class FutureListenerWrappers {
private static final Cache<
GenericFutureListener<? extends Future<?>>, GenericFutureListener<? extends Future<?>>>
wrappers = Cache.newBuilder().setWeakKeys().setWeakValues().build();

@SuppressWarnings("unchecked")
public static GenericFutureListener<? extends Future<? super Void>> wrap(
ContextStore<GenericFutureListener, GenericFutureListener> contextStore,
Context context,
GenericFutureListener<? extends Future<? super Void>> delegate) {
public static GenericFutureListener<?> wrap(
Context context, GenericFutureListener<? extends Future<?>> delegate) {
if (delegate instanceof WrappedFutureListener
|| delegate instanceof WrappedProgressiveFutureListener) {
return delegate;
}
return (GenericFutureListener<? extends Future<? super Void>>)
contextStore.putIfAbsent(
delegate,
() -> {
if (delegate instanceof GenericProgressiveFutureListener) {
return new WrappedProgressiveFutureListener(
context,
(GenericProgressiveFutureListener<ProgressiveFuture<? super Void>>) delegate);
} else {
return new WrappedFutureListener(
context, (GenericFutureListener<Future<? super Void>>) delegate);
}
});
return wrappers.computeIfAbsent(
delegate,
(key) -> {
if (delegate instanceof GenericProgressiveFutureListener) {
return new WrappedProgressiveFutureListener(
context, (GenericProgressiveFutureListener<ProgressiveFuture<?>>) delegate);
} else {
return new WrappedFutureListener(context, (GenericFutureListener<Future<?>>) delegate);
}
});
}

public static GenericFutureListener<? extends Future<? super Void>> getWrapper(
ContextStore<GenericFutureListener, GenericFutureListener> contextStore,
GenericFutureListener<? extends Future<? super Void>> delegate) {
GenericFutureListener<? extends Future<? super Void>> wrapper =
(GenericFutureListener<? extends Future<? super Void>>) contextStore.get(delegate);
public static GenericFutureListener<? extends Future<?>> getWrapper(
GenericFutureListener<? extends Future<?>> delegate) {
GenericFutureListener<? extends Future<?>> wrapper = wrappers.get(delegate);
return wrapper == null ? delegate : wrapper;
}

private static final class WrappedFutureListener
implements GenericFutureListener<Future<? super Void>> {
private static final class WrappedFutureListener implements GenericFutureListener<Future<?>> {

private final Context context;
private final GenericFutureListener<Future<? super Void>> delegate;
private final GenericFutureListener<Future<?>> delegate;

private WrappedFutureListener(
Context context, GenericFutureListener<Future<? super Void>> delegate) {
private WrappedFutureListener(Context context, GenericFutureListener<Future<?>> delegate) {
this.context = context;
this.delegate = delegate;
}

@Override
public void operationComplete(Future<? super Void> future) throws Exception {
public void operationComplete(Future<?> future) throws Exception {
try (Scope ignored = context.makeCurrent()) {
delegate.operationComplete(future);
}
}
}

private static final class WrappedProgressiveFutureListener
implements GenericProgressiveFutureListener<ProgressiveFuture<? super Void>> {
implements GenericProgressiveFutureListener<ProgressiveFuture<?>> {

private final Context context;
private final GenericProgressiveFutureListener<ProgressiveFuture<? super Void>> delegate;
private final GenericProgressiveFutureListener<ProgressiveFuture<?>> delegate;

private WrappedProgressiveFutureListener(
Context context,
GenericProgressiveFutureListener<ProgressiveFuture<? super Void>> delegate) {
Context context, GenericProgressiveFutureListener<ProgressiveFuture<?>> delegate) {
this.context = context;
this.delegate = delegate;
}

@Override
public void operationProgressed(
ProgressiveFuture<? super Void> progressiveFuture, long l, long l1) throws Exception {
public void operationProgressed(ProgressiveFuture<?> progressiveFuture, long l, long l1)
throws Exception {
try (Scope ignored = context.makeCurrent()) {
delegate.operationProgressed(progressiveFuture, l, l1);
}
}

@Override
public void operationComplete(ProgressiveFuture<? super Void> progressiveFuture)
throws Exception {
public void operationComplete(ProgressiveFuture<?> progressiveFuture) throws Exception {
try (Scope ignored = context.makeCurrent()) {
delegate.operationComplete(progressiveFuture);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -60,28 +58,23 @@ public static class AddListenerAdvice {
@Advice.OnMethodEnter
public static void wrapListener(
@Advice.Argument(value = 0, readOnly = false)
GenericFutureListener<? extends Future<? super Void>> listener) {
ContextStore<GenericFutureListener, GenericFutureListener> contextStore =
InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class);
listener =
FutureListenerWrappers.wrap(contextStore, Java8BytecodeBridge.currentContext(), listener);
GenericFutureListener<? extends Future<?>> listener) {
listener = FutureListenerWrappers.wrap(Java8BytecodeBridge.currentContext(), listener);
}
}

public static class AddListenersAdvice {
@Advice.OnMethodEnter
public static void wrapListener(
@Advice.Argument(value = 0, readOnly = false)
GenericFutureListener<? extends Future<? super Void>>[] listeners) {
GenericFutureListener<? extends Future<?>>[] listeners) {

ContextStore<GenericFutureListener, GenericFutureListener> contextStore =
InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class);
Context context = Java8BytecodeBridge.currentContext();
@SuppressWarnings("unchecked")
GenericFutureListener<? extends Future<? super Void>>[] wrappedListeners =
GenericFutureListener<? extends Future<?>>[] wrappedListeners =
new GenericFutureListener[listeners.length];
for (int i = 0; i < listeners.length; ++i) {
wrappedListeners[i] = FutureListenerWrappers.wrap(contextStore, context, listeners[i]);
wrappedListeners[i] = FutureListenerWrappers.wrap(context, listeners[i]);
}
listeners = wrappedListeners;
}
Expand All @@ -91,26 +84,22 @@ public static class RemoveListenerAdvice {
@Advice.OnMethodEnter
public static void wrapListener(
@Advice.Argument(value = 0, readOnly = false)
GenericFutureListener<? extends Future<? super Void>> listener) {
ContextStore<GenericFutureListener, GenericFutureListener> contextStore =
InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class);
listener = FutureListenerWrappers.getWrapper(contextStore, listener);
GenericFutureListener<? extends Future<?>> listener) {
listener = FutureListenerWrappers.getWrapper(listener);
}
}

public static class RemoveListenersAdvice {
@Advice.OnMethodEnter
public static void wrapListener(
@Advice.Argument(value = 0, readOnly = false)
GenericFutureListener<? extends Future<? super Void>>[] listeners) {
GenericFutureListener<? extends Future<?>>[] listeners) {

ContextStore<GenericFutureListener, GenericFutureListener> contextStore =
InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class);
@SuppressWarnings("unchecked")
GenericFutureListener<? extends Future<? super Void>>[] wrappedListeners =
GenericFutureListener<? extends Future<?>>[] wrappedListeners =
new GenericFutureListener[listeners.length];
for (int i = 0; i < listeners.length; ++i) {
wrappedListeners[i] = FutureListenerWrappers.getWrapper(contextStore, listeners[i]);
wrappedListeners[i] = FutureListenerWrappers.getWrapper(listeners[i]);
}
listeners = wrappedListeners;
}
Expand Down