Skip to content

Commit

Permalink
Aggregation implementation and test improvements (#7170)
Browse files Browse the repository at this point in the history
* Updating sum/avg tests.
* Back-porting: return aggregation results as a map from the remote layer instead of as an ObjectValue.
* Add long-alias support for aggregations.
  • Loading branch information
MarkDuckworth authored Apr 25, 2023
1 parent 756db42 commit 510c9b5
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 108 deletions.
5 changes: 5 additions & 0 deletions .changeset/olive-goats-greet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@firebase/firestore": patch
---

Simplified the internal handling of aggregation results.
7 changes: 3 additions & 4 deletions packages/firestore/src/api/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import { AggregateImpl } from '../core/aggregate';
import { firestoreClientRunAggregateQuery } from '../core/firestore_client';
import { count } from '../lite-api/aggregate';
import { AggregateQuerySnapshot } from '../lite-api/aggregate_types';
import { AggregateAlias } from '../model/aggregate_alias';
import { ObjectValue } from '../model/object_value';
import { ApiClientObjectMap, Value } from '../protos/firestore_proto_api';
import { cast } from '../util/input_validation';
import { mapToArray } from '../util/obj';

Expand Down Expand Up @@ -110,7 +109,7 @@ export function getAggregateFromServer<T extends AggregateSpec>(

const internalAggregates = mapToArray(aggregateSpec, (aggregate, alias) => {
return new AggregateImpl(
new AggregateAlias(alias),
alias,
aggregate._aggregateType,
aggregate._internalFieldPath
);
Expand All @@ -136,7 +135,7 @@ export function getAggregateFromServer<T extends AggregateSpec>(
function convertToAggregateQuerySnapshot<T extends AggregateSpec>(
firestore: Firestore,
query: Query<unknown>,
aggregateResult: ObjectValue
aggregateResult: ApiClientObjectMap<Value>
): AggregateQuerySnapshot<T> {
const userDataWriter = new ExpUserDataWriter(firestore);
const querySnapshot = new AggregateQuerySnapshot<T>(
Expand Down
5 changes: 2 additions & 3 deletions packages/firestore/src/core/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* limitations under the License.
*/

import { AggregateAlias } from '../model/aggregate_alias';
import { FieldPath } from '../model/path';

/**
Expand All @@ -29,7 +28,7 @@ export type AggregateType = 'count' | 'avg' | 'sum';
*/
export interface Aggregate {
readonly fieldPath?: FieldPath;
readonly alias: AggregateAlias;
readonly alias: string;
readonly aggregateType: AggregateType;
}

Expand All @@ -38,7 +37,7 @@ export interface Aggregate {
*/
export class AggregateImpl implements Aggregate {
constructor(
readonly alias: AggregateAlias,
readonly alias: string,
readonly aggregateType: AggregateType,
readonly fieldPath?: FieldPath
) {}
Expand Down
6 changes: 3 additions & 3 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import { Document } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { FieldIndex } from '../model/field_index';
import { Mutation } from '../model/mutation';
import { ObjectValue } from '../model/object_value';
import { toByteStreamReader } from '../platform/byte_stream_reader';
import { newSerializer } from '../platform/serializer';
import { newTextEncoder } from '../platform/text_serializer';
import { ApiClientObjectMap, Value } from '../protos/firestore_proto_api';
import { Datastore, invokeRunAggregationQueryRpc } from '../remote/datastore';
import {
RemoteStore,
Expand Down Expand Up @@ -526,8 +526,8 @@ export function firestoreClientRunAggregateQuery(
client: FirestoreClient,
query: Query,
aggregates: Aggregate[]
): Promise<ObjectValue> {
const deferred = new Deferred<ObjectValue>();
): Promise<ApiClientObjectMap<Value>> {
const deferred = new Deferred<ApiClientObjectMap<Value>>();

client.asyncQueue.enqueueAndForget(async () => {
// TODO (sum/avg) should we update this to use the event manager?
Expand Down
7 changes: 3 additions & 4 deletions packages/firestore/src/lite-api/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
import { deepEqual } from '@firebase/util';

import { AggregateImpl } from '../core/aggregate';
import { AggregateAlias } from '../model/aggregate_alias';
import { ObjectValue } from '../model/object_value';
import { ApiClientObjectMap, Value } from '../protos/firestore_proto_api';
import { invokeRunAggregationQueryRpc } from '../remote/datastore';
import { cast } from '../util/input_validation';
import { mapToArray } from '../util/obj';
Expand Down Expand Up @@ -96,7 +95,7 @@ export function getAggregate<T extends AggregateSpec>(

const internalAggregates = mapToArray(aggregateSpec, (aggregate, alias) => {
return new AggregateImpl(
new AggregateAlias(alias),
alias,
aggregate._aggregateType,
aggregate._internalFieldPath
);
Expand All @@ -115,7 +114,7 @@ export function getAggregate<T extends AggregateSpec>(
function convertToAggregateQuerySnapshot<T extends AggregateSpec>(
firestore: Firestore,
query: Query<unknown>,
aggregateResult: ObjectValue
aggregateResult: ApiClientObjectMap<Value>
): AggregateQuerySnapshot<T> {
const userDataWriter = new LiteUserDataWriter(firestore);
const querySnapshot = new AggregateQuerySnapshot<T>(
Expand Down
8 changes: 4 additions & 4 deletions packages/firestore/src/lite-api/aggregate_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/

import { AggregateType } from '../core/aggregate';
import { ObjectValue } from '../model/object_value';
import { FieldPath as InternalFieldPath } from '../model/path';
import { ApiClientObjectMap, Value } from '../protos/firestore_proto_api';

import { Query } from './reference';
import { AbstractUserDataWriter } from './user_data_writer';
Expand Down Expand Up @@ -85,7 +85,7 @@ export class AggregateQuerySnapshot<T extends AggregateSpec> {
constructor(
query: Query<unknown>,
private readonly _userDataWriter: AbstractUserDataWriter,
private readonly _data: ObjectValue
private readonly _data: ApiClientObjectMap<Value>
) {
this.query = query;
}
Expand All @@ -102,8 +102,8 @@ export class AggregateQuerySnapshot<T extends AggregateSpec> {
* query.
*/
data(): AggregateSpecData<T> {
return this._userDataWriter.convertValue(
this._data.value
return this._userDataWriter.convertObjectMap(
this._data
) as AggregateSpecData<T>;
}
}
14 changes: 13 additions & 1 deletion packages/firestore/src/lite-api/user_data_writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import {
import { TypeOrder } from '../model/type_order';
import { typeOrder } from '../model/values';
import {
ApiClientObjectMap,
ArrayValue as ProtoArrayValue,
LatLng as ProtoLatLng,
MapValue as ProtoMapValue,
Timestamp as ProtoTimestamp,
Value,
Value as ProtoValue
} from '../protos/firestore_proto_api';
import { isValidResourceName } from '../remote/serializer';
Expand Down Expand Up @@ -91,9 +93,19 @@ export abstract class AbstractUserDataWriter {
private convertObject(
mapValue: ProtoMapValue,
serverTimestampBehavior: ServerTimestampBehavior
): DocumentData {
return this.convertObjectMap(mapValue.fields, serverTimestampBehavior);
}

/**
* @internal
*/
convertObjectMap(
fields: ApiClientObjectMap<Value> | undefined,
serverTimestampBehavior: ServerTimestampBehavior = 'none'
): DocumentData {
const result: DocumentData = {};
forEach(mapValue.fields, (key, value) => {
forEach(fields, (key, value) => {
result[key] = this.convertValue(value, serverTimestampBehavior);
});
return result;
Expand Down
48 changes: 0 additions & 48 deletions packages/firestore/src/model/aggregate_alias.ts

This file was deleted.

38 changes: 31 additions & 7 deletions packages/firestore/src/remote/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ import { Query, queryToTarget } from '../core/query';
import { Document } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation } from '../model/mutation';
import { ObjectValue } from '../model/object_value';
import {
ApiClientObjectMap,
BatchGetDocumentsRequest as ProtoBatchGetDocumentsRequest,
BatchGetDocumentsResponse as ProtoBatchGetDocumentsResponse,
RunAggregationQueryRequest as ProtoRunAggregationQueryRequest,
RunAggregationQueryResponse as ProtoRunAggregationQueryResponse,
RunQueryRequest as ProtoRunQueryRequest,
RunQueryResponse as ProtoRunQueryResponse
RunQueryResponse as ProtoRunQueryResponse,
Value
} from '../protos/firestore_proto_api';
import { debugAssert, debugCast, hardAssert } from '../util/assert';
import { AsyncQueue } from '../util/async_queue';
import { Code, FirestoreError } from '../util/error';
import { isNullOrUndefined } from '../util/types';

import { Connection } from './connection';
import {
Expand All @@ -50,8 +52,7 @@ import {
toMutation,
toName,
toQueryTarget,
toRunAggregationQueryRequest,
fromAggregationResult
toRunAggregationQueryRequest
} from './serializer';

/**
Expand Down Expand Up @@ -243,9 +244,9 @@ export async function invokeRunAggregationQueryRpc(
datastore: Datastore,
query: Query,
aggregates: Aggregate[]
): Promise<ObjectValue> {
): Promise<ApiClientObjectMap<Value>> {
const datastoreImpl = debugCast(datastore, DatastoreImpl);
const request = toRunAggregationQueryRequest(
const { request, aliasMap } = toRunAggregationQueryRequest(
datastoreImpl.serializer,
queryToTarget(query),
aggregates
Expand All @@ -267,8 +268,31 @@ export async function invokeRunAggregationQueryRpc(
filteredResult.length === 1,
'Aggregation fields are missing from result.'
);
debugAssert(
!isNullOrUndefined(filteredResult[0].result),
'aggregationQueryResponse.result'
);
debugAssert(
!isNullOrUndefined(filteredResult[0].result.aggregateFields),
'aggregationQueryResponse.result.aggregateFields'
);

// Remap the short-form aliases that were sent to the server
// to the client-side aliases. Users will access the results
// using the client-side alias.
const unmappedAggregateFields = filteredResult[0].result?.aggregateFields;
const remappedFields = Object.keys(unmappedAggregateFields).reduce<
ApiClientObjectMap<Value>
>((accumulator, key) => {
debugAssert(
!isNullOrUndefined(aliasMap[key]),
`'${key}' not present in aliasMap result`
);
accumulator[aliasMap[key]] = unmappedAggregateFields[key]!;
return accumulator;
}, {});

return fromAggregationResult(filteredResult[0]);
return remappedFields;
}

export function newPersistentWriteStream(
Expand Down
48 changes: 23 additions & 25 deletions packages/firestore/src/remote/serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ import {
Precondition as ProtoPrecondition,
QueryTarget as ProtoQueryTarget,
RunAggregationQueryRequest as ProtoRunAggregationQueryRequest,
RunAggregationQueryResponse as ProtoRunAggregationQueryResponse,
Aggregation as ProtoAggregation,
Status as ProtoStatus,
Target as ProtoTarget,
Expand Down Expand Up @@ -438,22 +437,6 @@ export function fromDocument(
return hasCommittedMutations ? result.setHasCommittedMutations() : result;
}

export function fromAggregationResult(
aggregationQueryResponse: ProtoRunAggregationQueryResponse
): ObjectValue {
assertPresent(
aggregationQueryResponse.result,
'aggregationQueryResponse.result'
);
assertPresent(
aggregationQueryResponse.result.aggregateFields,
'aggregationQueryResponse.result.aggregateFields'
);
return new ObjectValue({
mapValue: { fields: aggregationQueryResponse.result?.aggregateFields }
});
}

function fromFound(
serializer: JsonProtoSerializer,
doc: ProtoBatchGetDocumentsResponse
Expand Down Expand Up @@ -908,26 +891,38 @@ export function toRunAggregationQueryRequest(
serializer: JsonProtoSerializer,
target: Target,
aggregates: Aggregate[]
): ProtoRunAggregationQueryRequest {
): {
request: ProtoRunAggregationQueryRequest;
aliasMap: Record<string, string>;
} {
const queryTarget = toQueryTarget(serializer, target);
const aliasMap: Record<string, string> = {};

const aggregations: ProtoAggregation[] = [];
let aggregationNum = 0;

aggregates.forEach(aggregate => {
// Map all client-side aliases to a unique short-form
// alias. This avoids issues with client-side aliases that
// exceed the 1500-byte string size limit.
const serverAlias = `aggregate_${aggregationNum++}`;
aliasMap[serverAlias] = aggregate.alias;

if (aggregate.aggregateType === 'count') {
aggregations.push({
alias: aggregate.alias.canonicalString(),
alias: serverAlias,
count: {}
});
} else if (aggregate.aggregateType === 'avg') {
aggregations.push({
alias: aggregate.alias.canonicalString(),
alias: serverAlias,
avg: {
field: toFieldPathReference(aggregate.fieldPath!)
}
});
} else if (aggregate.aggregateType === 'sum') {
aggregations.push({
alias: aggregate.alias.canonicalString(),
alias: serverAlias,
sum: {
field: toFieldPathReference(aggregate.fieldPath!)
}
Expand All @@ -936,11 +931,14 @@ export function toRunAggregationQueryRequest(
});

return {
structuredAggregationQuery: {
aggregations,
structuredQuery: queryTarget.structuredQuery
request: {
structuredAggregationQuery: {
aggregations,
structuredQuery: queryTarget.structuredQuery
},
parent: queryTarget.parent
},
parent: queryTarget.parent
aliasMap
};
}

Expand Down
Loading

0 comments on commit 510c9b5

Please sign in to comment.