diff --git a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts index 029278c20f3..3cd152cc32c 100644 --- a/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts +++ b/packages/pubsub/__tests__/AWSAppSyncRealTimeProvider.test.ts @@ -1,11 +1,26 @@ -import { Auth } from '@aws-amplify/auth'; -import { Credentials, Logger, Signer } from '@aws-amplify/core'; -import { GraphQLError, isCompositeType } from 'graphql'; +jest.mock('@aws-amplify/core', () => ({ + __esModule: true, + ...jest.requireActual('@aws-amplify/core'), + browserOrNode() { + return { + isBrowser: true, + isNode: false, + }; + }, +})); + import Observable from 'zen-observable-ts'; -import { AWSAppSyncRealTimeProvider } from '../src/Providers/AWSAppSyncRealTimeProvider'; +import { Reachability, Credentials, Logger, Signer } from '@aws-amplify/core'; +import { Auth } from '@aws-amplify/auth'; import Cache from '@aws-amplify/cache'; + import { MESSAGE_TYPES } from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; -import { FakeWebSocketInterface, delay, replaceConstant } from './helpers'; +import * as constants from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; + +import { delay, FakeWebSocketInterface, replaceConstant } from './helpers'; +import { ConnectionState as CS } from '../src'; + +import { AWSAppSyncRealTimeProvider } from '../src/Providers/AWSAppSyncRealTimeProvider'; describe('AWSAppSyncRealTimeProvider', () => { describe('isCustomDomain()', () => { @@ -76,13 +91,75 @@ describe('AWSAppSyncRealTimeProvider', () => { fakeWebSocketInterface.newWebSocket(); return fakeWebSocketInterface.webSocket; }); + + // Reduce retry delay for tests to 100ms + Object.defineProperty(constants, 'MAX_DELAY_MS', { + value: 100, + }); + + // Set the network to "online" for these tests + const spyon = jest + .spyOn(Reachability.prototype, 'networkMonitor') + .mockImplementationOnce( + () => + new Observable(observer => { + observer.next?.({ online: true }); + }) + ); }); afterEach(async () => { await fakeWebSocketInterface?.closeInterface(); + fakeWebSocketInterface?.teardown(); loggerSpy.mockClear(); }); + test('standard subscription / unsubscription steps through the expected connection states', async () => { + const observer = provider.subscribe('test', { + appSyncGraphqlEndpoint: 'ws://localhost:8080', + }); + + const subscription = observer.subscribe({ + next: () => {}, + error: x => {}, + }); + + // Wait for the socket to be ready + await fakeWebSocketInterface?.standardConnectionHandshake(); + await fakeWebSocketInterface?.sendMessage( + new MessageEvent('start_ack', { + data: JSON.stringify({ + type: MESSAGE_TYPES.GQL_START_ACK, + payload: { connectionTimeoutMs: 100 }, + id: fakeWebSocketInterface?.webSocket.subscriptionId, + }), + }) + ); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connected, + ]); + expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + ]); + + subscription.unsubscribe(); + + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectedPendingDisconnect, + ]); + + expect(fakeWebSocketInterface?.observedConnectionStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingDisconnect, + CS.Disconnected, + ]); + }); + test('returns error when no appSyncGraphqlEndpoint is provided', async () => { expect.assertions(2); const mockError = jest.fn(); @@ -117,7 +194,7 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); // Wait for the socket to be initialize await fakeWebSocketInterface.readyForUse; @@ -143,7 +220,7 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'http://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); // Wait for the socket to be initialize await fakeWebSocketInterface.readyForUse; @@ -170,7 +247,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'https://testaccounturl123456789123.appsync-api.us-east-1.amazonaws.com/graphql', }) - .subscribe({}); + .subscribe({ error: () => {} }); // Wait for the socket to be initialize await fakeWebSocketInterface.readyForUse; @@ -189,7 +266,8 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerError(); expect(loggerSpy).toHaveBeenCalledWith( @@ -205,11 +283,14 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerClose(); - await delay(50); + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Disconnected, + ]); // Watching for raised exception to be caught and logged expect(loggerSpy).toBeCalledWith( 'DEBUG', @@ -227,11 +308,11 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerOpen(); await fakeWebSocketInterface?.triggerError(); - // When the socket throws an error during handshake expect(loggerSpy).toHaveBeenCalledWith( 'DEBUG', @@ -246,7 +327,8 @@ describe('AWSAppSyncRealTimeProvider', () => { .subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; await fakeWebSocketInterface?.triggerOpen(); await fakeWebSocketInterface?.triggerClose(); @@ -305,6 +387,7 @@ describe('AWSAppSyncRealTimeProvider', () => { data: JSON.stringify({ type: MESSAGE_TYPES.GQL_START_ACK, payload: { connectionTimeoutMs: 100 }, + id: fakeWebSocketInterface?.webSocket.subscriptionId, }), }) ); @@ -336,6 +419,7 @@ describe('AWSAppSyncRealTimeProvider', () => { data: JSON.stringify({ type: MESSAGE_TYPES.GQL_START_ACK, payload: { connectionTimeoutMs: 100 }, + id: fakeWebSocketInterface?.webSocket.subscriptionId, }), }) ); @@ -473,39 +557,59 @@ describe('AWSAppSyncRealTimeProvider', () => { ); }); - test('subscription observer error is triggered when a connection is formed and an ack data message is received then ack timeout prompts disconnect', async () => { - expect.assertions(1); + test('subscription observer error is triggered when a connection is formed and an ack data message is received then ka timeout prompts disconnect', async () => { + expect.assertions(2); const observer = provider.subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ - error: () => {}, - }); - - await fakeWebSocketInterface?.readyForUse; - await fakeWebSocketInterface?.triggerOpen(); - + const subscription = observer.subscribe({ error: () => {} }); // Resolve the message delivery actions - await Promise.resolve( - fakeWebSocketInterface?.sendMessage( - new MessageEvent('connection_ack', { - data: JSON.stringify({ - type: MESSAGE_TYPES.GQL_CONNECTION_ACK, - payload: { connectionTimeoutMs: 20 }, - }), - }) - ) + await replaceConstant( + 'DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT', + 5, + async () => { + await fakeWebSocketInterface?.readyForUse; + await fakeWebSocketInterface?.triggerOpen(); + await fakeWebSocketInterface?.sendMessage( + new MessageEvent('connection_ack', { + data: JSON.stringify({ + type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK, + payload: { connectionTimeoutMs: 100 }, + }), + }) + ); + + await fakeWebSocketInterface?.sendMessage( + new MessageEvent('start_ack', { + data: JSON.stringify({ + type: MESSAGE_TYPES.GQL_START_ACK, + payload: {}, + id: fakeWebSocketInterface?.webSocket.subscriptionId, + }), + }) + ); + + await fakeWebSocketInterface?.sendDataMessage({ + type: MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE, + payload: { data: {} }, + }); + } ); - await fakeWebSocketInterface?.sendDataMessage({ - type: MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE, - payload: { data: {} }, - }); + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Connected, + ]); - // Now wait for the timeout to elapse - await delay(100); + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.ConnectionDisrupted, + ]); + + expect(fakeWebSocketInterface?.observedConnectionStates).toContain( + CS.ConnectedPendingKeepAlive + ); expect(loggerSpy).toBeCalledWith( 'DEBUG', @@ -520,7 +624,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({}); + const subscription = observer.subscribe({ error: () => {} }); await fakeWebSocketInterface?.standardConnectionHandshake(); await fakeWebSocketInterface?.sendDataMessage({ @@ -536,19 +640,19 @@ describe('AWSAppSyncRealTimeProvider', () => { test('failure to ack before timeout', async () => { expect.assertions(1); - await replaceConstant('START_ACK_TIMEOUT', 20, async () => { + await replaceConstant('START_ACK_TIMEOUT', 30, async () => { const observer = provider.subscribe('test', { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ - error: () => {}, - }); + const subscription = observer.subscribe({ error: () => {} }); await fakeWebSocketInterface?.standardConnectionHandshake(); - // Wait long enough that the shortened timeout will elapse - await delay(100); + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitForConnectionState([ + CS.Disconnected, + ]); expect(loggerSpy).toBeCalledWith( 'DEBUG', @@ -566,15 +670,19 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', }); - const subscription = observer.subscribe({ - error: () => {}, - }); + const subscription = observer.subscribe({ error: () => {} }); await fakeWebSocketInterface?.readyForUse; + Promise.resolve(); await fakeWebSocketInterface?.triggerOpen(); - // Wait long enough that the shortened timeout will elapse - await delay(100); + // Wait no less than 20 ms + await delay(20); + + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Disconnected, + ]); // Watching for raised exception to be caught and logged expect(loggerSpy).toBeCalledWith( @@ -598,7 +706,8 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'API_KEY', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -626,7 +735,8 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AWS_IAM', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -704,8 +814,10 @@ describe('AWSAppSyncRealTimeProvider', () => { }, }); - // It takes time for the credentials to resolve - await delay(50); + // Wait until the socket is automatically disconnected + await fakeWebSocketInterface?.waitUntilConnectionStateIn([ + CS.Disconnected, + ]); expect(loggerSpy).toHaveBeenCalledWith( 'WARN', @@ -730,7 +842,8 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'OPENID_CONNECT', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -785,9 +898,9 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'OPENID_CONNECT', }) - .subscribe({}); - await fakeWebSocketInterface?.readyForUse; + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( 'DEBUG', 'Authenticating with OPENID_CONNECT' @@ -814,7 +927,8 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AMAZON_COGNITO_USER_POOLS', }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -834,7 +948,8 @@ describe('AWSAppSyncRealTimeProvider', () => { Authorization: 'test', }, }) - .subscribe({}); + .subscribe({ error: () => {} }); + await fakeWebSocketInterface?.readyForUse; expect(loggerSpy).toBeCalledWith( @@ -851,7 +966,7 @@ describe('AWSAppSyncRealTimeProvider', () => { appSyncGraphqlEndpoint: 'ws://localhost:8080', authenticationType: 'AWS_LAMBDA', additionalHeaders: { - Authorization: undefined, + Authorization: '', }, }) .subscribe({ diff --git a/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts new file mode 100644 index 00000000000..fc5d4f53650 --- /dev/null +++ b/packages/pubsub/__tests__/ConnectionStateMonitor.tests.ts @@ -0,0 +1,226 @@ +/* + * Copyright 2017-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with + * the License. A copy of the License is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +jest.mock('@aws-amplify/core', () => ({ + __esModule: true, + ...jest.requireActual('@aws-amplify/core'), + browserOrNode() { + return { + isBrowser: true, + isNode: false, + }; + }, +})); + +import Observable from 'zen-observable-ts'; +import { Reachability } from '@aws-amplify/core'; +import { + ConnectionStateMonitor, + CONNECTION_CHANGE, +} from '../src/utils/ConnectionStateMonitor'; +import { ConnectionState as CS } from '../src'; + +describe('ConnectionStateMonitor', () => { + let monitor: ConnectionStateMonitor; + let observedStates: CS[]; + let subscription: ZenObservable.Subscription; + let reachabilityObserver: ZenObservable.Observer<{ online: boolean }>; + + beforeEach(() => { + const spyon = jest + .spyOn(Reachability.prototype, 'networkMonitor') + .mockImplementationOnce(() => { + return new Observable(observer => { + reachabilityObserver = observer; + }); + }); + }); + + describe('when the network is connected', () => { + beforeEach(() => { + reachabilityObserver?.next?.({ online: true }); + + observedStates = []; + subscription?.unsubscribe(); + monitor = new ConnectionStateMonitor(); + subscription = monitor.connectionStateObservable.subscribe(value => { + observedStates.push(value); + }); + }); + + test('connection states starts out disconnected', () => { + expect(observedStates).toEqual(['Disconnected']); + }); + + test('standard states connection pattern', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + expect(observedStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + ]); + }); + + test('connection states when the network is lost while connected', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + reachabilityObserver?.next?.({ online: false }); + expect(observedStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingNetwork, + ]); + }); + + test('connection states when the network is lost and the connection times out', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + reachabilityObserver?.next?.({ online: false }); + monitor.record(CONNECTION_CHANGE.CLOSED); + expect(observedStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingNetwork, + CS.ConnectionDisruptedPendingNetwork, + ]); + }); + + test('connection states when the network is lost, the connection times out and then the network recovers', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + reachabilityObserver?.next?.({ online: false }); + monitor.record(CONNECTION_CHANGE.CLOSED); + reachabilityObserver?.next?.({ online: true }); + expect(observedStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingNetwork, + CS.ConnectionDisruptedPendingNetwork, + CS.ConnectionDisrupted, + ]); + }); + + test('connection states when a connection is no longer needed', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + monitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION); + + expect(observedStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingDisconnect, + ]); + }); + + test('connection states when a connection is no longer needed closed', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + monitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CLOSED); + + expect(observedStates).toEqual([ + CS.Disconnected, + CS.Connecting, + CS.Connected, + CS.ConnectedPendingDisconnect, + CS.Disconnected, + ]); + }); + + test('connection states when a connection misses a keepalive, and then recovers', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingKeepAlive', + 'Connected', + ]); + }); + + test('lots of keep alive messages dont add more connection state events', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + ]); + }); + + test('missed keep alives during a network outage dont add an additional state change', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + reachabilityObserver?.next?.({ online: false }); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingNetwork', + ]); + }); + + test('when the network recovers, keep alives become the concern until one is seen', () => { + monitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); + monitor.record(CONNECTION_CHANGE.CONNECTION_ESTABLISHED); + reachabilityObserver?.next?.({ online: false }); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + reachabilityObserver?.next?.({ online: true }); + monitor.record(CONNECTION_CHANGE.KEEP_ALIVE); + + expect(observedStates).toEqual([ + 'Disconnected', + 'Connecting', + 'Connected', + 'ConnectedPendingNetwork', + 'ConnectedPendingKeepAlive', + 'Connected', + ]); + }); + }); + + describe('when the network is disconnected', () => { + beforeEach(() => { + reachabilityObserver?.next?.({ online: false }); + + observedStates = []; + subscription?.unsubscribe(); + monitor = new ConnectionStateMonitor(); + subscription = monitor.connectionStateObservable.subscribe(value => { + observedStates.push(value); + }); + }); + + test('starts out disconnected', () => { + expect(observedStates).toEqual(['Disconnected']); + }); + }); +}); diff --git a/packages/pubsub/__tests__/PubSub-unit-test.ts b/packages/pubsub/__tests__/PubSub-unit-test.ts index 969e42d520b..622ceefcdf9 100644 --- a/packages/pubsub/__tests__/PubSub-unit-test.ts +++ b/packages/pubsub/__tests__/PubSub-unit-test.ts @@ -1,3 +1,14 @@ +jest.mock('@aws-amplify/core', () => ({ + __esModule: true, + ...jest.requireActual('@aws-amplify/core'), + browserOrNode() { + return { + isBrowser: true, + isNode: false, + }; + }, +})); + import { PubSubClass as PubSub } from '../src/PubSub'; import { MqttOverWSProvider, diff --git a/packages/pubsub/__tests__/helpers.ts b/packages/pubsub/__tests__/helpers.ts index 8328d953def..959522341be 100644 --- a/packages/pubsub/__tests__/helpers.ts +++ b/packages/pubsub/__tests__/helpers.ts @@ -1,3 +1,6 @@ +import { Hub } from '@aws-amplify/core'; +import Observable from 'zen-observable-ts'; +import { ConnectionState as CS, CONNECTION_STATE_CHANGE } from '../src'; import * as constants from '../src/Providers/AWSAppSyncRealTimeProvider/constants'; export function delay(timeout) { @@ -10,10 +13,14 @@ export function delay(timeout) { export class FakeWebSocketInterface { readonly webSocket: FakeWebSocket; - readyForUse: Promise; + readyForUse: Promise; hasClosed: Promise; + teardownHubListener: () => void; + observedConnectionStates: CS[] = []; + currentConnectionState: CS; private readyResolve: (value: PromiseLike) => void; + private connectionStateObservers: ZenObservable.Observer[] = []; constructor() { this.readyForUse = new Promise((res, rej) => { @@ -23,69 +30,137 @@ export class FakeWebSocketInterface { this.hasClosed = new Promise((res, rej) => { closeResolver = res; }); - this.webSocket = new FakeWebSocket(closeResolver); + this.webSocket = new FakeWebSocket(() => closeResolver); + + this.teardownHubListener = Hub.listen('api', (data: any) => { + const { payload } = data; + if (payload.event === CONNECTION_STATE_CHANGE) { + const connectionState = payload.data.connectionState as CS; + this.observedConnectionStates.push(connectionState); + this.connectionStateObservers.forEach(observer => { + observer?.next?.(connectionState); + }); + this.currentConnectionState = connectionState; + } + }); + } + + /** + * @returns {Observable} - The observable that emits all ConnectionState updates (past and future) + */ + allConnectionStateObserver() { + return new Observable(observer => { + this.observedConnectionStates.forEach(state => { + observer.next(state); + }); + this.connectionStateObservers.push(observer); + }); + } + + /** + * @returns {Observable} - The observable that emits ConnectionState updates (past and future) + */ + connectionStateObserver() { + return new Observable(observer => { + this.connectionStateObservers.push(observer); + }); + } + /** + * Tear down the Fake Socket state + */ + teardown() { + this.teardownHubListener(); + this.connectionStateObservers.forEach(observer => { + observer?.complete?.(); + }); } + /** + * Once ready for use, send onOpen and the connection_ack + */ async standardConnectionHandshake() { await this.readyForUse; await this.triggerOpen(); await this.handShakeMessage(); } + /** + * After an open is triggered, the provider has logic that must execute + * which changes the function resolvers assigned to the websocket + */ async triggerOpen() { - // After an open is triggered, the provider has logic that must execute - // which changes the function resolvers assigned to the websocket await this.runAndResolve(() => { this.webSocket.onopen(new Event('', {})); }); } + /** + * After a close is triggered, the provider has logic that must execute + * which changes the function resolvers assigned to the websocket + */ async triggerClose() { - // After a close is triggered, the provider has logic that must execute - // which changes the function resolvers assigned to the websocket await this.runAndResolve(() => { - if (this.webSocket.onclose) - this.webSocket.onclose(new CloseEvent('', {})); + if (this.webSocket.onclose) { + try { + this.webSocket.onclose(new CloseEvent('', {})); + } catch {} + } }); } + /** + * Close the interface and wait until the connection is either disconnected or disrupted + */ async closeInterface() { await this.triggerClose(); // Wait for either hasClosed or a half second has passed - await new Promise(res => { + await new Promise(async res => { // The interface is closed when the socket "hasClosed" this.hasClosed.then(() => res(undefined)); - - // The provider can get pretty wrapped around itself, - // but its safe to continue after half a second, even if it hasn't closed the socket - delay(500).then(() => res(undefined)); + await this.waitUntilConnectionStateIn([ + CS.Disconnected, + CS.ConnectionDisrupted, + ]); + res(undefined); }); } + /** + * After an error is triggered, the provider has logic that must execute + * which changes the function resolvers assigned to the websocket + */ async triggerError() { - // After an error is triggered, the provider has logic that must execute - // which changes the function resolvers assigned to the websocket await this.runAndResolve(() => { this.webSocket.onerror(new Event('TestError', {})); }); } + /** + * Produce a websocket with a short delay to mimic reality + * @returns A websocket + */ newWebSocket() { - setTimeout(() => this.readyResolve(undefined), 10); + setTimeout(() => this.readyResolve(Promise.resolve()), 10); return this.webSocket; } + /** + * Send a connection_ack + */ async handShakeMessage() { await this.sendMessage( new MessageEvent('connection_ack', { data: JSON.stringify({ type: constants.MESSAGE_TYPES.GQL_CONNECTION_ACK, - payload: { keepAliveTimeout: 100_000 }, + payload: { connectionTimeoutMs: 100_000 }, }), }) ); } + /** + * Send a data message + */ async sendDataMessage(data: {}) { await this.sendMessage( new MessageEvent('data', { @@ -97,6 +172,9 @@ export class FakeWebSocketInterface { ); } + /** + * Emit a message on the socket + */ async sendMessage(message: MessageEvent) { // After a message is sent, it takes a few ms for it to enact provider behavior await this.runAndResolve(() => { @@ -104,15 +182,56 @@ export class FakeWebSocketInterface { }); } + /** + * Run a gicommand and resolve to allow internal behavior to execute + */ async runAndResolve(fn) { fn(); await Promise.resolve(); } + + /** + * DELETE THIS? + */ + async observesConnectionState(connectionState: CS) { + return new Promise((res, rej) => { + this.allConnectionStateObserver().subscribe(value => { + if (value === connectionState) { + res(undefined); + } + }); + }); + } + + /** + * @returns a Promise that will wait for one of the provided states to be observed + */ + async waitForConnectionState(connectionStates: CS[]) { + return new Promise((res, rej) => { + this.connectionStateObserver().subscribe(value => { + if (connectionStates.includes(String(value) as CS)) { + res(undefined); + } + }); + }); + } + + /** + * @returns a Promise that will wait until the current state is one of the provided states + */ + async waitUntilConnectionStateIn(connectionStates: CS[]) { + return new Promise((res, rej) => { + if (connectionStates.includes(this.currentConnectionState)) { + res(undefined); + } + res(this.waitForConnectionState(connectionStates)); + }); + } } class FakeWebSocket implements WebSocket { subscriptionId: string | undefined; - closeResolver?: (value: PromiseLike) => void; + closeResolverFcn: () => (value: PromiseLike) => void; binaryType: BinaryType; bufferedAmount: number; @@ -125,7 +244,8 @@ class FakeWebSocket implements WebSocket { readyState: number; url: string; close(code?: number, reason?: string): void { - if (this.closeResolver) this.closeResolver(undefined); + const closeResolver = this.closeResolverFcn(); + if (closeResolver) closeResolver(Promise.resolve(undefined)); } send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void { const parsedInput = JSON.parse(String(data)); @@ -170,8 +290,8 @@ class FakeWebSocket implements WebSocket { throw new Error('Method not implemented dispatchEvent.'); } - constructor(closeResolver?: (value: PromiseLike) => void) { - this.closeResolver = closeResolver; + constructor(closeResolver: () => (value: PromiseLike) => void) { + this.closeResolverFcn = closeResolver; } } diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts index a529573187e..b8e5c04cdb5 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/constants.ts @@ -93,3 +93,8 @@ export const START_ACK_TIMEOUT = 15000; * Default Time in milleseconds to wait for GQL_CONNECTION_KEEP_ALIVE message */ export const DEFAULT_KEEP_ALIVE_TIMEOUT = 5 * 60 * 1000; + +/** + * Default Time in milleseconds to alert for missed GQL_CONNECTION_KEEP_ALIVE message + */ +export const DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT = 65 * 1000; diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts index 47a7e0d389b..0049a79c9ac 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider/index.ts @@ -30,12 +30,14 @@ import { import Cache from '@aws-amplify/cache'; import Auth, { GRAPHQL_AUTH_MODE } from '@aws-amplify/auth'; import { AbstractPubSubProvider } from '../PubSubProvider'; -import { CONTROL_MSG } from '../../index'; +import { CONNECTION_STATE_CHANGE, CONTROL_MSG } from '../../index'; + import { AMPLIFY_SYMBOL, AWS_APPSYNC_REALTIME_HEADERS, CONNECTION_INIT_TIMEOUT, DEFAULT_KEEP_ALIVE_TIMEOUT, + DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT, MAX_DELAY_MS, MESSAGE_TYPES, NON_RETRYABLE_CODES, @@ -43,6 +45,10 @@ import { START_ACK_TIMEOUT, SUBSCRIPTION_STATUS, } from './constants'; +import { + ConnectionStateMonitor, + CONNECTION_CHANGE, +} from '../../utils/ConnectionStateMonitor'; const logger = new Logger('AWSAppSyncRealTimeProvider'); @@ -89,8 +95,27 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { private socketStatus: SOCKET_STATUS = SOCKET_STATUS.CLOSED; private keepAliveTimeoutId?: ReturnType; private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT; + private keepAliveAlertTimeoutId?: ReturnType; private subscriptionObserverMap: Map = new Map(); private promiseArray: Array<{ res: Function; rej: Function }> = []; + private readonly connectionStateMonitor = new ConnectionStateMonitor(); + + constructor(options: ProviderOptions = {}) { + super(options); + // Monitor the connection state and pass changes along to Hub + this.connectionStateMonitor.connectionStateObservable.subscribe( + ConnectionState => { + dispatchApiEvent( + CONNECTION_STATE_CHANGE, + { + provider: this, + connectionState: ConnectionState, + }, + `Connection state is ${ConnectionState}` + ); + } + ); + } getNewWebSocket(url, protocol) { return new WebSocket(url, protocol); @@ -147,6 +172,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }, ], }); + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); observer.complete(); }); @@ -252,6 +278,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { const stringToAWSRealTime = JSON.stringify(subscriptionMessage); try { + this.connectionStateMonitor.record(CONNECTION_CHANGE.OPENING_CONNECTION); await this._initializeWebSocketConnection({ apiKey, appSyncGraphqlEndpoint, @@ -262,6 +289,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { } catch (err) { logger.debug({ err }); const message = err['message'] ?? ''; + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); observer.error({ errors: [ { @@ -366,12 +394,20 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { this.socketStatus = SOCKET_STATUS.CLOSED; return; } + + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSING_CONNECTION); + if (this.awsRealTimeSocket.bufferedAmount > 0) { // Still data on the WebSocket setTimeout(this._closeSocketIfRequired.bind(this), 1000); } else { logger.debug('closing WebSocket...'); - if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId); + if (this.keepAliveTimeoutId) { + clearTimeout(this.keepAliveTimeoutId); + } + if (this.keepAliveAlertTimeoutId) { + clearTimeout(this.keepAliveAlertTimeoutId); + } const tempSocket = this.awsRealTimeSocket; // Cleaning callbacks to avoid race condition, socket still exists tempSocket.onclose = null; @@ -379,6 +415,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { tempSocket.close(1000); this.awsRealTimeSocket = undefined; this.socketStatus = SOCKET_STATUS.CLOSED; + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); } } @@ -432,17 +469,25 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { subscriptionFailedCallback, }); } + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_ESTABLISHED + ); - // TODO: emit event on hub but it requires to store the id first return; } if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) { if (this.keepAliveTimeoutId) clearTimeout(this.keepAliveTimeoutId); + if (this.keepAliveAlertTimeoutId) + clearTimeout(this.keepAliveAlertTimeoutId); this.keepAliveTimeoutId = setTimeout( - this._errorDisconnect.bind(this, CONTROL_MSG.TIMEOUT_DISCONNECT), + () => this._errorDisconnect(CONTROL_MSG.TIMEOUT_DISCONNECT), this.keepAliveTimeout ); + this.keepAliveAlertTimeoutId = setTimeout(() => { + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE_MISSED); + }, DEFAULT_KEEP_ALIVE_ALERT_TIMEOUT); + this.connectionStateMonitor.record(CONNECTION_CHANGE.KEEP_ALIVE); return; } @@ -489,6 +534,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }); this.subscriptionObserverMap.clear(); if (this.awsRealTimeSocket) { + this.connectionStateMonitor.record(CONNECTION_CHANGE.CLOSED); this.awsRealTimeSocket.close(); } @@ -630,6 +676,9 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { logger.debug(`WebSocket connection error`); }; newSocket.onclose = () => { + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_FAILED + ); rej(new Error('Connection handshake error')); }; newSocket.onopen = () => { @@ -703,6 +752,9 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { function checkAckOk(ackOk: boolean) { if (!ackOk) { + this.connectionStateMonitor.record( + CONNECTION_CHANGE.CONNECTION_FAILED + ); rej( new Error( `Connection timeout: ack from AWSRealTime was not received on ${CONNECTION_INIT_TIMEOUT} ms` diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index d44e2a1f28a..cf84732d317 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -22,6 +22,9 @@ enum CONTROL_MSG { TIMEOUT_DISCONNECT = 'Timeout disconnect', } +export const CONNECTION_STATE_CHANGE = 'ConnectionStateChange'; +export { ConnectionState } from './types'; + export { PubSub, CONTROL_MSG }; /** diff --git a/packages/pubsub/src/types/index.ts b/packages/pubsub/src/types/index.ts index 99cfd92756d..bd6450c9a58 100644 --- a/packages/pubsub/src/types/index.ts +++ b/packages/pubsub/src/types/index.ts @@ -19,3 +19,39 @@ export interface SubscriptionObserver { error(errorValue: any): void; complete(): void; } + +/** @enum {string} */ +export enum ConnectionState { + /* + * The connection is alive and healthy + */ + Connected = 'Connected', + /* + * The connection is alive, but the connection is offline + */ + ConnectedPendingNetwork = 'ConnectedPendingNetwork', + /* + * The connection has been disconnected while in use + */ + ConnectionDisrupted = 'ConnectionDisrupted', + /* + * The connection has been disconnected and the network is offline + */ + ConnectionDisruptedPendingNetwork = 'ConnectionDisruptedPendingNetwork', + /* + * The connection is in the process of connecting + */ + Connecting = 'Connecting', + /* + * The connection is not in use and is being disconnected + */ + ConnectedPendingDisconnect = 'ConnectedPendingDisconnect', + /* + * The connection is not in use and has been disconnected + */ + Disconnected = 'Disconnected', + /* + * The connection is alive, but a keep alive message has been missed + */ + ConnectedPendingKeepAlive = 'ConnectedPendingKeepAlive', +} diff --git a/packages/pubsub/src/utils/ConnectionStateMonitor.ts b/packages/pubsub/src/utils/ConnectionStateMonitor.ts new file mode 100644 index 00000000000..8347571c3ac --- /dev/null +++ b/packages/pubsub/src/utils/ConnectionStateMonitor.ts @@ -0,0 +1,191 @@ +/* + * Copyright 2017-2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with + * the License. A copy of the License is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +import { Reachability } from '@aws-amplify/core'; +import Observable, { ZenObservable } from 'zen-observable-ts'; +import { ConnectionState } from '../index'; + +// Internal types for tracking different connection states +type LinkedConnectionState = 'connected' | 'disconnected'; +type LinkedHealthState = 'healthy' | 'unhealthy'; +type LinkedConnectionStates = { + networkState: LinkedConnectionState; + connectionState: LinkedConnectionState | 'connecting'; + intendedConnectionState: LinkedConnectionState; + keepAliveState: LinkedHealthState; +}; + +export const CONNECTION_CHANGE: { + [key in + | 'KEEP_ALIVE_MISSED' + | 'KEEP_ALIVE' + | 'CONNECTION_ESTABLISHED' + | 'CONNECTION_FAILED' + | 'CLOSING_CONNECTION' + | 'OPENING_CONNECTION' + | 'CLOSED' + | 'ONLINE' + | 'OFFLINE']: Partial; +} = { + KEEP_ALIVE_MISSED: { keepAliveState: 'unhealthy' }, + KEEP_ALIVE: { keepAliveState: 'healthy' }, + CONNECTION_ESTABLISHED: { connectionState: 'connected' }, + CONNECTION_FAILED: { + intendedConnectionState: 'disconnected', + connectionState: 'disconnected', + }, + CLOSING_CONNECTION: { intendedConnectionState: 'disconnected' }, + OPENING_CONNECTION: { + intendedConnectionState: 'connected', + connectionState: 'connecting', + }, + CLOSED: { connectionState: 'disconnected' }, + ONLINE: { networkState: 'connected' }, + OFFLINE: { networkState: 'disconnected' }, +}; + +export class ConnectionStateMonitor { + /** + * @private + */ + private _linkedConnectionState: LinkedConnectionStates; + private _linkedConnectionStateObservable: Observable; + private _linkedConnectionStateObserver: ZenObservable.SubscriptionObserver; + private _networkMonitoringSubscription?: ZenObservable.Subscription; + + constructor() { + this._networkMonitoringSubscription = undefined; + this._linkedConnectionState = { + networkState: 'connected', + connectionState: 'disconnected', + intendedConnectionState: 'disconnected', + keepAliveState: 'healthy', + }; + + this._linkedConnectionStateObservable = + new Observable(connectionStateObserver => { + connectionStateObserver.next(this._linkedConnectionState); + this._linkedConnectionStateObserver = connectionStateObserver; + }); + } + + /** + * Turn network state monitoring on if it isn't on already + */ + private enableNetworkMonitoring() { + // Maintain the network state based on the reachability monitor + if (this._networkMonitoringSubscription === undefined) { + this._networkMonitoringSubscription = new Reachability() + .networkMonitor() + .subscribe(({ online }) => { + this.record( + online ? CONNECTION_CHANGE.ONLINE : CONNECTION_CHANGE.OFFLINE + ); + }); + } + } + + /** + * Turn network state monitoring off if it isn't off already + */ + private disableNetworkMonitoring() { + this._networkMonitoringSubscription?.unsubscribe(); + this._networkMonitoringSubscription = undefined; + } + + /** + * Get the observable that allows us to monitor the connection state + * + * @returns {Observable} - The observable that emits ConnectionState updates + */ + public get connectionStateObservable(): Observable { + let previous: ConnectionState; + + // The linked state aggregates state changes to any of the network, connection, + // intendedConnection and keepAliveHealth. Some states will change these independent + // states without changing the overall connection state. + + // After translating from linked states to ConnectionState, then remove any duplicates + return this._linkedConnectionStateObservable + .map(value => { + return this.connectionStatesTranslator(value); + }) + .filter(current => { + const toInclude = current !== previous; + previous = current; + return toInclude; + }); + } + + /* + * Updates local connection state and emits the full state to the observer. + */ + record(statusUpdates: Partial) { + // Maintain the network monitor + if (statusUpdates.intendedConnectionState === 'connected') { + this.enableNetworkMonitoring(); + } else if (statusUpdates.intendedConnectionState === 'disconnected') { + this.disableNetworkMonitoring(); + } + + // Maintain the socket state + const newSocketStatus = { + ...this._linkedConnectionState, + ...statusUpdates, + }; + + this._linkedConnectionState = { ...newSocketStatus }; + + this._linkedConnectionStateObserver.next(this._linkedConnectionState); + } + + /* + * Translate the ConnectionState structure into a specific ConnectionState string literal union + */ + private connectionStatesTranslator({ + connectionState, + networkState, + intendedConnectionState, + keepAliveState, + }: LinkedConnectionStates): ConnectionState { + if (connectionState === 'connected' && networkState === 'disconnected') + return ConnectionState.ConnectedPendingNetwork; + + if ( + connectionState === 'connected' && + intendedConnectionState === 'disconnected' + ) + return ConnectionState.ConnectedPendingDisconnect; + + if ( + connectionState === 'disconnected' && + intendedConnectionState === 'connected' && + networkState === 'disconnected' + ) + return ConnectionState.ConnectionDisruptedPendingNetwork; + + if ( + connectionState === 'disconnected' && + intendedConnectionState === 'connected' + ) + return ConnectionState.ConnectionDisrupted; + + if (connectionState === 'connected' && keepAliveState === 'unhealthy') + return ConnectionState.ConnectedPendingKeepAlive; + + // All remaining states directly correspond to the connection state + if (connectionState === 'connecting') return ConnectionState.Connecting; + if (connectionState === 'disconnected') return ConnectionState.Disconnected; + return ConnectionState.Connected; + } +}