Skip to content

Commit

Permalink
fix: [#4452][#4456][#4460][botframework-streaming] Should reject pend…
Browse files Browse the repository at this point in the history
…ing requests on disconnection (#4461)

* Fix inconclusive tests

* Rejects pending requests on disconnection

* isConnected should always return boolean

* Fix race condition of missed responses

* Add client/server tests

* Skip test if it is not on Windows

* Fix for Node.js 12

* Fix and skip failing tests
  • Loading branch information
compulim authored Apr 24, 2023
1 parent f9cf5fd commit 25e7146
Show file tree
Hide file tree
Showing 20 changed files with 444 additions and 160 deletions.
15 changes: 14 additions & 1 deletion libraries/botframework-streaming/src/payloads/requestManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class PendingRequest {
* Orchestrates and manages pending streaming requests.
*/
export class RequestManager {
private readonly _pendingRequests = {};
private readonly _pendingRequests: Record<string, PendingRequest> = {};

/**
* Gets the count of the pending requests.
Expand Down Expand Up @@ -77,4 +77,17 @@ export class RequestManager {

return promise;
}

/**
* Rejects all requests pending a response.
*
* @param reason The reason for rejection.
*/
rejectAllResponses(reason?: Error): void {
Object.entries(this._pendingRequests).forEach(([requestId, { reject }]) => {
reject(reason);

delete this._pendingRequests[requestId];
});
}
}
7 changes: 6 additions & 1 deletion libraries/botframework-streaming/src/protocolAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,14 @@ export class ProtocolAdapter {
*/
async sendRequest(request: StreamingRequest): Promise<IReceiveResponse> {
const requestId: string = generateGuid();

// Register the request in the request manager before sending it to the server.
// Otherwise, if the server respond quickly, it may miss the request.
const getResponsePromise = this.requestManager.getResponse(requestId);

await this.sendOperations.sendRequest(requestId, request);

return this.requestManager.getResponse(requestId);
return getResponsePromise;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ export class WebSocketClient implements IStreamingTransportClient {

// eslint-disable-next-line @typescript-eslint/no-explicit-any
private onConnectionDisconnected(sender: Record<string, unknown>, args: any): void {
// Rejects all pending requests on disconnect.
this._requestManager.rejectAllResponses(new Error('Disconnect was called.'));

if (this._disconnectionHandler != null) {
this._disconnectionHandler('Disconnected');
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ export class WebSocketClient implements IStreamingTransportClient {

// eslint-disable-next-line @typescript-eslint/no-explicit-any
private onConnectionDisconnected(sender: Record<string, unknown>, args: any): void {
// Rejects all pending requests on disconnect.
this._requestManager.rejectAllResponses(new Error('Disconnect was called.'));

if (this._disconnectionHandler != null) {
this._disconnectionHandler('Disconnected');
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export class WebSocketTransport implements ITransportSender, ITransportReceiver
* @returns `true` if the the transport is connected and ready to send data, `false` otherwise.
*/
get isConnected(): boolean {
return this.ws?.isConnected;
return !!this.ws?.isConnected;
}

/**
Expand Down
10 changes: 5 additions & 5 deletions libraries/botframework-streaming/tests/Assembler.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ describe('PayloadAssembler', function () {

expect(() => {
rra.close();
}).to.not.throw;
}).to.not.throw();
});

it('returns a new stream.', function () {
Expand Down Expand Up @@ -121,7 +121,7 @@ describe('PayloadAssembler', function () {
const csa = new PayloadAssembler(streamManager, { header, onCompleted: () => {} });

expect(csa.createPayloadStream()).instanceOf(SubscribableStream);
expect(csa.close()).to.not.throw;
expect(() => csa.close()).to.not.throw();
});
});

Expand Down Expand Up @@ -175,7 +175,7 @@ describe('PayloadAssemblerManager', function () {
};
const s = p.getPayloadStream(head);
expect(s).to.be.instanceOf(SubscribableStream);
expect(p.onReceive(head, s, 0)).to.not.throw;
expect(() => p.onReceive(head, s, 0)).to.not.throw();
done();
});

Expand All @@ -195,7 +195,7 @@ describe('PayloadAssemblerManager', function () {
const s = p.getPayloadStream(head);

expect(s).to.be.instanceOf(SubscribableStream);
expect(p.onReceive(head, s, 0)).to.not.throw;
expect(() => p.onReceive(head, s, 0)).to.not.throw();
done();
});

Expand All @@ -215,7 +215,7 @@ describe('PayloadAssemblerManager', function () {
const s = p.getPayloadStream(head);

expect(s).to.be.instanceOf(SubscribableStream);
expect(p.onReceive(head, s, 0)).to.not.throw;
expect(() => p.onReceive(head, s, 0)).to.not.throw();
done();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,6 @@ describe('Streaming Extensions ContentStream Tests ', function () {
const cs = new ContentStream('cs1', new TestPayloadAssembler());
cs.readAsString();

expect(cs.cancel()).to.not.throw;
expect(() => cs.cancel()).to.not.throw();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ describe('CancelDisassembler', function () {
expect(cd.payloadType).to.equal(PayloadTypes.cancelStream);
expect(cd.sender).to.equal(sender);

expect(cd.disassemble()).to.not.throw;
expect(() => cd.disassemble()).to.not.throw();
});
});
80 changes: 46 additions & 34 deletions libraries/botframework-streaming/tests/NamedPipe.test.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
const assert = require('assert');
const { expect } = require('chai');
const { expectEventually } = require('./helpers/expectEventually');
const { NamedPipeClient, NamedPipeServer, StreamingRequest } = require('../lib');
const { NamedPipeTransport } = require('../lib/namedPipe');
const { platform } = require('os');
const { RequestHandler } = require('../lib');
const { createNodeServer, getServerFactory } = require('../lib/utilities/createNodeServer');

Expand Down Expand Up @@ -78,7 +80,10 @@ class TestClient {
}
}

describe('Streaming Extensions NamedPipe Library Tests', function () {
// Skips Windows-only tests. Linux does not have named pipes.
describe.windowsOnly = platform() === 'linux' ? describe.skip : describe;

describe.windowsOnly('Streaming Extensions NamedPipe Library Tests', function () {
describe('NamedPipe Transport Tests', function () {
it('Client connect', function () {
const c = new TestClient('pipeName');
Expand Down Expand Up @@ -108,7 +113,7 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
sock.writable = true;
const transport = new NamedPipeTransport(sock, 'fakeSocket1');
expect(transport).to.be.instanceOf(NamedPipeTransport);
expect(() => transport.close()).to.not.throw;
expect(() => transport.close()).to.not.throw();
});

it('creates a new transport and connects', function () {
Expand All @@ -119,7 +124,7 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
const transport = new NamedPipeTransport(sock, 'fakeSocket2');
expect(transport).to.be.instanceOf(NamedPipeTransport);
expect(transport.isConnected).to.be.true;
expect(() => transport.close()).to.not.throw;
expect(() => transport.close()).to.not.throw();
});

it('closes the transport without throwing', function () {
Expand All @@ -129,7 +134,7 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
sock.writable = true;
const transport = new NamedPipeTransport(sock, 'fakeSocket3');
expect(transport).to.be.instanceOf(NamedPipeTransport);
expect(transport.close()).to.not.throw;
expect(() => transport.close()).to.not.throw();
expect(transport.isConnected).to.be.false;
});

Expand All @@ -144,7 +149,7 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
const buff = Buffer.from('hello', 'utf8');
const sent = transport.send(buff);
expect(sent).to.equal(5);
expect(() => transport.close()).to.not.throw;
expect(() => transport.close()).to.not.throw();
});

it('returns 0 when attempting to write to a closed socket', function () {
Expand All @@ -159,19 +164,21 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
const buff = Buffer.from('hello', 'utf8');
const sent = transport.send(buff);
expect(sent).to.equal(0);
expect(() => transport.close()).to.not.throw;
expect(() => transport.close()).to.not.throw();
});

it('throws when reading from a dead socket', function () {
// TODO: 2023-04-24 [hawo] #4462 The code today does not allows the receive() call to be rejected by reading a dead socket.
// The receive() call will be rejected IFF the socket is closed/error AFTER the receive() call.
it.skip('throws when reading from a dead socket', async function () {
const sock = new FauxSock();
sock.destroyed = false;
sock.destroyed = true;
sock.connecting = false;
sock.writable = true;
const transport = new NamedPipeTransport(sock, 'fakeSocket5');
expect(transport).to.be.instanceOf(NamedPipeTransport);
expect(transport.isConnected).to.be.true;
expect(transport.receive(5)).to.throw;
expect(() => transport.close()).to.not.throw;
expect(transport.isConnected).to.be.false;
(await expectEventually(transport.receive(5))).to.throw();
expect(() => transport.close()).to.not.throw();
});

it('can read from the socket', function () {
Expand All @@ -185,7 +192,7 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
transport.receive(12).catch();
transport.socketReceive(Buffer.from('Hello World!', 'utf8'));

expect(() => transport.close()).to.not.throw;
expect(() => transport.close()).to.not.throw();
});

it('cleans up when onClose is fired', function () {
Expand Down Expand Up @@ -231,7 +238,7 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
expect(transport).to.be.instanceOf(NamedPipeTransport);
expect(transport.isConnected).to.be.true;
const buff = Buffer.from('hello', 'utf8');
expect(transport.socketReceive(buff)).to.not.throw;
expect(() => transport.socketReceive(buff)).to.not.throw();
});
});

Expand All @@ -240,19 +247,22 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {

it('creates a new client', function () {
expect(client).to.be.instanceOf(NamedPipeClient);
expect(client.disconnect()).to.not.throw;
expect(() => client.disconnect()).to.not.throw();
});

it('connects without throwing', function () {
expect(client.connect()).to.not.throw;
expect(client.disconnect()).to.not.throw;
expect(() => client.connect()).to.not.throw();
expect(() => client.disconnect()).to.not.throw();
});

it('disconnects without throwing', function () {
expect(client.disconnect()).to.not.throw;
expect(() => client.disconnect()).to.not.throw();
});

it('sends without throwing', function (done) {
// TODO: 2023-04-24 [hawo] #4462 The client.send() call will only resolve when the other side responded.
// Because the other side is not connected to anything, thus, no response is received.
// Thus, the Promise is not resolved.
it.skip('sends without throwing', function (done) {
const req = new StreamingRequest();
req.Verb = 'POST';
req.Path = 'some/path';
Expand All @@ -262,15 +272,15 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
.catch((err) => {
expect(err).to.be.undefined;
})
.then(done());
.then(done);
});
});

describe('NamedPipe Server Tests', function () {
it('creates a new server', function () {
const server = new NamedPipeServer('pipeA', new RequestHandler());
expect(server).to.be.instanceOf(NamedPipeServer);
expect(server.disconnect()).to.not.throw;
expect(() => server.disconnect()).to.not.throw();
});

it('throws a TypeError during construction if missing the "baseName" parameter', function () {
Expand All @@ -281,15 +291,15 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
const server = new NamedPipeServer('pipeA', new RequestHandler());
expect(server).to.be.instanceOf(NamedPipeServer);

expect(server.start()).to.not.throw;
expect(server.disconnect()).to.not.throw;
expect(() => server.start()).to.not.throw();
expect(() => server.disconnect()).to.not.throw();
});

it('disconnects without throwing', function () {
const server = new NamedPipeServer('pipeA', new RequestHandler());
expect(server).to.be.instanceOf(NamedPipeServer);
expect(server.start()).to.not.throw;
expect(server.disconnect()).to.not.throw;
expect(() => server.start()).to.not.throw();
expect(() => server.disconnect()).to.not.throw();
});

it('returns true if isConnected === true on _receiver & _sender', function () {
Expand All @@ -301,25 +311,28 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
expect(server.isConnected).to.be.true;
});

it('sends without throwing', function (done) {
// TODO: 2023-04-24 [hawo] #4462 The client.send() call will only resolve when the other side responded.
// Because the other side is not connected to anything, thus, no response is received.
// Thus, the Promise is not resolved.
it.skip('sends without throwing', function (done) {
const server = new NamedPipeServer('pipeA', new RequestHandler());
expect(server).to.be.instanceOf(NamedPipeServer);
expect(server.start()).to.not.throw;
expect(() => server.start()).to.not.throw();
const req = { verb: 'POST', path: '/api/messages', streams: [] };
server
.send(req)
.catch((err) => {
expect(err).to.be.undefined;
})
.then(expect(server.disconnect()).to.not.throw)
.then(done());
.then(expect(() => server.disconnect()).to.not.throw())
.then(done);
});

it('handles being disconnected', function () {
const server = new NamedPipeServer('pipeA', new RequestHandler());
expect(server).to.be.instanceOf(NamedPipeServer);
server.start();
expect(server.disconnect()).to.not.throw;
expect(() => server.disconnect()).to.not.throw();
});

it('ensures that two servers cannot get into a split brain scenario', async function () {
Expand Down Expand Up @@ -369,29 +382,28 @@ describe('Streaming Extensions NamedPipe Library Tests', function () {
});

it('should not throw when choosing not to pass in a callback at all into createNodeServer()', function () {
expect(() => createNodeServer()).to.not.throw;
expect(() => createNodeServer()).to.not.throw();
});

it('should return a Server when calling createNodeServer()', function () {
const server = createNodeServer();
expect(server).to.not.throw;
expect(server).to.not.be.null;
expect(server).to.be.instanceOf(Object);
expect(typeof server.listen).to.equal('function');
expect(typeof server.close).to.equal('function');
});

it('should return the factory when calling getServerFactory()', function () {
expect(getServerFactory()).to.not.throw;
expect(() => getServerFactory()).to.not.throw();
const serverFactoryFunction = getServerFactory();
expect(serverFactoryFunction).to.not.be.null;
expect(typeof serverFactoryFunction).to.equal('function');
});

it("should throw if the callback isn't a valid connection listener callback", function () {
const callback = () => {};
const callback = 'not a function';
const serverFactory = getServerFactory();
expect(serverFactory(callback)).to.throw;
expect(() => serverFactory(callback)).to.throw();
});
});
});
Loading

0 comments on commit 25e7146

Please sign in to comment.