From 30c171674a209ab88bcb3c4a071f2dbb2931b578 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 21 May 2021 16:26:39 +0300 Subject: [PATCH 1/4] Fix memory leak on netty --- .../api/caching/CacheBuilder.java | 10 +++ .../netty/common/FutureListenerWrappers.java | 67 +++++++++---------- .../common/NettyFutureInstrumentation.java | 31 +++------ 3 files changed, 50 insertions(+), 58 deletions(-) diff --git a/instrumentation-api-caching/src/main/java/io/opentelemetry/instrumentation/api/caching/CacheBuilder.java b/instrumentation-api-caching/src/main/java/io/opentelemetry/instrumentation/api/caching/CacheBuilder.java index 4c6602942e30..3c7a61e8b3b8 100644 --- a/instrumentation-api-caching/src/main/java/io/opentelemetry/instrumentation/api/caching/CacheBuilder.java +++ b/instrumentation-api-caching/src/main/java/io/opentelemetry/instrumentation/api/caching/CacheBuilder.java @@ -14,6 +14,7 @@ public final class CacheBuilder { private static final long UNSET = -1; private boolean weakKeys; + private boolean weakValues; private long maximumSize = UNSET; private Executor executor = null; @@ -32,6 +33,12 @@ public CacheBuilder setWeakKeys() { return this; } + /** Sets that values should be referenced weakly. */ + public CacheBuilder setWeakValues() { + this.weakValues = true; + return this; + } + // Visible for testing CacheBuilder setExecutor(Executor executor) { this.executor = executor; @@ -47,6 +54,9 @@ public Cache build() { if (weakKeys) { caffeine.weakKeys(); } + if (weakValues) { + caffeine.weakValues(); + } if (maximumSize != UNSET) { caffeine.maximumSize(maximumSize); } diff --git a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java index 12a864fef4af..e69752a8fe6d 100644 --- a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java +++ b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java @@ -11,55 +11,50 @@ import io.netty.util.concurrent.ProgressiveFuture; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; -import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import io.opentelemetry.instrumentation.api.caching.Cache; public final class FutureListenerWrappers { + private static final Cache< + GenericFutureListener>, GenericFutureListener>> + wrappers = Cache.newBuilder().setWeakKeys().setWeakValues().build(); + @SuppressWarnings("unchecked") - public static GenericFutureListener> wrap( - ContextStore contextStore, - Context context, - GenericFutureListener> delegate) { + public static GenericFutureListener wrap( + Context context, GenericFutureListener> delegate) { if (delegate instanceof WrappedFutureListener || delegate instanceof WrappedProgressiveFutureListener) { return delegate; } - return (GenericFutureListener>) - contextStore.putIfAbsent( - delegate, - () -> { - if (delegate instanceof GenericProgressiveFutureListener) { - return new WrappedProgressiveFutureListener( - context, - (GenericProgressiveFutureListener>) delegate); - } else { - return new WrappedFutureListener( - context, (GenericFutureListener>) delegate); - } - }); + return wrappers.computeIfAbsent( + delegate, + (key) -> { + if (delegate instanceof GenericProgressiveFutureListener) { + return new WrappedProgressiveFutureListener( + context, (GenericProgressiveFutureListener>) delegate); + } else { + return new WrappedFutureListener(context, (GenericFutureListener>) delegate); + } + }); } - public static GenericFutureListener> getWrapper( - ContextStore contextStore, - GenericFutureListener> delegate) { - GenericFutureListener> wrapper = - (GenericFutureListener>) contextStore.get(delegate); + public static GenericFutureListener> getWrapper( + GenericFutureListener> delegate) { + GenericFutureListener> wrapper = wrappers.get(delegate); return wrapper == null ? delegate : wrapper; } - private static final class WrappedFutureListener - implements GenericFutureListener> { + private static final class WrappedFutureListener implements GenericFutureListener> { private final Context context; - private final GenericFutureListener> delegate; + private final GenericFutureListener> delegate; - private WrappedFutureListener( - Context context, GenericFutureListener> delegate) { + private WrappedFutureListener(Context context, GenericFutureListener> delegate) { this.context = context; this.delegate = delegate; } @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { try (Scope ignored = context.makeCurrent()) { delegate.operationComplete(future); } @@ -67,29 +62,27 @@ public void operationComplete(Future future) throws Exception { } private static final class WrappedProgressiveFutureListener - implements GenericProgressiveFutureListener> { + implements GenericProgressiveFutureListener> { private final Context context; - private final GenericProgressiveFutureListener> delegate; + private final GenericProgressiveFutureListener> delegate; private WrappedProgressiveFutureListener( - Context context, - GenericProgressiveFutureListener> delegate) { + Context context, GenericProgressiveFutureListener> delegate) { this.context = context; this.delegate = delegate; } @Override - public void operationProgressed( - ProgressiveFuture progressiveFuture, long l, long l1) throws Exception { + public void operationProgressed(ProgressiveFuture progressiveFuture, long l, long l1) + throws Exception { try (Scope ignored = context.makeCurrent()) { delegate.operationProgressed(progressiveFuture, l, l1); } } @Override - public void operationComplete(ProgressiveFuture progressiveFuture) - throws Exception { + public void operationComplete(ProgressiveFuture progressiveFuture) throws Exception { try (Scope ignored = context.makeCurrent()) { delegate.operationComplete(progressiveFuture); } diff --git a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/NettyFutureInstrumentation.java b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/NettyFutureInstrumentation.java index 80f6af7a7ca5..35199955ce8d 100644 --- a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/NettyFutureInstrumentation.java +++ b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/NettyFutureInstrumentation.java @@ -17,8 +17,6 @@ import io.opentelemetry.context.Context; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import io.opentelemetry.javaagent.instrumentation.api.ContextStore; -import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.type.TypeDescription; @@ -60,11 +58,8 @@ public static class AddListenerAdvice { @Advice.OnMethodEnter public static void wrapListener( @Advice.Argument(value = 0, readOnly = false) - GenericFutureListener> listener) { - ContextStore contextStore = - InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class); - listener = - FutureListenerWrappers.wrap(contextStore, Java8BytecodeBridge.currentContext(), listener); + GenericFutureListener> listener) { + listener = FutureListenerWrappers.wrap(Java8BytecodeBridge.currentContext(), listener); } } @@ -72,16 +67,14 @@ public static class AddListenersAdvice { @Advice.OnMethodEnter public static void wrapListener( @Advice.Argument(value = 0, readOnly = false) - GenericFutureListener>[] listeners) { + GenericFutureListener>[] listeners) { - ContextStore contextStore = - InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class); Context context = Java8BytecodeBridge.currentContext(); @SuppressWarnings("unchecked") - GenericFutureListener>[] wrappedListeners = + GenericFutureListener>[] wrappedListeners = new GenericFutureListener[listeners.length]; for (int i = 0; i < listeners.length; ++i) { - wrappedListeners[i] = FutureListenerWrappers.wrap(contextStore, context, listeners[i]); + wrappedListeners[i] = FutureListenerWrappers.wrap(context, listeners[i]); } listeners = wrappedListeners; } @@ -91,10 +84,8 @@ public static class RemoveListenerAdvice { @Advice.OnMethodEnter public static void wrapListener( @Advice.Argument(value = 0, readOnly = false) - GenericFutureListener> listener) { - ContextStore contextStore = - InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class); - listener = FutureListenerWrappers.getWrapper(contextStore, listener); + GenericFutureListener> listener) { + listener = FutureListenerWrappers.getWrapper(listener); } } @@ -102,15 +93,13 @@ public static class RemoveListenersAdvice { @Advice.OnMethodEnter public static void wrapListener( @Advice.Argument(value = 0, readOnly = false) - GenericFutureListener>[] listeners) { + GenericFutureListener>[] listeners) { - ContextStore contextStore = - InstrumentationContext.get(GenericFutureListener.class, GenericFutureListener.class); @SuppressWarnings("unchecked") - GenericFutureListener>[] wrappedListeners = + GenericFutureListener>[] wrappedListeners = new GenericFutureListener[listeners.length]; for (int i = 0; i < listeners.length; ++i) { - wrappedListeners[i] = FutureListenerWrappers.getWrapper(contextStore, listeners[i]); + wrappedListeners[i] = FutureListenerWrappers.getWrapper(listeners[i]); } listeners = wrappedListeners; } From 6e1207b2e714cddb650ec9edb1a6b351cfebabc6 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 21 May 2021 17:11:49 +0300 Subject: [PATCH 2/4] add comment --- .../instrumentation/netty/common/FutureListenerWrappers.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java index e69752a8fe6d..12ebc6311e2a 100644 --- a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java +++ b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java @@ -14,6 +14,11 @@ import io.opentelemetry.instrumentation.api.caching.Cache; public final class FutureListenerWrappers { + // Instead of ContextStore use Cache with weak keys and weak values to store link between original + // listener and wrapper. ContextStore works fine when wrapper is stored in a field on original + // listener, but when listener class is a lambda instead of field it gets stored in a map with + // weak keys where original listener is key and wrapper is value. As wrapper has a strong + // reference to original listener this causes a memory leak. private static final Cache< GenericFutureListener>, GenericFutureListener>> wrappers = Cache.newBuilder().setWeakKeys().setWeakValues().build(); From 2988ba935fbd838f48f03441e7418f6d7f7f0456 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Fri, 21 May 2021 23:17:02 +0300 Subject: [PATCH 3/4] imporve comment as per review --- .../instrumentation/netty/common/FutureListenerWrappers.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java index 12ebc6311e2a..43b0b8c54742 100644 --- a/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java +++ b/instrumentation/netty/netty-4-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/netty/common/FutureListenerWrappers.java @@ -19,6 +19,9 @@ public final class FutureListenerWrappers { // listener, but when listener class is a lambda instead of field it gets stored in a map with // weak keys where original listener is key and wrapper is value. As wrapper has a strong // reference to original listener this causes a memory leak. + // Also note that it's ok if the value is collected prior to the key, since this cache is only + // used to remove the wrapped listener from the netty future, and if the value is collected prior + // to the key, that means it's no longer used (referenced) by the netty future anyways. private static final Cache< GenericFutureListener>, GenericFutureListener>> wrappers = Cache.newBuilder().setWeakKeys().setWeakValues().build(); From 46dd8d2b44c4a8d80b06639a22fe8d2265a03ffc Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Mon, 24 May 2021 14:05:37 +0300 Subject: [PATCH 4/4] use caffeine when weak values is set --- .../github/benmanes/caffeine/cache/CacheImplementations.java | 4 ++++ .../instrumentation/api/caching/CacheBuilder.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/instrumentation-api-caching/src/main/java/com/github/benmanes/caffeine/cache/CacheImplementations.java b/instrumentation-api-caching/src/main/java/com/github/benmanes/caffeine/cache/CacheImplementations.java index ce6fa7eb27ce..f48459eb550b 100644 --- a/instrumentation-api-caching/src/main/java/com/github/benmanes/caffeine/cache/CacheImplementations.java +++ b/instrumentation-api-caching/src/main/java/com/github/benmanes/caffeine/cache/CacheImplementations.java @@ -21,5 +21,9 @@ final class CacheImplementations { WSMS wsms; // cache FSMS fsms; // node + // Weak keys, weak values + WI wi; // cache + FW fw; // node + private CacheImplementations() {} } diff --git a/instrumentation-api-caching/src/main/java/io/opentelemetry/instrumentation/api/caching/CacheBuilder.java b/instrumentation-api-caching/src/main/java/io/opentelemetry/instrumentation/api/caching/CacheBuilder.java index 3c7a61e8b3b8..406b5ec33b4e 100644 --- a/instrumentation-api-caching/src/main/java/io/opentelemetry/instrumentation/api/caching/CacheBuilder.java +++ b/instrumentation-api-caching/src/main/java/io/opentelemetry/instrumentation/api/caching/CacheBuilder.java @@ -47,7 +47,7 @@ CacheBuilder setExecutor(Executor executor) { /** Returns a new {@link Cache} with the settings of this {@link CacheBuilder}. */ public Cache build() { - if (weakKeys && maximumSize == UNSET) { + if (weakKeys && !weakValues && maximumSize == UNSET) { return new WeakLockFreeCache<>(); } Caffeine caffeine = Caffeine.newBuilder();