From 7517e3a7bddfc211e3c55fb4463c532d8c330a47 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 24 Jan 2019 08:39:24 -0500 Subject: [PATCH] Optimize warning header de-duplication (#37725) Now that warning headers no longer contain a timestamp of when the warning was generated, we no longer need to extract the warning value from the warning to determine whether or not the warning value is duplicated. Instead, we can compare strings directly. Further, when de-duplicating warning headers, are constantly rebuilding sets. Instead of doing that, we can carry about the set with us and rebuild it if we find a new warning value. This commit applies both of these optimizations. --- .../common/logging/DeprecationLogger.java | 2 +- .../common/util/concurrent/ThreadContext.java | 107 +++++++++++++----- 2 files changed, 82 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/logging/DeprecationLogger.java b/server/src/main/java/org/elasticsearch/common/logging/DeprecationLogger.java index 0c77271c7ed0f..81d272923db22 100644 --- a/server/src/main/java/org/elasticsearch/common/logging/DeprecationLogger.java +++ b/server/src/main/java/org/elasticsearch/common/logging/DeprecationLogger.java @@ -232,7 +232,7 @@ void deprecated(final Set threadContexts, final String message, f while (iterator.hasNext()) { try { final ThreadContext next = iterator.next(); - next.addResponseHeader("Warning", warningHeaderValue, DeprecationLogger::extractWarningValueFromWarningHeader); + next.addResponseHeader("Warning", warningHeaderValue); } catch (final IllegalStateException e) { // ignored; it should be removed shortly } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index bd3507ef7764a..0fa0e832a0a2b 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -36,14 +36,18 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; +import java.util.stream.Collector; import java.util.stream.Stream; import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT; @@ -258,11 +262,11 @@ public Map getHeaders() { * @return Never {@code null}. */ public Map> getResponseHeaders() { - Map> responseHeaders = threadLocal.get().responseHeaders; + Map> responseHeaders = threadLocal.get().responseHeaders; HashMap> map = new HashMap<>(responseHeaders.size()); - for (Map.Entry> entry : responseHeaders.entrySet()) { - map.put(entry.getKey(), Collections.unmodifiableList(entry.getValue())); + for (Map.Entry> entry : responseHeaders.entrySet()) { + map.put(entry.getKey(), Collections.unmodifiableList(new ArrayList<>(entry.getValue()))); } return Collections.unmodifiableMap(map); @@ -405,7 +409,7 @@ default void restore() { private static final class ThreadContextStruct { private final Map requestHeaders; private final Map transientHeaders; - private final Map> responseHeaders; + private final Map> responseHeaders; private final boolean isSystemContext; private long warningHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header private ThreadContextStruct(StreamInput in) throws IOException { @@ -416,7 +420,23 @@ private ThreadContextStruct(StreamInput in) throws IOException { } this.requestHeaders = requestHeaders; - this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString); + this.responseHeaders = in.readMap(StreamInput::readString, input -> { + final int size = input.readVInt(); + if (size == 0) { + return Collections.emptySet(); + } else if (size == 1) { + return Collections.singleton(input.readString()); + } else { + // use a linked hash set to preserve order + final LinkedHashSet values = new LinkedHashSet<>(size); + for (int i = 0; i < size; i++) { + final String value = input.readString(); + final boolean added = values.add(value); + assert added : value; + } + return values; + } + }); this.transientHeaders = Collections.emptyMap(); isSystemContext = false; // we never serialize this it's a transient flag this.warningHeadersSize = 0L; @@ -430,7 +450,7 @@ private ThreadContextStruct setSystemContext() { } private ThreadContextStruct(Map requestHeaders, - Map> responseHeaders, + Map> responseHeaders, Map transientHeaders, boolean isSystemContext) { this.requestHeaders = requestHeaders; this.responseHeaders = responseHeaders; @@ -440,7 +460,7 @@ private ThreadContextStruct(Map requestHeaders, } private ThreadContextStruct(Map requestHeaders, - Map> responseHeaders, + Map> responseHeaders, Map transientHeaders, boolean isSystemContext, long warningHeadersSize) { this.requestHeaders = requestHeaders; @@ -481,19 +501,19 @@ private ThreadContextStruct putHeaders(Map headers) { } } - private ThreadContextStruct putResponseHeaders(Map> headers) { + private ThreadContextStruct putResponseHeaders(Map> headers) { assert headers != null; if (headers.isEmpty()) { return this; } - final Map> newResponseHeaders = new HashMap<>(this.responseHeaders); - for (Map.Entry> entry : headers.entrySet()) { + final Map> newResponseHeaders = new HashMap<>(this.responseHeaders); + for (Map.Entry> entry : headers.entrySet()) { String key = entry.getKey(); - final List existingValues = newResponseHeaders.get(key); + final Set existingValues = newResponseHeaders.get(key); if (existingValues != null) { - List newValues = Stream.concat(entry.getValue().stream(), - existingValues.stream()).distinct().collect(Collectors.toList()); - newResponseHeaders.put(key, Collections.unmodifiableList(newValues)); + final Set newValues = + Stream.concat(entry.getValue().stream(), existingValues.stream()).collect(LINKED_HASH_SET_COLLECTOR); + newResponseHeaders.put(key, Collections.unmodifiableSet(newValues)); } else { newResponseHeaders.put(key, entry.getValue()); } @@ -523,20 +543,19 @@ private ThreadContextStruct putResponse(final String key, final String value, fi } } - final Map> newResponseHeaders = new HashMap<>(this.responseHeaders); - final List existingValues = newResponseHeaders.get(key); + final Map> newResponseHeaders; + final Set existingValues = responseHeaders.get(key); if (existingValues != null) { - final Set existingUniqueValues = existingValues.stream().map(uniqueValue).collect(Collectors.toSet()); - assert existingValues.size() == existingUniqueValues.size() : - "existing values: [" + existingValues + "], existing unique values [" + existingUniqueValues + "]"; - if (existingUniqueValues.contains(uniqueValue.apply(value))) { + if (existingValues.contains(uniqueValue.apply(value))) { return this; } - final List newValues = new ArrayList<>(existingValues); - newValues.add(value); - newResponseHeaders.put(key, Collections.unmodifiableList(newValues)); + // preserve insertion order + final Set newValues = Stream.concat(existingValues.stream(), Stream.of(value)).collect(LINKED_HASH_SET_COLLECTOR); + newResponseHeaders = new HashMap<>(responseHeaders); + newResponseHeaders.put(key, Collections.unmodifiableSet(newValues)); } else { - newResponseHeaders.put(key, Collections.singletonList(value)); + newResponseHeaders = new HashMap<>(responseHeaders); + newResponseHeaders.put(key, Collections.singleton(value)); } //check if we can add another warning header - if max count within limits @@ -588,7 +607,7 @@ private void writeTo(StreamOutput out, Map defaultHeaders) throw out.writeString(entry.getValue()); } - out.writeMapOfLists(responseHeaders, StreamOutput::writeString, StreamOutput::writeString); + out.writeMap(responseHeaders, StreamOutput::writeString, StreamOutput::writeStringCollection); } } @@ -751,4 +770,40 @@ public AbstractRunnable unwrap() { return in; } } + + private static final Collector, Set> LINKED_HASH_SET_COLLECTOR = new LinkedHashSetCollector<>(); + + private static class LinkedHashSetCollector implements Collector, Set> { + @Override + public Supplier> supplier() { + return LinkedHashSet::new; + } + + @Override + public BiConsumer, T> accumulator() { + return Set::add; + } + + @Override + public BinaryOperator> combiner() { + return (left, right) -> { + left.addAll(right); + return left; + }; + } + + @Override + public Function, Set> finisher() { + return Function.identity(); + } + + private static final Set CHARACTERISTICS = + Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH)); + + @Override + public Set characteristics() { + return CHARACTERISTICS; + } + } + }