Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only touch bytebufs when doing advanced logging #1354

Merged
merged 4 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions zuul-core/src/main/java/com/netflix/netty/common/ByteBufUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2022 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.netty.common;

import static io.netty.util.ResourceLeakDetector.Level.ADVANCED;
import static io.netty.util.ResourceLeakDetector.Level.PARANOID;
import com.netflix.zuul.message.ZuulMessage;
import io.netty.util.ReferenceCounted;
import io.netty.util.ResourceLeakDetector;

/**
* ByteBufUtil
*
* @author Arthur Gonigberg
* @since October 20, 2022
*/
public class ByteBufUtil {

private static final boolean isAdvancedLeakDetection =
ResourceLeakDetector.getLevel().ordinal() >= ResourceLeakDetector.Level.ADVANCED.ordinal();

public static void touch(ReferenceCounted byteBuf, String hint, ZuulMessage msg) {
Copy link
Collaborator

@gavinbunney gavinbunney Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worthwhile using a String.format with hint, with vargs param instead of ZuulMessage param, to allow for future logging of other objects if required - would also mean could just have a single touch method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about that too, but String.format has terrible performance.

if (isAdvancedLeakDetection) {
byteBuf.touch(hint + msg);
}
}

public static void touch(ReferenceCounted byteBuf, String hint) {
if (isAdvancedLeakDetection) {
byteBuf.touch(hint);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.netflix.config.CachedDynamicLongProperty;
import com.netflix.config.DynamicBooleanProperty;
import com.netflix.config.DynamicIntegerSetProperty;
import com.netflix.netty.common.ByteBufUtil;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.Filter;
import com.netflix.zuul.context.CommonContextKeys;
Expand Down Expand Up @@ -302,13 +303,13 @@ public HttpContent processContentChunk(final ZuulMessage zuulReq, final HttpCont
if (originConn != null) {
//Connected to origin, stream request body without buffering
proxiedRequestWithoutBuffering = true;
chunk.touch("ProxyEndpoint writing chunk to origin, request: " + zuulReq);
ByteBufUtil.touch(chunk, "ProxyEndpoint writing chunk to origin, request: ", zuulReq);
originConn.getChannel().writeAndFlush(chunk);
return null;
}

//Not connected to origin yet, let caller buffer the request body
chunk.touch("ProxyEndpoint buffering chunk to origin, request: " + zuulReq);
ByteBufUtil.touch(chunk, "ProxyEndpoint buffering chunk to origin, request: ", zuulReq);
return chunk;
}

Expand Down Expand Up @@ -337,10 +338,10 @@ private void filterResponse(final HttpResponseMessage zuulResponse) {

public void invokeNext(final HttpContent chunk) {
try {
chunk.touch("ProxyEndpoint received chunk from origin, request: " + zuulRequest);
ByteBufUtil.touch(chunk, "ProxyEndpoint received chunk from origin, request: ", zuulRequest);
methodBinding.bind(() -> filterResponseChunk(chunk));
} catch (Exception ex) {
chunk.touch("ProxyEndpoint exception processing chunk from origin, request: " + zuulRequest);
ByteBufUtil.touch(chunk, "ProxyEndpoint exception processing chunk from origin, request: ", zuulRequest);
unlinkFromOrigin();
LOG.error("Error in invokeNext content", ex);
channelCtx.fireExceptionCaught(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Strings;
import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.netty.common.ByteBufUtil;
import com.netflix.zuul.context.SessionContext;
import com.netflix.zuul.filters.ZuulFilter;
import com.netflix.zuul.message.http.HttpHeaderNames;
Expand Down Expand Up @@ -100,10 +101,10 @@ public boolean hasCompleteBody() {
@Override
public void bufferBodyContents(final HttpContent chunk) {
setHasBody(true);
chunk.touch("ZuulMessage buffering body content.");
ByteBufUtil.touch(chunk, "ZuulMessage buffering body content.");
bodyChunks.add(chunk);
if (chunk instanceof LastHttpContent) {
chunk.touch("ZuulMessage buffering body content complete.");
ByteBufUtil.touch(chunk, "ZuulMessage buffering body content complete.");
bodyBufferedCompletely = true;
}
}
Expand Down Expand Up @@ -200,7 +201,7 @@ public boolean finishBufferedBodyIfIncomplete() {
public void disposeBufferedBody() {
bodyChunks.forEach(chunk -> {
if ((chunk != null) && (chunk.refCnt() > 0)) {
chunk.touch("ZuulMessage disposing buffered body");
ByteBufUtil.touch(chunk, "ZuulMessage disposing buffered body");
chunk.release();
}
});
Expand All @@ -213,9 +214,9 @@ public void runBufferedBodyContentThroughFilter(ZuulFilter<?, ?> filter) {
// original chunk passed in as is without any processing
for (int i=0; i < bodyChunks.size(); i++) {
final HttpContent origChunk = bodyChunks.get(i);
origChunk.touch("ZuulMessage processing chunk, filter: " + filter.filterName());
ByteBufUtil.touch(origChunk, "ZuulMessage processing chunk, filter: " + filter.filterName());
final HttpContent filteredChunk = filter.processContentChunk(this, origChunk);
filteredChunk.touch("ZuulMessage processing filteredChunk, filter: " + filter.filterName());
ByteBufUtil.touch(filteredChunk, "ZuulMessage processing filteredChunk, filter: " + filter.filterName());
if ((filteredChunk != null) && (filteredChunk != origChunk)) {
//filter actually did some processing, set the new chunk in and release the old chunk.
bodyChunks.set(i, filteredChunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.base.Strings;
import com.netflix.config.DynamicStringProperty;
import com.netflix.netty.common.ByteBufUtil;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.impl.Preconditions;
import com.netflix.zuul.FilterLoader;
Expand Down Expand Up @@ -134,10 +135,10 @@ public void filter(final HttpRequestMessage zuulReq, final HttpContent chunk) {
getEndpoint(zuulReq), "endpoint");
endpointName = endpoint.filterName();

chunk.touch("Endpoint processing chunk, ZuulMessage: " + zuulReq);
ByteBufUtil.touch(chunk, "Endpoint processing chunk, ZuulMessage: ", zuulReq);
final HttpContent newChunk = endpoint.processContentChunk(zuulReq, chunk);
if (newChunk != null) {
newChunk.touch("Endpoint buffering newChunk, ZuulMessage: " + zuulReq);
ByteBufUtil.touch(newChunk, "Endpoint buffering newChunk, ZuulMessage: ", zuulReq);
//Endpoints do not directly forward content chunks to next stage in the filter chain.
zuulReq.bufferBodyContents(newChunk);

Expand All @@ -148,7 +149,7 @@ public void filter(final HttpRequestMessage zuulReq, final HttpContent chunk) {

if (isFilterAwaitingBody(zuulReq) && zuulReq.hasCompleteBody() && !(endpoint instanceof ProxyEndpoint)) {
//whole body has arrived, resume filter chain
newChunk.touch("Endpoint body complete, resume chain, ZuulMessage: " + zuulReq);
ByteBufUtil.touch(newChunk, "Endpoint body complete, resume chain, ZuulMessage: ", zuulReq);
invokeNextStage(filter(endpoint, zuulReq));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.zuul.netty.filter;

import com.netflix.netty.common.ByteBufUtil;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.impl.Preconditions;
import com.netflix.zuul.FilterUsageNotifier;
Expand Down Expand Up @@ -108,15 +109,15 @@ public void filter(T inMesg, HttpContent chunk) {
final ZuulFilter<T, T> filter = filters[i];
filterName = filter.filterName();
if ((! filter.isDisabled()) && (! shouldSkipFilter(inMesg, filter))) {
chunk.touch("Filter runner processing chunk, filter: " + filter.filterName());
ByteBufUtil.touch(chunk, "Filter runner processing chunk, filter: " + filter.filterName());
final HttpContent newChunk = filter.processContentChunk(inMesg, chunk);
if (newChunk == null) {
//Filter wants to break the chain and stop propagating this chunk any further
return;
}
//deallocate original chunk if necessary
if ((newChunk != chunk) && (chunk.refCnt() > 0)) {
chunk.touch("Filter runner processing newChunk, filter: " + filter.filterName());
ByteBufUtil.touch(chunk, "Filter runner processing newChunk, filter: " + filter.filterName());
chunk.release(chunk.refCnt());
}
chunk = newChunk;
Expand All @@ -125,10 +126,10 @@ public void filter(T inMesg, HttpContent chunk) {

if (limit >= filters.length) {
//Filter chain has run to end, pass down the channel pipeline
chunk.touch("Filter runner chain complete, message: " + inMesg);
ByteBufUtil.touch(chunk, "Filter runner chain complete, message: ", inMesg);
invokeNextStage(inMesg, chunk);
} else {
chunk.touch("Filter runner buffering chunk, message: " + inMesg);
ByteBufUtil.touch(chunk, "Filter runner buffering chunk, message: ", inMesg);
inMesg.bufferBodyContents(chunk);

boolean isAwaitingBody = isFilterAwaitingBody(inMesg);
Expand All @@ -154,7 +155,7 @@ public void filter(T inMesg, HttpContent chunk) {

if (isAwaitingBody && inMesg.hasCompleteBody()) {
//whole body has arrived, resume filter chain
chunk.touch("Filter body complete, resume chain, ZuulMessage: " + inMesg);
ByteBufUtil.touch(chunk, "Filter body complete, resume chain, ZuulMessage: ", inMesg);
runFilters(inMesg, runningFilterIdx);
}
}
Expand Down