Skip to content

Commit

Permalink
Add bloom filter to existence filter and watchFilters spec builder (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
milaGGL authored Nov 30, 2022
1 parent c17af51 commit a3fb711
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 31 deletions.
1 change: 1 addition & 0 deletions packages/firestore/src/protos/firestore_proto_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ export declare type BeginTransactionRequest =
firestoreV1ApiClientInterfaces.BeginTransactionRequest;
export declare type BeginTransactionResponse =
firestoreV1ApiClientInterfaces.BeginTransactionResponse;
export declare type BloomFilter = firestoreV1ApiClientInterfaces.BloomFilter;
export declare type CollectionSelector =
firestoreV1ApiClientInterfaces.CollectionSelector;
export declare type CommitRequest =
Expand Down
5 changes: 3 additions & 2 deletions packages/firestore/src/remote/existence_filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
* limitations under the License.
*/

import { BloomFilter as ProtoBloomFilter } from '../protos/firestore_proto_api';

export class ExistenceFilter {
// TODO(b/33078163): just use simplest form of existence filter for now
constructor(public count: number) {}
constructor(public count: number, public unchangedNames?: ProtoBloomFilter) {}
}
4 changes: 2 additions & 2 deletions packages/firestore/src/remote/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -537,8 +537,8 @@ export function fromWatchChange(
assertPresent(change.filter, 'filter');
const filter = change.filter;
assertPresent(filter.targetId, 'filter.targetId');
const count = filter.count || 0;
const existenceFilter = new ExistenceFilter(count);
const { count = 0, unchangedNames } = filter;
const existenceFilter = new ExistenceFilter(count, unchangedNames);
const targetId = filter.targetId;
watchChange = new ExistenceFilterChange(targetId, existenceFilter);
} else {
Expand Down
62 changes: 54 additions & 8 deletions packages/firestore/test/unit/specs/existence_filter_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,23 @@ describeSpec('Existence Filters:', [], () => {
.userListens(query1)
.watchAcksFull(query1, 1000, doc1)
.expectEvents(query1, { added: [doc1] })
.watchFilters([query1], doc1.key)
.watchFilters([query1], [doc1.key])
.watchSnapshots(2000);
});

// This test is only to make sure watchFilters can accept bloom filter.
// TODO:(mila) update the tests when bloom filter logic is implemented.
specTest('Existence filter with bloom filter match', [], () => {
const query1 = query('collection');
const doc1 = doc('collection/1', 1000, { v: 1 });
return spec()
.userListens(query1)
.watchAcksFull(query1, 1000, doc1)
.expectEvents(query1, { added: [doc1] })
.watchFilters([query1], [doc1.key], {
bits: { bitmap: 'a', padding: 1 },
hashCount: 1
})
.watchSnapshots(2000);
});

Expand All @@ -45,7 +61,7 @@ describeSpec('Existence Filters:', [], () => {
.watchSnapshots(2000)
.expectEvents(query1, {})
.watchSends({ affects: [query1] }, doc1)
.watchFilters([query1], doc1.key)
.watchFilters([query1], [doc1.key])
.watchSnapshots(2000)
.expectEvents(query1, { added: [doc1] });
});
Expand All @@ -59,7 +75,7 @@ describeSpec('Existence Filters:', [], () => {
.watchCurrents(query1, 'resume-token-1000')
.watchSnapshots(2000)
.expectEvents(query1, {})
.watchFilters([query1], doc1.key)
.watchFilters([query1], [doc1.key])
.watchSnapshots(2000)
.expectEvents(query1, { fromCache: true });
});
Expand Down Expand Up @@ -96,7 +112,37 @@ describeSpec('Existence Filters:', [], () => {
.userListens(query1)
.watchAcksFull(query1, 1000, doc1, doc2)
.expectEvents(query1, { added: [doc1, doc2] })
.watchFilters([query1], doc1.key) // in the next sync doc2 was deleted
.watchFilters([query1], [doc1.key]) // in the next sync doc2 was deleted
.watchSnapshots(2000)
// query is now marked as "inconsistent" because of filter mismatch
.expectEvents(query1, { fromCache: true })
.expectActiveTargets({ query: query1, resumeToken: '' })
.watchRemoves(query1) // Acks removal of query
.watchAcksFull(query1, 2000, doc1)
.expectLimboDocs(doc2.key) // doc2 is now in limbo
.ackLimbo(2000, deletedDoc('collection/2', 2000))
.expectLimboDocs() // doc2 is no longer in limbo
.expectEvents(query1, {
removed: [doc2]
})
);
});

// This test is only to make sure watchFilters can accept bloom filter.
// TODO:(mila) update the tests when bloom filter logic is implemented.
specTest('Existence filter mismatch triggers bloom filter', [], () => {
const query1 = query('collection');
const doc1 = doc('collection/1', 1000, { v: 1 });
const doc2 = doc('collection/2', 1000, { v: 2 });
return (
spec()
.userListens(query1)
.watchAcksFull(query1, 1000, doc1, doc2)
.expectEvents(query1, { added: [doc1, doc2] })
.watchFilters([query1], [doc1.key], {
bits: { bitmap: 'a', padding: 1 },
hashCount: 3
}) // in the next sync doc2 was deleted
.watchSnapshots(2000)
// query is now marked as "inconsistent" because of filter mismatch
.expectEvents(query1, { fromCache: true })
Expand Down Expand Up @@ -130,7 +176,7 @@ describeSpec('Existence Filters:', [], () => {
resumeToken: 'existence-filter-resume-token'
})
.watchAcks(query1)
.watchFilters([query1], doc1.key) // in the next sync doc2 was deleted
.watchFilters([query1], [doc1.key]) // in the next sync doc2 was deleted
.watchSnapshots(2000)
// query is now marked as "inconsistent" because of filter mismatch
.expectEvents(query1, { fromCache: true })
Expand Down Expand Up @@ -159,7 +205,7 @@ describeSpec('Existence Filters:', [], () => {
// Send a mismatching existence filter with two documents, but don't
// send a new global snapshot. We should not see an event until we
// receive the snapshot.
.watchFilters([query1], doc1.key, doc2.key)
.watchFilters([query1], [doc1.key, doc2.key])
.watchSends({ affects: [query1] }, doc3)
.watchSnapshots(2000)
// The query result includes doc3, but is marked as "inconsistent"
Expand Down Expand Up @@ -193,7 +239,7 @@ describeSpec('Existence Filters:', [], () => {
.userListens(query1)
.watchAcksFull(query1, 1000, doc1, doc2)
.expectEvents(query1, { added: [doc1, doc2] })
.watchFilters([query1], doc1.key) // in the next sync doc2 was deleted
.watchFilters([query1], [doc1.key]) // in the next sync doc2 was deleted
.watchSnapshots(2000)
// query is now marked as "inconsistent" because of filter mismatch
.expectEvents(query1, { fromCache: true })
Expand Down Expand Up @@ -229,7 +275,7 @@ describeSpec('Existence Filters:', [], () => {
.userListens(query1)
.watchAcksFull(query1, 1000, doc1, doc2)
.expectEvents(query1, { added: [doc1, doc2] })
.watchFilters([query1], doc1.key) // doc2 was deleted
.watchFilters([query1], [doc1.key]) // doc2 was deleted
.watchSnapshots(2000)
.expectEvents(query1, { fromCache: true })
// The SDK is unable to re-run the query, and does not remove doc2
Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/test/unit/specs/limbo_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ describeSpec('Limbo Documents:', [], () => {
// documents that changed since the resume token. This will cause it
// to just send the docBs with an existence filter with a count of 3.
.watchSends({ affects: [query1] }, docB1, docB2, docB3)
.watchFilters([query1], docB1.key, docB2.key, docB3.key)
.watchFilters([query1], [docB1.key, docB2.key, docB3.key])
.watchSnapshots(1001)
.expectEvents(query1, {
added: [docB1, docB2, docB3],
Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/test/unit/specs/limit_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ describeSpec('Limits:', [], () => {
// we receive an existence filter, which indicates that our view is
// out of sync.
.watchSends({ affects: [limitQuery] }, secondDocument)
.watchFilters([limitQuery], secondDocument.key)
.watchFilters([limitQuery], [secondDocument.key])
.watchSnapshots(1004)
.expectActiveTargets({ query: limitQuery, resumeToken: '' })
.watchRemoves(limitQuery)
Expand Down
12 changes: 7 additions & 5 deletions packages/firestore/test/unit/specs/spec_builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { DocumentKey } from '../../../src/model/document_key';
import { FieldIndex } from '../../../src/model/field_index';
import { JsonObject } from '../../../src/model/object_value';
import { ResourcePath } from '../../../src/model/path';
import { BloomFilter as ProtoBloomFilter } from '../../../src/protos/firestore_proto_api';
import {
isPermanentWriteError,
mapCodeFromRpcCode,
Expand Down Expand Up @@ -769,18 +770,19 @@ export class SpecBuilder {
return this;
}

watchFilters(queries: Query[], ...docs: DocumentKey[]): this {
watchFilters(
queries: Query[],
docs: DocumentKey[] = [],
bloomFilter?: ProtoBloomFilter
): this {
this.nextStep();
const targetIds = queries.map(query => {
return this.getTargetId(query);
});
const keys = docs.map(key => {
return key.path.canonicalString();
});
const filter: SpecWatchFilter = [targetIds] as SpecWatchFilter;
for (const key of keys) {
filter.push(key);
}
const filter = { targetIds, keys, bloomFilter } as SpecWatchFilter;
this.currentStep = {
watchFilter: filter
};
Expand Down
15 changes: 6 additions & 9 deletions packages/firestore/test/unit/specs/spec_test_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -693,13 +693,12 @@ abstract class TestRunner {
}

private doWatchFilter(watchFilter: SpecWatchFilter): Promise<void> {
const targetIds: TargetId[] = watchFilter[0];
const { targetIds, keys, bloomFilter } = watchFilter;
debugAssert(
targetIds.length === 1,
'ExistenceFilters currently support exactly one target only.'
);
const keys = watchFilter.slice(1);
const filter = new ExistenceFilter(keys.length);
const filter = new ExistenceFilter(keys.length, bloomFilter);
const change = new ExistenceFilterChange(targetIds[0], filter);
return this.doWatchEvent(change);
}
Expand Down Expand Up @@ -1577,14 +1576,12 @@ export interface SpecClientState {
}

/**
* [[<target-id>, ...], <key>, ...]
* Note that the last parameter is really of type ...string (spread operator)
* The filter is based of a list of keys to match in the existence filter
*/
export interface SpecWatchFilter
extends Array<TargetId[] | string | undefined> {
'0': TargetId[];
'1': string | undefined;
export interface SpecWatchFilter {
targetIds: TargetId[];
keys: string[];
bloomFilter?: api.BloomFilter;
}

export type SpecLimitType = 'LimitToFirst' | 'LimitToLast';
Expand Down
8 changes: 6 additions & 2 deletions packages/firestore/test/util/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,19 @@ export function existenceFilterEvent(
targetId: number,
syncedKeys: DocumentKeySet,
remoteCount: number,
snapshotVersion: number
snapshotVersion: number,
bloomFilter?: api.BloomFilter
): RemoteEvent {
const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => syncedKeys,
getTargetDataForTarget: targetId =>
targetData(targetId, TargetPurpose.Listen, 'foo')
});
aggregator.handleExistenceFilter(
new ExistenceFilterChange(targetId, new ExistenceFilter(remoteCount))
new ExistenceFilterChange(
targetId,
new ExistenceFilter(remoteCount, bloomFilter)
)
);
return aggregator.createRemoteEvent(version(snapshotVersion));
}
Expand Down
3 changes: 2 additions & 1 deletion packages/firestore/test/util/spec_test_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ export function encodeWatchChange(
if (watchChange instanceof ExistenceFilterChange) {
return {
filter: {
targetId: watchChange.targetId,
count: watchChange.existenceFilter.count,
targetId: watchChange.targetId
unchangedNames: watchChange.existenceFilter.unchangedNames
}
};
}
Expand Down

0 comments on commit a3fb711

Please sign in to comment.