Skip to content

Commit

Permalink
Merge pull request #2791 from murgatroid99/grpc-js_channel_close_pick…
Browse files Browse the repository at this point in the history
…_fix

grpc-js: Ensure pending calls end after channel close
  • Loading branch information
murgatroid99 authored Jul 10, 2024
2 parents fbbc78d + 810e9e6 commit 023c1d0
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
31 changes: 29 additions & 2 deletions packages/grpc-js/src/internal-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { ChannelOptions } from './channel-options';
import { ResolvingLoadBalancer } from './resolving-load-balancer';
import { SubchannelPool, getSubchannelPool } from './subchannel-pool';
import { ChannelControlHelper } from './load-balancer';
import { UnavailablePicker, Picker, QueuePicker } from './picker';
import { UnavailablePicker, Picker, QueuePicker, PickArgs, PickResult, PickResultType } from './picker';
import { Metadata } from './metadata';
import { Status, LogVerbosity, Propagate } from './constants';
import { FilterStackFactory } from './filter-stack';
Expand Down Expand Up @@ -143,6 +143,22 @@ class ChannelSubchannelWrapper
}
}

class ShutdownPicker implements Picker {
pick(pickArgs: PickArgs): PickResult {
return {
pickResultType: PickResultType.DROP,
status: {
code: Status.UNAVAILABLE,
details: 'Channel closed before call started',
metadata: new Metadata()
},
subchannel: null,
onCallStarted: null,
onCallEnded: null
}
}
}

export class InternalChannel {
private readonly resolvingLoadBalancer: ResolvingLoadBalancer;
private readonly subchannelPool: SubchannelPool;
Expand Down Expand Up @@ -536,7 +552,9 @@ export class InternalChannel {
}

getConfig(method: string, metadata: Metadata): GetConfigResult {
this.resolvingLoadBalancer.exitIdle();
if (this.connectivityState !== ConnectivityState.SHUTDOWN) {
this.resolvingLoadBalancer.exitIdle();
}
if (this.configSelector) {
return {
type: 'SUCCESS',
Expand Down Expand Up @@ -745,6 +763,15 @@ export class InternalChannel {
close() {
this.resolvingLoadBalancer.destroy();
this.updateState(ConnectivityState.SHUTDOWN);
this.currentPicker = new ShutdownPicker();
for (const call of this.configSelectionQueue) {
call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started');
}
this.configSelectionQueue = [];
for (const call of this.pickQueue) {
call.cancelWithStatus(Status.UNAVAILABLE, 'Channel closed before call started');
}
this.pickQueue = [];
clearInterval(this.callRefTimer);
if (this.idleTimer) {
clearTimeout(this.idleTimer);
Expand Down
30 changes: 30 additions & 0 deletions packages/grpc-js/test/test-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,21 @@ describe('Client without a server', () => {
}
);
});
it('close should force calls to end', done => {
client.makeUnaryRequest(
'/service/method',
x => x,
x => x,
Buffer.from([]),
new grpc.Metadata({waitForReady: true}),
(error, value) => {
assert(error);
assert.strictEqual(error?.code, grpc.status.UNAVAILABLE);
done();
}
);
client.close();
});
});

describe('Client with a nonexistent target domain', () => {
Expand Down Expand Up @@ -133,4 +148,19 @@ describe('Client with a nonexistent target domain', () => {
}
);
});
it('close should force calls to end', done => {
client.makeUnaryRequest(
'/service/method',
x => x,
x => x,
Buffer.from([]),
new grpc.Metadata({waitForReady: true}),
(error, value) => {
assert(error);
assert.strictEqual(error?.code, grpc.status.UNAVAILABLE);
done();
}
);
client.close();
});
});

0 comments on commit 023c1d0

Please sign in to comment.