Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add expectedCount to Target in listen request #6854

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions packages/firestore/src/core/sync_engine_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,6 @@ export async function syncEngineListen(
syncEngineImpl.localStore,
queryToTarget(query)
);
if (syncEngineImpl.isPrimaryClient) {
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
}

const status = syncEngineImpl.sharedClientState.addLocalQueryTarget(
targetData.targetId
Expand All @@ -331,6 +328,10 @@ export async function syncEngineListen(
status === 'current',
targetData.resumeToken
);

dconeybe marked this conversation as resolved.
Show resolved Hide resolved
if (syncEngineImpl.isPrimaryClient) {
dconeybe marked this conversation as resolved.
Show resolved Hide resolved
remoteStoreListen(syncEngineImpl.remoteStore, targetData);
}
}

return viewSnapshot;
Expand Down
33 changes: 29 additions & 4 deletions packages/firestore/src/local/target_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ export class TargetData {
* matches the target. The resume token essentially identifies a point in
* time from which the server should resume sending results.
*/
readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING
readonly resumeToken: ByteString = ByteString.EMPTY_BYTE_STRING,
/**
* The number of documents that last matched the query at the resume token or
* read time. Documents are counted only when making a listen request with
* resume token or read time, otherwise, keep it null.
*/
readonly expectedCount: number | null = null
) {}

/** Creates a new target data instance with an updated sequence number. */
Expand All @@ -78,7 +84,8 @@ export class TargetData {
sequenceNumber,
this.snapshotVersion,
this.lastLimboFreeSnapshotVersion,
this.resumeToken
this.resumeToken,
this.expectedCount
);
}

Expand All @@ -97,7 +104,24 @@ export class TargetData {
this.sequenceNumber,
snapshotVersion,
this.lastLimboFreeSnapshotVersion,
resumeToken
resumeToken,
dconeybe marked this conversation as resolved.
Show resolved Hide resolved
/* expectedCount= */ null
);
}

/**
* Creates a new target data instance with an updated expected count.
*/
withExpectedCount(expectedCount: number): TargetData {
return new TargetData(
this.target,
this.targetId,
this.purpose,
this.sequenceNumber,
this.snapshotVersion,
this.lastLimboFreeSnapshotVersion,
this.resumeToken,
expectedCount
);
}

Expand All @@ -115,7 +139,8 @@ export class TargetData {
this.sequenceNumber,
this.snapshotVersion,
lastLimboFreeSnapshotVersion,
this.resumeToken
this.resumeToken,
this.expectedCount
);
}
}
11 changes: 11 additions & 0 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,17 @@ function sendWatchRequest(
remoteStoreImpl.watchChangeAggregator!.recordPendingTargetRequest(
targetData.targetId
);

if (
targetData.resumeToken.approximateByteSize() > 0 ||
targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0
) {
const expectedCount = remoteStoreImpl.remoteSyncer.getRemoteKeysForTarget!(
targetData.targetId
).size;
targetData = targetData.withExpectedCount(expectedCount);
}

ensureWatchStream(remoteStoreImpl).watch(targetData);
}

Expand Down
2 changes: 2 additions & 0 deletions packages/firestore/src/remote/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,7 @@ export function toTarget(

if (targetData.resumeToken.approximateByteSize() > 0) {
result.resumeToken = toBytes(serializer, targetData.resumeToken);
result.expectedCount = targetData.expectedCount ?? undefined;
} else if (targetData.snapshotVersion.compareTo(SnapshotVersion.min()) > 0) {
// TODO(wuandy): Consider removing above check because it is most likely true.
// Right now, many tests depend on this behaviour though (leaving min() out
Expand All @@ -1020,6 +1021,7 @@ export function toTarget(
serializer,
targetData.snapshotVersion.toTimestamp()
);
result.expectedCount = targetData.expectedCount ?? undefined;
}

return result;
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/test/unit/remote/serializer.helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1778,7 +1778,8 @@ export function serializerTest(
}
},
resumeToken: new Uint8Array([1, 2, 3]),
targetId: 1
targetId: 1,
expectedCount: undefined
};
expect(result).to.deep.equal(expected);
});
Expand Down
82 changes: 82 additions & 0 deletions packages/firestore/test/unit/specs/listen_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1809,4 +1809,86 @@ describeSpec('Listens:', [], () => {
);
}
);

specTest(
'Resuming a query should specify expectedCount when adding the target',
[],
() => {
const query1 = query('collection');
const docA = doc('collection/a', 1000, { key: 'a' });
const docB = doc('collection/b', 1000, { key: 'b' });

return (
spec()
.withGCEnabled(false)
.userListens(query1)
.watchAcksFull(query1, 1000)
.expectEvents(query1, {})
.userUnlistens(query1)
.watchRemoves(query1)
// There are 0 remote documents from previous listen.
.userListens(query1, {
resumeToken: 'resume-token-1000',
expectedCount: 0
})
.expectEvents(query1, { fromCache: true })
.watchAcksFull(query1, 2000, docA, docB)
.expectEvents(query1, { added: [docA, docB] })
.userUnlistens(query1)
.userListens(query1, {
resumeToken: 'resume-token-2000',
expectedCount: 2
})
.expectEvents(query1, { added: [docA, docB], fromCache: true })
);
}
);

specTest(
'Resuming a query should specify expectedCount that does not include pending mutations',
[],
() => {
const query1 = query('collection');
const docA = doc('collection/a', 1000, { key: 'a' });
const docBLocal = doc('collection/b', 1000, {
key: 'b'
}).setHasLocalMutations();

return spec()
.withGCEnabled(false)
.userListens(query1)
.watchAcksFull(query1, 1000, docA)
.expectEvents(query1, { added: [docA] })
.userUnlistens(query1)
.userSets('collection/b', { key: 'b' })
.userListens(query1, {
resumeToken: 'resume-token-1000',
expectedCount: 1
})
.expectEvents(query1, {
added: [docA, docBLocal],
fromCache: true,
hasPendingWrites: true
});
}
);

specTest(
'ExpectedCount in listen request should work after coming back online',
[],
() => {
const query1 = query('collection');
const docA = doc('collection/a', 1000, { key: 'a' });

return spec()
.withGCEnabled(false)
.userListens(query1)
.watchAcksFull(query1, 1000, docA)
.expectEvents(query1, { added: [docA] })
.disableNetwork()
.expectEvents(query1, { fromCache: true })
.enableNetwork()
.restoreListen(query1, 'resume-token-1000', /* expectedCount= */ 1);
}
);
});
81 changes: 43 additions & 38 deletions packages/firestore/test/unit/specs/spec_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,19 @@ export interface ActiveTargetSpec {
queries: SpecQuery[];
resumeToken?: string;
readTime?: TestSnapshotVersion;
expectedCount?: number;
}

export interface ActiveTargetMap {
[targetId: string]: ActiveTargetSpec;
}

export interface ResumeSpec {
resumeToken?: string;
readTime?: TestSnapshotVersion;
expectedCount?: number;
}

/**
* Tracks the expected memory state of a client (e.g. the expected active watch
* targets based on userListens(), userUnlistens(), and watchRemoves()
Expand Down Expand Up @@ -256,10 +263,7 @@ export class SpecBuilder {
return this;
}

userListens(
query: Query,
resume?: { resumeToken?: string; readTime?: TestSnapshotVersion }
): this {
userListens(query: Query, resume?: ResumeSpec): this {
this.nextStep();

const target = queryToTarget(query);
Expand All @@ -278,12 +282,7 @@ export class SpecBuilder {
}

this.queryMapping.set(target, targetId);
this.addQueryToActiveTargets(
targetId,
query,
resume?.resumeToken,
resume?.readTime
);
this.addQueryToActiveTargets(targetId, query, resume);
this.currentStep = {
userListen: { targetId, query: SpecBuilder.queryToSpec(query) },
expectedState: { activeTargets: { ...this.activeTargets } }
Expand All @@ -296,14 +295,21 @@ export class SpecBuilder {
* Registers a previously active target with the test expectations after a
* stream disconnect.
*/
restoreListen(query: Query, resumeToken: string): this {
restoreListen(
query: Query,
resumeToken: string,
expectedCount?: number
): this {
const targetId = this.queryMapping.get(queryToTarget(query));

if (isNullOrUndefined(targetId)) {
throw new Error("Can't restore an unknown query: " + query);
}

this.addQueryToActiveTargets(targetId!, query, resumeToken);
this.addQueryToActiveTargets(targetId!, query, {
resumeToken,
expectedCount
});

const currentStep = this.currentStep!;
currentStep.expectedState = currentStep.expectedState || {};
Expand Down Expand Up @@ -531,18 +537,18 @@ export class SpecBuilder {
query: Query;
resumeToken?: string;
readTime?: TestSnapshotVersion;
expectedCount?: number;
}>
): this {
this.assertStep('Active target expectation requires previous step');
const currentStep = this.currentStep!;
this.clientState.activeTargets = {};
targets.forEach(({ query, resumeToken, readTime }) => {
this.addQueryToActiveTargets(
this.getTargetId(query),
query,
targets.forEach(({ query, resumeToken, readTime, expectedCount }) => {
this.addQueryToActiveTargets(this.getTargetId(query), query, {
resumeToken,
readTime
);
readTime,
expectedCount
});
});
currentStep.expectedState = currentStep.expectedState || {};
currentStep.expectedState.activeTargets = { ...this.activeTargets };
Expand Down Expand Up @@ -573,7 +579,7 @@ export class SpecBuilder {
this.addQueryToActiveTargets(
this.limboMapping[path],
newQueryForPath(key.path),
''
{ resumeToken: '' }
);
});

Expand Down Expand Up @@ -912,22 +918,14 @@ export class SpecBuilder {
}

/** Registers a query that is active in another tab. */
expectListen(
query: Query,
resume?: { resumeToken?: string; readTime?: TestSnapshotVersion }
): this {
expectListen(query: Query, resume?: ResumeSpec): this {
this.assertStep('Expectations require previous step');

const target = queryToTarget(query);
const targetId = this.queryIdGenerator.cachedId(target);
this.queryMapping.set(target, targetId);

this.addQueryToActiveTargets(
targetId,
query,
resume?.resumeToken,
resume?.readTime
);
this.addQueryToActiveTargets(targetId, query, resume);

const currentStep = this.currentStep!;
currentStep.expectedState = currentStep.expectedState || {};
Expand Down Expand Up @@ -1095,9 +1093,12 @@ export class SpecBuilder {
private addQueryToActiveTargets(
targetId: number,
query: Query,
resumeToken?: string,
readTime?: TestSnapshotVersion
resume?: ResumeSpec
): void {
if (!(resume?.resumeToken || resume?.readTime) && resume?.expectedCount) {
fail('Expected count is present without a resume token or read time.');
}

if (this.activeTargets[targetId]) {
const activeQueries = this.activeTargets[targetId].queries;
if (
Expand All @@ -1108,21 +1109,24 @@ export class SpecBuilder {
// `query` is not added yet.
this.activeTargets[targetId] = {
queries: [SpecBuilder.queryToSpec(query), ...activeQueries],
resumeToken: resumeToken || '',
readTime
resumeToken: resume?.resumeToken || '',
readTime: resume?.readTime,
expectedCount: resume?.expectedCount
};
} else {
this.activeTargets[targetId] = {
queries: activeQueries,
resumeToken: resumeToken || '',
readTime
resumeToken: resume?.resumeToken || '',
readTime: resume?.readTime,
expectedCount: resume?.expectedCount
};
}
} else {
this.activeTargets[targetId] = {
queries: [SpecBuilder.queryToSpec(query)],
resumeToken: resumeToken || '',
readTime
resumeToken: resume?.resumeToken || '',
readTime: resume?.readTime,
expectedCount: resume?.expectedCount
};
}
}
Expand All @@ -1134,7 +1138,8 @@ export class SpecBuilder {
if (queriesAfterRemoval.length > 0) {
this.activeTargets[targetId] = {
queries: queriesAfterRemoval,
resumeToken: this.activeTargets[targetId].resumeToken
resumeToken: this.activeTargets[targetId].resumeToken,
expectedCount: this.activeTargets[targetId].expectedCount
};
} else {
delete this.activeTargets[targetId];
Expand Down
Loading