From 87ad8f84989fb849837579f5e3b51982429f681e Mon Sep 17 00:00:00 2001 From: Aaron S Date: Fri, 1 Jul 2022 11:04:47 -0500 Subject: [PATCH 01/15] feat: Add connection health state monitoring structure --- .../AWSAppSyncRealTimeProvider.test.ts | 11 + .../__tests__/ConnectionStateMonitor.tests.ts | 229 ++++++++++++++++++ packages/pubsub/__tests__/PubSub-unit-test.ts | 11 + .../src/utils/ConnectionStatusMonitor.ts | 184 ++++++++++++++ 4 files changed, 435 insertions(+) create mode 100644 packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts create mode 100644 packages/pubsub/src/utils/ConnectionStatusMonitor.ts diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index 029278c20f3..0adf05fe301 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -1,3 +1,14 @@ +jest.mock('@aws-amplify/core', () => ({ + __esModule: true, + ...jest.requireActual('@aws-amplify/core'), + browserOrNode() { + return { + isBrowser: true, + isNode: false, + }; + }, +})); + import { Auth } from '@aws-amplify/auth'; import { Credentials, Logger, Signer } from '@aws-amplify/core'; import { GraphQLError, isCompositeType } from 'graphql'; diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts new file mode 100644 index 00000000000..bb56b58990a --- /dev/null +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -0,0 +1,229 @@ +/* + * Copyright 2017-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with + * the License. A copy of the License is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +jest.mock('@aws-amplify/core', () => ({ + __esModule: true, + ...jest.requireActual('@aws-amplify/core'), + browserOrNode() { + return { + isBrowser: true, + isNode: false, + }; + }, +})); + +import Observable from 'zen-observable-ts'; +import { Reachability } from '@aws-amplify/core'; +import { + ConnectionHealthState, + ConnectionStatusMonitor, +} from '../src/utils/ConnectionStatusMonitor'; + +describe('ConnectionStateMonitor', () => { + let monitor: ConnectionStatusMonitor; + let observedStates: ConnectionHealthState[]; + let subscription: ZenObservable.Subscription; + let reachabilityObserver: ZenObservable.Observer<{ online: boolean }>; + + beforeEach(() => { + const spyon = jest + .spyOn(Reachability.prototype, 'networkMonitor') + .mockImplementationOnce( + () => + new Observable(observer => { + reachabilityObserver = observer; + }) + ); + }); + + describe('when the network is connected', () => { + beforeEach(() => { + reachabilityObserver?.next?.({ online: true }); + + observedStates = []; + subscription?.unsubscribe(); + monitor = new ConnectionStatusMonitor(); + subscription = monitor.connectionHealthStateObservable.subscribe( + value => { + observedStates.push(value); + } + ); + }); + + test('connection health states starts out disconnected', () => { + expect(observedStates).toEqual(['Disconnected']); + }); + + test('standard health states connection pattern', () => { + monitor.openingConnection(); + monitor.connectionEstablished(); + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + ]); + }); + + test('connection health states when the network is lost while connected', () => { + monitor.openingConnection(); + monitor.connectionEstablished(); + reachabilityObserver?.next?.({ online: false }); + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingNetwork', + ]); + }); + + test('connection health states when the network is lost and the connection times out', () => { + monitor.openingConnection(); + monitor.connectionEstablished(); + reachabilityObserver?.next?.({ online: false }); + monitor.disconnected(); + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingNetwork', + 'ConnectionDisruptedPendingNetwork', + ]); + }); + + test('connection health states when the network is lost, the connection times out and then the network recovers', () => { + monitor.openingConnection(); + monitor.connectionEstablished(); + reachabilityObserver?.next?.({ online: false }); + monitor.disconnected(); + reachabilityObserver?.next?.({ online: true }); + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingNetwork', + 'ConnectionDisruptedPendingNetwork', + 'ConnectionDisrupted', + ]); + }); + + test('connection health states when a connection is no longer needed', () => { + monitor.openingConnection(); + monitor.connectionEstablished(); + monitor.disconnecting(); + + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingDisconnect', + ]); + }); + + test('connection health states when a connection is no longer needed closed', () => { + monitor.openingConnection(); + monitor.connectionEstablished(); + monitor.disconnecting(); + monitor.disconnected(); + + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingDisconnect', + 'Disconnected', + ]); + }); + + test('connection health states when a connection misses a keepalive, and then recovers', () => { + monitor.openingConnection(); + monitor.connectionEstablished(); + monitor.keepAliveMissed(); + monitor.keepAlive(); + + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingKeepAlive', + 'Connected', + ]); + }); + + test('lots of keep alive messages dont add more connection state events', () => { + monitor.openingConnection(); + monitor.keepAlive(); + monitor.connectionEstablished(); + monitor.keepAlive(); + monitor.keepAlive(); + monitor.keepAlive(); + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + ]); + }); + + test('missed keep alives during a network outage dont add an additional state change', () => { + monitor.openingConnection(); + monitor.connectionEstablished(); + reachabilityObserver?.next?.({ online: false }); + monitor.keepAliveMissed(); + monitor.keepAliveMissed(); + + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingNetwork', + ]); + }); + + test('when the network recovers, keep alives become the concern until one is seen', () => { + monitor.openingConnection(); + monitor.connectionEstablished(); + reachabilityObserver?.next?.({ online: false }); + monitor.keepAliveMissed(); + monitor.keepAliveMissed(); + reachabilityObserver?.next?.({ online: true }); + monitor.keepAlive(); + + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingNetwork', + 'ConnectedPendingKeepAlive', + 'Connected', + ]); + }); + }); + + describe('when the network is disconnected', () => { + beforeEach(() => { + reachabilityObserver?.next?.({ online: false }); + + observedStates = []; + subscription?.unsubscribe(); + monitor = new ConnectionStatusMonitor(); + subscription = monitor.connectionHealthStateObservable.subscribe( + value => { + observedStates.push(value); + } + ); + }); + + test('starts out disconnected', () => { + expect(observedStates).toEqual(['Disconnected']); + }); + }); +}); diff --git a/packages/pubsub/__tests__/PubSub-unit-test.ts b/packages/pubsub/__tests__/PubSub-unit-test.ts index 969e42d520b..622ceefcdf9 100644 --- a/packages/pubsub/__tests__/PubSub-unit-test.ts +++ b/packages/pubsub/__tests__/PubSub-unit-test.ts @@ -1,3 +1,14 @@ +jest.mock('@aws-amplify/core', () => ({ + __esModule: true, + ...jest.requireActual('@aws-amplify/core'), + browserOrNode() { + return { + isBrowser: true, + isNode: false, + }; + }, +})); + import { PubSubClass as PubSub } from '../src/PubSub'; import { MqttOverWSProvider, diff --git a/packages/pubsub/src/utils/ConnectionStatusMonitor.ts b/packages/pubsub/src/utils/ConnectionStatusMonitor.ts new file mode 100644 index 00000000000..b88af5536ea --- /dev/null +++ b/packages/pubsub/src/utils/ConnectionStatusMonitor.ts @@ -0,0 +1,184 @@ +/* + * Copyright 2017-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with + * the License. A copy of the License is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +import { Reachability } from '@aws-amplify/core'; +import Observable, { ZenObservable } from 'zen-observable-ts'; + +// Internal types for tracking different connection states +type ConnectivityState = 'connected' | 'disconnected'; +type HealthState = 'healthy' | 'unhealthy'; +type ConnectionState = { + networkState: ConnectivityState; + connectionState: ConnectivityState | 'connecting'; + intendedConnectionState: ConnectivityState; + keepAliveState: HealthState; +}; + +export type ConnectionHealthState = + | 'Connected' + | 'ConnectedPendingNetwork' + | 'ConnectionDisrupted' + | 'ConnectionDisruptedPendingNetwork' + | 'Connecting' + | 'ConnectedPendingDisconnect' + | 'Disconnected' + | 'ConnectedPendingKeepAlive'; + +export class ConnectionStatusMonitor { + /** + * @private + */ + private _connectionState: ConnectionState; + private _connectionStateObservable: Observable; + private _connectionStateObserver: ZenObservable.SubscriptionObserver; + + constructor() { + this._connectionState = { + networkState: 'connected', + connectionState: 'disconnected', + intendedConnectionState: 'disconnected', + keepAliveState: 'healthy', + }; + + this._connectionStateObservable = new Observable( + connectionStateObserver => { + connectionStateObserver.next(this._connectionState); + this._connectionStateObserver = connectionStateObserver; + } + ); + + // Maintain the network state based on the reachability monitor + new Reachability().networkMonitor().subscribe(({ online }) => { + this.updateConnectionState({ + networkState: online ? 'connected' : 'disconnected', + }); + }); + } + + /** + * Get the observable that allows us to monitor the connection health + * + * @returns {Observable} - The observable that emits ConnectionHealthState updates + */ + public get connectionHealthStateObservable(): Observable { + // Translate from connection states to ConnectionHealthStates, then remove any duplicates + let previous: ConnectionHealthState; + return this._connectionStateObservable + .map(value => { + return this.connectionStateToConnectionHealth(value); + }) + .filter(current => { + const toInclude = current !== previous; + previous = current; + return toInclude; + }); + } + + /** + * Tell the monitor that the connection has been disconnected + */ + disconnected() { + this.updateConnectionState({ connectionState: 'disconnected' }); + } + + /** + * Tell the monitor that the connection is opening + */ + openingConnection() { + this.updateConnectionState({ + intendedConnectionState: 'connected', + connectionState: 'connecting', + }); + } + + /** + * Tell the monitor that the connection is disconnecting + */ + disconnecting() { + this.updateConnectionState({ intendedConnectionState: 'disconnected' }); + } + + /** + * Tell the monitor that the connection has been established + */ + connectionEstablished() { + this.updateConnectionState({ connectionState: 'connected' }); + } + + /** + * Tell the monitor that a keep alive has occurred + */ + keepAlive() { + this.updateConnectionState({ keepAliveState: 'healthy' }); + } + + /** + * Tell the monitor that a keep alive has been missed + */ + keepAliveMissed() { + this.updateConnectionState({ keepAliveState: 'unhealthy' }); + } + + /** + * @private + */ + + private updateConnectionState(statusUpdates: Partial) { + // Maintain the socket state + const newSocketStatus = { ...this._connectionState, ...statusUpdates }; + + this._connectionState = { ...newSocketStatus }; + + this._connectionStateObserver.next({ ...this._connectionState }); + } + + /* + * Translate the ConnectionState structure into a specific ConnectionHealthState string literal union + */ + private connectionStateToConnectionHealth({ + connectionState, + networkState, + intendedConnectionState, + keepAliveState, + }: ConnectionState): ConnectionHealthState { + if (connectionState === 'connected' && networkState === 'disconnected') + return 'ConnectedPendingNetwork'; + + if ( + connectionState === 'connected' && + intendedConnectionState === 'disconnected' + ) + return 'ConnectedPendingDisconnect'; + + if ( + connectionState === 'disconnected' && + intendedConnectionState === 'connected' && + networkState === 'disconnected' + ) + return 'ConnectionDisruptedPendingNetwork'; + + if ( + connectionState === 'disconnected' && + intendedConnectionState === 'connected' + ) + return 'ConnectionDisrupted'; + + if (connectionState === 'connected' && keepAliveState === 'unhealthy') + return 'ConnectedPendingKeepAlive'; + + // All remaining states directly correspond to the connection state + if (connectionState === 'connecting') return 'Connecting'; + if (connectionState === 'disconnected') return 'Disconnected'; + return 'Connected'; + } +} From 2b8823cb36c54fd67254c6d73e1fd2dd4b7abfaf Mon Sep 17 00:00:00 2001 From: Aaron S Date: Fri, 1 Jul 2022 12:32:22 -0500 Subject: [PATCH 02/15] feat: AppSync realtime connection monitoring --- .../__tests__/ConnectionStateMonitor.tests.ts | 10 +++--- .../AWSAppSyncRealTimeProvider/constants.ts | 5 +++ .../AWSAppSyncRealTimeProvider/index.ts | 36 +++++++++++++++++-- packages/pubsub/src/index.ts | 4 +++ ...usMonitor.ts => ConnectionStateMonitor.ts} | 2 +- 5 files changed, 49 insertions(+), 8 deletions(-) rename packages/pubsub/src/utils/{ConnectionStatusMonitor.ts => ConnectionStateMonitor.ts} (99%) diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts index bb56b58990a..2593e29423e 100644 --- a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -26,11 +26,11 @@ import Observable from 'zen-observable-ts'; import { Reachability } from '@aws-amplify/core'; import { ConnectionHealthState, - ConnectionStatusMonitor, -} from '../src/utils/ConnectionStatusMonitor'; + ConnectionStateMonitor, +} from '../src/utils/ConnectionStateMonitor'; describe('ConnectionStateMonitor', () => { - let monitor: ConnectionStatusMonitor; + let monitor: ConnectionStateMonitor; let observedStates: ConnectionHealthState[]; let subscription: ZenObservable.Subscription; let reachabilityObserver: ZenObservable.Observer<{ online: boolean }>; @@ -52,7 +52,7 @@ describe('ConnectionStateMonitor', () => { observedStates = []; subscription?.unsubscribe(); - monitor = new ConnectionStatusMonitor(); + monitor = new ConnectionStateMonitor(); subscription = monitor.connectionHealthStateObservable.subscribe( value => { observedStates.push(value); @@ -214,7 +214,7 @@ describe('ConnectionStateMonitor', () => { observedStates = []; subscription?.unsubscribe(); - monitor = new ConnectionStatusMonitor(); + monitor = new ConnectionStateMonitor(); subscription = monitor.connectionHealthStateObservable.subscribe( value => { observedStates.push(value); diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts index a529573187e..993dcd95a23 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts @@ -93,3 +93,8 @@ export const START_ACK_TIMEOUT = 15000; * Default Time in milleseconds to wait for GQL_CONNECTION_KEEP_ALIVE message */ export const DEFAULT_KEEP_ALIVE_TIMEOUT = 5 * 60 * 1000; + +/** + * Default Time in milleseconds to alert for missed GQL_CONNECTION_KEEP_ALIVE message + */ +export const KEEP_ALIVE_ALERT_TIMEOUT = 65 * 1000; diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 47a7e0d389b..b247478a140 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -30,12 +30,13 @@ import { import Cache from '@aws-amplify/cache'; import Auth, { GRAPHQL_AUTH_MODE } from '@aws-amplify/auth'; import { AbstractPubSubProvider } from '../PubSubProvider'; -import { CONTROL_MSG } from '../../index'; +import { CONNECTION_HEALTH_CHANGE, CONTROL_MSG } from '../../index'; import { AMPLIFY_SYMBOL, AWS_APPSYNC_REALTIME_HEADERS, CONNECTION_INIT_TIMEOUT, DEFAULT_KEEP_ALIVE_TIMEOUT, + KEEP_ALIVE_ALERT_TIMEOUT, MAX_DELAY_MS, MESSAGE_TYPES, NON_RETRYABLE_CODES, @@ -43,6 +44,7 @@ import { START_ACK_TIMEOUT, SUBSCRIPTION_STATUS, } from './constants'; +import { ConnectionStateMonitor } from '../../utils/ConnectionStateMonitor'; const logger = new Logger('AWSAppSyncRealTimeProvider'); @@ -89,8 +91,21 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED; private keepAliveTimeoutId?: ReturnType; private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT; + private keepAliveAlertTimeoutId?: ReturnType; + private keepAliveAlertTimeout = KEEP_ALIVE_ALERT_TIMEOUT; private subscriptionObserverMap: Map = new Map(); private promiseArray: Array<{ res: Function; rej: Function }> = []; + private readonly connectionStateMonitor = new ConnectionStateMonitor(); + + constructor(options: ProviderOptions = {}) { + super(options); + // Monitor the connection health state and pass changes along to Hub + this.connectionStateMonitor.connectionHealthStateObservable.subscribe( + connectionHealthState => { + dispatchApiEvent(CONNECTION_HEALTH_CHANGE, {}, connectionHealthState); + } + ); + } getNewWebSocket(url, protocol) { return new WebSocket(url, protocol); @@ -252,6 +267,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { const stringToAWSRealTime = JSON.stringify(subscriptionMessage); try { + this.connectionStateMonitor.openingConnection(); await this._initializeWebSocketConnection({ apiKey, appSyncGraphqlEndpoint, @@ -262,6 +278,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { } catch (err) { logger.debug({ err }); const message = err['message'] ?? ''; + this.connectionStateMonitor.disconnected(); observer.error({ errors: [ { @@ -371,7 +388,13 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { setTimeout(this._closeSocketIfRequired.bind(this), 1000); } else { logger.debug('closing WebSocket...'); - if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId); + this.connectionStateMonitor.disconnecting(); + if (this.keepAliveTimeoutId) { + clearTimeout(this.keepAliveTimeoutId); + } + if (this.keepAliveAlertTimeoutId) { + clearTimeout(this.keepAliveAlertTimeoutId); + } const tempSocket = this.awsRealTimeSocket; // Cleaning callbacks to avoid race condition, socket still exists tempSocket.onclose = null; @@ -439,10 +462,17 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) { if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId); + if (this.keepAliveAlertTimeoutId) + clearTimeout(this.keepAliveAlertTimeoutId); this.keepAliveTimeoutId = setTimeout( this._errorDisconnect.bind(this, CONTROL_MSG.TIMEOUT_DISCONNECT), this.keepAliveTimeout ); + this.keepAliveAlertTimeoutId = setTimeout( + this.connectionStateMonitor.keepAliveMissed, + this.keepAliveAlertTimeout + ); + this.connectionStateMonitor.keepAlive(); return; } @@ -489,6 +519,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }); this.subscriptionObserverMap.clear(); if (this.awsRealTimeSocket) { + this.connectionStateMonitor.disconnected(); this.awsRealTimeSocket.close(); } @@ -678,6 +709,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); }; } + this.connectionStateMonitor.connectionEstablished(); res('Cool, connected to AWS AppSyncRealTime'); return; } diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index d44e2a1f28a..38ff704dec6 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -22,6 +22,10 @@ enum CONTROL_MSG { TIMEOUT_DISCONNECT = 'Timeout disconnect', } +export const CONNECTION_HEALTH_CHANGE = 'ConnectionHealthChange'; + +export { ConnectionHealthState } from './utils/ConnectionStateMonitor'; + export { PubSub, CONTROL_MSG }; /** diff --git a/packages/pubsub/src/utils/ConnectionStatusMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts similarity index 99% rename from packages/pubsub/src/utils/ConnectionStatusMonitor.ts rename to packages/pubsub/src/utils/ConnectionStateMonitor.ts index b88af5536ea..ba831da87d9 100644 --- a/packages/pubsub/src/utils/ConnectionStatusMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -34,7 +34,7 @@ export type ConnectionHealthState = | 'Disconnected' | 'ConnectedPendingKeepAlive'; -export class ConnectionStatusMonitor { +export class ConnectionStateMonitor { /** * @private */ From 6134eb5d0a0afd03175673009223ba1ddf3eca40 Mon Sep 17 00:00:00 2001 From: Aaron S Date: Wed, 6 Jul 2022 00:56:43 -0500 Subject: [PATCH 03/15] feat: AppSync realtime connection monitor test integration --- .../AWSAppSyncRealTimeProvider.test.ts | 202 +++++++++++++----- packages/pubsub/__tests__/helpers.ts | 106 +++++++-- .../AWSAppSyncRealTimeProvider/index.ts | 17 +- .../src/utils/ConnectionStateMonitor.ts | 10 + 4 files changed, 261 insertions(+), 74 deletions(-) diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index 0adf05fe301..e253cf36149 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -16,7 +16,9 @@ import Observable from 'zen-observable-ts'; import { AWSAppSyncRealTimeProvider } from '../src/Providers/AWSAppSyncRealTimeProvider'; import Cache from '@aws-amplify/cache'; import { MESSAGE_TYPES } from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; -import { FakeWebSocketInterface, delay, replaceConstant } from './helpers'; +import { delay, FakeWebSocketInterface, replaceConstant } from './helpers'; +import * as constants from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; +import { Reachability } from '@aws-amplify/core'; describe('AWSAppSyncRealTimeProvider', () => { describe('isCustomDomain()', () => { @@ -87,13 +89,74 @@ describe('AWSAppSyncRealTimeProvider', () => { fakeWebSocketInterface.newWebSocket(); return fakeWebSocketInterface.webSocket; }); + + // Reduce retry delay for tests to 100ms + Object.defineProperty(constants, 'MAX_DELAY_MS', { + value: 100, + }); + + const spyon = jest + .spyOn(Reachability.prototype, 'networkMonitor') + .mockImplementationOnce( + () => + new Observable(observer => { + observer.next?.({ online: true }); + }) + ); }); afterEach(async () => { await fakeWebSocketInterface?.closeInterface(); + fakeWebSocketInterface?.teardown(); loggerSpy.mockClear(); }); + test('standard subscription / unsubscription steps through the expected connection states', async () => { + const observer = provider.subscribe('test', { + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + const subscription = observer.subscribe({ + next: () => {}, + error: x => {}, + }); + + // Wait for the socket to be ready + await fakeWebSocketInterface?.standardConnectionHandshake(); + await fakeWebSocketInterface?.sendMessage( + new MessageEvent('start_ack', { + data: JSON.stringify({ + type: MESSAGE_TYPES.GQL_START_ACK, + payload: { connectionTimeoutMs: 100 }, + id: fakeWebSocketInterface?.webSocket.subscriptionId, + }), + }) + ); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + 'Connected', + ]); + expect( + fakeWebSocketInterface?.observedConnectionHealthStates + ).toEqual(['Disconnected', 'Connecting', 'Connected']); + + subscription.unsubscribe(); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + 'ConnectedPendingDisconnect', + ]); + + expect( + fakeWebSocketInterface?.observedConnectionHealthStates + ).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingDisconnect', + 'Disconnected', + ]); + }); + test('returns error when no appSyncGraphqlEndpoint is provided', async () => { expect.assertions(2); const mockError = jest.fn(); @@ -128,7 +191,7 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); // Wait for the socket to be initialize await fakeWebSocketInterface.readyForUse; @@ -154,7 +217,7 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'http://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); // Wait for the socket to be initialize await fakeWebSocketInterface.readyForUse; @@ -181,7 +244,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'https://testaccounturl123456789123.appsync-api.us-east-1.amazonaws.com/graphql', }) - .subscribe({}); + .subscribe({ error: () => {} }); // Wait for the socket to be initialize await fakeWebSocketInterface.readyForUse; @@ -200,7 +263,8 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerError(); expect(loggerSpy).toHaveBeenCalledWith( @@ -216,11 +280,14 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerClose(); - await delay(50); + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + 'Disconnected', + ]); // Watching for raised exception to be caught and logged expect(loggerSpy).toBeCalledWith( 'DEBUG', @@ -238,11 +305,11 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerOpen(); await fakeWebSocketInterface?.triggerError(); - // When the socket throws an error during handshake expect(loggerSpy).toHaveBeenCalledWith( 'DEBUG', @@ -257,7 +324,8 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerOpen(); await fakeWebSocketInterface?.triggerClose(); @@ -316,6 +384,7 @@ describe('AWSAppSyncRealTimeProvider', () => { data: JSON.stringify({ type: MESSAGE_TYPES.GQL_START_ACK, payload: { connectionTimeoutMs: 100 }, + id: fakeWebSocketInterface?.webSocket.subscriptionId, }), }) ); @@ -347,6 +416,7 @@ describe('AWSAppSyncRealTimeProvider', () => { data: JSON.stringify({ type: MESSAGE_TYPES.GQL_START_ACK, payload: { connectionTimeoutMs: 100 }, + id: fakeWebSocketInterface?.webSocket.subscriptionId, }), }) ); @@ -484,39 +554,55 @@ describe('AWSAppSyncRealTimeProvider', () => { ); }); - test('subscription observer error is triggered when a connection is formed and an ack data message is received then ack timeout prompts disconnect', async () => { - expect.assertions(1); + test('subscription observer error is triggered when a connection is formed and an ack data message is received then ka timeout prompts disconnect', async () => { + expect.assertions(2); const observer = provider.subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ - error: () => {}, - }); - - await fakeWebSocketInterface?.readyForUse; - await fakeWebSocketInterface?.triggerOpen(); - + const subscription = observer.subscribe({ error: () => {} }); // Resolve the message delivery actions - await Promise.resolve( - fakeWebSocketInterface?.sendMessage( + await replaceConstant('KEEP_ALIVE_ALERT_TIMEOUT', 5, async () => { + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + await fakeWebSocketInterface?.sendMessage( new MessageEvent('connection_ack', { data: JSON.stringify({ - type: MESSAGE_TYPES.GQL_CONNECTION_ACK, - payload: { connectionTimeoutMs: 20 }, + type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK, + payload: { connectionTimeoutMs: 100 }, }), }) - ) - ); + ); - await fakeWebSocketInterface?.sendDataMessage({ - type: MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE, - payload: { data: {} }, + await fakeWebSocketInterface?.sendMessage( + new MessageEvent('start_ack', { + data: JSON.stringify({ + type: MESSAGE_TYPES.GQL_START_ACK, + payload: {}, + id: fakeWebSocketInterface?.webSocket.subscriptionId, + }), + }) + ); + + await fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE, + payload: { data: {} }, + }); }); - // Now wait for the timeout to elapse - await delay(100); + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + 'Connected', + ]); + + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + 'ConnectionDisrupted', + ]); + + expect( + fakeWebSocketInterface?.observedConnectionHealthStates + ).toContain('ConnectedPendingKeepAlive'); expect(loggerSpy).toBeCalledWith( 'DEBUG', @@ -531,7 +617,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({}); + const subscription = observer.subscribe({ error: () => {} }); await fakeWebSocketInterface?.standardConnectionHandshake(); await fakeWebSocketInterface?.sendDataMessage({ @@ -547,19 +633,19 @@ describe('AWSAppSyncRealTimeProvider', () => { test('failure to ack before timeout', async () => { expect.assertions(1); - await replaceConstant('START_ACK_TIMEOUT', 20, async () => { + await replaceConstant('START_ACK_TIMEOUT', 30, async () => { const observer = provider.subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ - error: () => {}, - }); + const subscription = observer.subscribe({ error: () => {} }); await fakeWebSocketInterface?.standardConnectionHandshake(); - // Wait long enough that the shortened timeout will elapse - await delay(100); + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitForConnectionState([ + 'Disconnected', + ]); expect(loggerSpy).toBeCalledWith( 'DEBUG', @@ -577,15 +663,19 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ - error: () => {}, - }); + const subscription = observer.subscribe({ error: () => {} }); await fakeWebSocketInterface?.readyForUse; + Promise.resolve(); await fakeWebSocketInterface?.triggerOpen(); - // Wait long enough that the shortened timeout will elapse - await delay(100); + // Wait no less than 20 ms + await delay(20); + + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + 'Disconnected', + ]); // Watching for raised exception to be caught and logged expect(loggerSpy).toBeCalledWith( @@ -597,6 +687,7 @@ describe('AWSAppSyncRealTimeProvider', () => { ), }) ); + console.log('END TEST'); }); }); @@ -609,7 +700,8 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'API_KEY', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -637,7 +729,8 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AWS_IAM', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -715,8 +808,10 @@ describe('AWSAppSyncRealTimeProvider', () => { }, }); - // It takes time for the credentials to resolve - await delay(50); + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + 'Disconnected', + ]); expect(loggerSpy).toHaveBeenCalledWith( 'WARN', @@ -741,7 +836,8 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'OPENID_CONNECT', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -796,9 +892,9 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'OPENID_CONNECT', }) - .subscribe({}); - await fakeWebSocketInterface?.readyForUse; + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( 'DEBUG', 'Authenticating with OPENID_CONNECT' @@ -825,7 +921,8 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AMAZON_COGNITO_USER_POOLS', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -845,7 +942,8 @@ describe('AWSAppSyncRealTimeProvider', () => { Authorization: 'test', }, }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -862,7 +960,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AWS_LAMBDA', additionalHeaders: { - Authorization: undefined, + Authorization: '', }, }) .subscribe({ diff --git a/packages/pubsub/__tests__/helpers.ts b/packages/pubsub/__tests__/helpers.ts index 8328d953def..00e0e41ee66 100644 --- a/packages/pubsub/__tests__/helpers.ts +++ b/packages/pubsub/__tests__/helpers.ts @@ -1,3 +1,6 @@ +import { Hub } from '@aws-amplify/core'; +import Observable from 'zen-observable-ts'; +import { ConnectionHealthState, CONNECTION_HEALTH_CHANGE } from '../src'; import * as constants from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; export function delay(timeout) { @@ -10,10 +13,15 @@ export function delay(timeout) { export class FakeWebSocketInterface { readonly webSocket: FakeWebSocket; - readyForUse: Promise; + readyForUse: Promise; hasClosed: Promise; + teardownHubListener: () => void; + observedConnectionHealthStates: ConnectionHealthState[] = []; + currentConnectionHealthState: ConnectionHealthState; private readyResolve: (value: PromiseLike) => void; + private connectionHealthObservers: ZenObservable.Observer[] = + []; constructor() { this.readyForUse = new Promise((res, rej) => { @@ -23,7 +31,41 @@ export class FakeWebSocketInterface { this.hasClosed = new Promise((res, rej) => { closeResolver = res; }); - this.webSocket = new FakeWebSocket(closeResolver); + this.webSocket = new FakeWebSocket(() => closeResolver); + + this.teardownHubListener = Hub.listen('api', (data: any) => { + const { payload } = data; + if (payload.event === CONNECTION_HEALTH_CHANGE) { + const healthState = payload.message as ConnectionHealthState; + this.observedConnectionHealthStates.push(healthState); + this.connectionHealthObservers.forEach(observer => { + observer?.next?.(healthState); + }); + this.currentConnectionHealthState = healthState; + } + }); + } + + allConnectionHealthStateObserver() { + return new Observable(observer => { + this.observedConnectionHealthStates.forEach(state => { + observer.next(state); + }); + this.connectionHealthObservers.push(observer); + }); + } + + connectionHealthStateObserver() { + return new Observable(observer => { + this.connectionHealthObservers.push(observer); + }); + } + + teardown() { + this.teardownHubListener(); + this.connectionHealthObservers.forEach(observer => { + observer?.complete?.(); + }); } async standardConnectionHandshake() { @@ -44,21 +86,25 @@ export class FakeWebSocketInterface { // After a close is triggered, the provider has logic that must execute // which changes the function resolvers assigned to the websocket await this.runAndResolve(() => { - if (this.webSocket.onclose) - this.webSocket.onclose(new CloseEvent('', {})); + if (this.webSocket.onclose) { + try { + this.webSocket.onclose(new CloseEvent('', {})); + } catch {} + } }); } async closeInterface() { await this.triggerClose(); // Wait for either hasClosed or a half second has passed - await new Promise(res => { + await new Promise(async res => { // The interface is closed when the socket "hasClosed" this.hasClosed.then(() => res(undefined)); - - // The provider can get pretty wrapped around itself, - // but its safe to continue after half a second, even if it hasn't closed the socket - delay(500).then(() => res(undefined)); + await this.waitUntilConnectionStateIn([ + 'Disconnected', + 'ConnectionDisrupted', + ]); + res(undefined); }); } @@ -71,7 +117,7 @@ export class FakeWebSocketInterface { } newWebSocket() { - setTimeout(() => this.readyResolve(undefined), 10); + setTimeout(() => this.readyResolve(Promise.resolve()), 10); return this.webSocket; } @@ -80,7 +126,7 @@ export class FakeWebSocketInterface { new MessageEvent('connection_ack', { data: JSON.stringify({ type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK, - payload: { keepAliveTimeout: 100_000 }, + payload: { connectionTimeoutMs: 100_000 }, }), }) ); @@ -108,11 +154,40 @@ export class FakeWebSocketInterface { fn(); await Promise.resolve(); } + + async observesConnectionState(connectionState: ConnectionHealthState) { + return new Promise((res, rej) => { + this.allConnectionHealthStateObserver().subscribe(value => { + if (value === connectionState) { + res(undefined); + } + }); + }); + } + + async waitForConnectionState(connectionStates: ConnectionHealthState[]) { + return new Promise((res, rej) => { + this.connectionHealthStateObserver().subscribe(value => { + if (connectionStates.includes(String(value) as ConnectionHealthState)) { + res(undefined); + } + }); + }); + } + + async waitUntilConnectionStateIn(connectionStates: ConnectionHealthState[]) { + return new Promise((res, rej) => { + if (connectionStates.includes(this.currentConnectionHealthState)) { + res(undefined); + } + res(this.waitForConnectionState(connectionStates)); + }); + } } class FakeWebSocket implements WebSocket { subscriptionId: string | undefined; - closeResolver?: (value: PromiseLike) => void; + closeResolverFcn: () => (value: PromiseLike) => void; binaryType: BinaryType; bufferedAmount: number; @@ -125,7 +200,8 @@ class FakeWebSocket implements WebSocket { readyState: number; url: string; close(code?: number, reason?: string): void { - if (this.closeResolver) this.closeResolver(undefined); + const closeResolver = this.closeResolverFcn(); + if (closeResolver) closeResolver(Promise.resolve(undefined)); } send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void { const parsedInput = JSON.parse(String(data)); @@ -170,8 +246,8 @@ class FakeWebSocket implements WebSocket { throw new Error('Method not implemented dispatchEvent.'); } - constructor(closeResolver?: (value: PromiseLike) => void) { - this.closeResolver = closeResolver; + constructor(closeResolver: () => (value: PromiseLike) => void) { + this.closeResolverFcn = closeResolver; } } diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index b247478a140..0cdbe9f8660 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -92,7 +92,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { private keepAliveTimeoutId?: ReturnType; private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT; private keepAliveAlertTimeoutId?: ReturnType; - private keepAliveAlertTimeout = KEEP_ALIVE_ALERT_TIMEOUT; private subscriptionObserverMap: Map = new Map(); private promiseArray: Array<{ res: Function; rej: Function }> = []; private readonly connectionStateMonitor = new ConnectionStateMonitor(); @@ -162,6 +161,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }, ], }); + this.connectionStateMonitor.disconnected(); observer.complete(); }); @@ -402,6 +402,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { tempSocket.close(1000); this.awsRealTimeSocket = undefined; this.socketStatus = SOCKET_STATUS.CLOSED; + this.connectionStateMonitor.disconnected(); } } @@ -455,6 +456,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { subscriptionFailedCallback, }); } + this.connectionStateMonitor.connectionEstablished(); // TODO: emit event on hub but it requires to store the id first return; @@ -465,13 +467,12 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { if (this.keepAliveAlertTimeoutId) clearTimeout(this.keepAliveAlertTimeoutId); this.keepAliveTimeoutId = setTimeout( - this._errorDisconnect.bind(this, CONTROL_MSG.TIMEOUT_DISCONNECT), + () => this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT), this.keepAliveTimeout ); - this.keepAliveAlertTimeoutId = setTimeout( - this.connectionStateMonitor.keepAliveMissed, - this.keepAliveAlertTimeout - ); + this.keepAliveAlertTimeoutId = setTimeout(() => { + this.connectionStateMonitor.keepAliveMissed(); + }, KEEP_ALIVE_ALERT_TIMEOUT); this.connectionStateMonitor.keepAlive(); return; } @@ -661,6 +662,8 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { logger.debug(`WebSocket connection error`); }; newSocket.onclose = () => { + this.connectionStateMonitor.connectionFailed(); + //throw new Error('Connection handshake error'); rej(new Error('Connection handshake error')); }; newSocket.onopen = () => { @@ -709,7 +712,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); }; } - this.connectionStateMonitor.connectionEstablished(); res('Cool, connected to AWS AppSyncRealTime'); return; } @@ -735,6 +737,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { function checkAckOk(ackOk: boolean) { if (!ackOk) { + this.connectionStateMonitor.connectionFailed(); rej( new Error( `Connection timeout: ack from AWSRealTime was not received on ${CONNECTION_INIT_TIMEOUT} ms` diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index ba831da87d9..07d1160a7d2 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -108,6 +108,16 @@ export class ConnectionStateMonitor { this.updateConnectionState({ intendedConnectionState: 'disconnected' }); } + /** + * Tell the monitor that the connection has failed + */ + connectionFailed() { + this.updateConnectionState({ + intendedConnectionState: 'disconnected', + connectionState: 'disconnected', + }); + } + /** * Tell the monitor that the connection has been established */ From e8f54a8ee0adbc00dc1d2e5414c3018c03246478 Mon Sep 17 00:00:00 2001 From: Aaron S Date: Wed, 6 Jul 2022 01:01:03 -0500 Subject: [PATCH 04/15] fix: Include missed changes --- .../__tests__/AWSAppSyncRealTimeProvider.test.ts | 13 +++++++------ .../Providers/AWSAppSyncRealTimeProvider/index.ts | 1 - 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index e253cf36149..4c868ec4fee 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -9,16 +9,17 @@ jest.mock('@aws-amplify/core', () => ({ }, })); -import { Auth } from '@aws-amplify/auth'; -import { Credentials, Logger, Signer } from '@aws-amplify/core'; -import { GraphQLError, isCompositeType } from 'graphql'; import Observable from 'zen-observable-ts'; -import { AWSAppSyncRealTimeProvider } from '../src/Providers/AWSAppSyncRealTimeProvider'; +import { Reachability, Credentials, Logger, Signer } from '@aws-amplify/core'; +import { Auth } from '@aws-amplify/auth'; import Cache from '@aws-amplify/cache'; + import { MESSAGE_TYPES } from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; -import { delay, FakeWebSocketInterface, replaceConstant } from './helpers'; import * as constants from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; -import { Reachability } from '@aws-amplify/core'; + +import { delay, FakeWebSocketInterface, replaceConstant } from './helpers'; + +import { AWSAppSyncRealTimeProvider } from '../src/Providers/AWSAppSyncRealTimeProvider'; describe('AWSAppSyncRealTimeProvider', () => { describe('isCustomDomain()', () => { diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 0cdbe9f8660..1d133d90492 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -663,7 +663,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }; newSocket.onclose = () => { this.connectionStateMonitor.connectionFailed(); - //throw new Error('Connection handshake error'); rej(new Error('Connection handshake error')); }; newSocket.onopen = () => { From e677c04cbb059f5ac578e6efc2783eb2d6cef0fe Mon Sep 17 00:00:00 2001 From: Aaron S Date: Wed, 6 Jul 2022 13:51:31 -0500 Subject: [PATCH 05/15] fix: Restructure Hub dispatch --- .../pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts | 1 + packages/pubsub/__tests__/helpers.ts | 3 ++- .../src/Providers/AWSAppSyncRealTimeProvider/index.ts | 9 ++++++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index 4c868ec4fee..df6955aa897 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -96,6 +96,7 @@ describe('AWSAppSyncRealTimeProvider', () => { value: 100, }); + // Set the network to "online" for these tests const spyon = jest .spyOn(Reachability.prototype, 'networkMonitor') .mockImplementationOnce( diff --git a/packages/pubsub/__tests__/helpers.ts b/packages/pubsub/__tests__/helpers.ts index 00e0e41ee66..a344513c04b 100644 --- a/packages/pubsub/__tests__/helpers.ts +++ b/packages/pubsub/__tests__/helpers.ts @@ -36,7 +36,8 @@ export class FakeWebSocketInterface { this.teardownHubListener = Hub.listen('api', (data: any) => { const { payload } = data; if (payload.event === CONNECTION_HEALTH_CHANGE) { - const healthState = payload.message as ConnectionHealthState; + const healthState = payload.data + .connectionState as ConnectionHealthState; this.observedConnectionHealthStates.push(healthState); this.connectionHealthObservers.forEach(observer => { observer?.next?.(healthState); diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 1d133d90492..9e5a0953cbd 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -101,7 +101,14 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { // Monitor the connection health state and pass changes along to Hub this.connectionStateMonitor.connectionHealthStateObservable.subscribe( connectionHealthState => { - dispatchApiEvent(CONNECTION_HEALTH_CHANGE, {}, connectionHealthState); + dispatchApiEvent( + CONNECTION_HEALTH_CHANGE, + { + provider: this, + connectionState: connectionHealthState, + }, + `Connection state is ${connectionHealthState}` + ); } ); } From 43dece9655b14b3b4448950b5c8eaa95cf723f76 Mon Sep 17 00:00:00 2001 From: Aaron S Date: Wed, 6 Jul 2022 15:42:15 -0500 Subject: [PATCH 06/15] fix: Remove print statement --- packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index df6955aa897..89fc99850db 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -689,7 +689,6 @@ describe('AWSAppSyncRealTimeProvider', () => { ), }) ); - console.log('END TEST'); }); }); From 78b05162295deb055ce46f302d7d047f2f5fb265 Mon Sep 17 00:00:00 2001 From: Aaron S Date: Mon, 11 Jul 2022 16:26:06 -0500 Subject: [PATCH 07/15] fix: PR comment updates --- .../AWSAppSyncRealTimeProvider.test.ts | 56 ++++++++++--------- .../__tests__/ConnectionStateMonitor.tests.ts | 14 ++--- packages/pubsub/__tests__/helpers.ts | 4 +- .../AWSAppSyncRealTimeProvider/constants.ts | 2 +- .../AWSAppSyncRealTimeProvider/index.ts | 21 +++---- packages/pubsub/src/index.ts | 4 +- .../src/utils/ConnectionStateMonitor.ts | 40 ++++++------- 7 files changed, 74 insertions(+), 67 deletions(-) diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index 89fc99850db..cbec7795a52 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -565,33 +565,37 @@ describe('AWSAppSyncRealTimeProvider', () => { const subscription = observer.subscribe({ error: () => {} }); // Resolve the message delivery actions - await replaceConstant('KEEP_ALIVE_ALERT_TIMEOUT', 5, async () => { - await fakeWebSocketInterface?.readyForUse; - await fakeWebSocketInterface?.triggerOpen(); - await fakeWebSocketInterface?.sendMessage( - new MessageEvent('connection_ack', { - data: JSON.stringify({ - type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK, - payload: { connectionTimeoutMs: 100 }, - }), - }) - ); - - await fakeWebSocketInterface?.sendMessage( - new MessageEvent('start_ack', { - data: JSON.stringify({ - type: MESSAGE_TYPES.GQL_START_ACK, - payload: {}, - id: fakeWebSocketInterface?.webSocket.subscriptionId, - }), - }) - ); + await replaceConstant( + 'DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT', + 5, + async () => { + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + await fakeWebSocketInterface?.sendMessage( + new MessageEvent('connection_ack', { + data: JSON.stringify({ + type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK, + payload: { connectionTimeoutMs: 100 }, + }), + }) + ); + + await fakeWebSocketInterface?.sendMessage( + new MessageEvent('start_ack', { + data: JSON.stringify({ + type: MESSAGE_TYPES.GQL_START_ACK, + payload: {}, + id: fakeWebSocketInterface?.webSocket.subscriptionId, + }), + }) + ); - await fakeWebSocketInterface?.sendDataMessage({ - type: MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE, - payload: { data: {} }, - }); - }); + await fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE, + payload: { data: {} }, + }); + } + ); await fakeWebSocketInterface?.waitUntilConnectionStateIn([ 'Connected', diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts index 2593e29423e..1b1cc413c69 100644 --- a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -25,13 +25,13 @@ jest.mock('@aws-amplify/core', () => ({ import Observable from 'zen-observable-ts'; import { Reachability } from '@aws-amplify/core'; import { - ConnectionHealthState, + ConnectionState, ConnectionStateMonitor, } from '../src/utils/ConnectionStateMonitor'; describe('ConnectionStateMonitor', () => { let monitor: ConnectionStateMonitor; - let observedStates: ConnectionHealthState[]; + let observedStates: ConnectionState[]; let subscription: ZenObservable.Subscription; let reachabilityObserver: ZenObservable.Observer<{ online: boolean }>; @@ -90,7 +90,7 @@ describe('ConnectionStateMonitor', () => { monitor.openingConnection(); monitor.connectionEstablished(); reachabilityObserver?.next?.({ online: false }); - monitor.disconnected(); + monitor.closed(); expect(observedStates).toEqual([ 'Disconnected', 'Connecting', @@ -104,7 +104,7 @@ describe('ConnectionStateMonitor', () => { monitor.openingConnection(); monitor.connectionEstablished(); reachabilityObserver?.next?.({ online: false }); - monitor.disconnected(); + monitor.closed(); reachabilityObserver?.next?.({ online: true }); expect(observedStates).toEqual([ 'Disconnected', @@ -119,7 +119,7 @@ describe('ConnectionStateMonitor', () => { test('connection health states when a connection is no longer needed', () => { monitor.openingConnection(); monitor.connectionEstablished(); - monitor.disconnecting(); + monitor.closing(); expect(observedStates).toEqual([ 'Disconnected', @@ -132,8 +132,8 @@ describe('ConnectionStateMonitor', () => { test('connection health states when a connection is no longer needed closed', () => { monitor.openingConnection(); monitor.connectionEstablished(); - monitor.disconnecting(); - monitor.disconnected(); + monitor.closing(); + monitor.closed(); expect(observedStates).toEqual([ 'Disconnected', diff --git a/packages/pubsub/__tests__/helpers.ts b/packages/pubsub/__tests__/helpers.ts index a344513c04b..a1932501cc1 100644 --- a/packages/pubsub/__tests__/helpers.ts +++ b/packages/pubsub/__tests__/helpers.ts @@ -1,6 +1,6 @@ import { Hub } from '@aws-amplify/core'; import Observable from 'zen-observable-ts'; -import { ConnectionHealthState, CONNECTION_HEALTH_CHANGE } from '../src'; +import { ConnectionHealthState, CONNECTION_STATE_CHANGE } from '../src'; import * as constants from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; export function delay(timeout) { @@ -35,7 +35,7 @@ export class FakeWebSocketInterface { this.teardownHubListener = Hub.listen('api', (data: any) => { const { payload } = data; - if (payload.event === CONNECTION_HEALTH_CHANGE) { + if (payload.event === CONNECTION_STATE_CHANGE) { const healthState = payload.data .connectionState as ConnectionHealthState; this.observedConnectionHealthStates.push(healthState); diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts index 993dcd95a23..b8e5c04cdb5 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts @@ -97,4 +97,4 @@ export const DEFAULT_KEEP_ALIVE_TIMEOUT = 5 * 60 * 1000; /** * Default Time in milleseconds to alert for missed GQL_CONNECTION_KEEP_ALIVE message */ -export const KEEP_ALIVE_ALERT_TIMEOUT = 65 * 1000; +export const DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT = 65 * 1000; diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 9e5a0953cbd..8ddb4a4985e 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -30,13 +30,13 @@ import { import Cache from '@aws-amplify/cache'; import Auth, { GRAPHQL_AUTH_MODE } from '@aws-amplify/auth'; import { AbstractPubSubProvider } from '../PubSubProvider'; -import { CONNECTION_HEALTH_CHANGE, CONTROL_MSG } from '../../index'; +import { CONNECTION_STATE_CHANGE, CONTROL_MSG } from '../../index'; import { AMPLIFY_SYMBOL, AWS_APPSYNC_REALTIME_HEADERS, CONNECTION_INIT_TIMEOUT, DEFAULT_KEEP_ALIVE_TIMEOUT, - KEEP_ALIVE_ALERT_TIMEOUT, + DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT, MAX_DELAY_MS, MESSAGE_TYPES, NON_RETRYABLE_CODES, @@ -102,7 +102,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.connectionStateMonitor.connectionHealthStateObservable.subscribe( connectionHealthState => { dispatchApiEvent( - CONNECTION_HEALTH_CHANGE, + CONNECTION_STATE_CHANGE, { provider: this, connectionState: connectionHealthState, @@ -168,7 +168,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }, ], }); - this.connectionStateMonitor.disconnected(); + this.connectionStateMonitor.closed(); observer.complete(); }); @@ -285,7 +285,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { } catch (err) { logger.debug({ err }); const message = err['message'] ?? ''; - this.connectionStateMonitor.disconnected(); + this.connectionStateMonitor.closed(); observer.error({ errors: [ { @@ -390,12 +390,14 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.socketStatus = SOCKET_STATUS.CLOSED; return; } + + this.connectionStateMonitor.closing(); + if (this.awsRealTimeSocket.bufferedAmount > 0) { // Still data on the WebSocket setTimeout(this._closeSocketIfRequired.bind(this), 1000); } else { logger.debug('closing WebSocket...'); - this.connectionStateMonitor.disconnecting(); if (this.keepAliveTimeoutId) { clearTimeout(this.keepAliveTimeoutId); } @@ -409,7 +411,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { tempSocket.close(1000); this.awsRealTimeSocket = undefined; this.socketStatus = SOCKET_STATUS.CLOSED; - this.connectionStateMonitor.disconnected(); + this.connectionStateMonitor.closed(); } } @@ -465,7 +467,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { } this.connectionStateMonitor.connectionEstablished(); - // TODO: emit event on hub but it requires to store the id first return; } @@ -479,7 +480,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { ); this.keepAliveAlertTimeoutId = setTimeout(() => { this.connectionStateMonitor.keepAliveMissed(); - }, KEEP_ALIVE_ALERT_TIMEOUT); + }, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT); this.connectionStateMonitor.keepAlive(); return; } @@ -527,7 +528,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }); this.subscriptionObserverMap.clear(); if (this.awsRealTimeSocket) { - this.connectionStateMonitor.disconnected(); + this.connectionStateMonitor.closed(); this.awsRealTimeSocket.close(); } diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index 38ff704dec6..1e2fa27689c 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -22,9 +22,9 @@ enum CONTROL_MSG { TIMEOUT_DISCONNECT = 'Timeout disconnect', } -export const CONNECTION_HEALTH_CHANGE = 'ConnectionHealthChange'; +export const CONNECTION_STATE_CHANGE = 'ConnectionHealthChange'; -export { ConnectionHealthState } from './utils/ConnectionStateMonitor'; +export { ConnectionState as ConnectionHealthState } from './utils/ConnectionStateMonitor'; export { PubSub, CONTROL_MSG }; diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index 07d1160a7d2..2761fb8726b 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -15,16 +15,16 @@ import { Reachability } from '@aws-amplify/core'; import Observable, { ZenObservable } from 'zen-observable-ts'; // Internal types for tracking different connection states -type ConnectivityState = 'connected' | 'disconnected'; -type HealthState = 'healthy' | 'unhealthy'; -type ConnectionState = { - networkState: ConnectivityState; - connectionState: ConnectivityState | 'connecting'; - intendedConnectionState: ConnectivityState; - keepAliveState: HealthState; +type LinkedConnectionState = 'connected' | 'disconnected'; +type LinkedConnectionHealthState = 'healthy' | 'unhealthy'; +type LinkedConnectionStates = { + networkState: LinkedConnectionState; + connectionState: LinkedConnectionState | 'connecting'; + intendedConnectionState: LinkedConnectionState; + keepAliveState: LinkedConnectionHealthState; }; -export type ConnectionHealthState = +export type ConnectionState = | 'Connected' | 'ConnectedPendingNetwork' | 'ConnectionDisrupted' @@ -38,9 +38,9 @@ export class ConnectionStateMonitor { /** * @private */ - private _connectionState: ConnectionState; - private _connectionStateObservable: Observable; - private _connectionStateObserver: ZenObservable.SubscriptionObserver; + private _connectionState: LinkedConnectionStates; + private _connectionStateObservable: Observable; + private _connectionStateObserver: ZenObservable.SubscriptionObserver; constructor() { this._connectionState = { @@ -50,7 +50,7 @@ export class ConnectionStateMonitor { keepAliveState: 'healthy', }; - this._connectionStateObservable = new Observable( + this._connectionStateObservable = new Observable( connectionStateObserver => { connectionStateObserver.next(this._connectionState); this._connectionStateObserver = connectionStateObserver; @@ -68,11 +68,11 @@ export class ConnectionStateMonitor { /** * Get the observable that allows us to monitor the connection health * - * @returns {Observable} - The observable that emits ConnectionHealthState updates + * @returns {Observable} - The observable that emits ConnectionHealthState updates */ - public get connectionHealthStateObservable(): Observable { + public get connectionHealthStateObservable(): Observable { // Translate from connection states to ConnectionHealthStates, then remove any duplicates - let previous: ConnectionHealthState; + let previous: ConnectionState; return this._connectionStateObservable .map(value => { return this.connectionStateToConnectionHealth(value); @@ -87,7 +87,7 @@ export class ConnectionStateMonitor { /** * Tell the monitor that the connection has been disconnected */ - disconnected() { + closed() { this.updateConnectionState({ connectionState: 'disconnected' }); } @@ -104,7 +104,7 @@ export class ConnectionStateMonitor { /** * Tell the monitor that the connection is disconnecting */ - disconnecting() { + closing() { this.updateConnectionState({ intendedConnectionState: 'disconnected' }); } @@ -143,7 +143,9 @@ export class ConnectionStateMonitor { * @private */ - private updateConnectionState(statusUpdates: Partial) { + private updateConnectionState( + statusUpdates: Partial + ) { // Maintain the socket state const newSocketStatus = { ...this._connectionState, ...statusUpdates }; @@ -160,7 +162,7 @@ export class ConnectionStateMonitor { networkState, intendedConnectionState, keepAliveState, - }: ConnectionState): ConnectionHealthState { + }: LinkedConnectionStates): ConnectionState { if (connectionState === 'connected' && networkState === 'disconnected') return 'ConnectedPendingNetwork'; From eb11b572a7db7e925042e207b8a29c3a0031b6df Mon Sep 17 00:00:00 2001 From: Aaron S Date: Wed, 13 Jul 2022 15:24:17 -0500 Subject: [PATCH 08/15] fix: Found more overlooked health state refactoring --- .../AWSAppSyncRealTimeProvider.test.ts | 18 ++++---- .../__tests__/ConnectionStateMonitor.tests.ts | 16 +++---- packages/pubsub/__tests__/helpers.ts | 45 +++++++++---------- .../AWSAppSyncRealTimeProvider/index.ts | 8 ++-- packages/pubsub/src/index.ts | 4 +- .../src/utils/ConnectionStateMonitor.ts | 18 ++++---- 6 files changed, 52 insertions(+), 57 deletions(-) diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index cbec7795a52..e04bd3fef90 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -138,9 +138,11 @@ describe('AWSAppSyncRealTimeProvider', () => { await fakeWebSocketInterface?.waitUntilConnectionStateIn([ 'Connected', ]); - expect( - fakeWebSocketInterface?.observedConnectionHealthStates - ).toEqual(['Disconnected', 'Connecting', 'Connected']); + expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + ]); subscription.unsubscribe(); @@ -148,9 +150,7 @@ describe('AWSAppSyncRealTimeProvider', () => { 'ConnectedPendingDisconnect', ]); - expect( - fakeWebSocketInterface?.observedConnectionHealthStates - ).toEqual([ + expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ 'Disconnected', 'Connecting', 'Connected', @@ -606,9 +606,9 @@ describe('AWSAppSyncRealTimeProvider', () => { 'ConnectionDisrupted', ]); - expect( - fakeWebSocketInterface?.observedConnectionHealthStates - ).toContain('ConnectedPendingKeepAlive'); + expect(fakeWebSocketInterface?.observedConnectionStates).toContain( + 'ConnectedPendingKeepAlive' + ); expect(loggerSpy).toBeCalledWith( 'DEBUG', diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts index 1b1cc413c69..7a6b1610b62 100644 --- a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -53,11 +53,9 @@ describe('ConnectionStateMonitor', () => { observedStates = []; subscription?.unsubscribe(); monitor = new ConnectionStateMonitor(); - subscription = monitor.connectionHealthStateObservable.subscribe( - value => { - observedStates.push(value); - } - ); + subscription = monitor.ConnectionStateObservable.subscribe(value => { + observedStates.push(value); + }); }); test('connection health states starts out disconnected', () => { @@ -215,11 +213,9 @@ describe('ConnectionStateMonitor', () => { observedStates = []; subscription?.unsubscribe(); monitor = new ConnectionStateMonitor(); - subscription = monitor.connectionHealthStateObservable.subscribe( - value => { - observedStates.push(value); - } - ); + subscription = monitor.ConnectionStateObservable.subscribe(value => { + observedStates.push(value); + }); }); test('starts out disconnected', () => { diff --git a/packages/pubsub/__tests__/helpers.ts b/packages/pubsub/__tests__/helpers.ts index a1932501cc1..0bc07be6024 100644 --- a/packages/pubsub/__tests__/helpers.ts +++ b/packages/pubsub/__tests__/helpers.ts @@ -1,6 +1,6 @@ import { Hub } from '@aws-amplify/core'; import Observable from 'zen-observable-ts'; -import { ConnectionHealthState, CONNECTION_STATE_CHANGE } from '../src'; +import { ConnectionState, CONNECTION_STATE_CHANGE } from '../src'; import * as constants from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; export function delay(timeout) { @@ -16,11 +16,11 @@ export class FakeWebSocketInterface { readyForUse: Promise; hasClosed: Promise; teardownHubListener: () => void; - observedConnectionHealthStates: ConnectionHealthState[] = []; - currentConnectionHealthState: ConnectionHealthState; + observedConnectionStates: ConnectionState[] = []; + currentConnectionState: ConnectionState; private readyResolve: (value: PromiseLike) => void; - private connectionHealthObservers: ZenObservable.Observer[] = + private connectionStateObservers: ZenObservable.Observer[] = []; constructor() { @@ -36,35 +36,34 @@ export class FakeWebSocketInterface { this.teardownHubListener = Hub.listen('api', (data: any) => { const { payload } = data; if (payload.event === CONNECTION_STATE_CHANGE) { - const healthState = payload.data - .connectionState as ConnectionHealthState; - this.observedConnectionHealthStates.push(healthState); - this.connectionHealthObservers.forEach(observer => { - observer?.next?.(healthState); + const connectionState = payload.data.connectionState as ConnectionState; + this.observedConnectionStates.push(connectionState); + this.connectionStateObservers.forEach(observer => { + observer?.next?.(connectionState); }); - this.currentConnectionHealthState = healthState; + this.currentConnectionState = connectionState; } }); } - allConnectionHealthStateObserver() { + allConnectionStateObserver() { return new Observable(observer => { - this.observedConnectionHealthStates.forEach(state => { + this.observedConnectionStates.forEach(state => { observer.next(state); }); - this.connectionHealthObservers.push(observer); + this.connectionStateObservers.push(observer); }); } - connectionHealthStateObserver() { + ConnectionStateObserver() { return new Observable(observer => { - this.connectionHealthObservers.push(observer); + this.connectionStateObservers.push(observer); }); } teardown() { this.teardownHubListener(); - this.connectionHealthObservers.forEach(observer => { + this.connectionStateObservers.forEach(observer => { observer?.complete?.(); }); } @@ -156,9 +155,9 @@ export class FakeWebSocketInterface { await Promise.resolve(); } - async observesConnectionState(connectionState: ConnectionHealthState) { + async observesConnectionState(connectionState: ConnectionState) { return new Promise((res, rej) => { - this.allConnectionHealthStateObserver().subscribe(value => { + this.allConnectionStateObserver().subscribe(value => { if (value === connectionState) { res(undefined); } @@ -166,19 +165,19 @@ export class FakeWebSocketInterface { }); } - async waitForConnectionState(connectionStates: ConnectionHealthState[]) { + async waitForConnectionState(connectionStates: ConnectionState[]) { return new Promise((res, rej) => { - this.connectionHealthStateObserver().subscribe(value => { - if (connectionStates.includes(String(value) as ConnectionHealthState)) { + this.ConnectionStateObserver().subscribe(value => { + if (connectionStates.includes(String(value) as ConnectionState)) { res(undefined); } }); }); } - async waitUntilConnectionStateIn(connectionStates: ConnectionHealthState[]) { + async waitUntilConnectionStateIn(connectionStates: ConnectionState[]) { return new Promise((res, rej) => { - if (connectionStates.includes(this.currentConnectionHealthState)) { + if (connectionStates.includes(this.currentConnectionState)) { res(undefined); } res(this.waitForConnectionState(connectionStates)); diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 8ddb4a4985e..07a6b752f1a 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -99,15 +99,15 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { constructor(options: ProviderOptions = {}) { super(options); // Monitor the connection health state and pass changes along to Hub - this.connectionStateMonitor.connectionHealthStateObservable.subscribe( - connectionHealthState => { + this.connectionStateMonitor.ConnectionStateObservable.subscribe( + ConnectionState => { dispatchApiEvent( CONNECTION_STATE_CHANGE, { provider: this, - connectionState: connectionHealthState, + connectionState: ConnectionState, }, - `Connection state is ${connectionHealthState}` + `Connection state is ${ConnectionState}` ); } ); diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index 1e2fa27689c..0d4382e7789 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -22,9 +22,9 @@ enum CONTROL_MSG { TIMEOUT_DISCONNECT = 'Timeout disconnect', } -export const CONNECTION_STATE_CHANGE = 'ConnectionHealthChange'; +export const CONNECTION_STATE_CHANGE = 'ConnectionStateChange'; -export { ConnectionState as ConnectionHealthState } from './utils/ConnectionStateMonitor'; +export { ConnectionState } from './utils/ConnectionStateMonitor'; export { PubSub, CONTROL_MSG }; diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index 2761fb8726b..330848a25cf 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -16,12 +16,12 @@ import Observable, { ZenObservable } from 'zen-observable-ts'; // Internal types for tracking different connection states type LinkedConnectionState = 'connected' | 'disconnected'; -type LinkedConnectionHealthState = 'healthy' | 'unhealthy'; +type LinkedHealthState = 'healthy' | 'unhealthy'; type LinkedConnectionStates = { networkState: LinkedConnectionState; connectionState: LinkedConnectionState | 'connecting'; intendedConnectionState: LinkedConnectionState; - keepAliveState: LinkedConnectionHealthState; + keepAliveState: LinkedHealthState; }; export type ConnectionState = @@ -66,16 +66,16 @@ export class ConnectionStateMonitor { } /** - * Get the observable that allows us to monitor the connection health + * Get the observable that allows us to monitor the connection state * - * @returns {Observable} - The observable that emits ConnectionHealthState updates + * @returns {Observable} - The observable that emits ConnectionState updates */ - public get connectionHealthStateObservable(): Observable { - // Translate from connection states to ConnectionHealthStates, then remove any duplicates + public get ConnectionStateObservable(): Observable { + // Translate from connection states to ConnectionStates, then remove any duplicates let previous: ConnectionState; return this._connectionStateObservable .map(value => { - return this.connectionStateToConnectionHealth(value); + return this.linkedConnectionStatesToConnectionState(value); }) .filter(current => { const toInclude = current !== previous; @@ -155,9 +155,9 @@ export class ConnectionStateMonitor { } /* - * Translate the ConnectionState structure into a specific ConnectionHealthState string literal union + * Translate the ConnectionState structure into a specific ConnectionState string literal union */ - private connectionStateToConnectionHealth({ + private linkedConnectionStatesToConnectionState({ connectionState, networkState, intendedConnectionState, From cd04b3a765a6d1b2023b008aef3297d704b9497d Mon Sep 17 00:00:00 2001 From: Aaron S Date: Wed, 13 Jul 2022 15:34:37 -0500 Subject: [PATCH 09/15] fix: Found even more overlooked health state refactoring --- .../__tests__/ConnectionStateMonitor.tests.ts | 20 +++++++++---------- packages/pubsub/__tests__/helpers.ts | 4 ++-- .../AWSAppSyncRealTimeProvider/index.ts | 4 ++-- .../src/utils/ConnectionStateMonitor.ts | 2 +- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts index 7a6b1610b62..a4a9aa5d79c 100644 --- a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -53,16 +53,16 @@ describe('ConnectionStateMonitor', () => { observedStates = []; subscription?.unsubscribe(); monitor = new ConnectionStateMonitor(); - subscription = monitor.ConnectionStateObservable.subscribe(value => { + subscription = monitor.connectionStateObservable.subscribe(value => { observedStates.push(value); }); }); - test('connection health states starts out disconnected', () => { + test('connection states starts out disconnected', () => { expect(observedStates).toEqual(['Disconnected']); }); - test('standard health states connection pattern', () => { + test('standard states connection pattern', () => { monitor.openingConnection(); monitor.connectionEstablished(); expect(observedStates).toEqual([ @@ -72,7 +72,7 @@ describe('ConnectionStateMonitor', () => { ]); }); - test('connection health states when the network is lost while connected', () => { + test('connection states when the network is lost while connected', () => { monitor.openingConnection(); monitor.connectionEstablished(); reachabilityObserver?.next?.({ online: false }); @@ -84,7 +84,7 @@ describe('ConnectionStateMonitor', () => { ]); }); - test('connection health states when the network is lost and the connection times out', () => { + test('connection states when the network is lost and the connection times out', () => { monitor.openingConnection(); monitor.connectionEstablished(); reachabilityObserver?.next?.({ online: false }); @@ -98,7 +98,7 @@ describe('ConnectionStateMonitor', () => { ]); }); - test('connection health states when the network is lost, the connection times out and then the network recovers', () => { + test('connection states when the network is lost, the connection times out and then the network recovers', () => { monitor.openingConnection(); monitor.connectionEstablished(); reachabilityObserver?.next?.({ online: false }); @@ -114,7 +114,7 @@ describe('ConnectionStateMonitor', () => { ]); }); - test('connection health states when a connection is no longer needed', () => { + test('connection states when a connection is no longer needed', () => { monitor.openingConnection(); monitor.connectionEstablished(); monitor.closing(); @@ -127,7 +127,7 @@ describe('ConnectionStateMonitor', () => { ]); }); - test('connection health states when a connection is no longer needed closed', () => { + test('connection states when a connection is no longer needed closed', () => { monitor.openingConnection(); monitor.connectionEstablished(); monitor.closing(); @@ -142,7 +142,7 @@ describe('ConnectionStateMonitor', () => { ]); }); - test('connection health states when a connection misses a keepalive, and then recovers', () => { + test('connection states when a connection misses a keepalive, and then recovers', () => { monitor.openingConnection(); monitor.connectionEstablished(); monitor.keepAliveMissed(); @@ -213,7 +213,7 @@ describe('ConnectionStateMonitor', () => { observedStates = []; subscription?.unsubscribe(); monitor = new ConnectionStateMonitor(); - subscription = monitor.ConnectionStateObservable.subscribe(value => { + subscription = monitor.connectionStateObservable.subscribe(value => { observedStates.push(value); }); }); diff --git a/packages/pubsub/__tests__/helpers.ts b/packages/pubsub/__tests__/helpers.ts index 0bc07be6024..04ffd86cd6e 100644 --- a/packages/pubsub/__tests__/helpers.ts +++ b/packages/pubsub/__tests__/helpers.ts @@ -55,7 +55,7 @@ export class FakeWebSocketInterface { }); } - ConnectionStateObserver() { + connectionStateObserver() { return new Observable(observer => { this.connectionStateObservers.push(observer); }); @@ -167,7 +167,7 @@ export class FakeWebSocketInterface { async waitForConnectionState(connectionStates: ConnectionState[]) { return new Promise((res, rej) => { - this.ConnectionStateObserver().subscribe(value => { + this.connectionStateObserver().subscribe(value => { if (connectionStates.includes(String(value) as ConnectionState)) { res(undefined); } diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 07a6b752f1a..f099f7122c6 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -98,8 +98,8 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { constructor(options: ProviderOptions = {}) { super(options); - // Monitor the connection health state and pass changes along to Hub - this.connectionStateMonitor.ConnectionStateObservable.subscribe( + // Monitor the connection state and pass changes along to Hub + this.connectionStateMonitor.connectionStateObservable.subscribe( ConnectionState => { dispatchApiEvent( CONNECTION_STATE_CHANGE, diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index 330848a25cf..b83727089d2 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -70,7 +70,7 @@ export class ConnectionStateMonitor { * * @returns {Observable} - The observable that emits ConnectionState updates */ - public get ConnectionStateObservable(): Observable { + public get connectionStateObservable(): Observable { // Translate from connection states to ConnectionStates, then remove any duplicates let previous: ConnectionState; return this._connectionStateObservable From 03295c034739dce5d54c4cc0bb8da5f9f38dc5db Mon Sep 17 00:00:00 2001 From: Aaron S Date: Tue, 19 Jul 2022 15:07:32 -0500 Subject: [PATCH 10/15] fix: Enum connection states, record based changes and other minor changes from comments --- .../AWSAppSyncRealTimeProvider.test.ts | 35 ++--- .../__tests__/ConnectionStateMonitor.tests.ts | 132 +++++++++--------- packages/pubsub/__tests__/helpers.ts | 82 ++++++++--- .../AWSAppSyncRealTimeProvider/index.ts | 34 +++-- packages/pubsub/src/index.ts | 3 +- packages/pubsub/src/types/index.ts | 36 +++++ .../src/utils/ConnectionStateMonitor.ts | 126 ++++++----------- 7 files changed, 251 insertions(+), 197 deletions(-) diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index e04bd3fef90..3cd152cc32c 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -18,6 +18,7 @@ import { MESSAGE_TYPES } from '../src/Providers/AWSAppSyncRealTimeProvider/const import * as constants from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; import { delay, FakeWebSocketInterface, replaceConstant } from './helpers'; +import { ConnectionState as CS } from '../src'; import { AWSAppSyncRealTimeProvider } from '../src/Providers/AWSAppSyncRealTimeProvider'; @@ -136,26 +137,26 @@ describe('AWSAppSyncRealTimeProvider', () => { ); await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - 'Connected', + CS.Connected, ]); expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', + CS.Disconnected, + CS.Connecting, + CS.Connected, ]); subscription.unsubscribe(); await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - 'ConnectedPendingDisconnect', + CS.ConnectedPendingDisconnect, ]); expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', - 'ConnectedPendingDisconnect', - 'Disconnected', + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingDisconnect, + CS.Disconnected, ]); }); @@ -288,7 +289,7 @@ describe('AWSAppSyncRealTimeProvider', () => { await fakeWebSocketInterface?.triggerClose(); await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - 'Disconnected', + CS.Disconnected, ]); // Watching for raised exception to be caught and logged expect(loggerSpy).toBeCalledWith( @@ -598,16 +599,16 @@ describe('AWSAppSyncRealTimeProvider', () => { ); await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - 'Connected', + CS.Connected, ]); // Wait until the socket is automatically disconnected await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - 'ConnectionDisrupted', + CS.ConnectionDisrupted, ]); expect(fakeWebSocketInterface?.observedConnectionStates).toContain( - 'ConnectedPendingKeepAlive' + CS.ConnectedPendingKeepAlive ); expect(loggerSpy).toBeCalledWith( @@ -650,7 +651,7 @@ describe('AWSAppSyncRealTimeProvider', () => { // Wait until the socket is automatically disconnected await fakeWebSocketInterface?.waitForConnectionState([ - 'Disconnected', + CS.Disconnected, ]); expect(loggerSpy).toBeCalledWith( @@ -680,7 +681,7 @@ describe('AWSAppSyncRealTimeProvider', () => { // Wait until the socket is automatically disconnected await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - 'Disconnected', + CS.Disconnected, ]); // Watching for raised exception to be caught and logged @@ -815,7 +816,7 @@ describe('AWSAppSyncRealTimeProvider', () => { // Wait until the socket is automatically disconnected await fakeWebSocketInterface?.waitUntilConnectionStateIn([ - 'Disconnected', + CS.Disconnected, ]); expect(loggerSpy).toHaveBeenCalledWith( diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts index a4a9aa5d79c..4f4b00a281b 100644 --- a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -25,13 +25,14 @@ jest.mock('@aws-amplify/core', () => ({ import Observable from 'zen-observable-ts'; import { Reachability } from '@aws-amplify/core'; import { - ConnectionState, ConnectionStateMonitor, + CONNECTION_CHANGE, } from '../src/utils/ConnectionStateMonitor'; +import { ConnectionState as CS } from '../src'; describe('ConnectionStateMonitor', () => { let monitor: ConnectionStateMonitor; - let observedStates: ConnectionState[]; + let observedStates: CS[]; let subscription: ZenObservable.Subscription; let reachabilityObserver: ZenObservable.Observer<{ online: boolean }>; @@ -63,90 +64,90 @@ describe('ConnectionStateMonitor', () => { }); test('standard states connection pattern', () => { - monitor.openingConnection(); - monitor.connectionEstablished(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); expect(observedStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', + CS.Disconnected, + CS.Connecting, + CS.Connected, ]); }); test('connection states when the network is lost while connected', () => { - monitor.openingConnection(); - monitor.connectionEstablished(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); reachabilityObserver?.next?.({ online: false }); expect(observedStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', - 'ConnectedPendingNetwork', + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingNetwork, ]); }); test('connection states when the network is lost and the connection times out', () => { - monitor.openingConnection(); - monitor.connectionEstablished(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); reachabilityObserver?.next?.({ online: false }); - monitor.closed(); + monitor.record(CONNECTION_CHANGE.CLOSED); expect(observedStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', - 'ConnectedPendingNetwork', - 'ConnectionDisruptedPendingNetwork', + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingNetwork, + CS.ConnectionDisruptedPendingNetwork, ]); }); test('connection states when the network is lost, the connection times out and then the network recovers', () => { - monitor.openingConnection(); - monitor.connectionEstablished(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); reachabilityObserver?.next?.({ online: false }); - monitor.closed(); + monitor.record(CONNECTION_CHANGE.CLOSED); reachabilityObserver?.next?.({ online: true }); expect(observedStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', - 'ConnectedPendingNetwork', - 'ConnectionDisruptedPendingNetwork', - 'ConnectionDisrupted', + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingNetwork, + CS.ConnectionDisruptedPendingNetwork, + CS.ConnectionDisrupted, ]); }); test('connection states when a connection is no longer needed', () => { - monitor.openingConnection(); - monitor.connectionEstablished(); - monitor.closing(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + monitor.record(CONNECTION_CHANGE.CLOSING); expect(observedStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', - 'ConnectedPendingDisconnect', + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingDisconnect, ]); }); test('connection states when a connection is no longer needed closed', () => { - monitor.openingConnection(); - monitor.connectionEstablished(); - monitor.closing(); - monitor.closed(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + monitor.record(CONNECTION_CHANGE.CLOSING); + monitor.record(CONNECTION_CHANGE.CLOSED); expect(observedStates).toEqual([ - 'Disconnected', - 'Connecting', - 'Connected', - 'ConnectedPendingDisconnect', - 'Disconnected', + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingDisconnect, + CS.Disconnected, ]); }); test('connection states when a connection misses a keepalive, and then recovers', () => { - monitor.openingConnection(); - monitor.connectionEstablished(); - monitor.keepAliveMissed(); - monitor.keepAlive(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); expect(observedStates).toEqual([ 'Disconnected', @@ -158,12 +159,13 @@ describe('ConnectionStateMonitor', () => { }); test('lots of keep alive messages dont add more connection state events', () => { - monitor.openingConnection(); - monitor.keepAlive(); - monitor.connectionEstablished(); - monitor.keepAlive(); - monitor.keepAlive(); - monitor.keepAlive(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + expect(observedStates).toEqual([ 'Disconnected', 'Connecting', @@ -172,11 +174,11 @@ describe('ConnectionStateMonitor', () => { }); test('missed keep alives during a network outage dont add an additional state change', () => { - monitor.openingConnection(); - monitor.connectionEstablished(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); reachabilityObserver?.next?.({ online: false }); - monitor.keepAliveMissed(); - monitor.keepAliveMissed(); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); expect(observedStates).toEqual([ 'Disconnected', @@ -187,13 +189,13 @@ describe('ConnectionStateMonitor', () => { }); test('when the network recovers, keep alives become the concern until one is seen', () => { - monitor.openingConnection(); - monitor.connectionEstablished(); + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); reachabilityObserver?.next?.({ online: false }); - monitor.keepAliveMissed(); - monitor.keepAliveMissed(); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); reachabilityObserver?.next?.({ online: true }); - monitor.keepAlive(); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); expect(observedStates).toEqual([ 'Disconnected', diff --git a/packages/pubsub/__tests__/helpers.ts b/packages/pubsub/__tests__/helpers.ts index 04ffd86cd6e..959522341be 100644 --- a/packages/pubsub/__tests__/helpers.ts +++ b/packages/pubsub/__tests__/helpers.ts @@ -1,6 +1,6 @@ import { Hub } from '@aws-amplify/core'; import Observable from 'zen-observable-ts'; -import { ConnectionState, CONNECTION_STATE_CHANGE } from '../src'; +import { ConnectionState as CS, CONNECTION_STATE_CHANGE } from '../src'; import * as constants from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; export function delay(timeout) { @@ -16,12 +16,11 @@ export class FakeWebSocketInterface { readyForUse: Promise; hasClosed: Promise; teardownHubListener: () => void; - observedConnectionStates: ConnectionState[] = []; - currentConnectionState: ConnectionState; + observedConnectionStates: CS[] = []; + currentConnectionState: CS; private readyResolve: (value: PromiseLike) => void; - private connectionStateObservers: ZenObservable.Observer[] = - []; + private connectionStateObservers: ZenObservable.Observer[] = []; constructor() { this.readyForUse = new Promise((res, rej) => { @@ -36,7 +35,7 @@ export class FakeWebSocketInterface { this.teardownHubListener = Hub.listen('api', (data: any) => { const { payload } = data; if (payload.event === CONNECTION_STATE_CHANGE) { - const connectionState = payload.data.connectionState as ConnectionState; + const connectionState = payload.data.connectionState as CS; this.observedConnectionStates.push(connectionState); this.connectionStateObservers.forEach(observer => { observer?.next?.(connectionState); @@ -46,6 +45,9 @@ export class FakeWebSocketInterface { }); } + /** + * @returns {Observable} - The observable that emits all ConnectionState updates (past and future) + */ allConnectionStateObserver() { return new Observable(observer => { this.observedConnectionStates.forEach(state => { @@ -55,12 +57,17 @@ export class FakeWebSocketInterface { }); } + /** + * @returns {Observable} - The observable that emits ConnectionState updates (past and future) + */ connectionStateObserver() { return new Observable(observer => { this.connectionStateObservers.push(observer); }); } - + /** + * Tear down the Fake Socket state + */ teardown() { this.teardownHubListener(); this.connectionStateObservers.forEach(observer => { @@ -68,23 +75,30 @@ export class FakeWebSocketInterface { }); } + /** + * Once ready for use, send onOpen and the connection_ack + */ async standardConnectionHandshake() { await this.readyForUse; await this.triggerOpen(); await this.handShakeMessage(); } + /** + * After an open is triggered, the provider has logic that must execute + * which changes the function resolvers assigned to the websocket + */ async triggerOpen() { - // After an open is triggered, the provider has logic that must execute - // which changes the function resolvers assigned to the websocket await this.runAndResolve(() => { this.webSocket.onopen(new Event('', {})); }); } + /** + * After a close is triggered, the provider has logic that must execute + * which changes the function resolvers assigned to the websocket + */ async triggerClose() { - // After a close is triggered, the provider has logic that must execute - // which changes the function resolvers assigned to the websocket await this.runAndResolve(() => { if (this.webSocket.onclose) { try { @@ -94,6 +108,9 @@ export class FakeWebSocketInterface { }); } + /** + * Close the interface and wait until the connection is either disconnected or disrupted + */ async closeInterface() { await this.triggerClose(); // Wait for either hasClosed or a half second has passed @@ -101,26 +118,35 @@ export class FakeWebSocketInterface { // The interface is closed when the socket "hasClosed" this.hasClosed.then(() => res(undefined)); await this.waitUntilConnectionStateIn([ - 'Disconnected', - 'ConnectionDisrupted', + CS.Disconnected, + CS.ConnectionDisrupted, ]); res(undefined); }); } + /** + * After an error is triggered, the provider has logic that must execute + * which changes the function resolvers assigned to the websocket + */ async triggerError() { - // After an error is triggered, the provider has logic that must execute - // which changes the function resolvers assigned to the websocket await this.runAndResolve(() => { this.webSocket.onerror(new Event('TestError', {})); }); } + /** + * Produce a websocket with a short delay to mimic reality + * @returns A websocket + */ newWebSocket() { setTimeout(() => this.readyResolve(Promise.resolve()), 10); return this.webSocket; } + /** + * Send a connection_ack + */ async handShakeMessage() { await this.sendMessage( new MessageEvent('connection_ack', { @@ -132,6 +158,9 @@ export class FakeWebSocketInterface { ); } + /** + * Send a data message + */ async sendDataMessage(data: {}) { await this.sendMessage( new MessageEvent('data', { @@ -143,6 +172,9 @@ export class FakeWebSocketInterface { ); } + /** + * Emit a message on the socket + */ async sendMessage(message: MessageEvent) { // After a message is sent, it takes a few ms for it to enact provider behavior await this.runAndResolve(() => { @@ -150,12 +182,18 @@ export class FakeWebSocketInterface { }); } + /** + * Run a gicommand and resolve to allow internal behavior to execute + */ async runAndResolve(fn) { fn(); await Promise.resolve(); } - async observesConnectionState(connectionState: ConnectionState) { + /** + * DELETE THIS? + */ + async observesConnectionState(connectionState: CS) { return new Promise((res, rej) => { this.allConnectionStateObserver().subscribe(value => { if (value === connectionState) { @@ -165,17 +203,23 @@ export class FakeWebSocketInterface { }); } - async waitForConnectionState(connectionStates: ConnectionState[]) { + /** + * @returns a Promise that will wait for one of the provided states to be observed + */ + async waitForConnectionState(connectionStates: CS[]) { return new Promise((res, rej) => { this.connectionStateObserver().subscribe(value => { - if (connectionStates.includes(String(value) as ConnectionState)) { + if (connectionStates.includes(String(value) as CS)) { res(undefined); } }); }); } - async waitUntilConnectionStateIn(connectionStates: ConnectionState[]) { + /** + * @returns a Promise that will wait until the current state is one of the provided states + */ + async waitUntilConnectionStateIn(connectionStates: CS[]) { return new Promise((res, rej) => { if (connectionStates.includes(this.currentConnectionState)) { res(undefined); diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index f099f7122c6..5d1f870121c 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -31,6 +31,7 @@ import Cache from '@aws-amplify/cache'; import Auth, { GRAPHQL_AUTH_MODE } from '@aws-amplify/auth'; import { AbstractPubSubProvider } from '../PubSubProvider'; import { CONNECTION_STATE_CHANGE, CONTROL_MSG } from '../../index'; + import { AMPLIFY_SYMBOL, AWS_APPSYNC_REALTIME_HEADERS, @@ -44,7 +45,10 @@ import { START_ACK_TIMEOUT, SUBSCRIPTION_STATUS, } from './constants'; -import { ConnectionStateMonitor } from '../../utils/ConnectionStateMonitor'; +import { + ConnectionStateMonitor, + CONNECTION_CHANGE, +} from '../../utils/ConnectionStateMonitor'; const logger = new Logger('AWSAppSyncRealTimeProvider'); @@ -168,7 +172,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }, ], }); - this.connectionStateMonitor.closed(); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); observer.complete(); }); @@ -274,7 +278,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { const stringToAWSRealTime = JSON.stringify(subscriptionMessage); try { - this.connectionStateMonitor.openingConnection(); + this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); await this._initializeWebSocketConnection({ apiKey, appSyncGraphqlEndpoint, @@ -285,7 +289,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { } catch (err) { logger.debug({ err }); const message = err['message'] ?? ''; - this.connectionStateMonitor.closed(); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); observer.error({ errors: [ { @@ -391,7 +395,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { return; } - this.connectionStateMonitor.closing(); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING); if (this.awsRealTimeSocket.bufferedAmount > 0) { // Still data on the WebSocket @@ -411,7 +415,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { tempSocket.close(1000); this.awsRealTimeSocket = undefined; this.socketStatus = SOCKET_STATUS.CLOSED; - this.connectionStateMonitor.closed(); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); } } @@ -465,7 +469,9 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { subscriptionFailedCallback, }); } - this.connectionStateMonitor.connectionEstablished(); + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_ESTABLISHED + ); return; } @@ -479,9 +485,9 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.keepAliveTimeout ); this.keepAliveAlertTimeoutId = setTimeout(() => { - this.connectionStateMonitor.keepAliveMissed(); + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); }, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT); - this.connectionStateMonitor.keepAlive(); + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE); return; } @@ -528,7 +534,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }); this.subscriptionObserverMap.clear(); if (this.awsRealTimeSocket) { - this.connectionStateMonitor.closed(); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); this.awsRealTimeSocket.close(); } @@ -670,7 +676,9 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { logger.debug(`WebSocket connection error`); }; newSocket.onclose = () => { - this.connectionStateMonitor.connectionFailed(); + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_FAILED + ); rej(new Error('Connection handshake error')); }; newSocket.onopen = () => { @@ -744,7 +752,9 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { function checkAckOk(ackOk: boolean) { if (!ackOk) { - this.connectionStateMonitor.connectionFailed(); + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_FAILED + ); rej( new Error( `Connection timeout: ack from AWSRealTime was not received on ${CONNECTION_INIT_TIMEOUT} ms` diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index 0d4382e7789..cf84732d317 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -23,8 +23,7 @@ enum CONTROL_MSG { } export const CONNECTION_STATE_CHANGE = 'ConnectionStateChange'; - -export { ConnectionState } from './utils/ConnectionStateMonitor'; +export { ConnectionState } from './types'; export { PubSub, CONTROL_MSG }; diff --git a/packages/pubsub/src/types/index.ts b/packages/pubsub/src/types/index.ts index 99cfd92756d..bd6450c9a58 100644 --- a/packages/pubsub/src/types/index.ts +++ b/packages/pubsub/src/types/index.ts @@ -19,3 +19,39 @@ export interface SubscriptionObserver { error(errorValue: any): void; complete(): void; } + +/** @enum {string} */ +export enum ConnectionState { + /* + * The connection is alive and healthy + */ + Connected = 'Connected', + /* + * The connection is alive, but the connection is offline + */ + ConnectedPendingNetwork = 'ConnectedPendingNetwork', + /* + * The connection has been disconnected while in use + */ + ConnectionDisrupted = 'ConnectionDisrupted', + /* + * The connection has been disconnected and the network is offline + */ + ConnectionDisruptedPendingNetwork = 'ConnectionDisruptedPendingNetwork', + /* + * The connection is in the process of connecting + */ + Connecting = 'Connecting', + /* + * The connection is not in use and is being disconnected + */ + ConnectedPendingDisconnect = 'ConnectedPendingDisconnect', + /* + * The connection is not in use and has been disconnected + */ + Disconnected = 'Disconnected', + /* + * The connection is alive, but a keep alive message has been missed + */ + ConnectedPendingKeepAlive = 'ConnectedPendingKeepAlive', +} diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index b83727089d2..babdef66f6a 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -13,6 +13,7 @@ import { Reachability } from '@aws-amplify/core'; import Observable, { ZenObservable } from 'zen-observable-ts'; +import { ConnectionState } from '../index'; // Internal types for tracking different connection states type LinkedConnectionState = 'connected' | 'disconnected'; @@ -24,15 +25,34 @@ type LinkedConnectionStates = { keepAliveState: LinkedHealthState; }; -export type ConnectionState = - | 'Connected' - | 'ConnectedPendingNetwork' - | 'ConnectionDisrupted' - | 'ConnectionDisruptedPendingNetwork' - | 'Connecting' - | 'ConnectedPendingDisconnect' - | 'Disconnected' - | 'ConnectedPendingKeepAlive'; +export const CONNECTION_CHANGE: { + [key in + | 'KEEP_ALIVE_MISSED' + | 'KEEP_ALIVE' + | 'CONNECTION_ESTABLISHED' + | 'CONNECTION_FAILED' + | 'CLOSING' + | 'OPENING_CONNECTION' + | 'CLOSED' + | 'ONLINE' + | 'OFFLINE']: Partial; +} = { + KEEP_ALIVE_MISSED: { keepAliveState: 'unhealthy' }, + KEEP_ALIVE: { keepAliveState: 'healthy' }, + CONNECTION_ESTABLISHED: { connectionState: 'connected' }, + CONNECTION_FAILED: { + intendedConnectionState: 'disconnected', + connectionState: 'disconnected', + }, + CLOSING: { intendedConnectionState: 'disconnected' }, + OPENING_CONNECTION: { + intendedConnectionState: 'connected', + connectionState: 'connecting', + }, + CLOSED: { connectionState: 'disconnected' }, + ONLINE: { networkState: 'connected' }, + OFFLINE: { networkState: 'disconnected' }, +}; export class ConnectionStateMonitor { /** @@ -59,9 +79,9 @@ export class ConnectionStateMonitor { // Maintain the network state based on the reachability monitor new Reachability().networkMonitor().subscribe(({ online }) => { - this.updateConnectionState({ - networkState: online ? 'connected' : 'disconnected', - }); + this.record( + online ? CONNECTION_CHANGE.ONLINE : CONNECTION_CHANGE.OFFLINE + ); }); } @@ -84,74 +104,16 @@ export class ConnectionStateMonitor { }); } - /** - * Tell the monitor that the connection has been disconnected - */ - closed() { - this.updateConnectionState({ connectionState: 'disconnected' }); - } - - /** - * Tell the monitor that the connection is opening - */ - openingConnection() { - this.updateConnectionState({ - intendedConnectionState: 'connected', - connectionState: 'connecting', - }); - } - - /** - * Tell the monitor that the connection is disconnecting - */ - closing() { - this.updateConnectionState({ intendedConnectionState: 'disconnected' }); - } - - /** - * Tell the monitor that the connection has failed - */ - connectionFailed() { - this.updateConnectionState({ - intendedConnectionState: 'disconnected', - connectionState: 'disconnected', - }); - } - - /** - * Tell the monitor that the connection has been established - */ - connectionEstablished() { - this.updateConnectionState({ connectionState: 'connected' }); - } - - /** - * Tell the monitor that a keep alive has occurred - */ - keepAlive() { - this.updateConnectionState({ keepAliveState: 'healthy' }); - } - - /** - * Tell the monitor that a keep alive has been missed - */ - keepAliveMissed() { - this.updateConnectionState({ keepAliveState: 'unhealthy' }); - } - - /** - * @private + /* + * Updates local connection state and emits the full state to the observer. */ - - private updateConnectionState( - statusUpdates: Partial - ) { + record(statusUpdates: Partial) { // Maintain the socket state const newSocketStatus = { ...this._connectionState, ...statusUpdates }; this._connectionState = { ...newSocketStatus }; - this._connectionStateObserver.next({ ...this._connectionState }); + this._connectionStateObserver.next(this._connectionState); } /* @@ -164,33 +126,33 @@ export class ConnectionStateMonitor { keepAliveState, }: LinkedConnectionStates): ConnectionState { if (connectionState === 'connected' && networkState === 'disconnected') - return 'ConnectedPendingNetwork'; + return ConnectionState.ConnectedPendingNetwork; if ( connectionState === 'connected' && intendedConnectionState === 'disconnected' ) - return 'ConnectedPendingDisconnect'; + return ConnectionState.ConnectedPendingDisconnect; if ( connectionState === 'disconnected' && intendedConnectionState === 'connected' && networkState === 'disconnected' ) - return 'ConnectionDisruptedPendingNetwork'; + return ConnectionState.ConnectionDisruptedPendingNetwork; if ( connectionState === 'disconnected' && intendedConnectionState === 'connected' ) - return 'ConnectionDisrupted'; + return ConnectionState.ConnectionDisrupted; if (connectionState === 'connected' && keepAliveState === 'unhealthy') - return 'ConnectedPendingKeepAlive'; + return ConnectionState.ConnectedPendingKeepAlive; // All remaining states directly correspond to the connection state - if (connectionState === 'connecting') return 'Connecting'; - if (connectionState === 'disconnected') return 'Disconnected'; - return 'Connected'; + if (connectionState === 'connecting') return ConnectionState.Connecting; + if (connectionState === 'disconnected') return ConnectionState.Disconnected; + return ConnectionState.Connected; } } From 0bc54bd13999a22200cea8fa18e35d54894d3253 Mon Sep 17 00:00:00 2001 From: Aaron S Date: Thu, 21 Jul 2022 11:57:50 -0500 Subject: [PATCH 11/15] fix: State variable naming and related comments for clarity --- .../src/utils/ConnectionStateMonitor.ts | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index babdef66f6a..898f4afa6f3 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -59,8 +59,8 @@ export class ConnectionStateMonitor { * @private */ private _connectionState: LinkedConnectionStates; - private _connectionStateObservable: Observable; - private _connectionStateObserver: ZenObservable.SubscriptionObserver; + private _linkedConnectionStateObservable: Observable; + private _linkedConnectionStateObserver: ZenObservable.SubscriptionObserver; constructor() { this._connectionState = { @@ -70,12 +70,11 @@ export class ConnectionStateMonitor { keepAliveState: 'healthy', }; - this._connectionStateObservable = new Observable( - connectionStateObserver => { + this._linkedConnectionStateObservable = + new Observable(connectionStateObserver => { connectionStateObserver.next(this._connectionState); - this._connectionStateObserver = connectionStateObserver; - } - ); + this._linkedConnectionStateObserver = connectionStateObserver; + }); // Maintain the network state based on the reachability monitor new Reachability().networkMonitor().subscribe(({ online }) => { @@ -91,9 +90,13 @@ export class ConnectionStateMonitor { * @returns {Observable} - The observable that emits ConnectionState updates */ public get connectionStateObservable(): Observable { - // Translate from connection states to ConnectionStates, then remove any duplicates let previous: ConnectionState; - return this._connectionStateObservable + + // The linked state aggregates state changes to any of the network, connection, intendedConnection and keepAliveHealth. + // Some states will change these independent states without changing the overall connection state. + + // After translating from linked states to ConnectionState, then remove any duplicates + return this._linkedConnectionStateObservable .map(value => { return this.linkedConnectionStatesToConnectionState(value); }) @@ -113,7 +116,7 @@ export class ConnectionStateMonitor { this._connectionState = { ...newSocketStatus }; - this._connectionStateObserver.next(this._connectionState); + this._linkedConnectionStateObserver.next(this._connectionState); } /* From d1e3cc83fd1c4679b6f5efe412b6685756bf4eeb Mon Sep 17 00:00:00 2001 From: Aaron S Date: Thu, 21 Jul 2022 13:18:16 -0500 Subject: [PATCH 12/15] fix: Comment line length --- .../src/utils/ConnectionStateMonitor.ts | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index 898f4afa6f3..5c222f6e411 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -58,12 +58,12 @@ export class ConnectionStateMonitor { /** * @private */ - private _connectionState: LinkedConnectionStates; + private _linkedConnectionState: LinkedConnectionStates; private _linkedConnectionStateObservable: Observable; private _linkedConnectionStateObserver: ZenObservable.SubscriptionObserver; constructor() { - this._connectionState = { + this._linkedConnectionState = { networkState: 'connected', connectionState: 'disconnected', intendedConnectionState: 'disconnected', @@ -72,7 +72,7 @@ export class ConnectionStateMonitor { this._linkedConnectionStateObservable = new Observable(connectionStateObserver => { - connectionStateObserver.next(this._connectionState); + connectionStateObserver.next(this._linkedConnectionState); this._linkedConnectionStateObserver = connectionStateObserver; }); @@ -92,8 +92,9 @@ export class ConnectionStateMonitor { public get connectionStateObservable(): Observable { let previous: ConnectionState; - // The linked state aggregates state changes to any of the network, connection, intendedConnection and keepAliveHealth. - // Some states will change these independent states without changing the overall connection state. + // The linked state aggregates state changes to any of the network, connection, + // intendedConnection and keepAliveHealth. Some states will change these independent + // states without changing the overall connection state. // After translating from linked states to ConnectionState, then remove any duplicates return this._linkedConnectionStateObservable @@ -112,11 +113,14 @@ export class ConnectionStateMonitor { */ record(statusUpdates: Partial) { // Maintain the socket state - const newSocketStatus = { ...this._connectionState, ...statusUpdates }; + const newSocketStatus = { + ...this._linkedConnectionState, + ...statusUpdates, + }; - this._connectionState = { ...newSocketStatus }; + this._linkedConnectionState = { ...newSocketStatus }; - this._linkedConnectionStateObserver.next(this._connectionState); + this._linkedConnectionStateObserver.next(this._linkedConnectionState); } /* From 995b67fe3093bf927e439018d986fae8f69c3be3 Mon Sep 17 00:00:00 2001 From: Aaron S Date: Tue, 26 Jul 2022 14:59:58 -0500 Subject: [PATCH 13/15] fix: Update network monitor subscription management and name changes from PR comments --- .../__tests__/ConnectionStateMonitor.tests.ts | 14 ++++++-- .../AWSAppSyncRealTimeProvider/index.ts | 8 ++++- .../src/utils/ConnectionStateMonitor.ts | 33 +++++++++++++++---- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts index 4f4b00a281b..ac8963fd5fe 100644 --- a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -57,6 +57,11 @@ describe('ConnectionStateMonitor', () => { subscription = monitor.connectionStateObservable.subscribe(value => { observedStates.push(value); }); + monitor.enableNetworkMonitoring(); + }); + + afterEach(() => { + monitor.disableNetworkMonitoring(); }); test('connection states starts out disconnected', () => { @@ -118,7 +123,7 @@ describe('ConnectionStateMonitor', () => { test('connection states when a connection is no longer needed', () => { monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); - monitor.record(CONNECTION_CHANGE.CLOSING); + monitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION); expect(observedStates).toEqual([ CS.Disconnected, @@ -131,7 +136,7 @@ describe('ConnectionStateMonitor', () => { test('connection states when a connection is no longer needed closed', () => { monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); - monitor.record(CONNECTION_CHANGE.CLOSING); + monitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION); monitor.record(CONNECTION_CHANGE.CLOSED); expect(observedStates).toEqual([ @@ -218,6 +223,11 @@ describe('ConnectionStateMonitor', () => { subscription = monitor.connectionStateObservable.subscribe(value => { observedStates.push(value); }); + monitor.enableNetworkMonitoring(); + }); + + afterEach(() => { + monitor.disableNetworkMonitoring(); }); test('starts out disconnected', () => { diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 5d1f870121c..bdb71438a68 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -144,6 +144,8 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { ): Observable { const appSyncGraphqlEndpoint = options?.appSyncGraphqlEndpoint; + this.connectionStateMonitor.enableNetworkMonitoring(); + return new Observable(observer => { if (!options || !appSyncGraphqlEndpoint) { observer.error({ @@ -173,6 +175,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { ], }); this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this.connectionStateMonitor.disableNetworkMonitoring(); observer.complete(); }); @@ -290,6 +293,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { logger.debug({ err }); const message = err['message'] ?? ''; this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this.connectionStateMonitor.disableNetworkMonitoring(); observer.error({ errors: [ { @@ -395,7 +399,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { return; } - this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION); if (this.awsRealTimeSocket.bufferedAmount > 0) { // Still data on the WebSocket @@ -416,6 +420,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.awsRealTimeSocket = undefined; this.socketStatus = SOCKET_STATUS.CLOSED; this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this.connectionStateMonitor.disableNetworkMonitoring(); } } @@ -535,6 +540,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.subscriptionObserverMap.clear(); if (this.awsRealTimeSocket) { this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); + this.connectionStateMonitor.disableNetworkMonitoring(); this.awsRealTimeSocket.close(); } diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index 5c222f6e411..b357f88be33 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -31,7 +31,7 @@ export const CONNECTION_CHANGE: { | 'KEEP_ALIVE' | 'CONNECTION_ESTABLISHED' | 'CONNECTION_FAILED' - | 'CLOSING' + | 'CLOSING_CONNECTION' | 'OPENING_CONNECTION' | 'CLOSED' | 'ONLINE' @@ -44,7 +44,7 @@ export const CONNECTION_CHANGE: { intendedConnectionState: 'disconnected', connectionState: 'disconnected', }, - CLOSING: { intendedConnectionState: 'disconnected' }, + CLOSING_CONNECTION: { intendedConnectionState: 'disconnected' }, OPENING_CONNECTION: { intendedConnectionState: 'connected', connectionState: 'connecting', @@ -61,8 +61,10 @@ export class ConnectionStateMonitor { private _linkedConnectionState: LinkedConnectionStates; private _linkedConnectionStateObservable: Observable; private _linkedConnectionStateObserver: ZenObservable.SubscriptionObserver; + private _networkMonitoringSubscription?: ZenObservable.Subscription; constructor() { + this._networkMonitoringSubscription = undefined; this._linkedConnectionState = { networkState: 'connected', connectionState: 'disconnected', @@ -75,13 +77,30 @@ export class ConnectionStateMonitor { connectionStateObserver.next(this._linkedConnectionState); this._linkedConnectionStateObserver = connectionStateObserver; }); + } + /** + * Turn network state monitoring on + */ + public enableNetworkMonitoring() { // Maintain the network state based on the reachability monitor - new Reachability().networkMonitor().subscribe(({ online }) => { - this.record( - online ? CONNECTION_CHANGE.ONLINE : CONNECTION_CHANGE.OFFLINE - ); - }); + if (this._networkMonitoringSubscription === undefined) { + this._networkMonitoringSubscription = new Reachability() + .networkMonitor() + .subscribe(({ online }) => { + this.record( + online ? CONNECTION_CHANGE.ONLINE : CONNECTION_CHANGE.OFFLINE + ); + }); + } + } + + /** + * Turn network state monitoring off + */ + public disableNetworkMonitoring() { + this._networkMonitoringSubscription?.unsubscribe(); + this._networkMonitoringSubscription = undefined; } /** From abd7509d80be4cbb698dbba6dfc021f63cf86cf3 Mon Sep 17 00:00:00 2001 From: Aaron S Date: Wed, 27 Jul 2022 11:45:48 -0500 Subject: [PATCH 14/15] fix: Private function name change --- packages/pubsub/src/utils/ConnectionStateMonitor.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index b357f88be33..4191d303d66 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -118,7 +118,7 @@ export class ConnectionStateMonitor { // After translating from linked states to ConnectionState, then remove any duplicates return this._linkedConnectionStateObservable .map(value => { - return this.linkedConnectionStatesToConnectionState(value); + return this.connectionStatesTranslator(value); }) .filter(current => { const toInclude = current !== previous; @@ -145,7 +145,7 @@ export class ConnectionStateMonitor { /* * Translate the ConnectionState structure into a specific ConnectionState string literal union */ - private linkedConnectionStatesToConnectionState({ + private connectionStatesTranslator({ connectionState, networkState, intendedConnectionState, From 386564deda549f61f6ba8626935c196b8dc7acad Mon Sep 17 00:00:00 2001 From: Aaron S Date: Fri, 29 Jul 2022 09:32:30 -0500 Subject: [PATCH 15/15] fix: Network monitoring only active when the connection is intended to be open --- .../__tests__/ConnectionStateMonitor.tests.ts | 21 +++++-------------- .../AWSAppSyncRealTimeProvider/index.ts | 6 ------ .../src/utils/ConnectionStateMonitor.ts | 15 +++++++++---- 3 files changed, 16 insertions(+), 26 deletions(-) diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts index ac8963fd5fe..fc5d4f53650 100644 --- a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -39,12 +39,11 @@ describe('ConnectionStateMonitor', () => { beforeEach(() => { const spyon = jest .spyOn(Reachability.prototype, 'networkMonitor') - .mockImplementationOnce( - () => - new Observable(observer => { - reachabilityObserver = observer; - }) - ); + .mockImplementationOnce(() => { + return new Observable(observer => { + reachabilityObserver = observer; + }); + }); }); describe('when the network is connected', () => { @@ -57,11 +56,6 @@ describe('ConnectionStateMonitor', () => { subscription = monitor.connectionStateObservable.subscribe(value => { observedStates.push(value); }); - monitor.enableNetworkMonitoring(); - }); - - afterEach(() => { - monitor.disableNetworkMonitoring(); }); test('connection states starts out disconnected', () => { @@ -223,11 +217,6 @@ describe('ConnectionStateMonitor', () => { subscription = monitor.connectionStateObservable.subscribe(value => { observedStates.push(value); }); - monitor.enableNetworkMonitoring(); - }); - - afterEach(() => { - monitor.disableNetworkMonitoring(); }); test('starts out disconnected', () => { diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index bdb71438a68..0049a79c9ac 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -144,8 +144,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { ): Observable { const appSyncGraphqlEndpoint = options?.appSyncGraphqlEndpoint; - this.connectionStateMonitor.enableNetworkMonitoring(); - return new Observable(observer => { if (!options || !appSyncGraphqlEndpoint) { observer.error({ @@ -175,7 +173,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { ], }); this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - this.connectionStateMonitor.disableNetworkMonitoring(); observer.complete(); }); @@ -293,7 +290,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { logger.debug({ err }); const message = err['message'] ?? ''; this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - this.connectionStateMonitor.disableNetworkMonitoring(); observer.error({ errors: [ { @@ -420,7 +416,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.awsRealTimeSocket = undefined; this.socketStatus = SOCKET_STATUS.CLOSED; this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - this.connectionStateMonitor.disableNetworkMonitoring(); } } @@ -540,7 +535,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.subscriptionObserverMap.clear(); if (this.awsRealTimeSocket) { this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); - this.connectionStateMonitor.disableNetworkMonitoring(); this.awsRealTimeSocket.close(); } diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts index 4191d303d66..8347571c3ac 100644 --- a/packages/pubsub/src/utils/ConnectionStateMonitor.ts +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -80,9 +80,9 @@ export class ConnectionStateMonitor { } /** - * Turn network state monitoring on + * Turn network state monitoring on if it isn't on already */ - public enableNetworkMonitoring() { + private enableNetworkMonitoring() { // Maintain the network state based on the reachability monitor if (this._networkMonitoringSubscription === undefined) { this._networkMonitoringSubscription = new Reachability() @@ -96,9 +96,9 @@ export class ConnectionStateMonitor { } /** - * Turn network state monitoring off + * Turn network state monitoring off if it isn't off already */ - public disableNetworkMonitoring() { + private disableNetworkMonitoring() { this._networkMonitoringSubscription?.unsubscribe(); this._networkMonitoringSubscription = undefined; } @@ -131,6 +131,13 @@ export class ConnectionStateMonitor { * Updates local connection state and emits the full state to the observer. */ record(statusUpdates: Partial) { + // Maintain the network monitor + if (statusUpdates.intendedConnectionState === 'connected') { + this.enableNetworkMonitoring(); + } else if (statusUpdates.intendedConnectionState === 'disconnected') { + this.disableNetworkMonitoring(); + } + // Maintain the socket state const newSocketStatus = { ...this._linkedConnectionState,