Skip to content

Commit

Permalink
loadbalancer: better asymptotic behavior of host updates (#2745)
Browse files Browse the repository at this point in the history
Motivation:

Our current host update process is quadratic both in
time and space due to searching for hosts using list
iteration and performing a COW for each entry of the
SD event set.

Modifications:

Make a Map<Address, SDEvent>. This lets us iterate
through existing events with O(M+N) complexity
instead of O(M*N) complexity. As a bonus, we can
now easily detect if we get multiple SD events for
the same address in a batch.

Result:

We should see slightly worse performance in the small
M*N case but dramatically better performance in the
large case. Since SD events shouldn't be terribly
common and making a hashmap is cheap relative to the
total cost of a SD update, this is the safer
tradeoff.
  • Loading branch information
bryce-anderson authored Nov 17, 2023
1 parent 6a18e4d commit 936e01b
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,22 +159,25 @@ private ConnState closeConnState() {
}

@Override
public void markExpired() {
public boolean markExpired() {
for (;;) {
ConnState oldState = connStateUpdater.get(this);
if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) {
break;
if (oldState.state == State.EXPIRED) {
return false;
} else if (oldState.state == State.CLOSED) {
return true;
}
Object nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED;

if (connStateUpdater.compareAndSet(this, oldState,
new ConnState(oldState.connections, nextState))) {
cancelIfHealthCheck(oldState);
if (nextState == State.CLOSED) {
// Trigger the callback to remove the host from usedHosts array.
this.closeAsync().subscribe();
return true;
} else {
return false;
}
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ interface Host<ResolvedAddress, C extends LoadBalancedConnection> extends Listen
/**
* Signal that the host should not be the target of new connections but existing connections are still expected
* to be valid and can serve new requests. This does not have any implications for the health status of the host.
*
* @return true if the host is now in the closed state, false otherwise.
*/
void markExpired();
boolean markExpired();
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Spliterator;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -65,7 +67,6 @@
import static java.lang.Integer.toHexString;
import static java.lang.System.identityHashCode;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.stream.Collectors.toList;
Expand Down Expand Up @@ -246,63 +247,95 @@ public void onSubscribe(final Subscription s) {

@Override
public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
if (events == null) {
LOGGER.debug("{}: unexpectedly received null instead of events.", NewRoundRobinLoadBalancer.this);
if (events == null || events.isEmpty()) {
LOGGER.debug("{}: unexpectedly received null or empty list instead of events.",
NewRoundRobinLoadBalancer.this);
return;
}
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
final ServiceDiscovererEvent.Status eventStatus = event.status();
LOGGER.debug("{}: received new ServiceDiscoverer event {}. Inferred status: {}.",
NewRoundRobinLoadBalancer.this, event, eventStatus);

boolean sendReadyEvent;
List<Host<ResolvedAddress, C>> nextHosts;
for (;;) {
// TODO: we have some weirdness in the event that we fail the CAS namely that we can create a host
// that never gets used but is orphaned. It's fine so long as there is nothing to close but that
// guarantee may not always hold in the future.
@SuppressWarnings("unchecked")
final List<Host<ResolvedAddress, C>> usedAddresses =
usedHostsUpdater.updateAndGet(NewRoundRobinLoadBalancer.this, oldHosts -> {
if (isClosedList(oldHosts)) {
return oldHosts;
}
final ResolvedAddress addr = requireNonNull(event.address());
@SuppressWarnings("unchecked")
final List<Host<ResolvedAddress, C>> oldHostsTyped =
(List<Host<ResolvedAddress, C>>) oldHosts;

if (AVAILABLE.equals(eventStatus)) {
return addHostToList(oldHostsTyped, addr);
} else if (EXPIRED.equals(eventStatus)) {
if (oldHostsTyped.isEmpty()) {
return emptyList();
} else {
return markHostAsExpired(oldHostsTyped, addr);
}
} else if (UNAVAILABLE.equals(eventStatus)) {
return listWithHostRemoved(oldHostsTyped, host -> {
boolean match = host.address().equals(addr);
if (match) {
host.markClosed();
}
return match;
});
} else {
LOGGER.error("{}: Unexpected Status in event:" +
" {} (mapped to {}). Leaving usedHosts unchanged: {}",
NewRoundRobinLoadBalancer.this, event, eventStatus, oldHosts);
return oldHosts;
}
});

LOGGER.debug("{}: now using addresses (size={}): {}.",
NewRoundRobinLoadBalancer.this, usedAddresses.size(), usedAddresses);
List<Host<ResolvedAddress, C>> usedHosts = usedHostsUpdater.get(NewRoundRobinLoadBalancer.this);
if (isClosedList(usedHosts)) {
// We don't update if the load balancer is closed.
return;
}
nextHosts = new ArrayList<>(usedHosts.size() + events.size());
sendReadyEvent = false;

// First we make a map of addresses to events so that we don't get quadratic behavior for diffing.
// Unfortunately we need to make this every iteration of the CAS loop since we remove entries
// for hosts that already exist. If this results in to many collisions and map rebuilds we should
// re-assess how we manage concurrency for list mutations.
final Map<ResolvedAddress, ServiceDiscovererEvent<ResolvedAddress>> eventMap = new HashMap<>();
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
ServiceDiscovererEvent<ResolvedAddress> old = eventMap.put(event.address(), event);
if (old != null) {
LOGGER.debug("Multiple ServiceDiscoveryEvent's detected for address {}. Event: {}.",
event.address(), event);
}
}

if (AVAILABLE.equals(eventStatus)) {
if (usedAddresses.size() == 1) {
eventStreamProcessor.onNext(LOAD_BALANCER_READY_EVENT);
// First thing we do is go through the existing hosts and see if we need to transfer them. These
// will be all existing hosts that either don't have a matching discovery event or are not marked
// as unavailable. If they are marked unavailable, we need to close them (which is idempotent).
for (Host<ResolvedAddress, C> host : usedHosts) {
ServiceDiscovererEvent<ResolvedAddress> event = eventMap.remove(host.address());
if (event == null) {
// Host doesn't have a SD update so just copy it over.
nextHosts.add(host);
} else if (AVAILABLE.equals(event.status())) {
// We only send the ready event if the previous host list was empty.
sendReadyEvent = usedHosts.isEmpty();
// If the host is already in CLOSED state, we should discard it and create a new entry.
// For duplicate ACTIVE events or for repeated activation due to failed CAS
// of replacing the usedHosts array the marking succeeds so we will not add a new entry.
if (host.markActiveIfNotClosed()) {
nextHosts.add(host);
} else {
nextHosts.add(createHost(event.address()));
}
} else if (EXPIRED.equals(event.status())) {
if (!host.markExpired()) {
nextHosts.add(host);
}
} else if (UNAVAILABLE.equals(event.status())) {
host.markClosed();
} else {
LOGGER.warn("{}: Unsupported Status in event:" +
" {} (mapped to {}). Leaving usedHosts unchanged: {}",
NewRoundRobinLoadBalancer.this, event, event.status(), nextHosts);
nextHosts.add(host);
}
} else if (usedAddresses.isEmpty()) {
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
}
// Now process events that didn't have an existing host. The only ones that we actually care
// about are the AVAILABLE events which result in a new host.
for (ServiceDiscovererEvent<ResolvedAddress> event : eventMap.values()) {
if (AVAILABLE.equals(event.status())) {
sendReadyEvent = true;
nextHosts.add(createHost(event.address()));
}
}
// We've now built the new list so now we need to CAS it before we can move on. This should only be
// racing with closing hosts and closing the whole LB so it shouldn't be common to lose the race.
if (usedHostsUpdater.compareAndSet(NewRoundRobinLoadBalancer.this, usedHosts, nextHosts)) {
break;
}
}

final List<Host<ResolvedAddress, C>> currentHosts = usedHosts;
LOGGER.debug("{}: now using addresses (size={}): {}.",
NewRoundRobinLoadBalancer.this, nextHosts.size(), nextHosts);
if (nextHosts.isEmpty()) {
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
} else if (sendReadyEvent) {
eventStreamProcessor.onNext(LOAD_BALANCER_READY_EVENT);
}

if (firstEventsAfterResubscribe) {
// We can enter this path only if we re-subscribed because all previous hosts were UNHEALTHY.
if (events.isEmpty()) {
Expand All @@ -320,26 +353,14 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
// starts from an empty state propagating only AVAILABLE events. To be in sync with the
// ServiceDiscoverer we should clean up and close gracefully all hosts that are not present in the
// initial collection of events, regardless of their current state.
for (Host<ResolvedAddress, C> host : currentHosts) {
for (Host<ResolvedAddress, C> host : nextHosts) {
if (notAvailable(host, events)) {
host.closeAsyncGracefully().subscribe();
}
}
}
}

private List<Host<ResolvedAddress, C>> markHostAsExpired(
final List<Host<ResolvedAddress, C>> oldHostsTyped, final ResolvedAddress addr) {
for (Host<ResolvedAddress, C> host : oldHostsTyped) {
if (host.address().equals(addr)) {
// Host removal will be handled by the Host's onClose::afterFinally callback
host.markExpired();
break; // because duplicates are not allowed, we can stop iteration
}
}
return oldHostsTyped;
}

private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
// All hosts will share the healthcheck config of the parent RR loadbalancer.
Host<ResolvedAddress, C> host = new DefaultHost<>(NewRoundRobinLoadBalancer.this.toString(), addr,
Expand All @@ -355,39 +376,16 @@ private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
return host;
}

private List<Host<ResolvedAddress, C>> addHostToList(
List<Host<ResolvedAddress, C>> oldHostsTyped, ResolvedAddress addr) {
if (oldHostsTyped.isEmpty()) {
return singletonList(createHost(addr));
}

// duplicates are not allowed
for (Host<ResolvedAddress, C> host : oldHostsTyped) {
if (host.address().equals(addr)) {
if (!host.markActiveIfNotClosed()) {
// If the host is already in CLOSED state, we should create a new entry.
// For duplicate ACTIVE events or for repeated activation due to failed CAS
// of replacing the usedHosts array the marking succeeds so we will not add a new entry.
break;
}
return oldHostsTyped;
}
}

final List<Host<ResolvedAddress, C>> newHosts = new ArrayList<>(oldHostsTyped.size() + 1);
newHosts.addAll(oldHostsTyped);
newHosts.add(createHost(addr));
return newHosts;
}

private List<Host<ResolvedAddress, C>> listWithHostRemoved(
List<Host<ResolvedAddress, C>> oldHostsTyped, Predicate<Host<ResolvedAddress, C>> hostPredicate) {
if (oldHostsTyped.isEmpty()) {
// this can happen when an expired host is removed during closing of the NewRoundRobinLoadBalancer,
// but all of its connections have already been closed
return oldHostsTyped;
}
final List<Host<ResolvedAddress, C>> newHosts = new ArrayList<>(oldHostsTyped.size() - 1);
// We keep the old size as the capacity hint because the penalty for a resize in the case that the
// element isn't in the list is much worse than the penalty for an unused array slot.
final List<Host<ResolvedAddress, C>> newHosts = new ArrayList<>(oldHostsTyped.size());
for (int i = 0; i < oldHostsTyped.size(); ++i) {
final Host<ResolvedAddress, C> current = oldHostsTyped.get(i);
if (hostPredicate.test(current)) {
Expand Down

0 comments on commit 936e01b

Please sign in to comment.