diff --git a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java index c3f9a52404e..253422d3dbd 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java @@ -214,14 +214,13 @@ public void handleNameResolutionError(Status error) { updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error))); } - void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { + void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) { ConnectivityState newState = stateInfo.getState(); - SubchannelData subchannelData = subchannels.get(getAddress(subchannel)); // Shutdown channels/previously relevant subchannels can still callback with state updates. // To prevent pickers from returning these obsolete subchannels, this logic // is included to check if the current list of active subchannels includes this subchannel. - if (subchannelData == null || subchannelData.getSubchannel() != subchannel) { + if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) { return; } @@ -269,7 +268,7 @@ void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateIn case READY: shutdownRemaining(subchannelData); - addressIndex.seekTo(getAddress(subchannel)); + addressIndex.seekTo(getAddress(subchannelData.subchannel)); rawConnectivityState = READY; updateHealthCheckedState(subchannelData); break; @@ -277,7 +276,7 @@ void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateIn case TRANSIENT_FAILURE: // If we are looking at current channel, request a connection if possible if (addressIndex.isValid() - && subchannels.get(addressIndex.getCurrentAddress()).getSubchannel() == subchannel) { + && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) { if (addressIndex.increment()) { cancelScheduleTask(); requestConnection(); // is recursive so might hit the end of the addresses @@ -317,7 +316,7 @@ private void updateHealthCheckedState(SubchannelData subchannelData) { new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel))); } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) { updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError( - subchannelData.healthListener.healthStateInfo.getStatus()))); + subchannelData.healthStateInfo.getStatus()))); } else if (concludedState != TRANSIENT_FAILURE) { updateBalancingState(subchannelData.getHealthState(), new Picker(PickResult.withNoResult())); @@ -377,25 +376,24 @@ public void requestConnection() { return; } - Subchannel subchannel; - SocketAddress currentAddress; - currentAddress = addressIndex.getCurrentAddress(); - subchannel = subchannels.containsKey(currentAddress) - ? subchannels.get(currentAddress).getSubchannel() - : createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes()); + SocketAddress currentAddress = addressIndex.getCurrentAddress(); + SubchannelData subchannelData = subchannels.get(currentAddress); + if (subchannelData == null) { + subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes()); + } - ConnectivityState subchannelState = subchannels.get(currentAddress).getState(); + ConnectivityState subchannelState = subchannelData.getState(); switch (subchannelState) { case IDLE: - subchannel.requestConnection(); - subchannels.get(currentAddress).updateState(CONNECTING); + subchannelData.subchannel.requestConnection(); + subchannelData.updateState(CONNECTING); scheduleNextConnection(); break; case CONNECTING: if (enableHappyEyeballs) { scheduleNextConnection(); } else { - subchannel.requestConnection(); + subchannelData.subchannel.requestConnection(); } break; case TRANSIENT_FAILURE: @@ -455,7 +453,7 @@ private void cancelScheduleTask() { } } - private Subchannel createNewSubchannel(SocketAddress addr, Attributes attrs) { + private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) { HealthListener hcListener = new HealthListener(); final Subchannel subchannel = helper.createSubchannel( CreateSubchannelArgs.newBuilder() @@ -467,15 +465,15 @@ private Subchannel createNewSubchannel(SocketAddress addr, Attributes attrs) { log.warning("Was not able to create subchannel for " + addr); throw new IllegalStateException("Can't create subchannel"); } - SubchannelData subchannelData = new SubchannelData(subchannel, IDLE, hcListener); + SubchannelData subchannelData = new SubchannelData(subchannel, IDLE); hcListener.subchannelData = subchannelData; subchannels.put(addr, subchannelData); Attributes scAttrs = subchannel.getAttributes(); if (scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) { - hcListener.healthStateInfo = ConnectivityStateInfo.forNonError(READY); + subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY); } - subchannel.start(stateInfo -> processSubchannelState(subchannel, stateInfo)); - return subchannel; + subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo)); + return subchannelData; } private boolean isPassComplete() { @@ -492,17 +490,15 @@ private boolean isPassComplete() { } private final class HealthListener implements SubchannelStateListener { - private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE); private SubchannelData subchannelData; @Override public void onSubchannelState(ConnectivityStateInfo newState) { log.log(Level.FINE, "Received health status {0} for subchannel {1}", new Object[]{newState, subchannelData.subchannel}); - healthStateInfo = newState; + subchannelData.healthStateInfo = newState; try { - SubchannelData curSubChanData = subchannels.get(addressIndex.getCurrentAddress()); - if (curSubChanData != null && curSubChanData.healthListener == this) { + if (subchannelData == subchannels.get(addressIndex.getCurrentAddress())) { updateHealthCheckedState(subchannelData); } } catch (IllegalStateException e) { @@ -663,14 +659,12 @@ public int size() { private static final class SubchannelData { private final Subchannel subchannel; private ConnectivityState state; - private final HealthListener healthListener; private boolean completedConnectivityAttempt = false; + private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE); - public SubchannelData(Subchannel subchannel, ConnectivityState state, - HealthListener subchannelHealthListener) { + public SubchannelData(Subchannel subchannel, ConnectivityState state) { this.subchannel = subchannel; this.state = state; - this.healthListener = subchannelHealthListener; } public Subchannel getSubchannel() { @@ -695,7 +689,7 @@ private void updateState(ConnectivityState newState) { } private ConnectivityState getHealthState() { - return healthListener.healthStateInfo.getState(); + return healthStateInfo.getState(); } }