From 1cb5e2eac33331e62f6bc4c7c4f95cbfb0fb89ce Mon Sep 17 00:00:00 2001 From: forozco Date: Tue, 28 Apr 2020 17:47:01 -0400 Subject: [PATCH 01/21] allow servers to define client node selection strategy --- .../DefaultSelectionStrategySelector.java | 69 +++++++ .../dialogue/core/DialogueChannel.java | 97 ++------- .../core/DialogueNodeSelectionStrategy.java | 38 ++++ .../core/NodeSelectionStrategyChannel.java | 184 ++++++++++++++++++ ...ntilErrorNodeSelectionStrategyChannel.java | 4 +- .../core/SelectionStrategySelector.java | 27 +++ .../dialogue/core/SupplierChannel.java | 38 ++++ ...ErrorNodeSelectionStrategyChannelTest.java | 3 +- 8 files changed, 372 insertions(+), 88 deletions(-) create mode 100644 dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultSelectionStrategySelector.java create mode 100644 dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java create mode 100644 dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java create mode 100644 dialogue-core/src/main/java/com/palantir/dialogue/core/SelectionStrategySelector.java create mode 100644 dialogue-core/src/main/java/com/palantir/dialogue/core/SupplierChannel.java diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultSelectionStrategySelector.java new file mode 100644 index 000000000..0267bb2b3 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultSelectionStrategySelector.java @@ -0,0 +1,69 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.palantir.conjure.java.client.config.NodeSelectionStrategy; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +public final class DefaultSelectionStrategySelector implements SelectionStrategySelector { + private final DialogueNodeSelectionStrategy clientStrategy; + private final AtomicReference currentStrategy; + private final ConcurrentHashMap strategyPerChannel = + new ConcurrentHashMap<>(); + + public DefaultSelectionStrategySelector(NodeSelectionStrategy clientStrategy) { + this.clientStrategy = DialogueNodeSelectionStrategy.of(clientStrategy); + this.currentStrategy = new AtomicReference<>(this.clientStrategy); + } + + @Override + public DialogueNodeSelectionStrategy get() { + return currentStrategy.get(); + } + + @Override + public DialogueNodeSelectionStrategy updateAndGet(LimitedChannel channel, String strategyUpdate) { + DialogueNodeSelectionStrategy strategy = DialogueNodeSelectionStrategy.valueOf(strategyUpdate); + if (strategyPerChannel.getOrDefault(channel, strategy).equals(strategy)) { + return currentStrategy.get(); + } + + strategyPerChannel.put(channel, strategy); + return updateAndGetStrategy(); + } + + @Override + public DialogueNodeSelectionStrategy updateAndGet(ImmutableList channels) { + channels.forEach(strategyPerChannel::remove); + return updateAndGetStrategy(); + } + + private DialogueNodeSelectionStrategy updateAndGetStrategy() { + return currentStrategy.updateAndGet(_strategy -> { + Set strategies = new HashSet<>(strategyPerChannel.values()); + if (strategies.size() == 1) { + return Iterables.getOnlyElement(strategies); + } + return clientStrategy; + }); + } +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index 68eb5afb3..6c9a0f60c 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -31,18 +31,15 @@ import com.palantir.logsafe.Safe; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; -import com.palantir.logsafe.exceptions.SafeRuntimeException; import com.palantir.random.SafeThreadLocalRandom; import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; import java.util.Collection; import java.util.HashSet; import java.util.Map; -import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -52,7 +49,7 @@ public final class DialogueChannel implements Channel { private static final Logger log = LoggerFactory.getLogger(DialogueChannel.class); private final Map limitedChannelByUri = new ConcurrentHashMap<>(); - private final AtomicReference nodeSelectionStrategy = new AtomicReference<>(); + private final NodeSelectionStrategyChannel nodeSelectionStrategy; private final QueuedChannel queuedChannel; // just so we can process the queue when uris reload private final String channelName; @@ -61,8 +58,6 @@ public final class DialogueChannel implements Channel { private final Channel delegate; private final ClientMetrics clientMetrics; private final DialogueClientMetrics dialogueClientMetrics; - private final Random random; - private Ticker ticker; // TODO(forozco): you really want a refreshable of uri separate from the client config private DialogueChannel( @@ -78,11 +73,14 @@ private DialogueChannel( this.channelFactory = channelFactory; clientMetrics = ClientMetrics.of(clientConfiguration.taggedMetricRegistry()); dialogueClientMetrics = DialogueClientMetrics.of(clientConfiguration.taggedMetricRegistry()); - this.random = random; - this.ticker = ticker; - this.queuedChannel = new QueuedChannel( - new SupplierChannel(nodeSelectionStrategy::get), channelName, dialogueClientMetrics, maxQueueSize); - updateUris(clientConfiguration.uris()); + this.nodeSelectionStrategy = new NodeSelectionStrategyChannel( + channelName, + random, + ticker, + clientConfiguration.taggedMetricRegistry(), + new DefaultSelectionStrategySelector(clientConfiguration.nodeSelectionStrategy())); + updateUrisInner(clientConfiguration.uris(), true); + this.queuedChannel = new QueuedChannel(nodeSelectionStrategy, channelName, dialogueClientMetrics, maxQueueSize); this.delegate = wrap( queuedChannel, channelName, @@ -99,7 +97,10 @@ public ListenableFuture execute(Endpoint endpoint, Request request) { } public void updateUris(Collection uris) { - boolean firstTime = nodeSelectionStrategy.get() == null; + updateUrisInner(uris, false); + } + + private void updateUrisInner(Collection uris, boolean firstTime) { Set uniqueUris = new HashSet<>(uris); // Uris didn't really change so nothing to do if (limitedChannelByUri.keySet().equals(uniqueUris) && !firstTime) { @@ -129,14 +130,7 @@ public void updateUris(Collection uris) { .build(); newUris.forEach(uri -> limitedChannelByUri.put(uri, createLimitedChannel(uri, allUris.indexOf(uri)))); - nodeSelectionStrategy.getAndUpdate(previous -> getUpdatedNodeSelectionStrategy( - previous, - clientConfiguration, - ImmutableList.copyOf(limitedChannelByUri.values()), - random, - ticker, - channelName)); - + nodeSelectionStrategy.updateChannels(ImmutableList.copyOf(limitedChannelByUri.values())); // some queued requests might be able to make progress on a new uri now queuedChannel.schedule(); } @@ -154,55 +148,6 @@ private LimitedChannel createLimitedChannel(String uri, int uriIndex) { clientConfiguration, limitedChannel, clientConfiguration.taggedMetricRegistry(), channelName, uriIndex); } - private static LimitedChannel getUpdatedNodeSelectionStrategy( - @Nullable LimitedChannel previousNodeSelectionStrategy, - ClientConfiguration config, - ImmutableList channels, - Random random, - Ticker tick, - String channelName) { - if (channels.isEmpty()) { - return new ZeroUriChannel(channelName); - } - if (channels.size() == 1) { - // no fancy node selection heuristic can save us if our one node goes down - return channels.get(0); - } - - switch (config.nodeSelectionStrategy()) { - case PIN_UNTIL_ERROR: - case PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE: - DialoguePinuntilerrorMetrics pinuntilerrorMetrics = - DialoguePinuntilerrorMetrics.of(config.taggedMetricRegistry()); - // Previously pin until error, so we should preserve our previous location - if (previousNodeSelectionStrategy instanceof PinUntilErrorNodeSelectionStrategyChannel) { - PinUntilErrorNodeSelectionStrategyChannel previousPinUntilError = - (PinUntilErrorNodeSelectionStrategyChannel) previousNodeSelectionStrategy; - return PinUntilErrorNodeSelectionStrategyChannel.of( - Optional.of(previousPinUntilError.getCurrentChannel()), - config.nodeSelectionStrategy(), - channels, - pinuntilerrorMetrics, - random, - channelName); - } - return PinUntilErrorNodeSelectionStrategyChannel.of( - Optional.empty(), - config.nodeSelectionStrategy(), - channels, - pinuntilerrorMetrics, - random, - channelName); - case ROUND_ROBIN: - // When people ask for 'ROUND_ROBIN', they usually just want something to load balance better. - // We used to have a naive RoundRobinChannel, then tried RandomSelection and now use this heuristic: - return new BalancedNodeSelectionStrategyChannel( - channels, random, tick, config.taggedMetricRegistry(), channelName); - } - throw new SafeRuntimeException( - "Unknown NodeSelectionStrategy", SafeArg.of("unknown", config.nodeSelectionStrategy())); - } - private static LimitedChannel concurrencyLimiter( ClientConfiguration config, LimitedChannel channel, @@ -349,18 +294,4 @@ private void preconditions(ClientConfiguration conf) { "Retries on socket exceptions cannot be disabled without disabling retries entirely."); } } - - private static final class SupplierChannel implements LimitedChannel { - private final Supplier channelSupplier; - - SupplierChannel(Supplier channelSupplier) { - this.channelSupplier = channelSupplier; - } - - @Override - public Optional> maybeExecute(Endpoint endpoint, Request request) { - LimitedChannel delegate = channelSupplier.get(); - return delegate.maybeExecute(endpoint, request); - } - } } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java new file mode 100644 index 000000000..7b51aa5f1 --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java @@ -0,0 +1,38 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.palantir.conjure.java.client.config.NodeSelectionStrategy; + +public enum DialogueNodeSelectionStrategy { + PIN_UNTIL_ERROR, + PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE, + BALANCED, + UNKNOWN; + + static DialogueNodeSelectionStrategy of(NodeSelectionStrategy strategy) { + switch (strategy) { + case PIN_UNTIL_ERROR: + return PIN_UNTIL_ERROR; + case PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE: + return PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE; + case ROUND_ROBIN: + return BALANCED; + } + return UNKNOWN; + } +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java new file mode 100644 index 000000000..43e370ddb --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -0,0 +1,184 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.dialogue.Endpoint; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.Response; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeRuntimeException; +import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; +import java.util.List; +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.atomic.AtomicReference; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.immutables.value.Value; + +public final class NodeSelectionStrategyChannel implements LimitedChannel { + private static final String NODE_SELECTION_HEADER = "Node-Selection-Strategy"; + + private final AtomicReference nodeSelectionStrategy; + private final AtomicReference> nodeChannels; + + private final String channelName; + private final Random random; + private final Ticker tick; + private final TaggedMetricRegistry metrics; + private final SelectionStrategySelector strategySelector; + private final LimitedChannel delegate; + + public NodeSelectionStrategyChannel( + String channelName, + Random random, + Ticker tick, + TaggedMetricRegistry metrics, + SelectionStrategySelector strategySelector) { + this.channelName = channelName; + this.random = random; + this.tick = tick; + this.metrics = metrics; + this.strategySelector = strategySelector; + this.nodeChannels = new AtomicReference<>(ImmutableList.of()); + this.nodeSelectionStrategy = new AtomicReference<>(getUpdatedNodeSelectionStrategy( + null, nodeChannels.get(), strategySelector.get(), metrics, random, tick, channelName)); + this.delegate = new SupplierChannel(() -> nodeSelectionStrategy.get().channel()); + } + + @Override + public Optional> maybeExecute(Endpoint endpoint, Request request) { + return delegate.maybeExecute(endpoint, request); + } + + public void updateChannels(List newChannels) { + ImmutableList wrappedChannels = + newChannels.stream().map(WrapperChannel::new).collect(ImmutableList.toImmutableList()); + nodeChannels.set(wrappedChannels); + nodeSelectionStrategy.getAndUpdate(previousChannel -> getUpdatedNodeSelectionStrategy( + previousChannel, wrappedChannels, previousChannel.strategy(), metrics, random, tick, channelName)); + } + + private void updateStrategy(LimitedChannel channel, String strategy) { + DialogueNodeSelectionStrategy updatedStrategy = strategySelector.updateAndGet(channel, strategy); + nodeSelectionStrategy.getAndUpdate(currentStrategy -> { + if (updatedStrategy.equals(currentStrategy.strategy())) { + return currentStrategy; + } + return getUpdatedNodeSelectionStrategy( + currentStrategy, nodeChannels.get(), updatedStrategy, metrics, random, tick, channelName); + }); + } + + private static ChannelWithStrategy getUpdatedNodeSelectionStrategy( + @Nullable ChannelWithStrategy previousChannel, + @Nullable ImmutableList channels, + DialogueNodeSelectionStrategy updatedStrategy, + TaggedMetricRegistry metrics, + Random random, + Ticker tick, + String channelName) { + if (channels.isEmpty()) { + return ChannelWithStrategy.of(DialogueNodeSelectionStrategy.UNKNOWN, new ZeroUriChannel(channelName)); + } + if (channels.size() == 1) { + // no fancy node selection heuristic can save us if our one node goes down + return ChannelWithStrategy.of(DialogueNodeSelectionStrategy.UNKNOWN, channels.get(0)); + } + + LimitedChannel previousNodeSelectionStrategy = previousChannel.channel(); + + switch (updatedStrategy) { + case PIN_UNTIL_ERROR: + case PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE: + DialoguePinuntilerrorMetrics pinuntilerrorMetrics = DialoguePinuntilerrorMetrics.of(metrics); + // Previously pin until error, so we should preserve our previous location + if (previousNodeSelectionStrategy instanceof PinUntilErrorNodeSelectionStrategyChannel) { + PinUntilErrorNodeSelectionStrategyChannel previousPinUntilError = + (PinUntilErrorNodeSelectionStrategyChannel) previousNodeSelectionStrategy; + return ChannelWithStrategy.of( + updatedStrategy, + PinUntilErrorNodeSelectionStrategyChannel.of( + Optional.of(previousPinUntilError.getCurrentChannel()), + updatedStrategy, + channels, + pinuntilerrorMetrics, + random, + channelName)); + } + return ChannelWithStrategy.of( + updatedStrategy, + PinUntilErrorNodeSelectionStrategyChannel.of( + Optional.empty(), + updatedStrategy, + channels, + pinuntilerrorMetrics, + random, + channelName)); + case BALANCED: + // When people ask for 'ROUND_ROBIN', they usually just want something to load balance better. + // We used to have a naive RoundRobinChannel, then tried RandomSelection and now use this heuristic: + return ChannelWithStrategy.of( + updatedStrategy, + new BalancedNodeSelectionStrategyChannel(channels, random, tick, metrics, channelName)); + } + throw new SafeRuntimeException("Unknown NodeSelectionStrategy", SafeArg.of("unknown", updatedStrategy)); + } + + // TODO(forozco): really you'd want an union + @Value.Immutable + interface ChannelWithStrategy { + DialogueNodeSelectionStrategy strategy(); + + LimitedChannel channel(); + + static ChannelWithStrategy of(DialogueNodeSelectionStrategy strategy, LimitedChannel channel) { + return ImmutableChannelWithStrategy.builder() + .strategy(strategy) + .channel(channel) + .build(); + } + } + + private class WrapperChannel implements LimitedChannel { + private final LimitedChannel delegate; + private final FutureCallback callback; + + WrapperChannel(LimitedChannel delegate) { + this.delegate = delegate; + this.callback = new FutureCallback() { + @Override + public void onSuccess(@Nullable Response result) { + result.getFirstHeader(NODE_SELECTION_HEADER) + .ifPresent(strategy -> updateStrategy(delegate, strategy)); + } + + @Override + public void onFailure(Throwable _unused) {} + }; + } + + @Override + public Optional> maybeExecute(Endpoint endpoint, Request request) { + return delegate.maybeExecute(endpoint, request) + .map(response -> DialogueFutures.addDirectCallback(response, this.callback)); + } + } +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java index edb24c707..0023f151b 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; -import com.palantir.conjure.java.client.config.NodeSelectionStrategy; import com.palantir.dialogue.Endpoint; import com.palantir.dialogue.Request; import com.palantir.dialogue.Response; @@ -86,7 +85,7 @@ final class PinUntilErrorNodeSelectionStrategyChannel implements LimitedChannel static PinUntilErrorNodeSelectionStrategyChannel of( Optional initialChannel, - NodeSelectionStrategy strategy, + DialogueNodeSelectionStrategy strategy, List channels, DialoguePinuntilerrorMetrics metrics, Random random, @@ -111,7 +110,6 @@ static PinUntilErrorNodeSelectionStrategyChannel of( case PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE: NodeList constant = new ConstantNodeList(initialShuffle); return new PinUntilErrorNodeSelectionStrategyChannel(constant, initialHost, metrics, channelName); - case ROUND_ROBIN: } throw new SafeIllegalArgumentException("Unsupported NodeSelectionStrategy", SafeArg.of("strategy", strategy)); diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/SelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/SelectionStrategySelector.java new file mode 100644 index 000000000..b6808a62b --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/SelectionStrategySelector.java @@ -0,0 +1,27 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.google.common.collect.ImmutableList; + +public interface SelectionStrategySelector { + DialogueNodeSelectionStrategy get(); + + DialogueNodeSelectionStrategy updateAndGet(LimitedChannel channel, String strategyUpdate); + + DialogueNodeSelectionStrategy updateAndGet(ImmutableList channels); +} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/SupplierChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/SupplierChannel.java new file mode 100644 index 000000000..fba18bc0d --- /dev/null +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/SupplierChannel.java @@ -0,0 +1,38 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.dialogue.Endpoint; +import com.palantir.dialogue.Request; +import com.palantir.dialogue.Response; +import java.util.Optional; +import java.util.function.Supplier; + +final class SupplierChannel implements LimitedChannel { + private final Supplier channelSupplier; + + SupplierChannel(Supplier channelSupplier) { + this.channelSupplier = channelSupplier; + } + + @Override + public Optional> maybeExecute(Endpoint endpoint, Request request) { + LimitedChannel delegate = channelSupplier.get(); + return delegate.maybeExecute(endpoint, request); + } +} diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannelTest.java index f7d80685f..b912cb8fd 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannelTest.java @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import com.palantir.conjure.java.client.config.NodeSelectionStrategy; import com.palantir.dialogue.Response; import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; import java.time.Duration; @@ -197,7 +196,7 @@ public void finds_first_non_limited_channel() { void handles_reconstruction_from_stale_state() { PinUntilErrorNodeSelectionStrategyChannel.of( Optional.empty(), - NodeSelectionStrategy.PIN_UNTIL_ERROR, + DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR, ImmutableList.of(channel1, channel2), metrics, pseudo, From f0581e88a7fd4b61396b933f0ab6c6c6c7677324 Mon Sep 17 00:00:00 2001 From: forozco Date: Tue, 28 Apr 2020 18:54:27 -0400 Subject: [PATCH 02/21] tests and null away --- ...DefaultNodeSelectionStrategySelector.java} | 39 ++++--- .../dialogue/core/DialogueChannel.java | 4 +- .../core/DialogueNodeSelectionStrategy.java | 21 +++- .../core/NodeSelectionStrategyChannel.java | 41 ++++--- ...ava => NodeSelectionStrategySelector.java} | 11 +- ...ntilErrorNodeSelectionStrategyChannel.java | 14 +-- ...aultNodeSelectionStrategySelectorTest.java | 76 +++++++++++++ .../DialogueNodeSelectionStrategyTest.java | 42 +++++++ .../NodeSelectionStrategyChannelTest.java | 104 ++++++++++++++++++ 9 files changed, 302 insertions(+), 50 deletions(-) rename dialogue-core/src/main/java/com/palantir/dialogue/core/{DefaultSelectionStrategySelector.java => DefaultNodeSelectionStrategySelector.java} (50%) rename dialogue-core/src/main/java/com/palantir/dialogue/core/{SelectionStrategySelector.java => NodeSelectionStrategySelector.java} (65%) create mode 100644 dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java create mode 100644 dialogue-core/src/test/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategyTest.java create mode 100644 dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java similarity index 50% rename from dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultSelectionStrategySelector.java rename to dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java index 0267bb2b3..ca15d270c 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultSelectionStrategySelector.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java @@ -16,52 +16,59 @@ package com.palantir.dialogue.core; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import com.palantir.conjure.java.client.config.NodeSelectionStrategy; -import java.util.HashSet; +import java.util.Collection; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -public final class DefaultSelectionStrategySelector implements SelectionStrategySelector { +@SuppressWarnings("NullAway") +public final class DefaultNodeSelectionStrategySelector implements NodeSelectionStrategySelector { private final DialogueNodeSelectionStrategy clientStrategy; private final AtomicReference currentStrategy; - private final ConcurrentHashMap strategyPerChannel = + private final ConcurrentHashMap> strategyPerChannel = new ConcurrentHashMap<>(); - public DefaultSelectionStrategySelector(NodeSelectionStrategy clientStrategy) { + public DefaultNodeSelectionStrategySelector(NodeSelectionStrategy clientStrategy) { this.clientStrategy = DialogueNodeSelectionStrategy.of(clientStrategy); this.currentStrategy = new AtomicReference<>(this.clientStrategy); } @Override - public DialogueNodeSelectionStrategy get() { + public DialogueNodeSelectionStrategy getCurrentStrategy() { return currentStrategy.get(); } @Override - public DialogueNodeSelectionStrategy updateAndGet(LimitedChannel channel, String strategyUpdate) { - DialogueNodeSelectionStrategy strategy = DialogueNodeSelectionStrategy.valueOf(strategyUpdate); - if (strategyPerChannel.getOrDefault(channel, strategy).equals(strategy)) { + public DialogueNodeSelectionStrategy updateChannelStrategy( + LimitedChannel channel, List updatedStrategies) { + List previousStrategies = strategyPerChannel.put(channel, updatedStrategies); + if (updatedStrategies.isEmpty() || updatedStrategies.equals(previousStrategies)) { return currentStrategy.get(); } - - strategyPerChannel.put(channel, strategy); return updateAndGetStrategy(); } @Override - public DialogueNodeSelectionStrategy updateAndGet(ImmutableList channels) { - channels.forEach(strategyPerChannel::remove); + public DialogueNodeSelectionStrategy setActiveChannels(List channels) { + Sets.difference(strategyPerChannel.keySet(), ImmutableSet.copyOf(channels)) + .forEach(strategyPerChannel::remove); return updateAndGetStrategy(); } private DialogueNodeSelectionStrategy updateAndGetStrategy() { return currentStrategy.updateAndGet(_strategy -> { - Set strategies = new HashSet<>(strategyPerChannel.values()); - if (strategies.size() == 1) { - return Iterables.getOnlyElement(strategies); + // TODO(forozco): improve strategy selection process to find the common intersection + Collection> requestedStrategies = strategyPerChannel.values(); + Set firstChoiceStrategies = requestedStrategies.stream() + .map(strategies -> strategies.get(0)) + .collect(ImmutableSet.toImmutableSet()); + if (firstChoiceStrategies.size() == 1) { + return Iterables.getOnlyElement(firstChoiceStrategies); } return clientStrategy; }); diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index 6c9a0f60c..cd7dece7d 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -78,9 +78,9 @@ private DialogueChannel( random, ticker, clientConfiguration.taggedMetricRegistry(), - new DefaultSelectionStrategySelector(clientConfiguration.nodeSelectionStrategy())); - updateUrisInner(clientConfiguration.uris(), true); + new DefaultNodeSelectionStrategySelector(clientConfiguration.nodeSelectionStrategy())); this.queuedChannel = new QueuedChannel(nodeSelectionStrategy, channelName, dialogueClientMetrics, maxQueueSize); + updateUrisInner(clientConfiguration.uris(), true); this.delegate = wrap( queuedChannel, channelName, diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java index 7b51aa5f1..433e94714 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java @@ -16,7 +16,12 @@ package com.palantir.dialogue.core; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; import com.palantir.conjure.java.client.config.NodeSelectionStrategy; +import com.palantir.logsafe.SafeArg; +import com.palantir.logsafe.exceptions.SafeIllegalStateException; +import java.util.List; public enum DialogueNodeSelectionStrategy { PIN_UNTIL_ERROR, @@ -24,6 +29,20 @@ public enum DialogueNodeSelectionStrategy { BALANCED, UNKNOWN; + static List fromHeader(String header) { + return Splitter.on(";").splitToList(header).stream() + .map(DialogueNodeSelectionStrategy::safeValueOf) + .collect(ImmutableList.toImmutableList()); + } + + private static DialogueNodeSelectionStrategy safeValueOf(String value) { + try { + return valueOf(value.trim().toUpperCase()); + } catch (Exception e) { + return UNKNOWN; + } + } + static DialogueNodeSelectionStrategy of(NodeSelectionStrategy strategy) { switch (strategy) { case PIN_UNTIL_ERROR: @@ -33,6 +52,6 @@ static DialogueNodeSelectionStrategy of(NodeSelectionStrategy strategy) { case ROUND_ROBIN: return BALANCED; } - return UNKNOWN; + throw new SafeIllegalStateException("Unknown node selection strategy", SafeArg.of("strategy", strategy)); } } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index 43e370ddb..8f8c00cf2 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -23,6 +23,7 @@ import com.palantir.dialogue.Endpoint; import com.palantir.dialogue.Request; import com.palantir.dialogue.Response; +import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeRuntimeException; import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; @@ -38,12 +39,11 @@ public final class NodeSelectionStrategyChannel implements LimitedChannel { private final AtomicReference nodeSelectionStrategy; private final AtomicReference> nodeChannels; - private final String channelName; private final Random random; private final Ticker tick; private final TaggedMetricRegistry metrics; - private final SelectionStrategySelector strategySelector; + private final NodeSelectionStrategySelector strategySelector; private final LimitedChannel delegate; public NodeSelectionStrategyChannel( @@ -51,7 +51,7 @@ public NodeSelectionStrategyChannel( Random random, Ticker tick, TaggedMetricRegistry metrics, - SelectionStrategySelector strategySelector) { + NodeSelectionStrategySelector strategySelector) { this.channelName = channelName; this.random = random; this.tick = tick; @@ -59,8 +59,10 @@ public NodeSelectionStrategyChannel( this.strategySelector = strategySelector; this.nodeChannels = new AtomicReference<>(ImmutableList.of()); this.nodeSelectionStrategy = new AtomicReference<>(getUpdatedNodeSelectionStrategy( - null, nodeChannels.get(), strategySelector.get(), metrics, random, tick, channelName)); - this.delegate = new SupplierChannel(() -> nodeSelectionStrategy.get().channel()); + null, nodeChannels.get(), strategySelector.getCurrentStrategy(), metrics, random, tick, channelName)); + this.delegate = new SupplierChannel(() -> Preconditions.checkNotNull( + nodeSelectionStrategy.get(), "node selection strategy must not be null") + .channel()); } @Override @@ -69,42 +71,44 @@ public Optional> maybeExecute(Endpoint endpoint, Requ } public void updateChannels(List newChannels) { + DialogueNodeSelectionStrategy updatedStrategy = strategySelector.setActiveChannels(newChannels); ImmutableList wrappedChannels = newChannels.stream().map(WrapperChannel::new).collect(ImmutableList.toImmutableList()); nodeChannels.set(wrappedChannels); nodeSelectionStrategy.getAndUpdate(previousChannel -> getUpdatedNodeSelectionStrategy( - previousChannel, wrappedChannels, previousChannel.strategy(), metrics, random, tick, channelName)); + previousChannel.channel(), wrappedChannels, updatedStrategy, metrics, random, tick, channelName)); } - private void updateStrategy(LimitedChannel channel, String strategy) { - DialogueNodeSelectionStrategy updatedStrategy = strategySelector.updateAndGet(channel, strategy); + private void updateRequestedStrategies(LimitedChannel channel, List strategies) { + DialogueNodeSelectionStrategy updatedStrategy = strategySelector.updateChannelStrategy(channel, strategies); nodeSelectionStrategy.getAndUpdate(currentStrategy -> { if (updatedStrategy.equals(currentStrategy.strategy())) { return currentStrategy; } return getUpdatedNodeSelectionStrategy( - currentStrategy, nodeChannels.get(), updatedStrategy, metrics, random, tick, channelName); + currentStrategy.channel(), nodeChannels.get(), updatedStrategy, metrics, random, tick, channelName); }); } private static ChannelWithStrategy getUpdatedNodeSelectionStrategy( - @Nullable ChannelWithStrategy previousChannel, - @Nullable ImmutableList channels, + @Nullable LimitedChannel previousNodeSelectionStrategy, + @Nullable ImmutableList nullableChannels, DialogueNodeSelectionStrategy updatedStrategy, TaggedMetricRegistry metrics, Random random, Ticker tick, String channelName) { + + ImmutableList channels = + Preconditions.checkNotNull(nullableChannels, "channels must not be null"); if (channels.isEmpty()) { - return ChannelWithStrategy.of(DialogueNodeSelectionStrategy.UNKNOWN, new ZeroUriChannel(channelName)); + return ChannelWithStrategy.of(updatedStrategy, new ZeroUriChannel(channelName)); } if (channels.size() == 1) { // no fancy node selection heuristic can save us if our one node goes down - return ChannelWithStrategy.of(DialogueNodeSelectionStrategy.UNKNOWN, channels.get(0)); + return ChannelWithStrategy.of(updatedStrategy, channels.get(0)); } - LimitedChannel previousNodeSelectionStrategy = previousChannel.channel(); - switch (updatedStrategy) { case PIN_UNTIL_ERROR: case PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE: @@ -138,11 +142,11 @@ private static ChannelWithStrategy getUpdatedNodeSelectionStrategy( return ChannelWithStrategy.of( updatedStrategy, new BalancedNodeSelectionStrategyChannel(channels, random, tick, metrics, channelName)); + case UNKNOWN: } throw new SafeRuntimeException("Unknown NodeSelectionStrategy", SafeArg.of("unknown", updatedStrategy)); } - // TODO(forozco): really you'd want an union @Value.Immutable interface ChannelWithStrategy { DialogueNodeSelectionStrategy strategy(); @@ -165,9 +169,10 @@ private class WrapperChannel implements LimitedChannel { this.delegate = delegate; this.callback = new FutureCallback() { @Override - public void onSuccess(@Nullable Response result) { + public void onSuccess(Response result) { result.getFirstHeader(NODE_SELECTION_HEADER) - .ifPresent(strategy -> updateStrategy(delegate, strategy)); + .ifPresent(strategy -> updateRequestedStrategies( + delegate, DialogueNodeSelectionStrategy.fromHeader(strategy))); } @Override diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/SelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java similarity index 65% rename from dialogue-core/src/main/java/com/palantir/dialogue/core/SelectionStrategySelector.java rename to dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java index b6808a62b..d226397c4 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/SelectionStrategySelector.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java @@ -16,12 +16,13 @@ package com.palantir.dialogue.core; -import com.google.common.collect.ImmutableList; +import java.util.List; -public interface SelectionStrategySelector { - DialogueNodeSelectionStrategy get(); +public interface NodeSelectionStrategySelector { + DialogueNodeSelectionStrategy getCurrentStrategy(); - DialogueNodeSelectionStrategy updateAndGet(LimitedChannel channel, String strategyUpdate); + DialogueNodeSelectionStrategy updateChannelStrategy( + LimitedChannel channel, List updatedStrategies); - DialogueNodeSelectionStrategy updateAndGet(ImmutableList channels); + DialogueNodeSelectionStrategy setActiveChannels(List channels); } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java index 0023f151b..c4bb35db7 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/PinUntilErrorNodeSelectionStrategyChannel.java @@ -102,14 +102,12 @@ static PinUntilErrorNodeSelectionStrategyChannel of( .map(limitedChannel -> Math.max(0, initialShuffle.indexOf(limitedChannel))) .orElse(0); - switch (strategy) { - case PIN_UNTIL_ERROR: - NodeList shuffling = - ReshufflingNodeList.of(initialShuffle, random, System::nanoTime, metrics, channelName); - return new PinUntilErrorNodeSelectionStrategyChannel(shuffling, initialHost, metrics, channelName); - case PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE: - NodeList constant = new ConstantNodeList(initialShuffle); - return new PinUntilErrorNodeSelectionStrategyChannel(constant, initialHost, metrics, channelName); + if (strategy == DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR) { + NodeList shuffling = ReshufflingNodeList.of(initialShuffle, random, System::nanoTime, metrics, channelName); + return new PinUntilErrorNodeSelectionStrategyChannel(shuffling, initialHost, metrics, channelName); + } else if (strategy == DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE) { + NodeList constant = new ConstantNodeList(initialShuffle); + return new PinUntilErrorNodeSelectionStrategyChannel(constant, initialHost, metrics, channelName); } throw new SafeIllegalArgumentException("Unsupported NodeSelectionStrategy", SafeArg.of("strategy", strategy)); diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java new file mode 100644 index 000000000..fe19ddb75 --- /dev/null +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java @@ -0,0 +1,76 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.collect.ImmutableList; +import com.palantir.conjure.java.client.config.NodeSelectionStrategy; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DefaultNodeSelectionStrategySelectorTest { + + @Mock + LimitedChannel channelA; + + @Mock + LimitedChannel channelB; + + private DefaultNodeSelectionStrategySelector strategySelector; + + @BeforeEach + void beforeEach() { + strategySelector = new DefaultNodeSelectionStrategySelector(NodeSelectionStrategy.PIN_UNTIL_ERROR); + } + + @Test + void defaults_to_client_provided_strategy() { + assertThat(strategySelector.getCurrentStrategy()).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); + } + + @Test + void uses_server_provided_strategy() { + DialogueNodeSelectionStrategy strategy = strategySelector.updateChannelStrategy( + channelA, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); + assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); + } + + @Test + void falls_back_to_client_default_on_conflict() { + DialogueNodeSelectionStrategy strategy = strategySelector.updateChannelStrategy( + channelA, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); + assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); + + strategy = strategySelector.updateChannelStrategy( + channelB, ImmutableList.of(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE)); + + assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); + } + + @Test + void only_considers_active_channels() { + strategySelector.updateChannelStrategy(channelA, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); + DialogueNodeSelectionStrategy strategy = strategySelector.setActiveChannels(ImmutableList.of()); + + assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); + } +} diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategyTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategyTest.java new file mode 100644 index 000000000..f345456c6 --- /dev/null +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategyTest.java @@ -0,0 +1,42 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +class DialogueNodeSelectionStrategyTest { + + @Test + void parses_single_strategy() { + assertThat(DialogueNodeSelectionStrategy.fromHeader("BALANCED")) + .containsExactly(DialogueNodeSelectionStrategy.BALANCED); + } + + @Test + void parses_multiple_strategies() { + assertThat(DialogueNodeSelectionStrategy.fromHeader("BALANCED; PIN_UNTIL_ERROR")) + .containsExactly(DialogueNodeSelectionStrategy.BALANCED, DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); + } + + @Test + void case_insensitive() { + assertThat(DialogueNodeSelectionStrategy.fromHeader("balanced; PIN_UNTIL_ERROR")) + .containsExactly(DialogueNodeSelectionStrategy.BALANCED, DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); + } +} diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java new file mode 100644 index 000000000..508389021 --- /dev/null +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java @@ -0,0 +1,104 @@ +/* + * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.palantir.dialogue.core; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.palantir.conjure.java.client.config.NodeSelectionStrategy; +import com.palantir.dialogue.Response; +import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.util.Optional; +import java.util.Random; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.Spy; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class NodeSelectionStrategyChannelTest { + + @Spy + private NodeSelectionStrategySelector strategySelector = + new DefaultNodeSelectionStrategySelector(NodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE); + + @Mock + private LimitedChannel channel1; + + @Mock + private LimitedChannel channel2; + + @Mock + private Ticker clock; + + private String channelName = "channelName"; + private Random pseudo = new Random(12893712L); + private NodeSelectionStrategyChannel channel; + + @BeforeEach + void beforeEach() { + channel = new NodeSelectionStrategyChannel( + channelName, pseudo, clock, new DefaultTaggedMetricRegistry(), strategySelector); + } + + @Test + void handles_zero_to_one_uri_update() { + channel.updateChannels(ImmutableList.of()); + channel.updateChannels(ImmutableList.of(channel1)); + } + + @Test + void handles_one_to_many_uri_update() { + channel.updateChannels(ImmutableList.of(channel1)); + channel.updateChannels(ImmutableList.of(channel1, channel2)); + } + + @Test + void updates_strategy_selector_on_uri_update() { + channel.updateChannels(ImmutableList.of(channel1)); + + verify(strategySelector, times(1)).setActiveChannels(ImmutableList.of(channel1)); + } + + @Test + void tracks_per_host_strategy() { + channel.updateChannels(ImmutableList.of(channel1)); + setResponse(channel1, Optional.of(DialogueNodeSelectionStrategy.BALANCED.toString())); + channel.maybeExecute(null, null).get(); + + verify(strategySelector, times(1)) + .updateChannelStrategy(eq(channel1), eq(ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED))); + } + + private static void setResponse(LimitedChannel mockChannel, Optional header) { + Mockito.clearInvocations(mockChannel); + Mockito.reset(mockChannel); + Response resp = mock(Response.class); + lenient().when(resp.getFirstHeader("Node-Selection-Strategy")).thenReturn(header); + lenient().when(mockChannel.maybeExecute(any(), any())).thenReturn(Optional.of(Futures.immediateFuture(resp))); + } +} From 234dc9dfa6d08ea1c7955850f759db6c07e29a2d Mon Sep 17 00:00:00 2001 From: forozco Date: Tue, 28 Apr 2020 19:18:17 -0400 Subject: [PATCH 03/21] ignore leading unknown strategies --- .../core/DefaultNodeSelectionStrategySelector.java | 7 ++++++- .../core/DefaultNodeSelectionStrategySelectorTest.java | 10 ++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java index ca15d270c..06fc5248f 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Stream; @SuppressWarnings("NullAway") public final class DefaultNodeSelectionStrategySelector implements NodeSelectionStrategySelector { @@ -65,7 +66,11 @@ private DialogueNodeSelectionStrategy updateAndGetStrategy() { // TODO(forozco): improve strategy selection process to find the common intersection Collection> requestedStrategies = strategyPerChannel.values(); Set firstChoiceStrategies = requestedStrategies.stream() - .map(strategies -> strategies.get(0)) + .flatMap(strategies -> strategies.stream() + .filter(strategy -> strategy != DialogueNodeSelectionStrategy.UNKNOWN) + .findFirst() + .map(Stream::of) + .orElseGet(Stream::empty)) .collect(ImmutableSet.toImmutableSet()); if (firstChoiceStrategies.size() == 1) { return Iterables.getOnlyElement(firstChoiceStrategies); diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java index fe19ddb75..0426fec15 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java @@ -66,6 +66,16 @@ void falls_back_to_client_default_on_conflict() { assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); } + @Test + void ignores_unknown_strategy() { + strategySelector.updateChannelStrategy( + channelA, + ImmutableList.of(DialogueNodeSelectionStrategy.UNKNOWN, DialogueNodeSelectionStrategy.BALANCED)); + DialogueNodeSelectionStrategy strategy = strategySelector.updateChannelStrategy( + channelB, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); + assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); + } + @Test void only_considers_active_channels() { strategySelector.updateChannelStrategy(channelA, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); From 39bbaa60ead2eb1f0cbddea6742f0df59ed948a7 Mon Sep 17 00:00:00 2001 From: forozco Date: Tue, 28 Apr 2020 19:18:21 -0400 Subject: [PATCH 04/21] update simulations --- .../live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png | 4 ++-- simulation/src/test/resources/report.md | 2 +- .../live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/simulation/src/test/resources/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png b/simulation/src/test/resources/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png index e5fa6155b..d98eda582 100644 --- a/simulation/src/test/resources/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png +++ b/simulation/src/test/resources/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:3065afde103b1575f54bc3db953b646b55815e5abc1c7a618ab60c8368f45077 -size 97283 +oid sha256:0600acffc7fffa6401071864d9affb56a984b0e92a23d1b4a0657f4fe9c35a1a +size 122681 diff --git a/simulation/src/test/resources/report.md b/simulation/src/test/resources/report.md index d5c44c697..527093d58 100644 --- a/simulation/src/test/resources/report.md +++ b/simulation/src/test/resources/report.md @@ -13,7 +13,7 @@ fast_503s_then_revert[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=100.0% client_mean=PT0.120012009S server_cpu=PT1H30M0.00000004S client_received=45000/45000 server_resps=45004 codes={200=45000} fast_503s_then_revert[CONCURRENCY_LIMITER_ROUND_ROBIN].txt: success=100.0% client_mean=PT0.120126613S server_cpu=PT1H30M0.0000004S client_received=45000/45000 server_resps=45040 codes={200=45000} fast_503s_then_revert[UNLIMITED_ROUND_ROBIN].txt: success=100.0% client_mean=PT0.120126613S server_cpu=PT1H30M0.0000004S client_received=45000/45000 server_resps=45040 codes={200=45000} - live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=97.2% client_mean=PT4.8253936S server_cpu=PT2H46.79S client_received=2500/2500 server_resps=2500 codes={200=2430, 500=70} + live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=94.0% client_mean=PT5.2613496S server_cpu=PT1H56M35.39S client_received=2500/2500 server_resps=2500 codes={200=2350, 500=150} live_reloading[CONCURRENCY_LIMITER_ROUND_ROBIN].txt: success=90.9% client_mean=PT3.746748S server_cpu=PT1H55M56.12S client_received=2500/2500 server_resps=2500 codes={200=2273, 500=227} live_reloading[UNLIMITED_ROUND_ROBIN].txt: success=90.5% client_mean=PT2.789608S server_cpu=PT1H56M14.02S client_received=2500/2500 server_resps=2500 codes={200=2263, 500=237} one_big_spike[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=100.0% client_mean=PT1.135007332S server_cpu=PT2M49.65S client_received=1000/1000 server_resps=1131 codes={200=1000} diff --git a/simulation/src/test/resources/txt/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt b/simulation/src/test/resources/txt/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt index f67fd9f3a..54d029fff 100644 --- a/simulation/src/test/resources/txt/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt +++ b/simulation/src/test/resources/txt/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt @@ -1 +1 @@ -success=97.2% client_mean=PT4.8253936S server_cpu=PT2H46.79S client_received=2500/2500 server_resps=2500 codes={200=2430, 500=70} \ No newline at end of file +success=94.0% client_mean=PT5.2613496S server_cpu=PT1H56M35.39S client_received=2500/2500 server_resps=2500 codes={200=2350, 500=150} \ No newline at end of file From 77cc246f91e382e7fcd1c3008373fca36923b23b Mon Sep 17 00:00:00 2001 From: forozco Date: Tue, 28 Apr 2020 20:34:27 -0400 Subject: [PATCH 05/21] pinned channel is reused across updates --- .../com/palantir/dialogue/core/DialogueChannel.java | 8 ++++++-- .../dialogue/core/NodeSelectionStrategyChannel.java | 12 +++++++----- ...eloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png | 4 ++-- simulation/src/test/resources/report.md | 2 +- ...eloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt | 2 +- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index cd7dece7d..fd0973418 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -144,8 +144,12 @@ private LimitedChannel createLimitedChannel(String uri, int uriIndex) { channel = new TraceEnrichingChannel(channel); LimitedChannel limitedChannel = new ChannelToLimitedChannelAdapter(channel); - return concurrencyLimiter( - clientConfiguration, limitedChannel, clientConfiguration.taggedMetricRegistry(), channelName, uriIndex); + return nodeSelectionStrategy.wrap(concurrencyLimiter( + clientConfiguration, + limitedChannel, + clientConfiguration.taggedMetricRegistry(), + channelName, + uriIndex)); } private static LimitedChannel concurrencyLimiter( diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index 8f8c00cf2..c71de3292 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -70,13 +70,11 @@ public Optional> maybeExecute(Endpoint endpoint, Requ return delegate.maybeExecute(endpoint, request); } - public void updateChannels(List newChannels) { + public void updateChannels(ImmutableList newChannels) { DialogueNodeSelectionStrategy updatedStrategy = strategySelector.setActiveChannels(newChannels); - ImmutableList wrappedChannels = - newChannels.stream().map(WrapperChannel::new).collect(ImmutableList.toImmutableList()); - nodeChannels.set(wrappedChannels); + nodeChannels.set(newChannels); nodeSelectionStrategy.getAndUpdate(previousChannel -> getUpdatedNodeSelectionStrategy( - previousChannel.channel(), wrappedChannels, updatedStrategy, metrics, random, tick, channelName)); + previousChannel.channel(), newChannels, updatedStrategy, metrics, random, tick, channelName)); } private void updateRequestedStrategies(LimitedChannel channel, List strategies) { @@ -90,6 +88,10 @@ private void updateRequestedStrategies(LimitedChannel channel, List nullableChannels, diff --git a/simulation/src/test/resources/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png b/simulation/src/test/resources/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png index d98eda582..e5fa6155b 100644 --- a/simulation/src/test/resources/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png +++ b/simulation/src/test/resources/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].png @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:0600acffc7fffa6401071864d9affb56a984b0e92a23d1b4a0657f4fe9c35a1a -size 122681 +oid sha256:3065afde103b1575f54bc3db953b646b55815e5abc1c7a618ab60c8368f45077 +size 97283 diff --git a/simulation/src/test/resources/report.md b/simulation/src/test/resources/report.md index 527093d58..d5c44c697 100644 --- a/simulation/src/test/resources/report.md +++ b/simulation/src/test/resources/report.md @@ -13,7 +13,7 @@ fast_503s_then_revert[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=100.0% client_mean=PT0.120012009S server_cpu=PT1H30M0.00000004S client_received=45000/45000 server_resps=45004 codes={200=45000} fast_503s_then_revert[CONCURRENCY_LIMITER_ROUND_ROBIN].txt: success=100.0% client_mean=PT0.120126613S server_cpu=PT1H30M0.0000004S client_received=45000/45000 server_resps=45040 codes={200=45000} fast_503s_then_revert[UNLIMITED_ROUND_ROBIN].txt: success=100.0% client_mean=PT0.120126613S server_cpu=PT1H30M0.0000004S client_received=45000/45000 server_resps=45040 codes={200=45000} - live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=94.0% client_mean=PT5.2613496S server_cpu=PT1H56M35.39S client_received=2500/2500 server_resps=2500 codes={200=2350, 500=150} + live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=97.2% client_mean=PT4.8253936S server_cpu=PT2H46.79S client_received=2500/2500 server_resps=2500 codes={200=2430, 500=70} live_reloading[CONCURRENCY_LIMITER_ROUND_ROBIN].txt: success=90.9% client_mean=PT3.746748S server_cpu=PT1H55M56.12S client_received=2500/2500 server_resps=2500 codes={200=2273, 500=227} live_reloading[UNLIMITED_ROUND_ROBIN].txt: success=90.5% client_mean=PT2.789608S server_cpu=PT1H56M14.02S client_received=2500/2500 server_resps=2500 codes={200=2263, 500=237} one_big_spike[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=100.0% client_mean=PT1.135007332S server_cpu=PT2M49.65S client_received=1000/1000 server_resps=1131 codes={200=1000} diff --git a/simulation/src/test/resources/txt/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt b/simulation/src/test/resources/txt/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt index 54d029fff..f67fd9f3a 100644 --- a/simulation/src/test/resources/txt/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt +++ b/simulation/src/test/resources/txt/live_reloading[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt @@ -1 +1 @@ -success=94.0% client_mean=PT5.2613496S server_cpu=PT1H56M35.39S client_received=2500/2500 server_resps=2500 codes={200=2350, 500=150} \ No newline at end of file +success=97.2% client_mean=PT4.8253936S server_cpu=PT2H46.79S client_received=2500/2500 server_resps=2500 codes={200=2430, 500=70} \ No newline at end of file From b944fdc73b5d06b30d481d20bf34c37c83ec9fee Mon Sep 17 00:00:00 2001 From: forozco Date: Tue, 28 Apr 2020 21:56:05 -0400 Subject: [PATCH 06/21] NSS selector uses score to determine strategy --- .../DefaultNodeSelectionStrategySelector.java | 52 ++++++++++++------- .../core/DialogueNodeSelectionStrategy.java | 2 +- ...aultNodeSelectionStrategySelectorTest.java | 24 +++++++-- 3 files changed, 54 insertions(+), 24 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java index 06fc5248f..f88d9562b 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java @@ -20,23 +20,25 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.palantir.conjure.java.client.config.NodeSelectionStrategy; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; +import java.util.function.Function; +import java.util.stream.Collectors; @SuppressWarnings("NullAway") public final class DefaultNodeSelectionStrategySelector implements NodeSelectionStrategySelector { - private final DialogueNodeSelectionStrategy clientStrategy; private final AtomicReference currentStrategy; private final ConcurrentHashMap> strategyPerChannel = new ConcurrentHashMap<>(); - public DefaultNodeSelectionStrategySelector(NodeSelectionStrategy clientStrategy) { - this.clientStrategy = DialogueNodeSelectionStrategy.of(clientStrategy); - this.currentStrategy = new AtomicReference<>(this.clientStrategy); + public DefaultNodeSelectionStrategySelector(NodeSelectionStrategy initialStrategy) { + this.currentStrategy = new AtomicReference<>(DialogueNodeSelectionStrategy.of(initialStrategy)); } @Override @@ -61,21 +63,35 @@ public DialogueNodeSelectionStrategy setActiveChannels(List chan return updateAndGetStrategy(); } + /** + * returns the requested strategy with the lowest "score", where score is the sum of each strategy's position + * in each nodes request list. + * + * In case of ties, fall back to the previous strategy. + */ private DialogueNodeSelectionStrategy updateAndGetStrategy() { - return currentStrategy.updateAndGet(_strategy -> { - // TODO(forozco): improve strategy selection process to find the common intersection - Collection> requestedStrategies = strategyPerChannel.values(); - Set firstChoiceStrategies = requestedStrategies.stream() - .flatMap(strategies -> strategies.stream() - .filter(strategy -> strategy != DialogueNodeSelectionStrategy.UNKNOWN) - .findFirst() - .map(Stream::of) - .orElseGet(Stream::empty)) - .collect(ImmutableSet.toImmutableSet()); - if (firstChoiceStrategies.size() == 1) { - return Iterables.getOnlyElement(firstChoiceStrategies); + return currentStrategy.updateAndGet(previousStrategy -> { + Collection> allRequestedStrategies = strategyPerChannel.values(); + Map scorePerStrategy = Arrays.stream( + DialogueNodeSelectionStrategy.values()) + .filter(strategy -> strategy != DialogueNodeSelectionStrategy.UNKNOWN) + .collect(Collectors.toMap(Function.identity(), strategy -> allRequestedStrategies.stream() + .mapToInt(requestedStrategies -> { + int score = requestedStrategies.indexOf(strategy); + return score == -1 ? DialogueNodeSelectionStrategy.values().length : score; + }) + .sum())); + + int minScore = Collections.min(scorePerStrategy.values()); + Set minScoreStrategies = scorePerStrategy.entrySet().stream() + .filter(entry -> entry.getValue() == minScore) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + if (minScoreStrategies.size() == 1) { + return Iterables.getOnlyElement(minScoreStrategies); } - return clientStrategy; + + return previousStrategy; }); } } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java index 433e94714..3d1f95875 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java @@ -30,7 +30,7 @@ public enum DialogueNodeSelectionStrategy { UNKNOWN; static List fromHeader(String header) { - return Splitter.on(";").splitToList(header).stream() + return Splitter.on(",").splitToList(header).stream() .map(DialogueNodeSelectionStrategy::safeValueOf) .collect(ImmutableList.toImmutableList()); } diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java index 0426fec15..cad065689 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java @@ -55,15 +55,17 @@ void uses_server_provided_strategy() { } @Test - void falls_back_to_client_default_on_conflict() { - DialogueNodeSelectionStrategy strategy = strategySelector.updateChannelStrategy( + void falls_back_to_previous_on_conflict() { + DialogueNodeSelectionStrategy strategy; + + strategy = strategySelector.updateChannelStrategy( channelA, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); strategy = strategySelector.updateChannelStrategy( channelB, ImmutableList.of(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE)); - assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); + assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); } @Test @@ -78,9 +80,21 @@ void ignores_unknown_strategy() { @Test void only_considers_active_channels() { - strategySelector.updateChannelStrategy(channelA, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); - DialogueNodeSelectionStrategy strategy = strategySelector.setActiveChannels(ImmutableList.of()); + DialogueNodeSelectionStrategy strategy; + // Initially prefers PuE + strategy = strategySelector.updateChannelStrategy( + channelA, + ImmutableList.of( + DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR, DialogueNodeSelectionStrategy.BALANCED)); + assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); + + // Switches to Balance upon seeing another node that requests Balance + strategy = strategySelector.updateChannelStrategy( + channelB, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); + assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); + // Switches back to PuE once that node disappears + strategy = strategySelector.setActiveChannels(ImmutableList.of(channelA)); assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); } } From 7a8f2365a65627ef11298175cb4afc557f4161a1 Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 09:07:29 -0400 Subject: [PATCH 07/21] cleanup --- .../dialogue/core/DialogueNodeSelectionStrategyTest.java | 4 ++-- .../dialogue/core/NodeSelectionStrategyChannelTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategyTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategyTest.java index f345456c6..91b11a2fb 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategyTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategyTest.java @@ -30,13 +30,13 @@ void parses_single_strategy() { @Test void parses_multiple_strategies() { - assertThat(DialogueNodeSelectionStrategy.fromHeader("BALANCED; PIN_UNTIL_ERROR")) + assertThat(DialogueNodeSelectionStrategy.fromHeader("BALANCED, PIN_UNTIL_ERROR")) .containsExactly(DialogueNodeSelectionStrategy.BALANCED, DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); } @Test void case_insensitive() { - assertThat(DialogueNodeSelectionStrategy.fromHeader("balanced; PIN_UNTIL_ERROR")) + assertThat(DialogueNodeSelectionStrategy.fromHeader("balanced, PIN_UNTIL_ERROR")) .containsExactly(DialogueNodeSelectionStrategy.BALANCED, DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); } } diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java index 508389021..fd5d8f735 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java @@ -86,8 +86,8 @@ void updates_strategy_selector_on_uri_update() { @Test void tracks_per_host_strategy() { - channel.updateChannels(ImmutableList.of(channel1)); - setResponse(channel1, Optional.of(DialogueNodeSelectionStrategy.BALANCED.toString())); + channel.updateChannels(ImmutableList.of(channel.wrap(channel1))); + setResponse(channel1, Optional.of("BALANCED")); channel.maybeExecute(null, null).get(); verify(strategySelector, times(1)) From 0b3315cfc13500ae7ea38917a3c564b06908d5ba Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 13:01:04 -0400 Subject: [PATCH 08/21] upgrade guava --- versions.lock | 7 +++---- versions.props | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/versions.lock b/versions.lock index d536a8b28..c04dab866 100644 --- a/versions.lock +++ b/versions.lock @@ -10,9 +10,9 @@ com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.11.0 (1 constraints: 7c com.fasterxml.jackson.module:jackson-module-afterburner:2.11.0 (2 constraints: 2629e8cb) com.github.ben-manes.caffeine:caffeine:2.8.2 (2 constraints: c1177d4f) com.google.code.findbugs:jsr305:3.0.2 (11 constraints: 6b9fbbdf) -com.google.errorprone:error_prone_annotations:2.3.4 (5 constraints: 4a4818d6) +com.google.errorprone:error_prone_annotations:2.3.4 (5 constraints: 4c4820d7) com.google.guava:failureaccess:1.0.1 (1 constraints: 140ae1b4) -com.google.guava:guava:28.1-jre (9 constraints: d2974be9) +com.google.guava:guava:29.0-jre (9 constraints: 7597754a) com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava (1 constraints: bd17c918) com.google.j2objc:j2objc-annotations:1.3 (1 constraints: b809eda0) com.palantir.conjure.java:conjure-lib:5.14.2 (1 constraints: 3e054f3b) @@ -42,8 +42,7 @@ jakarta.annotation:jakarta.annotation-api:1.3.5 (2 constraints: fb144cb7) jakarta.ws.rs:jakarta.ws.rs-api:2.1.6 (2 constraints: fb144cb7) org.apache.httpcomponents:httpclient:4.5.12 (1 constraints: 3e054d3b) org.apache.httpcomponents:httpcore:4.4.13 (1 constraints: 591016a2) -org.checkerframework:checker-qual:3.3.0 (2 constraints: 181af743) -org.codehaus.mojo:animal-sniffer-annotations:1.18 (1 constraints: ee09d9aa) +org.checkerframework:checker-qual:3.3.0 (2 constraints: 421a275e) org.hdrhistogram:HdrHistogram:2.1.12 (1 constraints: 3e103aa2) org.immutables:value:2.8.3 (1 constraints: 0f051036) org.mpierce.metrics.reservoir:hdrhistogram-metrics-reservoir:1.1.2 (1 constraints: 0c10f891) diff --git a/versions.props b/versions.props index 9af794ddd..e2a1a685c 100644 --- a/versions.props +++ b/versions.props @@ -1,7 +1,7 @@ com.fasterxml.jackson.*:* = 2.11.0 com.github.ben-manes.caffeine:caffeine = 2.8.2 com.google.code.findbugs:jsr305 = 3.0.2 -com.google.guava:guava = 27.0.1-jre +com.google.guava:guava = 29.0-jre com.palantir.conjure.java.api:* = 2.12.0 com.palantir.conjure.java.runtime:* = 5.7.1 com.palantir.conjure.java:* = 5.14.2 From 651f397d456e1a7d8a31264ffd49d158fde487a8 Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 13:04:38 -0400 Subject: [PATCH 09/21] improve strategy parsing --- .../dialogue/core/DialogueNodeSelectionStrategy.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java index 3d1f95875..a1c4b6429 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java @@ -22,6 +22,8 @@ import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeIllegalStateException; import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public enum DialogueNodeSelectionStrategy { PIN_UNTIL_ERROR, @@ -29,16 +31,20 @@ public enum DialogueNodeSelectionStrategy { BALANCED, UNKNOWN; + private static final Logger log = LoggerFactory.getLogger(DialogueNodeSelectionStrategy.class); + private static final Splitter SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings(); + static List fromHeader(String header) { - return Splitter.on(",").splitToList(header).stream() + return SPLITTER.splitToStream(header) .map(DialogueNodeSelectionStrategy::safeValueOf) .collect(ImmutableList.toImmutableList()); } private static DialogueNodeSelectionStrategy safeValueOf(String value) { try { - return valueOf(value.trim().toUpperCase()); + return valueOf(value.toUpperCase()); } catch (Exception e) { + log.info("Received unknown selection strategy", SafeArg.of("strategy", value)); return UNKNOWN; } } From 709433618b28a0e8c7eb44cb97e4ca679ab74923 Mon Sep 17 00:00:00 2001 From: Felipe Orozco Date: Wed, 29 Apr 2020 14:00:01 -0400 Subject: [PATCH 10/21] Update dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java Co-Authored-By: Carter Kozak --- .../palantir/dialogue/core/NodeSelectionStrategySelector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java index d226397c4..fd8593836 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java @@ -18,7 +18,7 @@ import java.util.List; -public interface NodeSelectionStrategySelector { +interface NodeSelectionStrategySelector { DialogueNodeSelectionStrategy getCurrentStrategy(); DialogueNodeSelectionStrategy updateChannelStrategy( From 85a54ebd396776426b02e648b0e9559d6cf2f81f Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 14:05:02 -0400 Subject: [PATCH 11/21] avoid labdas --- .../core/NodeSelectionStrategyChannel.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index c71de3292..21e553783 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -170,22 +170,28 @@ private class WrapperChannel implements LimitedChannel { WrapperChannel(LimitedChannel delegate) { this.delegate = delegate; this.callback = new FutureCallback() { + @Override public void onSuccess(Response result) { - result.getFirstHeader(NODE_SELECTION_HEADER) - .ifPresent(strategy -> updateRequestedStrategies( - delegate, DialogueNodeSelectionStrategy.fromHeader(strategy))); + result.getFirstHeader(NODE_SELECTION_HEADER).ifPresent(this::consumeStrategy); } @Override public void onFailure(Throwable _unused) {} + + private void consumeStrategy(String strategy) { + updateRequestedStrategies(delegate, DialogueNodeSelectionStrategy.fromHeader(strategy)); + } }; } @Override public Optional> maybeExecute(Endpoint endpoint, Request request) { - return delegate.maybeExecute(endpoint, request) - .map(response -> DialogueFutures.addDirectCallback(response, this.callback)); + return delegate.maybeExecute(endpoint, request).map(this::wrap); + } + + private ListenableFuture wrap(ListenableFuture response) { + return DialogueFutures.addDirectCallback(response, callback); } } } From a181a9f6b914e15ddbb81ae2cf87840d53be3590 Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 14:07:55 -0400 Subject: [PATCH 12/21] package private --- .../dialogue/core/DefaultNodeSelectionStrategySelector.java | 4 ++-- .../palantir/dialogue/core/DialogueNodeSelectionStrategy.java | 2 +- .../palantir/dialogue/core/NodeSelectionStrategyChannel.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java index f88d9562b..95f35cbfb 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java @@ -32,12 +32,12 @@ import java.util.stream.Collectors; @SuppressWarnings("NullAway") -public final class DefaultNodeSelectionStrategySelector implements NodeSelectionStrategySelector { +final class DefaultNodeSelectionStrategySelector implements NodeSelectionStrategySelector { private final AtomicReference currentStrategy; private final ConcurrentHashMap> strategyPerChannel = new ConcurrentHashMap<>(); - public DefaultNodeSelectionStrategySelector(NodeSelectionStrategy initialStrategy) { + DefaultNodeSelectionStrategySelector(NodeSelectionStrategy initialStrategy) { this.currentStrategy = new AtomicReference<>(DialogueNodeSelectionStrategy.of(initialStrategy)); } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java index a1c4b6429..7dab6c4f5 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java @@ -25,7 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public enum DialogueNodeSelectionStrategy { +enum DialogueNodeSelectionStrategy { PIN_UNTIL_ERROR, PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE, BALANCED, diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index 21e553783..edf02455c 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -34,7 +34,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.immutables.value.Value; -public final class NodeSelectionStrategyChannel implements LimitedChannel { +final class NodeSelectionStrategyChannel implements LimitedChannel { private static final String NODE_SELECTION_HEADER = "Node-Selection-Strategy"; private final AtomicReference nodeSelectionStrategy; @@ -46,7 +46,7 @@ public final class NodeSelectionStrategyChannel implements LimitedChannel { private final NodeSelectionStrategySelector strategySelector; private final LimitedChannel delegate; - public NodeSelectionStrategyChannel( + NodeSelectionStrategyChannel( String channelName, Random random, Ticker tick, From 51ed3cd3129f44404a82fac9a304d2b7d0e77b79 Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 14:12:19 -0400 Subject: [PATCH 13/21] avoid exception for control flow --- .../core/DialogueNodeSelectionStrategy.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java index 7dab6c4f5..79e0d7fae 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java @@ -41,12 +41,17 @@ static List fromHeader(String header) { } private static DialogueNodeSelectionStrategy safeValueOf(String value) { - try { - return valueOf(value.toUpperCase()); - } catch (Exception e) { - log.info("Received unknown selection strategy", SafeArg.of("strategy", value)); - return UNKNOWN; + String normalizedValue = value.toUpperCase(); + if (PIN_UNTIL_ERROR.name().equals(normalizedValue)) { + return PIN_UNTIL_ERROR; + } else if (PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE.name().equals(normalizedValue)) { + return PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE; + } else if (BALANCED.name().equals(normalizedValue)) { + return BALANCED; } + + log.info("Received unknown selection strategy", SafeArg.of("strategy", value)); + return UNKNOWN; } static DialogueNodeSelectionStrategy of(NodeSelectionStrategy strategy) { From e480663bc1041ff8c4ec19bfb845de86e6936b2a Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 14:36:48 -0400 Subject: [PATCH 14/21] instrument --- .../DefaultNodeSelectionStrategySelector.java | 10 ++++-- .../dialogue/core/DialogueChannel.java | 8 +++-- .../core/NodeSelectionStrategyChannel.java | 34 ++++++++----------- .../main/metrics/dialogue-core-metrics.yml | 9 +++++ ...aultNodeSelectionStrategySelectorTest.java | 5 ++- .../NodeSelectionStrategyChannelTest.java | 5 +-- 6 files changed, 44 insertions(+), 27 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java index 95f35cbfb..d9d7afd38 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java @@ -34,11 +34,13 @@ @SuppressWarnings("NullAway") final class DefaultNodeSelectionStrategySelector implements NodeSelectionStrategySelector { private final AtomicReference currentStrategy; + private final DialogueNodeselectionMetrics metrics; private final ConcurrentHashMap> strategyPerChannel = new ConcurrentHashMap<>(); - DefaultNodeSelectionStrategySelector(NodeSelectionStrategy initialStrategy) { + DefaultNodeSelectionStrategySelector(NodeSelectionStrategy initialStrategy, DialogueNodeselectionMetrics metrics) { this.currentStrategy = new AtomicReference<>(DialogueNodeSelectionStrategy.of(initialStrategy)); + this.metrics = metrics; } @Override @@ -88,7 +90,11 @@ private DialogueNodeSelectionStrategy updateAndGetStrategy() { .map(Map.Entry::getKey) .collect(Collectors.toSet()); if (minScoreStrategies.size() == 1) { - return Iterables.getOnlyElement(minScoreStrategies); + DialogueNodeSelectionStrategy proposedStrategy = Iterables.getOnlyElement(minScoreStrategies); + if (!proposedStrategy.equals(previousStrategy)) { + metrics.strategy(proposedStrategy.name()).mark(); + return proposedStrategy; + } } return previousStrategy; diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index fd0973418..3e4fdfa5d 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -71,14 +71,16 @@ private DialogueChannel( this.channelName = channelName; this.clientConfiguration = clientConfiguration; this.channelFactory = channelFactory; - clientMetrics = ClientMetrics.of(clientConfiguration.taggedMetricRegistry()); - dialogueClientMetrics = DialogueClientMetrics.of(clientConfiguration.taggedMetricRegistry()); + this.clientMetrics = ClientMetrics.of(clientConfiguration.taggedMetricRegistry()); + this.dialogueClientMetrics = DialogueClientMetrics.of(clientConfiguration.taggedMetricRegistry()); this.nodeSelectionStrategy = new NodeSelectionStrategyChannel( channelName, random, ticker, clientConfiguration.taggedMetricRegistry(), - new DefaultNodeSelectionStrategySelector(clientConfiguration.nodeSelectionStrategy())); + new DefaultNodeSelectionStrategySelector( + clientConfiguration.nodeSelectionStrategy(), + DialogueNodeselectionMetrics.of(clientConfiguration.taggedMetricRegistry()))); this.queuedChannel = new QueuedChannel(nodeSelectionStrategy, channelName, dialogueClientMetrics, maxQueueSize); updateUrisInner(clientConfiguration.uris(), true); this.delegate = wrap( diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index edf02455c..f7cb2ad85 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -23,7 +23,6 @@ import com.palantir.dialogue.Endpoint; import com.palantir.dialogue.Request; import com.palantir.dialogue.Response; -import com.palantir.logsafe.Preconditions; import com.palantir.logsafe.SafeArg; import com.palantir.logsafe.exceptions.SafeRuntimeException; import com.palantir.tritium.metrics.registry.TaggedMetricRegistry; @@ -34,6 +33,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.immutables.value.Value; +@SuppressWarnings("NullAway") final class NodeSelectionStrategyChannel implements LimitedChannel { private static final String NODE_SELECTION_HEADER = "Node-Selection-Strategy"; @@ -60,9 +60,7 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { this.nodeChannels = new AtomicReference<>(ImmutableList.of()); this.nodeSelectionStrategy = new AtomicReference<>(getUpdatedNodeSelectionStrategy( null, nodeChannels.get(), strategySelector.getCurrentStrategy(), metrics, random, tick, channelName)); - this.delegate = new SupplierChannel(() -> Preconditions.checkNotNull( - nodeSelectionStrategy.get(), "node selection strategy must not be null") - .channel()); + this.delegate = new SupplierChannel(() -> nodeSelectionStrategy.get().channel()); } @Override @@ -70,22 +68,22 @@ public Optional> maybeExecute(Endpoint endpoint, Requ return delegate.maybeExecute(endpoint, request); } - public void updateChannels(ImmutableList newChannels) { - DialogueNodeSelectionStrategy updatedStrategy = strategySelector.setActiveChannels(newChannels); + void updateChannels(ImmutableList newChannels) { nodeChannels.set(newChannels); - nodeSelectionStrategy.getAndUpdate(previousChannel -> getUpdatedNodeSelectionStrategy( - previousChannel.channel(), newChannels, updatedStrategy, metrics, random, tick, channelName)); + maybeUpdateNodeSelectionStrategy(strategySelector.setActiveChannels(newChannels)); } private void updateRequestedStrategies(LimitedChannel channel, List strategies) { - DialogueNodeSelectionStrategy updatedStrategy = strategySelector.updateChannelStrategy(channel, strategies); - nodeSelectionStrategy.getAndUpdate(currentStrategy -> { - if (updatedStrategy.equals(currentStrategy.strategy())) { - return currentStrategy; - } - return getUpdatedNodeSelectionStrategy( - currentStrategy.channel(), nodeChannels.get(), updatedStrategy, metrics, random, tick, channelName); - }); + maybeUpdateNodeSelectionStrategy(strategySelector.updateChannelStrategy(channel, strategies)); + } + + private void maybeUpdateNodeSelectionStrategy(DialogueNodeSelectionStrategy updatedStrategy) { + // Quick check to avoid expensive CAS + if (updatedStrategy.equals(nodeSelectionStrategy.get().strategy())) { + return; + } + nodeSelectionStrategy.getAndUpdate(currentStrategy -> getUpdatedNodeSelectionStrategy( + currentStrategy.channel(), nodeChannels.get(), updatedStrategy, metrics, random, tick, channelName)); } LimitedChannel wrap(LimitedChannel channel) { @@ -94,15 +92,13 @@ LimitedChannel wrap(LimitedChannel channel) { private static ChannelWithStrategy getUpdatedNodeSelectionStrategy( @Nullable LimitedChannel previousNodeSelectionStrategy, - @Nullable ImmutableList nullableChannels, + ImmutableList channels, DialogueNodeSelectionStrategy updatedStrategy, TaggedMetricRegistry metrics, Random random, Ticker tick, String channelName) { - ImmutableList channels = - Preconditions.checkNotNull(nullableChannels, "channels must not be null"); if (channels.isEmpty()) { return ChannelWithStrategy.of(updatedStrategy, new ZeroUriChannel(channelName)); } diff --git a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml index 4ed005c9c..86cf76f97 100644 --- a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml +++ b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml @@ -91,3 +91,12 @@ namespaces: docs: The score that the BalancedChannel currently assigns to each host (computed based on inflight requests and recent failures). Requests are routed to the channel with the lowest score. (Note if there are >10 nodes this metric will not be recorded). + + dialogue.nodeselection: + docs: Instrumentation for which node selection strategy is used + metrics: + strategy: + type: meter + tags: [strategy] + docs: Marked every time the node selection strategy changes + diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java index cad065689..ce52fd1c6 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import com.palantir.conjure.java.client.config.NodeSelectionStrategy; +import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -39,7 +40,9 @@ class DefaultNodeSelectionStrategySelectorTest { @BeforeEach void beforeEach() { - strategySelector = new DefaultNodeSelectionStrategySelector(NodeSelectionStrategy.PIN_UNTIL_ERROR); + strategySelector = new DefaultNodeSelectionStrategySelector( + NodeSelectionStrategy.PIN_UNTIL_ERROR, + DialogueNodeselectionMetrics.of(new DefaultTaggedMetricRegistry())); } @Test diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java index fd5d8f735..804b915b3 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java @@ -43,8 +43,9 @@ class NodeSelectionStrategyChannelTest { @Spy - private NodeSelectionStrategySelector strategySelector = - new DefaultNodeSelectionStrategySelector(NodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE); + private NodeSelectionStrategySelector strategySelector = new DefaultNodeSelectionStrategySelector( + NodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE, + DialogueNodeselectionMetrics.of(new DefaultTaggedMetricRegistry())); @Mock private LimitedChannel channel1; From 8e889546455d8a09ee18f2adbad2e5e3b6dfc275 Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 15:51:11 -0400 Subject: [PATCH 15/21] simplicity --- .../DefaultNodeSelectionStrategySelector.java | 103 ----------------- .../dialogue/core/DialogueChannel.java | 15 +-- .../core/NodeSelectionStrategyChannel.java | 108 ++++++++++-------- .../core/NodeSelectionStrategySelector.java | 8 +- ...aultNodeSelectionStrategySelectorTest.java | 103 ----------------- .../NodeSelectionStrategyChannelTest.java | 32 +++--- 6 files changed, 85 insertions(+), 284 deletions(-) delete mode 100644 dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java delete mode 100644 dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java deleted file mode 100644 index d9d7afd38..000000000 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelector.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.dialogue.core; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; -import com.palantir.conjure.java.client.config.NodeSelectionStrategy; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; -import java.util.stream.Collectors; - -@SuppressWarnings("NullAway") -final class DefaultNodeSelectionStrategySelector implements NodeSelectionStrategySelector { - private final AtomicReference currentStrategy; - private final DialogueNodeselectionMetrics metrics; - private final ConcurrentHashMap> strategyPerChannel = - new ConcurrentHashMap<>(); - - DefaultNodeSelectionStrategySelector(NodeSelectionStrategy initialStrategy, DialogueNodeselectionMetrics metrics) { - this.currentStrategy = new AtomicReference<>(DialogueNodeSelectionStrategy.of(initialStrategy)); - this.metrics = metrics; - } - - @Override - public DialogueNodeSelectionStrategy getCurrentStrategy() { - return currentStrategy.get(); - } - - @Override - public DialogueNodeSelectionStrategy updateChannelStrategy( - LimitedChannel channel, List updatedStrategies) { - List previousStrategies = strategyPerChannel.put(channel, updatedStrategies); - if (updatedStrategies.isEmpty() || updatedStrategies.equals(previousStrategies)) { - return currentStrategy.get(); - } - return updateAndGetStrategy(); - } - - @Override - public DialogueNodeSelectionStrategy setActiveChannels(List channels) { - Sets.difference(strategyPerChannel.keySet(), ImmutableSet.copyOf(channels)) - .forEach(strategyPerChannel::remove); - return updateAndGetStrategy(); - } - - /** - * returns the requested strategy with the lowest "score", where score is the sum of each strategy's position - * in each nodes request list. - * - * In case of ties, fall back to the previous strategy. - */ - private DialogueNodeSelectionStrategy updateAndGetStrategy() { - return currentStrategy.updateAndGet(previousStrategy -> { - Collection> allRequestedStrategies = strategyPerChannel.values(); - Map scorePerStrategy = Arrays.stream( - DialogueNodeSelectionStrategy.values()) - .filter(strategy -> strategy != DialogueNodeSelectionStrategy.UNKNOWN) - .collect(Collectors.toMap(Function.identity(), strategy -> allRequestedStrategies.stream() - .mapToInt(requestedStrategies -> { - int score = requestedStrategies.indexOf(strategy); - return score == -1 ? DialogueNodeSelectionStrategy.values().length : score; - }) - .sum())); - - int minScore = Collections.min(scorePerStrategy.values()); - Set minScoreStrategies = scorePerStrategy.entrySet().stream() - .filter(entry -> entry.getValue() == minScore) - .map(Map.Entry::getKey) - .collect(Collectors.toSet()); - if (minScoreStrategies.size() == 1) { - DialogueNodeSelectionStrategy proposedStrategy = Iterables.getOnlyElement(minScoreStrategies); - if (!proposedStrategy.equals(previousStrategy)) { - metrics.strategy(proposedStrategy.name()).mark(); - return proposedStrategy; - } - } - - return previousStrategy; - }); - } -} diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java index 3e4fdfa5d..990338221 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueChannel.java @@ -74,13 +74,12 @@ private DialogueChannel( this.clientMetrics = ClientMetrics.of(clientConfiguration.taggedMetricRegistry()); this.dialogueClientMetrics = DialogueClientMetrics.of(clientConfiguration.taggedMetricRegistry()); this.nodeSelectionStrategy = new NodeSelectionStrategyChannel( + clientConfiguration.nodeSelectionStrategy(), channelName, random, ticker, - clientConfiguration.taggedMetricRegistry(), - new DefaultNodeSelectionStrategySelector( - clientConfiguration.nodeSelectionStrategy(), - DialogueNodeselectionMetrics.of(clientConfiguration.taggedMetricRegistry()))); + clientConfiguration.taggedMetricRegistry()); + this.queuedChannel = new QueuedChannel(nodeSelectionStrategy, channelName, dialogueClientMetrics, maxQueueSize); updateUrisInner(clientConfiguration.uris(), true); this.delegate = wrap( @@ -146,12 +145,8 @@ private LimitedChannel createLimitedChannel(String uri, int uriIndex) { channel = new TraceEnrichingChannel(channel); LimitedChannel limitedChannel = new ChannelToLimitedChannelAdapter(channel); - return nodeSelectionStrategy.wrap(concurrencyLimiter( - clientConfiguration, - limitedChannel, - clientConfiguration.taggedMetricRegistry(), - channelName, - uriIndex)); + return concurrencyLimiter( + clientConfiguration, limitedChannel, clientConfiguration.taggedMetricRegistry(), channelName, uriIndex); } private static LimitedChannel concurrencyLimiter( diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index f7cb2ad85..e4054ed45 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -17,9 +17,11 @@ package com.palantir.dialogue.core; import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; +import com.palantir.conjure.java.client.config.NodeSelectionStrategy; import com.palantir.dialogue.Endpoint; import com.palantir.dialogue.Request; import com.palantir.dialogue.Response; @@ -37,57 +39,78 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { private static final String NODE_SELECTION_HEADER = "Node-Selection-Strategy"; + private final FutureCallback callback = new NodeSelectionCallback(); + private final AtomicReference nodeSelectionStrategy; private final AtomicReference> nodeChannels; + private final NodeSelectionStrategySelector strategySelector; + private final String channelName; private final Random random; private final Ticker tick; private final TaggedMetricRegistry metrics; - private final NodeSelectionStrategySelector strategySelector; private final LimitedChannel delegate; NodeSelectionStrategyChannel( + NodeSelectionStrategy initialStrategy, String channelName, Random random, Ticker tick, - TaggedMetricRegistry metrics, - NodeSelectionStrategySelector strategySelector) { + TaggedMetricRegistry metrics) { + this( + NodeSelectionStrategyChannel::getFirstKnownStrategy, + DialogueNodeSelectionStrategy.of(initialStrategy), + channelName, + random, + tick, + metrics); + } + + @VisibleForTesting + NodeSelectionStrategyChannel( + NodeSelectionStrategySelector strategySelector, + DialogueNodeSelectionStrategy initialStrategy, + String channelName, + Random random, + Ticker tick, + TaggedMetricRegistry metrics) { + this.strategySelector = strategySelector; this.channelName = channelName; this.random = random; this.tick = tick; this.metrics = metrics; - this.strategySelector = strategySelector; this.nodeChannels = new AtomicReference<>(ImmutableList.of()); this.nodeSelectionStrategy = new AtomicReference<>(getUpdatedNodeSelectionStrategy( - null, nodeChannels.get(), strategySelector.getCurrentStrategy(), metrics, random, tick, channelName)); + null, nodeChannels.get(), initialStrategy, metrics, random, tick, channelName)); this.delegate = new SupplierChannel(() -> nodeSelectionStrategy.get().channel()); } @Override public Optional> maybeExecute(Endpoint endpoint, Request request) { - return delegate.maybeExecute(endpoint, request); + return delegate.maybeExecute(endpoint, request).map(this::wrapWithCallback); } - void updateChannels(ImmutableList newChannels) { - nodeChannels.set(newChannels); - maybeUpdateNodeSelectionStrategy(strategySelector.setActiveChannels(newChannels)); + private ListenableFuture wrapWithCallback(ListenableFuture response) { + return DialogueFutures.addDirectCallback(response, callback); } - private void updateRequestedStrategies(LimitedChannel channel, List strategies) { - maybeUpdateNodeSelectionStrategy(strategySelector.updateChannelStrategy(channel, strategies)); + void updateChannels(ImmutableList updatedChannels) { + nodeChannels.set(updatedChannels); + nodeSelectionStrategy.getAndUpdate(strat -> getUpdatedNodeSelectionStrategy( + strat.channel(), updatedChannels, strat.strategy(), metrics, random, tick, channelName)); } - private void maybeUpdateNodeSelectionStrategy(DialogueNodeSelectionStrategy updatedStrategy) { - // Quick check to avoid expensive CAS - if (updatedStrategy.equals(nodeSelectionStrategy.get().strategy())) { - return; + private void updateRequestedStrategies(List strategies) { + Optional maybeStrategy = strategySelector.updateAndGet(strategies); + if (maybeStrategy.isPresent()) { + DialogueNodeSelectionStrategy strategy = maybeStrategy.get(); + // Quick check to avoid expensive CAS + if (strategy.equals(nodeSelectionStrategy.get().strategy())) { + return; + } + nodeSelectionStrategy.getAndUpdate(currentStrategy -> getUpdatedNodeSelectionStrategy( + currentStrategy.channel(), nodeChannels.get(), strategy, metrics, random, tick, channelName)); } - nodeSelectionStrategy.getAndUpdate(currentStrategy -> getUpdatedNodeSelectionStrategy( - currentStrategy.channel(), nodeChannels.get(), updatedStrategy, metrics, random, tick, channelName)); - } - - LimitedChannel wrap(LimitedChannel channel) { - return new WrapperChannel(channel); } private static ChannelWithStrategy getUpdatedNodeSelectionStrategy( @@ -145,6 +168,17 @@ private static ChannelWithStrategy getUpdatedNodeSelectionStrategy( throw new SafeRuntimeException("Unknown NodeSelectionStrategy", SafeArg.of("unknown", updatedStrategy)); } + @VisibleForTesting + static Optional getFirstKnownStrategy( + List strategies) { + for (DialogueNodeSelectionStrategy strategy : strategies) { + if (!strategy.equals(DialogueNodeSelectionStrategy.UNKNOWN)) { + return Optional.of(strategy); + } + } + return Optional.empty(); + } + @Value.Immutable interface ChannelWithStrategy { DialogueNodeSelectionStrategy strategy(); @@ -159,35 +193,17 @@ static ChannelWithStrategy of(DialogueNodeSelectionStrategy strategy, LimitedCha } } - private class WrapperChannel implements LimitedChannel { - private final LimitedChannel delegate; - private final FutureCallback callback; - - WrapperChannel(LimitedChannel delegate) { - this.delegate = delegate; - this.callback = new FutureCallback() { - - @Override - public void onSuccess(Response result) { - result.getFirstHeader(NODE_SELECTION_HEADER).ifPresent(this::consumeStrategy); - } - - @Override - public void onFailure(Throwable _unused) {} - - private void consumeStrategy(String strategy) { - updateRequestedStrategies(delegate, DialogueNodeSelectionStrategy.fromHeader(strategy)); - } - }; + private final class NodeSelectionCallback implements FutureCallback { + @Override + public void onSuccess(Response result) { + result.getFirstHeader(NODE_SELECTION_HEADER).ifPresent(this::consumeStrategy); } @Override - public Optional> maybeExecute(Endpoint endpoint, Request request) { - return delegate.maybeExecute(endpoint, request).map(this::wrap); - } + public void onFailure(Throwable _unused) {} - private ListenableFuture wrap(ListenableFuture response) { - return DialogueFutures.addDirectCallback(response, callback); + private void consumeStrategy(String strategy) { + updateRequestedStrategies(DialogueNodeSelectionStrategy.fromHeader(strategy)); } } } diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java index fd8593836..0b19353f7 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java @@ -17,12 +17,8 @@ package com.palantir.dialogue.core; import java.util.List; +import java.util.Optional; interface NodeSelectionStrategySelector { - DialogueNodeSelectionStrategy getCurrentStrategy(); - - DialogueNodeSelectionStrategy updateChannelStrategy( - LimitedChannel channel, List updatedStrategies); - - DialogueNodeSelectionStrategy setActiveChannels(List channels); + Optional updateAndGet(List updatedStrategies); } diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java deleted file mode 100644 index ce52fd1c6..000000000 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/DefaultNodeSelectionStrategySelectorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * (c) Copyright 2020 Palantir Technologies Inc. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.palantir.dialogue.core; - -import static org.assertj.core.api.Assertions.assertThat; - -import com.google.common.collect.ImmutableList; -import com.palantir.conjure.java.client.config.NodeSelectionStrategy; -import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -@ExtendWith(MockitoExtension.class) -class DefaultNodeSelectionStrategySelectorTest { - - @Mock - LimitedChannel channelA; - - @Mock - LimitedChannel channelB; - - private DefaultNodeSelectionStrategySelector strategySelector; - - @BeforeEach - void beforeEach() { - strategySelector = new DefaultNodeSelectionStrategySelector( - NodeSelectionStrategy.PIN_UNTIL_ERROR, - DialogueNodeselectionMetrics.of(new DefaultTaggedMetricRegistry())); - } - - @Test - void defaults_to_client_provided_strategy() { - assertThat(strategySelector.getCurrentStrategy()).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); - } - - @Test - void uses_server_provided_strategy() { - DialogueNodeSelectionStrategy strategy = strategySelector.updateChannelStrategy( - channelA, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); - assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); - } - - @Test - void falls_back_to_previous_on_conflict() { - DialogueNodeSelectionStrategy strategy; - - strategy = strategySelector.updateChannelStrategy( - channelA, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); - assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); - - strategy = strategySelector.updateChannelStrategy( - channelB, ImmutableList.of(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE)); - - assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); - } - - @Test - void ignores_unknown_strategy() { - strategySelector.updateChannelStrategy( - channelA, - ImmutableList.of(DialogueNodeSelectionStrategy.UNKNOWN, DialogueNodeSelectionStrategy.BALANCED)); - DialogueNodeSelectionStrategy strategy = strategySelector.updateChannelStrategy( - channelB, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); - assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); - } - - @Test - void only_considers_active_channels() { - DialogueNodeSelectionStrategy strategy; - // Initially prefers PuE - strategy = strategySelector.updateChannelStrategy( - channelA, - ImmutableList.of( - DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR, DialogueNodeSelectionStrategy.BALANCED)); - assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); - - // Switches to Balance upon seeing another node that requests Balance - strategy = strategySelector.updateChannelStrategy( - channelB, ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED)); - assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.BALANCED); - - // Switches back to PuE once that node disappears - strategy = strategySelector.setActiveChannels(ImmutableList.of(channelA)); - assertThat(strategy).isEqualTo(DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR); - } -} diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java index 804b915b3..d52763b3e 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java @@ -26,9 +26,9 @@ import com.github.benmanes.caffeine.cache.Ticker; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; -import com.palantir.conjure.java.client.config.NodeSelectionStrategy; import com.palantir.dialogue.Response; import com.palantir.tritium.metrics.registry.DefaultTaggedMetricRegistry; +import java.util.List; import java.util.Optional; import java.util.Random; import org.junit.jupiter.api.BeforeEach; @@ -43,9 +43,13 @@ class NodeSelectionStrategyChannelTest { @Spy - private NodeSelectionStrategySelector strategySelector = new DefaultNodeSelectionStrategySelector( - NodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE, - DialogueNodeselectionMetrics.of(new DefaultTaggedMetricRegistry())); + private NodeSelectionStrategySelector strategySelector = new NodeSelectionStrategySelector() { + @Override + public Optional updateAndGet( + List updatedStrategies) { + return NodeSelectionStrategyChannel.getFirstKnownStrategy(updatedStrategies); + } + }; @Mock private LimitedChannel channel1; @@ -63,7 +67,12 @@ class NodeSelectionStrategyChannelTest { @BeforeEach void beforeEach() { channel = new NodeSelectionStrategyChannel( - channelName, pseudo, clock, new DefaultTaggedMetricRegistry(), strategySelector); + strategySelector, + DialogueNodeSelectionStrategy.PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE, + channelName, + pseudo, + clock, + new DefaultTaggedMetricRegistry()); } @Test @@ -79,20 +88,11 @@ void handles_one_to_many_uri_update() { } @Test - void updates_strategy_selector_on_uri_update() { + void updates_strategy_on_response() { channel.updateChannels(ImmutableList.of(channel1)); - - verify(strategySelector, times(1)).setActiveChannels(ImmutableList.of(channel1)); - } - - @Test - void tracks_per_host_strategy() { - channel.updateChannels(ImmutableList.of(channel.wrap(channel1))); setResponse(channel1, Optional.of("BALANCED")); channel.maybeExecute(null, null).get(); - - verify(strategySelector, times(1)) - .updateChannelStrategy(eq(channel1), eq(ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED))); + verify(strategySelector, times(1)).updateAndGet(eq(ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED))); } private static void setResponse(LimitedChannel mockChannel, Optional header) { From ae4bf3b37ede5db4c8ae9860dc72d5e1ab0289dd Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 17:09:31 -0400 Subject: [PATCH 16/21] cleanup --- .../core/NodeSelectionStrategyChannel.java | 70 +++++++++---------- 1 file changed, 34 insertions(+), 36 deletions(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index e240bcbe4..3b49b01cb 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -41,8 +41,7 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { private final FutureCallback callback = new NodeSelectionCallback(); - private final AtomicReference nodeSelectionStrategy; - private final AtomicReference> nodeChannels; + private final AtomicReference nodeSelectionStrategy; private final NodeSelectionStrategySelector strategySelector; private final String channelName; @@ -79,9 +78,10 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { this.random = random; this.tick = tick; this.metrics = metrics; - this.nodeChannels = new AtomicReference<>(ImmutableList.of()); - this.nodeSelectionStrategy = new AtomicReference<>( - ChannelWithStrategy.of(initialStrategy, new ZeroUriNodeSelectionChannel(channelName))); + this.nodeSelectionStrategy = new AtomicReference<>(NodeSelectionChannel.builder() + .strategy(initialStrategy) + .channel(new ZeroUriNodeSelectionChannel(channelName)) + .build()); this.delegate = new SupplierChannel(() -> nodeSelectionStrategy.get().channel()); } @@ -95,9 +95,8 @@ private ListenableFuture wrapWithCallback(ListenableFuture r } void updateChannels(ImmutableList updatedChannels) { - nodeChannels.set(updatedChannels); - nodeSelectionStrategy.getAndUpdate(strat -> getUpdatedNodeSelectionStrategy( - strat.channel(), updatedChannels, strat.strategy(), metrics, random, tick, channelName)); + nodeSelectionStrategy.getAndUpdate(prevChannel -> getUpdatedSelectedChannel( + prevChannel.channel(), updatedChannels, prevChannel.strategy(), metrics, random, tick, channelName)); } private void updateRequestedStrategies(List strategies) { @@ -108,12 +107,12 @@ private void updateRequestedStrategies(List strat if (strategy.equals(nodeSelectionStrategy.get().strategy())) { return; } - nodeSelectionStrategy.getAndUpdate(currentStrategy -> getUpdatedNodeSelectionStrategy( - currentStrategy.channel(), nodeChannels.get(), strategy, metrics, random, tick, channelName)); + nodeSelectionStrategy.getAndUpdate(prevChannel -> getUpdatedSelectedChannel( + prevChannel.channel(), prevChannel.hostChannels(), strategy, metrics, random, tick, channelName)); } } - private static ChannelWithStrategy getUpdatedNodeSelectionStrategy( + private static NodeSelectionChannel getUpdatedSelectedChannel( @Nullable LimitedChannel previousNodeSelectionStrategy, ImmutableList channels, DialogueNodeSelectionStrategy updatedStrategy, @@ -121,13 +120,16 @@ private static ChannelWithStrategy getUpdatedNodeSelectionStrategy( Random random, Ticker tick, String channelName) { - + NodeSelectionChannel.Builder channelBuilder = + NodeSelectionChannel.builder().strategy(updatedStrategy).hostChannels(channels); if (channels.isEmpty()) { - return ChannelWithStrategy.of(updatedStrategy, new ZeroUriNodeSelectionChannel(channelName)); + return channelBuilder + .channel(new ZeroUriNodeSelectionChannel(channelName)) + .build(); } if (channels.size() == 1) { // no fancy node selection heuristic can save us if our one node goes down - return ChannelWithStrategy.of(updatedStrategy, channels.get(0)); + return channelBuilder.channel(channels.get(0)).build(); } switch (updatedStrategy) { @@ -138,31 +140,26 @@ private static ChannelWithStrategy getUpdatedNodeSelectionStrategy( if (previousNodeSelectionStrategy instanceof PinUntilErrorNodeSelectionStrategyChannel) { PinUntilErrorNodeSelectionStrategyChannel previousPinUntilError = (PinUntilErrorNodeSelectionStrategyChannel) previousNodeSelectionStrategy; - return ChannelWithStrategy.of( - updatedStrategy, - PinUntilErrorNodeSelectionStrategyChannel.of( + return channelBuilder + .channel(PinUntilErrorNodeSelectionStrategyChannel.of( Optional.of(previousPinUntilError.getCurrentChannel()), updatedStrategy, channels, pinuntilerrorMetrics, random, - channelName)); + channelName)) + .build(); } - return ChannelWithStrategy.of( - updatedStrategy, - PinUntilErrorNodeSelectionStrategyChannel.of( - Optional.empty(), - updatedStrategy, - channels, - pinuntilerrorMetrics, - random, - channelName)); + return channelBuilder + .channel(PinUntilErrorNodeSelectionStrategyChannel.of( + Optional.empty(), updatedStrategy, channels, pinuntilerrorMetrics, random, channelName)) + .build(); case BALANCED: // When people ask for 'ROUND_ROBIN', they usually just want something to load balance better. // We used to have a naive RoundRobinChannel, then tried RandomSelection and now use this heuristic: - return ChannelWithStrategy.of( - updatedStrategy, - new BalancedNodeSelectionStrategyChannel(channels, random, tick, metrics, channelName)); + return channelBuilder + .channel(new BalancedNodeSelectionStrategyChannel(channels, random, tick, metrics, channelName)) + .build(); case UNKNOWN: } throw new SafeRuntimeException("Unknown NodeSelectionStrategy", SafeArg.of("unknown", updatedStrategy)); @@ -180,16 +177,17 @@ static Optional getFirstKnownStrategy( } @Value.Immutable - interface ChannelWithStrategy { + interface NodeSelectionChannel { DialogueNodeSelectionStrategy strategy(); LimitedChannel channel(); - static ChannelWithStrategy of(DialogueNodeSelectionStrategy strategy, LimitedChannel channel) { - return ImmutableChannelWithStrategy.builder() - .strategy(strategy) - .channel(channel) - .build(); + ImmutableList hostChannels(); + + class Builder extends ImmutableNodeSelectionChannel.Builder {} + + static Builder builder() { + return new Builder(); } } From 62c5edec50e0d253da80327823a65afc2e27951a Mon Sep 17 00:00:00 2001 From: forozco Date: Wed, 29 Apr 2020 21:09:31 +0000 Subject: [PATCH 17/21] Add generated changelog entries --- changelog/@unreleased/pr-688.v2.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/@unreleased/pr-688.v2.yml diff --git a/changelog/@unreleased/pr-688.v2.yml b/changelog/@unreleased/pr-688.v2.yml new file mode 100644 index 000000000..4ae4003cb --- /dev/null +++ b/changelog/@unreleased/pr-688.v2.yml @@ -0,0 +1,5 @@ +type: feature +feature: + description: Clients respect `Node-Selection-Strategy` response header + links: + - https://github.com/palantir/dialogue/pull/688 From d9fbd2142fc2674ac3552ee2cb32104f0dda4721 Mon Sep 17 00:00:00 2001 From: iamdanfox Date: Thu, 30 Apr 2020 17:31:34 +0100 Subject: [PATCH 18/21] README for server side node selection (#697) * README for node selection strategy * Consistent capitalization --- README.md | 18 +++++++++++++++++- changelog/@unreleased/pr-688.v2.yml | 3 ++- .../core/NodeSelectionStrategyChannelTest.java | 6 ++++-- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 32db7b4fc..64e10c41f 100644 --- a/README.md +++ b/README.md @@ -113,11 +113,27 @@ _This API is influenced by gRPC's [Java library](https://github.com/grpc/grpc-ja ## Behaviour -### Concurrency Limits +### Concurrency limits Each host has an [AIMD](https://en.wikipedia.org/wiki/Additive_increase/multiplicative_decrease) concurrency limit. This protects servers by stopping requests getting out the door on the client-side. Permits are multiplicatively decreased after receiving any 5xx, 429 or 308 response. Otherwise, they are additively increased. +### Node selection strategies +When configured with multiple uris, Dialogue has several strategies for choosing which upstream to route requests to. +The default strategy is `PIN_UNTIL_ERROR`, although users can choose alternatives such as `ROUND_ROBIN` when building a ClientConfiguration +object. Note that the choice of an appropriate strategy usually depends on the _upstream_ server's behaviour, i.e. if its +performance relies heavily on warm caches, or if successive requests must land on the same node to successfully complete +a transaction. To solve this problem without needing code changes in all clients, servers can recommend a +NodeSelectionStrategy (see below). + +### Server-recommended node selection strategies +Servers can inform clients of their recommended strategies by including the +`Node-Selection-Strategy` response header. Values are separated by commas and are ordered by preference. See [available strategies](dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java). +``` +Node-Selection-Strategy: BALANCED,PIN_UNTIL_ERROR +``` +When the header is present, it takes precedence over user-selected strategies. Servers are free to omit this value. + ### NodeSelectionStrategy.ROUND_ROBIN Used to balance requests across many servers better than the default PIN_UNTIL_ERROR. The actual algorithm has evolved from naive Round Robin, then to Random Selection and now diff --git a/changelog/@unreleased/pr-688.v2.yml b/changelog/@unreleased/pr-688.v2.yml index 4ae4003cb..cfaae27b2 100644 --- a/changelog/@unreleased/pr-688.v2.yml +++ b/changelog/@unreleased/pr-688.v2.yml @@ -1,5 +1,6 @@ type: feature feature: - description: Clients respect `Node-Selection-Strategy` response header + description: | + Clients respect the optional `Node-Selection-Strategy` response header, which takes precedence over user-selected strategies. links: - https://github.com/palantir/dialogue/pull/688 diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java index d52763b3e..aac391589 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java @@ -90,9 +90,11 @@ void handles_one_to_many_uri_update() { @Test void updates_strategy_on_response() { channel.updateChannels(ImmutableList.of(channel1)); - setResponse(channel1, Optional.of("BALANCED")); + setResponse(channel1, Optional.of("BALANCED,FOO")); channel.maybeExecute(null, null).get(); - verify(strategySelector, times(1)).updateAndGet(eq(ImmutableList.of(DialogueNodeSelectionStrategy.BALANCED))); + verify(strategySelector, times(1)) + .updateAndGet(eq(ImmutableList.of( + DialogueNodeSelectionStrategy.BALANCED, DialogueNodeSelectionStrategy.UNKNOWN))); } private static void setResponse(LimitedChannel mockChannel, Optional header) { From 54354484e2dc2f6746dc5ea60483a48dc99f8de2 Mon Sep 17 00:00:00 2001 From: forozco Date: Thu, 30 Apr 2020 13:21:11 -0400 Subject: [PATCH 19/21] comment --- .../dialogue/core/DialogueNodeSelectionStrategy.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java index 79e0d7fae..f260a454c 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/DialogueNodeSelectionStrategy.java @@ -25,6 +25,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Supported node selection strategies which can either be user provided or received over the wire from servers. + * Separate from {@link NodeSelectionStrategy} to allow us to more easily iterate on strategies and support unknown + * strategies coming in over the wire. + */ enum DialogueNodeSelectionStrategy { PIN_UNTIL_ERROR, PIN_UNTIL_ERROR_WITHOUT_RESHUFFLE, From 1684849d835cde290febb696bc864559864990bd Mon Sep 17 00:00:00 2001 From: forozco Date: Thu, 30 Apr 2020 13:22:24 -0400 Subject: [PATCH 20/21] chooser --- .../palantir/dialogue/core/NodeSelectionStrategyChannel.java | 4 ++-- ...trategySelector.java => NodeSelectionStrategyChooser.java} | 2 +- .../dialogue/core/NodeSelectionStrategyChannelTest.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) rename dialogue-core/src/main/java/com/palantir/dialogue/core/{NodeSelectionStrategySelector.java => NodeSelectionStrategyChooser.java} (95%) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index db7d57491..84deca114 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -41,7 +41,7 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { private final FutureCallback callback = new NodeSelectionCallback(); private final AtomicReference nodeSelectionStrategy; - private final NodeSelectionStrategySelector strategySelector; + private final NodeSelectionStrategyChooser strategySelector; private final String channelName; private final Random random; @@ -51,7 +51,7 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { @VisibleForTesting NodeSelectionStrategyChannel( - NodeSelectionStrategySelector strategySelector, + NodeSelectionStrategyChooser strategySelector, DialogueNodeSelectionStrategy initialStrategy, String channelName, Random random, diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChooser.java similarity index 95% rename from dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java rename to dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChooser.java index 0b19353f7..e126fec06 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategySelector.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChooser.java @@ -19,6 +19,6 @@ import java.util.List; import java.util.Optional; -interface NodeSelectionStrategySelector { +interface NodeSelectionStrategyChooser { Optional updateAndGet(List updatedStrategies); } diff --git a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java index aac391589..2021d205f 100644 --- a/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java +++ b/dialogue-core/src/test/java/com/palantir/dialogue/core/NodeSelectionStrategyChannelTest.java @@ -43,7 +43,7 @@ class NodeSelectionStrategyChannelTest { @Spy - private NodeSelectionStrategySelector strategySelector = new NodeSelectionStrategySelector() { + private NodeSelectionStrategyChooser strategySelector = new NodeSelectionStrategyChooser() { @Override public Optional updateAndGet( List updatedStrategies) { From ff4f587a49c594dd1cee00fb34a06b93b30f89a9 Mon Sep 17 00:00:00 2001 From: forozco Date: Thu, 30 Apr 2020 13:27:12 -0400 Subject: [PATCH 21/21] metrics --- .../dialogue/core/NodeSelectionStrategyChannel.java | 9 +++++++++ dialogue-core/src/main/metrics/dialogue-core-metrics.yml | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java index 84deca114..14e771f7e 100644 --- a/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java +++ b/dialogue-core/src/main/java/com/palantir/dialogue/core/NodeSelectionStrategyChannel.java @@ -48,6 +48,7 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { private final Ticker tick; private final TaggedMetricRegistry metrics; private final LimitedChannel delegate; + private final DialogueNodeselectionMetrics nodeSelectionMetrics; @VisibleForTesting NodeSelectionStrategyChannel( @@ -62,6 +63,7 @@ final class NodeSelectionStrategyChannel implements LimitedChannel { this.random = random; this.tick = tick; this.metrics = metrics; + this.nodeSelectionMetrics = DialogueNodeselectionMetrics.of(metrics); this.nodeSelectionStrategy = new AtomicReference<>(NodeSelectionChannel.builder() .strategy(initialStrategy) .channel(new ZeroUriNodeSelectionChannel(channelName)) @@ -101,6 +103,13 @@ private void updateRequestedStrategies(List strat if (strategy.equals(nodeSelectionStrategy.get().strategy())) { return; } + + this.nodeSelectionMetrics + .strategy() + .channelName(channelName) + .strategy(strategy.toString()) + .build() + .mark(); nodeSelectionStrategy.getAndUpdate(prevChannel -> getUpdatedSelectedChannel( prevChannel.channel(), prevChannel.hostChannels(), strategy, metrics, random, tick, channelName)); } diff --git a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml index 86cf76f97..a70b686a1 100644 --- a/dialogue-core/src/main/metrics/dialogue-core-metrics.yml +++ b/dialogue-core/src/main/metrics/dialogue-core-metrics.yml @@ -97,6 +97,6 @@ namespaces: metrics: strategy: type: meter - tags: [strategy] + tags: [channel-name, strategy] docs: Marked every time the node selection strategy changes