From 8737df86773ab3e2fb4e24c5f749112a051a7bd1 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 3 Nov 2021 17:53:43 +0100 Subject: [PATCH 1/2] Don't send RESET message when the connection is not open Send RESET when the connection is not optimal since it was time waiting for the message send times out. It could be really fast in the nodejs environment, but it could take some minutes in browser. Skip send RESET when the connection is not open avoid this issue. --- .../src/connection/connection-channel.js | 9 +- .../channel/browser/browser-channel.test.js | 32 +++++ .../test/channel/node/node-channel.test.js | 14 +++ .../connection/connection-channel.test.js | 64 +++++++++- core/src/internal/connection-holder.ts | 11 +- test/internal/connection-holder.test.js | 110 ++++++++++++++++++ test/result.test.js | 2 +- 7 files changed, 235 insertions(+), 7 deletions(-) diff --git a/bolt-connection/src/connection/connection-channel.js b/bolt-connection/src/connection/connection-channel.js index c30609705..e1559b1c8 100644 --- a/bolt-connection/src/connection/connection-channel.js +++ b/bolt-connection/src/connection/connection-channel.js @@ -270,7 +270,10 @@ export default class ChannelConnection extends Connection { */ _handleFatalError (error) { this._isBroken = true - this._error = this.handleAndTransformError(this._protocol.currentFailure || error, this._address) + this._error = this.handleAndTransformError( + this._protocol.currentFailure || error, + this._address + ) if (this._log.isErrorEnabled()) { this._log.error( @@ -320,6 +323,10 @@ export default class ChannelConnection extends Connection { } _resetOnFailure () { + if (!this.isOpen()) { + return + } + this._protocol.reset({ onError: () => { this._protocol.resetFailure() diff --git a/bolt-connection/test/channel/browser/browser-channel.test.js b/bolt-connection/test/channel/browser/browser-channel.test.js index 53fa9f873..d9e0aa7a8 100644 --- a/bolt-connection/test/channel/browser/browser-channel.test.js +++ b/bolt-connection/test/channel/browser/browser-channel.test.js @@ -294,6 +294,38 @@ describe('WebSocketChannel', () => { } }) + describe('.close()', () => { + it('should set _open to false before resolve the promise', async () => { + const fakeSetTimeout = setTimeoutMock.install() + try { + // do not execute setTimeout callbacks + fakeSetTimeout.pause() + const address = ServerAddress.fromUrl('bolt://localhost:8989') + const driverConfig = { connectionTimeout: 4242 } + const channelConfig = new ChannelConfig( + address, + driverConfig, + SERVICE_UNAVAILABLE + ) + webSocketChannel = new WebSocketChannel( + channelConfig, + undefined, + createWebSocketFactory(WS_OPEN) + ) + + expect(webSocketChannel._open).toBe(true) + + const promise = webSocketChannel.close() + + expect(webSocketChannel._open).toBe(false) + + await promise + } finally { + fakeSetTimeout.uninstall() + } + }) + }) + describe('.setupReceiveTimeout()', () => { beforeEach(() => { const address = ServerAddress.fromUrl('http://localhost:8989') diff --git a/bolt-connection/test/channel/node/node-channel.test.js b/bolt-connection/test/channel/node/node-channel.test.js index bd649e854..5d6b175b1 100644 --- a/bolt-connection/test/channel/node/node-channel.test.js +++ b/bolt-connection/test/channel/node/node-channel.test.js @@ -44,6 +44,20 @@ describe('NodeChannel', () => { return expect(channel.close()).resolves.not.toThrow() }) + describe('.close()', () => { + it('should set _open to false before resolve the promise', async () => { + const channel = createMockedChannel(true) + + expect(channel._open).toBe(true) + + const promise = channel.close() + + expect(channel._open).toBe(false) + + await promise + }) + }) + describe('.setupReceiveTimeout()', () => { it('should call socket.setTimeout(receiveTimeout)', () => { const receiveTimeout = 42 diff --git a/bolt-connection/test/connection/connection-channel.test.js b/bolt-connection/test/connection/connection-channel.test.js index 93a3da3f3..5ce805c5d 100644 --- a/bolt-connection/test/connection/connection-channel.test.js +++ b/bolt-connection/test/connection/connection-channel.test.js @@ -19,7 +19,6 @@ import ChannelConnection from '../../src/connection/connection-channel' import { int, internal } from 'neo4j-driver-core' -import { add } from 'lodash' const { serverAddress: { ServerAddress }, @@ -127,6 +126,69 @@ describe('ChannelConnection', () => { ) }) + describe('._resetOnFailure()', () => { + describe('when connection isOpen', () => { + it('should call protocol.reset() and then protocol.resetFailure() onComplete', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onComplete()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalled() + expect(protocol.resetFailure).toHaveBeenCalled() + }) + + it('should call protocol.reset() and then protocol.resetFailure() onError', () => { + const channel = { + _open: true + } + + const protocol = { + reset: jest.fn(observer => observer.onError()), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).toHaveBeenCalled() + expect(protocol.resetFailure).toHaveBeenCalled() + }) + }) + + describe('when connection is not open', () => { + it('should not call protocol.reset() and protocol.resetFailure()', () => { + const channel = { + _open: false + } + + const protocol = { + reset: jest.fn(observer => { + observer.onComplete() + observer.onError() + }), + resetFailure: jest.fn() + } + const protocolSupplier = () => protocol + const connection = spyOnConnectionChannel({ channel, protocolSupplier }) + + connection._resetOnFailure() + + expect(protocol.reset).not.toHaveBeenCalled() + expect(protocol.resetFailure).not.toHaveBeenCalled() + }) + }) + }) + function spyOnConnectionChannel ({ channel, errorHandler, diff --git a/core/src/internal/connection-holder.ts b/core/src/internal/connection-holder.ts index 4345e3aaa..9cae02bf5 100644 --- a/core/src/internal/connection-holder.ts +++ b/core/src/internal/connection-holder.ts @@ -179,10 +179,13 @@ class ConnectionHolder implements ConnectionHolderInterface { this._connectionPromise = this._connectionPromise .then((connection?: Connection) => { if (connection) { - return connection - .resetAndFlush() - .catch(ignoreError) - .then(() => connection._release()) + if (connection.isOpen()) { + return connection + .resetAndFlush() + .catch(ignoreError) + .then(() => connection._release()) + } + return connection._release() } else { return Promise.resolve() } diff --git a/test/internal/connection-holder.test.js b/test/internal/connection-holder.test.js index 89fcf4dae..c84b5e9a4 100644 --- a/test/internal/connection-holder.test.js +++ b/test/internal/connection-holder.test.js @@ -292,6 +292,116 @@ describe('#unit ConnectionHolder', () => { expect(connectionHolder.database()).toBe('testdb') }) + + describe('.releaseConnection()', () => { + describe('when the connection is initialized', () => { + describe('and connection is open', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(1) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + + describe('and connection is not open', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection() + connection._open = false + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.releaseConnection() + }) + + it('should not call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(0) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + }) + }) + + describe('.close()', () => { + describe('when the connection is initialized', () => { + describe('and connection is open', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection() + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.close() + }) + + it('should call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(1) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + + describe('and connection is not open', () => { + let connection + + beforeEach(async () => { + connection = new FakeConnection() + connection._open = false + const connectionProvider = newSingleConnectionProvider(connection) + const connectionHolder = new ConnectionHolder({ + mode: READ, + connectionProvider + }) + + connectionHolder.initializeConnection() + + await connectionHolder.close() + }) + + it('should not call connection.resetAndFlush', () => { + expect(connection.resetInvoked).toBe(0) + }) + + it('should call connection._release()', () => { + expect(connection.releaseInvoked).toBe(1) + }) + }) + }) + }) }) class RecordingConnectionProvider extends SingleConnectionProvider { diff --git a/test/result.test.js b/test/result.test.js index 76ab8bf0a..035c3c059 100644 --- a/test/result.test.js +++ b/test/result.test.js @@ -73,7 +73,7 @@ describe('#integration result stream', () => { done() }, onError: error => { - console.log(error) + done.fail(error) } }) }) From 76a86fa118b8f143a6af6ccddecc6e8e38c81028 Mon Sep 17 00:00:00 2001 From: Antonio Barcelos Date: Wed, 3 Nov 2021 23:13:17 +0100 Subject: [PATCH 2/2] Solving issue in driver.close() --- bolt-connection/src/pool/pool.js | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/bolt-connection/src/pool/pool.js b/bolt-connection/src/pool/pool.js index b48f2cf47..dff3cd47a 100644 --- a/bolt-connection/src/pool/pool.js +++ b/bolt-connection/src/pool/pool.js @@ -134,9 +134,22 @@ class Pool { * Destroy all idle resources in this pool. * @returns {Promise} A promise that is resolved when the resources are purged */ - close () { + async close () { this._closed = true - return Promise.all(Object.keys(this._pools).map(key => this._purgeKey(key))) + /** + * The lack of Promise consuming was making the driver do not close properly in the scenario + * captured at result.test.js:it('should handle missing onCompleted'). The test was timing out + * because while wainting for the driver close. + * + * Consuming the Promise.all or by calling then or by awaiting in the result inside this method solved + * the issue somehow. + * + * PS: the return of this method was already awaited at PooledConnectionProvider.close, but the await bellow + * seems to be need also. + */ + return await Promise.all( + Object.keys(this._pools).map(key => this._purgeKey(key)) + ) } /**