Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/grpc-js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@grpc/grpc-js",
"version": "1.9.6",
"version": "1.9.7",
"description": "gRPC Library for Node - pure JS implementation",
"homepage": "https://grpc.io/",
"repository": "https://github.com/grpc/grpc-node/tree/master/packages/grpc-js",
Expand Down
30 changes: 25 additions & 5 deletions packages/grpc-js/src/load-balancer-pick-first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ export class PickFirstLoadBalancer implements LoadBalancer {
*/
private stickyTransientFailureMode = false;

/**
* Indicates whether we called channelControlHelper.requestReresolution since
* the last call to updateAddressList
*/
private requestedResolutionSinceLastUpdate = false;

/**
* The most recent error reported by any subchannel as it transitioned to
* TRANSIENT_FAILURE.
Expand Down Expand Up @@ -216,15 +222,28 @@ export class PickFirstLoadBalancer implements LoadBalancer {
}
}

private requestReresolution() {
this.requestedResolutionSinceLastUpdate = true;
this.channelControlHelper.requestReresolution();
}

private maybeEnterStickyTransientFailureMode() {
if (this.stickyTransientFailureMode) {
if (!this.allChildrenHaveReportedTF()) {
return;
}
if (!this.allChildrenHaveReportedTF()) {
if (!this.requestedResolutionSinceLastUpdate) {
/* Each time we get an update we reset each subchannel's
* hasReportedTransientFailure flag, so the next time we get to this
* point after that, each subchannel has reported TRANSIENT_FAILURE
* at least once since then. That is the trigger for requesting
* reresolution, whether or not the LB policy is already in sticky TF
* mode. */
this.requestReresolution();
}
if (this.stickyTransientFailureMode) {
return;
}
this.stickyTransientFailureMode = true;
this.channelControlHelper.requestReresolution();
for (const { subchannel } of this.children) {
subchannel.startConnecting();
}
Expand Down Expand Up @@ -256,7 +275,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
if (newState !== ConnectivityState.READY) {
this.removeCurrentPick();
this.calculateAndReportNewState();
this.channelControlHelper.requestReresolution();
this.requestReresolution();
}
return;
}
Expand All @@ -283,7 +302,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {

private startNextSubchannelConnecting(startIndex: number) {
clearTimeout(this.connectionDelayTimeout);
if (this.triedAllSubchannels || this.stickyTransientFailureMode) {
if (this.triedAllSubchannels) {
return;
}
for (const [index, child] of this.children.entries()) {
Expand Down Expand Up @@ -382,6 +401,7 @@ export class PickFirstLoadBalancer implements LoadBalancer {
this.currentSubchannelIndex = 0;
this.children = [];
this.triedAllSubchannels = false;
this.requestedResolutionSinceLastUpdate = false;
}

updateAddressList(
Expand Down
90 changes: 89 additions & 1 deletion packages/grpc-js/test/test-pick-first.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ function updateStateCallBackForExpectedStateSequence(
) {
const actualStateSequence: ConnectivityState[] = [];
let lastPicker: Picker | null = null;
let finished = false;
return (connectivityState: ConnectivityState, picker: Picker) => {
if (finished) {
return;
}
// Ignore duplicate state transitions
if (
connectivityState === actualStateSequence[actualStateSequence.length - 1]
Expand All @@ -60,6 +64,7 @@ function updateStateCallBackForExpectedStateSequence(
if (
expectedStateSequence[actualStateSequence.length] !== connectivityState
) {
finished = true;
done(
new Error(
`Unexpected state ${
Expand All @@ -69,10 +74,12 @@ function updateStateCallBackForExpectedStateSequence(
)}]`
)
);
return;
}
actualStateSequence.push(connectivityState);
lastPicker = picker;
if (actualStateSequence.length === expectedStateSequence.length) {
finished = true;
done();
}
};
Expand All @@ -90,7 +97,7 @@ describe('Shuffler', () => {
});
});

describe('pick_first load balancing policy', () => {
describe.only('pick_first load balancing policy', () => {
const config = new PickFirstLoadBalancingConfig(false);
let subchannels: MockSubchannel[] = [];
const baseChannelControlHelper: ChannelControlHelper = {
Expand Down Expand Up @@ -462,6 +469,87 @@ describe('pick_first load balancing policy', () => {
});
});
});
it('Should request reresolution every time each child reports TF', done => {
let reresolutionRequestCount = 0;
const targetReresolutionRequestCount = 3;
const currentStartState = ConnectivityState.IDLE;
const channelControlHelper = createChildChannelControlHelper(
baseChannelControlHelper,
{
createSubchannel: (subchannelAddress, subchannelArgs) => {
const subchannel = new MockSubchannel(
subchannelAddressToString(subchannelAddress),
currentStartState
);
subchannels.push(subchannel);
return subchannel;
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.CONNECTING, ConnectivityState.TRANSIENT_FAILURE],
err => setImmediate(() => {
assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount);
done(err);
})
),
requestReresolution: () => {
reresolutionRequestCount += 1;
}
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
process.nextTick(() => {
subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config);
process.nextTick(() => {
subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
process.nextTick(() => {
pickFirst.updateAddressList([{ host: 'localhost', port: 3 }], config);
process.nextTick(() => {
subchannels[2].transitionToState(ConnectivityState.TRANSIENT_FAILURE);
});
});
});
});
});
});
it('Should request reresolution if the new subchannels are already in TF', done => {
let reresolutionRequestCount = 0;
const targetReresolutionRequestCount = 3;
const currentStartState = ConnectivityState.TRANSIENT_FAILURE;
const channelControlHelper = createChildChannelControlHelper(
baseChannelControlHelper,
{
createSubchannel: (subchannelAddress, subchannelArgs) => {
const subchannel = new MockSubchannel(
subchannelAddressToString(subchannelAddress),
currentStartState
);
subchannels.push(subchannel);
return subchannel;
},
updateState: updateStateCallBackForExpectedStateSequence(
[ConnectivityState.TRANSIENT_FAILURE],
err => setImmediate(() => {
assert.strictEqual(reresolutionRequestCount, targetReresolutionRequestCount);
done(err);
})
),
requestReresolution: () => {
reresolutionRequestCount += 1;
}
}
);
const pickFirst = new PickFirstLoadBalancer(channelControlHelper);
pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config);
process.nextTick(() => {
pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config);
process.nextTick(() => {
pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config);
});
});
});
describe('Address list randomization', () => {
const shuffleConfig = new PickFirstLoadBalancingConfig(true);
it('Should pick different subchannels after multiple updates', done => {
Expand Down