From 68d5ba977ecca7c2cb070bd1e9a5cf42fb2ea5fc Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Tue, 9 Apr 2024 16:00:12 -0400 Subject: [PATCH 1/8] stream_bridge.ts: Add onConnected to StreamBridge --- .../src/platform/browser/webchannel_connection.ts | 1 + packages/firestore/src/remote/stream_bridge.ts | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/packages/firestore/src/platform/browser/webchannel_connection.ts b/packages/firestore/src/platform/browser/webchannel_connection.ts index 05cd79ecf9e..38d78996b6e 100644 --- a/packages/firestore/src/platform/browser/webchannel_connection.ts +++ b/packages/firestore/src/platform/browser/webchannel_connection.ts @@ -306,6 +306,7 @@ export class WebChannelConnection extends RestConnection { LOG_TAG, `RPC '${rpcName}' stream ${streamId} transport opened.` ); + streamBridge.callOnConnected(); } }); diff --git a/packages/firestore/src/remote/stream_bridge.ts b/packages/firestore/src/remote/stream_bridge.ts index bfecb707b8a..3acd31fc5d3 100644 --- a/packages/firestore/src/remote/stream_bridge.ts +++ b/packages/firestore/src/remote/stream_bridge.ts @@ -26,6 +26,7 @@ import { Stream } from './connection'; * interface. The stream callbacks are invoked with the callOn... methods. */ export class StreamBridge implements Stream { + private wrappedOnConnected: (() => void) | undefined; private wrappedOnOpen: (() => void) | undefined; private wrappedOnClose: ((err?: FirestoreError) => void) | undefined; private wrappedOnMessage: ((msg: O) => void) | undefined; @@ -38,6 +39,11 @@ export class StreamBridge implements Stream { this.closeFn = args.closeFn; } + onConnected(callback: () => void): void { + debugAssert(!this.wrappedOnConnected, 'Called onConnected on stream twice!'); + this.wrappedOnConnected = callback; + } + onOpen(callback: () => void): void { debugAssert(!this.wrappedOnOpen, 'Called onOpen on stream twice!'); this.wrappedOnOpen = callback; @@ -61,6 +67,14 @@ export class StreamBridge implements Stream { this.sendFn(msg); } + callOnConnected(): void { + debugAssert( + this.wrappedOnConnected !== undefined, + 'Cannot call onConnected because no callback was set' + ); + this.wrappedOnConnected(); + } + callOnOpen(): void { debugAssert( this.wrappedOnOpen !== undefined, From 5ebe4135ebbbf3810a6198bfecd186a58bda9055 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Tue, 9 Apr 2024 16:31:36 -0400 Subject: [PATCH 2/8] deliver connected notification --- packages/firestore/src/remote/connection.ts | 1 + packages/firestore/src/remote/persistent_stream.ts | 8 ++++++++ packages/firestore/src/remote/remote_store.ts | 9 +++++++++ 3 files changed, 18 insertions(+) diff --git a/packages/firestore/src/remote/connection.ts b/packages/firestore/src/remote/connection.ts index b727bc63c17..33c40d51415 100644 --- a/packages/firestore/src/remote/connection.ts +++ b/packages/firestore/src/remote/connection.ts @@ -113,6 +113,7 @@ export interface Connection { * be called if the stream successfully established a connection. */ export interface Stream { + onConnected(callback: () => void): void; onOpen(callback: () => void): void; onClose(callback: (err?: FirestoreError) => void): void; onMessage(callback: (msg: O) => void): void; diff --git a/packages/firestore/src/remote/persistent_stream.ts b/packages/firestore/src/remote/persistent_stream.ts index b7f657876b4..21c35aace28 100644 --- a/packages/firestore/src/remote/persistent_stream.ts +++ b/packages/firestore/src/remote/persistent_stream.ts @@ -125,6 +125,11 @@ const enum PersistentStreamState { * events by the concrete implementation classes. */ export interface PersistentStreamListener { + /** + * Called after receiving an acknowledgement from the server, confirming that + * we are able to connect to it. + */ + onConnected: () => Promise; /** * Called after the stream was established and can accept outgoing * messages @@ -483,6 +488,9 @@ export abstract class PersistentStream< const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount); this.stream = this.startRpc(authToken, appCheckToken); + this.stream.onConnected(() => { + dispatchIfNotClosed(() => this.listener!.onConnected()); + }); this.stream.onOpen(() => { dispatchIfNotClosed(() => { debugAssert( diff --git a/packages/firestore/src/remote/remote_store.ts b/packages/firestore/src/remote/remote_store.ts index 8cb361bfe09..2e9cb1546e3 100644 --- a/packages/firestore/src/remote/remote_store.ts +++ b/packages/firestore/src/remote/remote_store.ts @@ -403,6 +403,13 @@ function cleanUpWatchStreamState(remoteStoreImpl: RemoteStoreImpl): void { remoteStoreImpl.watchChangeAggregator = undefined; } +async function onWatchStreamConnected( + remoteStoreImpl: RemoteStoreImpl +): Promise { + // Mark the client as online since we got a "connected" notification. + remoteStoreImpl.onlineStateTracker.set(OnlineState.Online); +} + async function onWatchStreamOpen( remoteStoreImpl: RemoteStoreImpl ): Promise { @@ -923,6 +930,7 @@ function ensureWatchStream( remoteStoreImpl.datastore, remoteStoreImpl.asyncQueue, { + onConnected: onWatchStreamConnected.bind(null, remoteStoreImpl), onOpen: onWatchStreamOpen.bind(null, remoteStoreImpl), onClose: onWatchStreamClose.bind(null, remoteStoreImpl), onWatchChange: onWatchStreamChange.bind(null, remoteStoreImpl) @@ -969,6 +977,7 @@ function ensureWriteStream( remoteStoreImpl.datastore, remoteStoreImpl.asyncQueue, { + onConnected: () => Promise.resolve(), onOpen: onWriteStreamOpen.bind(null, remoteStoreImpl), onClose: onWriteStreamClose.bind(null, remoteStoreImpl), onHandshakeComplete: onWriteHandshakeComplete.bind( From fe7341239c21188acfca964e55dbbd80c06037d9 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Tue, 9 Apr 2024 16:31:56 -0400 Subject: [PATCH 3/8] yarn prettier --- packages/firestore/src/remote/stream_bridge.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/firestore/src/remote/stream_bridge.ts b/packages/firestore/src/remote/stream_bridge.ts index 3acd31fc5d3..78ae01116ac 100644 --- a/packages/firestore/src/remote/stream_bridge.ts +++ b/packages/firestore/src/remote/stream_bridge.ts @@ -40,7 +40,10 @@ export class StreamBridge implements Stream { } onConnected(callback: () => void): void { - debugAssert(!this.wrappedOnConnected, 'Called onConnected on stream twice!'); + debugAssert( + !this.wrappedOnConnected, + 'Called onConnected on stream twice!' + ); this.wrappedOnConnected = callback; } From 553a55c7ecb9498d7059c7a81b0c4fc9d62f47ce Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Tue, 9 Apr 2024 16:35:08 -0400 Subject: [PATCH 4/8] yarn changeset --- .changeset/early-tomatoes-occur.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/early-tomatoes-occur.md diff --git a/.changeset/early-tomatoes-occur.md b/.changeset/early-tomatoes-occur.md new file mode 100644 index 00000000000..b57ec9cd8b5 --- /dev/null +++ b/.changeset/early-tomatoes-occur.md @@ -0,0 +1,6 @@ +--- +'@firebase/firestore': patch +'firebase': patch +--- + +Prevent spurious "Backend didn't respond within 10 seconds" errors when network is indeed responding, just slowly. From 0edcf6395cacf193a90f51c4948e9dac063e2be2 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Wed, 17 Apr 2024 16:15:27 -0400 Subject: [PATCH 5/8] grpc_connection.ts: synthesize the 'onConnected' event that WebChannelConnection sends --- packages/firestore/src/platform/node/grpc_connection.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/firestore/src/platform/node/grpc_connection.ts b/packages/firestore/src/platform/node/grpc_connection.ts index ab9e44792ca..dec3137af76 100644 --- a/packages/firestore/src/platform/node/grpc_connection.ts +++ b/packages/firestore/src/platform/node/grpc_connection.ts @@ -286,9 +286,15 @@ export class GrpcConnection implements Connection { } }); + let onConnectedSent = false; grpcStream.on('data', (msg: Resp) => { if (!closed) { logDebug(LOG_TAG, `RPC '${rpcName}' stream ${streamId} received:`, msg); + // Emulate the "onConnected" event that WebChannelConnection sends. + if (!onConnectedSent) { + stream.callOnConnected(); + onConnectedSent = true; + } stream.callOnMessage(msg); } }); From b6494833402766b1bc8b0f109be6136c4931633c Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Wed, 17 Apr 2024 16:15:47 -0400 Subject: [PATCH 6/8] stream.test.ts: fix tests to accommodate the new "onConnected" event. --- .../test/integration/remote/stream.test.ts | 38 +++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/packages/firestore/test/integration/remote/stream.test.ts b/packages/firestore/test/integration/remote/stream.test.ts index 6726f92a772..53819ae21d5 100644 --- a/packages/firestore/test/integration/remote/stream.test.ts +++ b/packages/firestore/test/integration/remote/stream.test.ts @@ -22,7 +22,10 @@ import { Token } from '../../../src/api/credentials'; import { SnapshotVersion } from '../../../src/core/snapshot_version'; +import { Target } from '../../../src/core/target'; +import { TargetData, TargetPurpose } from '../../../src/local/target_data'; import { MutationResult } from '../../../src/model/mutation'; +import { ResourcePath } from '../../../src/model/path'; import { newPersistentWatchStream, newPersistentWriteStream @@ -57,7 +60,8 @@ type StreamEventType = | 'mutationResult' | 'watchChange' | 'open' - | 'close'; + | 'close' + | 'connected'; const SINGLE_MUTATION = [setMutation('docs/1', { foo: 'bar' })]; @@ -117,6 +121,10 @@ class StreamStatusListener implements WatchStreamListener, WriteStreamListener { return this.resolvePending('watchChange'); } + onConnected(): Promise { + return this.resolvePending('connected'); + } + onOpen(): Promise { return this.resolvePending('open'); } @@ -148,6 +156,14 @@ describe('Watch Stream', () => { }); }); }); + + it('gets connected event before first message', () => { + return withTestWatchStream(async (watchStream, streamListener) => { + await streamListener.awaitCallback('open'); + watchStream.watch(sampleTargetData()); + await streamListener.awaitCallback('connected'); + }); + }); }); class MockAuthCredentialsProvider extends EmptyAuthCredentialsProvider { @@ -190,6 +206,7 @@ describe('Write Stream', () => { 'Handshake must be complete before writing mutations' ); writeStream.writeHandshake(); + await streamListener.awaitCallback('connected'); await streamListener.awaitCallback('handshakeComplete'); // Now writes should succeed @@ -205,9 +222,10 @@ describe('Write Stream', () => { return withTestWriteStream((writeStream, streamListener, queue) => { return streamListener .awaitCallback('open') - .then(() => { + .then(async () => { writeStream.writeHandshake(); - return streamListener.awaitCallback('handshakeComplete'); + await streamListener.awaitCallback('connected'); + await streamListener.awaitCallback('handshakeComplete'); }) .then(() => { writeStream.markIdle(); @@ -228,6 +246,7 @@ describe('Write Stream', () => { return withTestWriteStream(async (writeStream, streamListener, queue) => { await streamListener.awaitCallback('open'); writeStream.writeHandshake(); + await streamListener.awaitCallback('connected'); await streamListener.awaitCallback('handshakeComplete'); // Mark the stream idle, but immediately cancel the idle timer by issuing another write. @@ -336,3 +355,16 @@ export async function withTestWatchStream( streamListener.verifyNoPendingCallbacks(); }); } + +function sampleTargetData(): TargetData { + const target: Target = { + path: ResourcePath.emptyPath(), + collectionGroup: null, + orderBy: [], + filters: [], + limit: null, + startAt: null, + endAt: null + }; + return new TargetData(target, 1, TargetPurpose.Listen, 1); +} From 590d5afd89d207bbddb5ac69bb6e2fedd0e7629d Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Wed, 17 Apr 2024 16:41:39 -0400 Subject: [PATCH 7/8] webchannel.test.ts: add missing onConnected callback --- .../firestore/test/integration/browser/webchannel.test.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/firestore/test/integration/browser/webchannel.test.ts b/packages/firestore/test/integration/browser/webchannel.test.ts index 9d9847fc665..622bc67ade2 100644 --- a/packages/firestore/test/integration/browser/webchannel.test.ts +++ b/packages/firestore/test/integration/browser/webchannel.test.ts @@ -63,6 +63,10 @@ describeFn('WebChannel', () => { } }; + // Register an "onConnected" callback since it's required, even though we + // don't care about this event. + stream.onConnected(() => {}); + // Once the stream is open, send an "add_target" request stream.onOpen(() => { stream.send(payload); From d309eaf846986524f7dd35d0c7a49df21380dab5 Mon Sep 17 00:00:00 2001 From: Denver Coneybeare Date: Thu, 18 Apr 2024 12:44:47 -0400 Subject: [PATCH 8/8] remote/connection.ts: update docs to clarify the difference between onOpen and onConnected --- packages/firestore/src/remote/connection.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/firestore/src/remote/connection.ts b/packages/firestore/src/remote/connection.ts index 33c40d51415..2bf982eb4d1 100644 --- a/packages/firestore/src/remote/connection.ts +++ b/packages/firestore/src/remote/connection.ts @@ -109,8 +109,11 @@ export interface Connection { * A bidirectional stream that can be used to send an receive messages. * * A stream can be closed locally with close() or can be closed remotely or - * through network errors. onClose is guaranteed to be called. onOpen will only - * be called if the stream successfully established a connection. + * through network errors. onClose is guaranteed to be called. onOpen will be + * called once the stream is ready to send messages (which may or may not be + * before an actual connection to the backend has been established). The + * onConnected event is called when an actual, physical connection with the + * backend has been established, and may occur before or after the onOpen event. */ export interface Stream { onConnected(callback: () => void): void;