Skip to content

Commit

Permalink
core: In PF, pass around SubchannelData instead of Subchannel
Browse files Browse the repository at this point in the history
Each usage of the subchannel immediately looked up the SubchannelData.
  • Loading branch information
ejona86 committed Aug 2, 2024
1 parent f9b072c commit 15456f8
Showing 1 changed file with 24 additions and 30 deletions.
54 changes: 24 additions & 30 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,13 @@ public void handleNameResolutionError(Status error) {
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
}

void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
ConnectivityState newState = stateInfo.getState();

SubchannelData subchannelData = subchannels.get(getAddress(subchannel));
// Shutdown channels/previously relevant subchannels can still callback with state updates.
// To prevent pickers from returning these obsolete subchannels, this logic
// is included to check if the current list of active subchannels includes this subchannel.
if (subchannelData == null || subchannelData.getSubchannel() != subchannel) {
if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) {
return;
}

Expand Down Expand Up @@ -269,15 +268,15 @@ void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateIn

case READY:
shutdownRemaining(subchannelData);
addressIndex.seekTo(getAddress(subchannel));
addressIndex.seekTo(getAddress(subchannelData.subchannel));
rawConnectivityState = READY;
updateHealthCheckedState(subchannelData);
break;

case TRANSIENT_FAILURE:
// If we are looking at current channel, request a connection if possible
if (addressIndex.isValid()
&& subchannels.get(addressIndex.getCurrentAddress()).getSubchannel() == subchannel) {
&& subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
if (addressIndex.increment()) {
cancelScheduleTask();
requestConnection(); // is recursive so might hit the end of the addresses
Expand Down Expand Up @@ -317,7 +316,7 @@ private void updateHealthCheckedState(SubchannelData subchannelData) {
new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
} else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(
subchannelData.healthListener.healthStateInfo.getStatus())));
subchannelData.healthStateInfo.getStatus())));
} else if (concludedState != TRANSIENT_FAILURE) {
updateBalancingState(subchannelData.getHealthState(),
new Picker(PickResult.withNoResult()));
Expand Down Expand Up @@ -377,25 +376,24 @@ public void requestConnection() {
return;
}

Subchannel subchannel;
SocketAddress currentAddress;
currentAddress = addressIndex.getCurrentAddress();
subchannel = subchannels.containsKey(currentAddress)
? subchannels.get(currentAddress).getSubchannel()
: createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
SocketAddress currentAddress = addressIndex.getCurrentAddress();
SubchannelData subchannelData = subchannels.get(currentAddress);
if (subchannelData == null) {
subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
}

ConnectivityState subchannelState = subchannels.get(currentAddress).getState();
ConnectivityState subchannelState = subchannelData.getState();
switch (subchannelState) {
case IDLE:
subchannel.requestConnection();
subchannels.get(currentAddress).updateState(CONNECTING);
subchannelData.subchannel.requestConnection();
subchannelData.updateState(CONNECTING);
scheduleNextConnection();
break;
case CONNECTING:
if (enableHappyEyeballs) {
scheduleNextConnection();
} else {
subchannel.requestConnection();
subchannelData.subchannel.requestConnection();
}
break;
case TRANSIENT_FAILURE:
Expand Down Expand Up @@ -455,7 +453,7 @@ private void cancelScheduleTask() {
}
}

private Subchannel createNewSubchannel(SocketAddress addr, Attributes attrs) {
private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) {
HealthListener hcListener = new HealthListener();
final Subchannel subchannel = helper.createSubchannel(
CreateSubchannelArgs.newBuilder()
Expand All @@ -467,15 +465,15 @@ private Subchannel createNewSubchannel(SocketAddress addr, Attributes attrs) {
log.warning("Was not able to create subchannel for " + addr);
throw new IllegalStateException("Can't create subchannel");
}
SubchannelData subchannelData = new SubchannelData(subchannel, IDLE, hcListener);
SubchannelData subchannelData = new SubchannelData(subchannel, IDLE);
hcListener.subchannelData = subchannelData;
subchannels.put(addr, subchannelData);
Attributes scAttrs = subchannel.getAttributes();
if (scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
hcListener.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
}
subchannel.start(stateInfo -> processSubchannelState(subchannel, stateInfo));
return subchannel;
subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
return subchannelData;
}

private boolean isPassComplete() {
Expand All @@ -492,17 +490,15 @@ private boolean isPassComplete() {
}

private final class HealthListener implements SubchannelStateListener {
private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
private SubchannelData subchannelData;

@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
log.log(Level.FINE, "Received health status {0} for subchannel {1}",
new Object[]{newState, subchannelData.subchannel});
healthStateInfo = newState;
subchannelData.healthStateInfo = newState;
try {
SubchannelData curSubChanData = subchannels.get(addressIndex.getCurrentAddress());
if (curSubChanData != null && curSubChanData.healthListener == this) {
if (subchannelData == subchannels.get(addressIndex.getCurrentAddress())) {
updateHealthCheckedState(subchannelData);
}
} catch (IllegalStateException e) {
Expand Down Expand Up @@ -663,14 +659,12 @@ public int size() {
private static final class SubchannelData {
private final Subchannel subchannel;
private ConnectivityState state;
private final HealthListener healthListener;
private boolean completedConnectivityAttempt = false;
private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);

public SubchannelData(Subchannel subchannel, ConnectivityState state,
HealthListener subchannelHealthListener) {
public SubchannelData(Subchannel subchannel, ConnectivityState state) {
this.subchannel = subchannel;
this.state = state;
this.healthListener = subchannelHealthListener;
}

public Subchannel getSubchannel() {
Expand All @@ -695,7 +689,7 @@ private void updateState(ConnectivityState newState) {
}

private ConnectivityState getHealthState() {
return healthListener.healthStateInfo.getState();
return healthStateInfo.getState();
}
}

Expand Down

0 comments on commit 15456f8

Please sign in to comment.