Skip to content

Commit

Permalink
setShared
Browse files Browse the repository at this point in the history
  • Loading branch information
Zizhong Zhang committed Apr 29, 2021
1 parent 57bd2b1 commit b5d2802
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 30 deletions.
14 changes: 12 additions & 2 deletions r2-core/src/main/java/com/linkedin/r2/filter/TimedRestFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TimedRestFilter implements RestFilter
private final TimingKey _onRequestTimingKey;
private final TimingKey _onResponseTimingKey;
private final TimingKey _onErrorTimingKey;
private boolean _shared;

/**
* Registers {@link TimingKey}s for {@link com.linkedin.r2.message.timing.TimingNameConstants#TIMED_REST_FILTER}.
Expand All @@ -63,6 +64,7 @@ public TimedRestFilter(RestFilter restFilter)
_restFilter.getClass().getSimpleName(), TimingImportance.LOW);
_onErrorTimingKey = TimingKey.registerNewKey(timingKeyPrefix + ON_ERROR_SUFFIX + timingKeyPostfix,
_restFilter.getClass().getSimpleName(), TimingImportance.LOW);
_shared = false;
}

@Override
Expand Down Expand Up @@ -94,7 +96,15 @@ public void onRestError(Throwable ex,
_restFilter.onRestError(ex, requestContext, wireAttrs, new TimedNextFilter<>(_onErrorTimingKey, nextFilter));
}

public List<TimingKey> getTimingKeyList() {
return Arrays.asList(_onErrorTimingKey, _onRequestTimingKey, _onResponseTimingKey);
public void setShared() {
_shared = true;
}

public void onShutdown() {
if (!_shared) {
TimingKey.unregisterKey(_onErrorTimingKey);
TimingKey.unregisterKey(_onRequestTimingKey);
TimingKey.unregisterKey(_onResponseTimingKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.timing.TimingImportance;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static com.linkedin.r2.filter.TimedRestFilter.ON_ERROR_SUFFIX;
Expand All @@ -42,6 +40,7 @@ public class TimedStreamFilter implements StreamFilter
private final TimingKey _onRequestTimingKey;
private final TimingKey _onResponseTimingKey;
private final TimingKey _onErrorTimingKey;
private boolean _shared;

/**
* Registers {@link TimingKey}s for {@link com.linkedin.r2.message.timing.TimingNameConstants#TIMED_STREAM_FILTER}.
Expand All @@ -62,6 +61,7 @@ public TimedStreamFilter(StreamFilter streamFilter)
filterClassName, TimingImportance.LOW);
_onErrorTimingKey = TimingKey.registerNewKey(timingKeyPrefix + ON_ERROR_SUFFIX + timingKeyPostfix,
filterClassName, TimingImportance.LOW);
_shared = false;
}

@Override
Expand Down Expand Up @@ -94,7 +94,15 @@ public void onStreamError(Throwable ex,
_streamFilter.onStreamError(ex, requestContext, wireAttrs, new TimedNextFilter<>(_onErrorTimingKey, nextFilter));
}

public List<TimingKey> getTimingKeyList() {
return Arrays.asList(_onErrorTimingKey, _onRequestTimingKey, _onResponseTimingKey);
public void setShared() {
_shared = true;
}

public void onShutdown() {
if (!_shared) {
TimingKey.unregisterKey(_onErrorTimingKey);
TimingKey.unregisterKey(_onRequestTimingKey);
TimingKey.unregisterKey(_onResponseTimingKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class FilterChainClient implements TransportClient
{
private final TransportClient _client;
private final FilterChain _filters;
private final FilterChain _sharedFilters;

/**
* Construct a new instance by composing the specified {@link TransportClient}
Expand All @@ -62,7 +61,7 @@ public class FilterChainClient implements TransportClient
* @param filters the {@link FilterChain} to be composed.
* @param sharedFilters the {@link FilterChain} can be used by other clients.
*/
public FilterChainClient(TransportClient client, FilterChain filters, FilterChain sharedFilters)
public FilterChainClient(TransportClient client, FilterChain filters)
{
_client = client;

Expand All @@ -74,7 +73,6 @@ public FilterChainClient(TransportClient client, FilterChain filters, FilterChai
.addLastRest(requestFilter)
.addFirst(responseFilter)
.addLast(requestFilter);
_sharedFilters = sharedFilters;
}

@Override
Expand Down Expand Up @@ -104,26 +102,11 @@ public void shutdown(Callback<None> callback)
{
_client.shutdown(callback);

List<StreamFilter> streamFilters = _filters.getStreamFilters();
List<RestFilter> restFilters = _filters.getRestFilters();
List<StreamFilter> sharedStreamFilters = _sharedFilters.getStreamFilters();
List<RestFilter> sharedRestFilters = _sharedFilters.getRestFilters();

streamFilters.stream()
.filter(filter -> !sharedStreamFilters.contains(filter))
.filter(TimedStreamFilter.class::isInstance)
.map(TimedStreamFilter.class::cast)
.map(TimedStreamFilter::getTimingKeyList)
.flatMap(Collection::stream)
.forEach(TimingKey::unregisterKey);

restFilters.stream()
.filter(filter -> !sharedRestFilters.contains(filter))
.filter(TimedRestFilter.class::isInstance)
.map(TimedRestFilter.class::cast)
.map(TimedRestFilter::getTimingKeyList)
.flatMap(Collection::stream)
.forEach(TimingKey::unregisterKey);
_filters.getStreamFilters().stream().filter(TimedStreamFilter.class::isInstance)
.map(TimedStreamFilter.class::cast).forEach(TimedStreamFilter::onShutdown);

_filters.getRestFilters().stream().filter(TimedRestFilter.class::isInstance)
.map(TimedRestFilter.class::cast).forEach(TimedRestFilter::onShutdown);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.linkedin.r2.filter.CompressionConfig;
import com.linkedin.r2.filter.FilterChain;
import com.linkedin.r2.filter.FilterChains;
import com.linkedin.r2.filter.TimedRestFilter;
import com.linkedin.r2.filter.TimedStreamFilter;
import com.linkedin.r2.filter.compression.ClientCompressionFilter;
import com.linkedin.r2.filter.compression.ClientCompressionHelper;
import com.linkedin.r2.filter.compression.ClientStreamCompressionFilter;
Expand Down Expand Up @@ -685,6 +687,11 @@ private HttpClientFactory(FilterChain filters,
{
_channelPoolManagerFactory = new ConnectionSharingChannelPoolManagerFactory(_channelPoolManagerFactory);
}

_filters.getStreamFilters().stream().filter(TimedStreamFilter.class::isInstance)
.map(TimedStreamFilter.class::cast).forEach(TimedStreamFilter::setShared);
_filters.getRestFilters().stream().filter(TimedRestFilter.class::isInstance)
.map(TimedRestFilter.class::cast).forEach(TimedRestFilter::setShared);
}

public static class Builder
Expand Down Expand Up @@ -1120,7 +1127,7 @@ private TransportClient getClient(Map<String, ? extends Object> properties,
filters = filters.addLastRest(disruptFilter);
filters = filters.addLast(disruptFilter);

client = new FilterChainClient(client, filters, _filters);
client = new FilterChainClient(client, filters);
client = new FactoryClient(client);
synchronized (_mutex)
{
Expand Down

1 comment on commit b5d2802

@nizarm
Copy link
Contributor

@nizarm nizarm commented on b5d2802 May 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM looks a lot cleaner with the flag-based approach. Thank you for taking care of this.

Please sign in to comment.