Skip to content

Commit

Permalink
Move name resolution retry from managed channel to name resolver (take
Browse files Browse the repository at this point in the history
…#2) (#9812)

This change has these main aspects to it:

1. Removal of any name resolution responsibility from ManagedChannelImpl
2. Creation of a new RetryScheduler to own generic retry logic
     - Can also be used outside the name resolution context
3. Creation of a new RetryingNameScheduler that can be used to wrap any
   polling name resolver to add retry capability
4. A new facility in NameResolver to allow implementations to notify
   listeners on the success of name resolution attempts
     - RetryingNameScheduler relies on this
  • Loading branch information
temawi authored Feb 4, 2023
1 parent 5a2adcc commit fcb5c54
Show file tree
Hide file tree
Showing 13 changed files with 753 additions and 360 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Schedules a retry operation according to a {@link BackoffPolicy}. The retry is run within a
* {@link SynchronizationContext}. At most one retry is scheduled at a time.
*/
final class BackoffPolicyRetryScheduler implements RetryScheduler {
private final ScheduledExecutorService scheduledExecutorService;
private final SynchronizationContext syncContext;
private final BackoffPolicy.Provider policyProvider;

private BackoffPolicy policy;
private ScheduledHandle scheduledHandle;

private static final Logger logger = Logger.getLogger(
BackoffPolicyRetryScheduler.class.getName());

BackoffPolicyRetryScheduler(BackoffPolicy.Provider policyProvider,
ScheduledExecutorService scheduledExecutorService,
SynchronizationContext syncContext) {
this.policyProvider = policyProvider;
this.scheduledExecutorService = scheduledExecutorService;
this.syncContext = syncContext;
}

/**
* Schedules a future retry operation. Only allows one retry to be scheduled at any given time.
*/
@Override
public void schedule(Runnable retryOperation) {
syncContext.throwIfNotInThisSynchronizationContext();

if (policy == null) {
policy = policyProvider.get();
}
// If a retry is already scheduled, take no further action.
if (scheduledHandle != null && scheduledHandle.isPending()) {
return;
}
long delayNanos = policy.nextBackoffNanos();
scheduledHandle = syncContext.schedule(retryOperation, delayNanos, TimeUnit.NANOSECONDS,
scheduledExecutorService);
logger.log(Level.FINE, "Scheduling DNS resolution backoff for {0}ns", delayNanos);
}

/**
* Resets the {@link BackoffPolicyRetryScheduler} and cancels any pending retry task. The policy
* will be cleared thus also resetting any state associated with it (e.g. a backoff multiplier).
*/
@Override
public void reset() {
syncContext.throwIfNotInThisSynchronizationContext();

syncContext.execute(() -> {
if (scheduledHandle != null && scheduledHandle.isPending()) {
scheduledHandle.cancel();
}
policy = null;
});
}

}
22 changes: 14 additions & 8 deletions core/src/main/java/io/grpc/internal/DnsNameResolverProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,25 @@ public final class DnsNameResolverProvider extends NameResolverProvider {
private static final String SCHEME = "dns";

@Override
public DnsNameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {

This comment has been minimized.

Copy link
@lhotari

lhotari Jun 20, 2023

This seems to break binary compatibility. Is this intentional?
I get java.lang.NoSuchMethodError: 'io.grpc.internal.DnsNameResolver io.grpc.internal.DnsNameResolverProvider.newNameResolver(java.net.URI, io.grpc.NameResolver$Args)', examples in Apache Pulsar when trying to upgrade to grpc-java 1.56.0 . The failure appear in the Apache Bookkeeper client that uses an older version of gprc-java, version 1.47.0 .

This comment has been minimized.

Copy link
@lhotari

lhotari Jun 20, 2023

I found the problem in Bookkeeper client. it was depending on the io.grpc.internal.DnsNameResolverProvider.newNameResolver(java.net.URI, io.grpc.NameResolver$Args) method. The PR apache/bookkeeper#3997 contains a simple fix to the issue. This is more of a problem when using grpc 1.56.0 with an older Bookkeeper client. It will be challenging to fix that.

if (SCHEME.equals(targetUri.getScheme())) {
String targetPath = Preconditions.checkNotNull(targetUri.getPath(), "targetPath");
Preconditions.checkArgument(targetPath.startsWith("/"),
"the path component (%s) of the target (%s) must start with '/'", targetPath, targetUri);
String name = targetPath.substring(1);
return new DnsNameResolver(
targetUri.getAuthority(),
name,
args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader()));
return new RetryingNameResolver(
new DnsNameResolver(
targetUri.getAuthority(),
name,
args,
GrpcUtil.SHARED_CHANNEL_EXECUTOR,
Stopwatch.createUnstarted(),
InternalServiceProviders.isAndroid(getClass().getClassLoader())),
new BackoffPolicyRetryScheduler(
new ExponentialBackoffPolicy.Provider(),
args.getScheduledExecutorService(),
args.getSynchronizationContext()),
args.getSynchronizationContext());
} else {
return null;
}
Expand Down
103 changes: 34 additions & 69 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import io.grpc.internal.ManagedChannelServiceConfig.ServiceConfigConvertedSelector;
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
import io.grpc.internal.RetriableStream.Throttle;
import io.grpc.internal.RetryingNameResolver.ResolutionResultListener;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
Expand Down Expand Up @@ -367,7 +368,6 @@ private void shutdownNameResolverAndLoadBalancer(boolean channelIsActive) {
checkState(lbHelper != null, "lbHelper is null");
}
if (nameResolver != null) {
cancelNameResolverBackoff();
nameResolver.shutdown();
nameResolverStarted = false;
if (channelIsActive) {
Expand Down Expand Up @@ -450,42 +450,10 @@ private void rescheduleIdleTimer() {
idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
}

// Run from syncContext
@VisibleForTesting
class DelayedNameResolverRefresh implements Runnable {
@Override
public void run() {
scheduledNameResolverRefresh = null;
refreshNameResolution();
}
}

// Must be used from syncContext
@Nullable private ScheduledHandle scheduledNameResolverRefresh;
// The policy to control backoff between name resolution attempts. Non-null when an attempt is
// scheduled. Must be used from syncContext
@Nullable private BackoffPolicy nameResolverBackoffPolicy;

// Must be run from syncContext
private void cancelNameResolverBackoff() {
syncContext.throwIfNotInThisSynchronizationContext();
if (scheduledNameResolverRefresh != null) {
scheduledNameResolverRefresh.cancel();
scheduledNameResolverRefresh = null;
nameResolverBackoffPolicy = null;
}
}

/**
* Force name resolution refresh to happen immediately and reset refresh back-off. Must be run
* Force name resolution refresh to happen immediately. Must be run
* from syncContext.
*/
private void refreshAndResetNameResolution() {
syncContext.throwIfNotInThisSynchronizationContext();
cancelNameResolverBackoff();
refreshNameResolution();
}

private void refreshNameResolution() {
syncContext.throwIfNotInThisSynchronizationContext();
if (nameResolverStarted) {
Expand Down Expand Up @@ -783,7 +751,24 @@ static NameResolver getNameResolver(
if (overrideAuthority == null) {
return resolver;
}
return new ForwardingNameResolver(resolver) {

// If the nameResolver is not already a RetryingNameResolver, then wrap it with it.
// This helps guarantee that name resolution retry remains supported even as it has been
// removed from ManagedChannelImpl.
// TODO: After a transition period, all NameResolver implementations that need retry should use
// RetryingNameResolver directly and this step can be removed.
NameResolver usedNameResolver;
if (resolver instanceof RetryingNameResolver) {
usedNameResolver = resolver;
} else {
usedNameResolver = new RetryingNameResolver(resolver,
new BackoffPolicyRetryScheduler(new ExponentialBackoffPolicy.Provider(),
nameResolverArgs.getScheduledExecutorService(),
nameResolverArgs.getSynchronizationContext()),
nameResolverArgs.getSynchronizationContext());
}

return new ForwardingNameResolver(usedNameResolver) {
@Override
public String getServiceAuthority() {
return overrideAuthority;
Expand Down Expand Up @@ -1291,7 +1276,7 @@ private void maybeTerminateChannel() {
// Must be called from syncContext
private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
refreshAndResetNameResolution();
refreshNameResolution();
}
}

Expand Down Expand Up @@ -1338,9 +1323,8 @@ public void run() {
if (shutdown.get()) {
return;
}
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
checkState(nameResolverStarted, "name resolver must be started");
refreshAndResetNameResolution();
if (nameResolverStarted) {
refreshNameResolution();
}
for (InternalSubchannel subchannel : subchannels) {
subchannel.resetConnectBackoff();
Expand Down Expand Up @@ -1496,7 +1480,7 @@ public void refreshNameResolution() {
final class LoadBalancerRefreshNameResolution implements Runnable {
@Override
public void run() {
refreshAndResetNameResolution();
ManagedChannelImpl.this.refreshNameResolution();
}
}

Expand Down Expand Up @@ -1727,7 +1711,7 @@ public ChannelCredentials withoutBearerTokens() {
}
}

private final class NameResolverListener extends NameResolver.Listener2 {
final class NameResolverListener extends NameResolver.Listener2 {
final LbHelperImpl helper;
final NameResolver resolver;

Expand Down Expand Up @@ -1759,8 +1743,9 @@ public void run() {
lastResolutionState = ResolutionState.SUCCESS;
}

nameResolverBackoffPolicy = null;
ConfigOrError configOrError = resolutionResult.getServiceConfig();
ResolutionResultListener resolutionResultListener = resolutionResult.getAttributes()
.get(RetryingNameResolver.RESOLUTION_RESULT_LISTENER_KEY);
InternalConfigSelector resolvedConfigSelector =
resolutionResult.getAttributes().get(InternalConfigSelector.KEY);
ManagedChannelServiceConfig validServiceConfig =
Expand Down Expand Up @@ -1817,6 +1802,9 @@ public void run() {
// we later check for these error codes when investigating pick results in
// GrpcUtil.getTransportFromPickResult().
onError(configOrError.getError());
if (resolutionResultListener != null) {
resolutionResultListener.resolutionAttempted(false);
}
return;
} else {
effectiveServiceConfig = lastServiceConfig;
Expand Down Expand Up @@ -1861,15 +1849,15 @@ public void run() {
}
Attributes attributes = attrBuilder.build();

boolean addressesAccepted = helper.lb.tryAcceptResolvedAddresses(
boolean lastAddressesAccepted = helper.lb.tryAcceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(servers)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(effectiveServiceConfig.getLoadBalancingConfig())
.build());

if (!addressesAccepted) {
scheduleExponentialBackOffInSyncContext();
// If a listener is provided, let it know if the addresses were accepted.
if (resolutionResultListener != null) {
resolutionResultListener.resolutionAttempted(lastAddressesAccepted);
}
}
}
Expand Down Expand Up @@ -1905,29 +1893,6 @@ private void handleErrorInSyncContext(Status error) {
}

helper.lb.handleNameResolutionError(error);

scheduleExponentialBackOffInSyncContext();
}

private void scheduleExponentialBackOffInSyncContext() {
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
// The name resolver may invoke onError multiple times, but we only want to
// schedule one backoff attempt
// TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
// want to reset the backoff interval upon repeated onError() calls
return;
}
if (nameResolverBackoffPolicy == null) {
nameResolverBackoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Scheduling DNS resolution backoff for {0} ns", delayNanos);
scheduledNameResolverRefresh =
syncContext.schedule(
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
transportFactory .getScheduledExecutorService());
}
}

Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/io/grpc/internal/RetryScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.internal;

/**
* This interface is used to schedule future retry attempts for a failed operation. The retry delay
* and the number of attempt is defined by implementing classes. Implementations should assure
* that only one future retry operation is ever scheduled at a time.
*/
public interface RetryScheduler {

/**
* A request to schedule a future retry (or retries) for a failed operation. Noop if an operation
* has already been scheduled.
*/
void schedule(Runnable retryOperation);

/**
* Resets the scheduler, effectively cancelling any future retry operation.
*/
void reset();
}
Loading

0 comments on commit fcb5c54

Please sign in to comment.