Skip to content

Commit

Permalink
Support for Aggregate Queries (#4207)
Browse files Browse the repository at this point in the history
* Support for Aggregate Queries

* improve pg and coverage

* Mongo 3.4 aggregates and tests

* replace _id with objectId

* improve tests for objectId

* project with group query

* typo
  • Loading branch information
dplewis authored and flovilmart committed Nov 12, 2017
1 parent 4e207d3 commit 7223add
Show file tree
Hide file tree
Showing 8 changed files with 675 additions and 1 deletion.
412 changes: 412 additions & 0 deletions spec/ParseQuery.Aggregate.spec.js

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/Adapters/Storage/Mongo/MongoCollection.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ export default class MongoCollection {
return countOperation;
}

distinct(field, query) {
return this._mongoCollection.distinct(field, query);
}

aggregate(pipeline, { maxTimeMS, readPreference } = {}) {
return this._mongoCollection.aggregate(pipeline, { maxTimeMS, readPreference }).toArray();
}

insertOne(object) {
return this._mongoCollection.insertOne(object);
}
Expand Down
21 changes: 21 additions & 0 deletions src/Adapters/Storage/Mongo/MongoStorageAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,27 @@ export class MongoStorageAdapter {
}));
}

distinct(className, schema, query, fieldName) {
schema = convertParseSchemaToMongoSchema(schema);
return this._adaptiveCollection(className)
.then(collection => collection.distinct(fieldName, transformWhere(className, query, schema)));
}

aggregate(className, pipeline, readPreference) {
readPreference = this._parseReadPreference(readPreference);
return this._adaptiveCollection(className)
.then(collection => collection.aggregate(pipeline, { readPreference, maxTimeMS: this._maxTimeMS }))
.then(results => {
results.forEach(result => {
if (result.hasOwnProperty('_id')) {
result.objectId = result._id;
delete result._id;
}
});
return results;
});
}

_parseReadPreference(readPreference) {
if (readPreference) {
switch (readPreference) {
Expand Down
138 changes: 138 additions & 0 deletions src/Adapters/Storage/Postgres/PostgresStorageAdapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ const transformDotField = (fieldName) => {
return name;
}

const transformAggregateField = (fieldName) => {
return fieldName.substr(1);
}

const validateKeys = (object) => {
if (typeof object == 'object') {
for (const key in object) {
Expand Down Expand Up @@ -1366,6 +1370,140 @@ export class PostgresStorageAdapter {
});
}

distinct(className, schema, query, fieldName) {
debug('distinct', className, query);
let field = fieldName;
let column = fieldName;
if (fieldName.indexOf('.') >= 0) {
field = transformDotFieldToComponents(fieldName).join('->');
column = fieldName.split('.')[0];
}
const isArrayField = schema.fields
&& schema.fields[fieldName]
&& schema.fields[fieldName].type === 'Array';
const values = [field, column, className];
const where = buildWhereClause({ schema, query, index: 4 });
values.push(...where.values);

const wherePattern = where.pattern.length > 0 ? `WHERE ${where.pattern}` : '';
let qs = `SELECT DISTINCT ON ($1:raw) $2:raw FROM $3:name ${wherePattern}`;
if (isArrayField) {
qs = `SELECT distinct jsonb_array_elements($1:raw) as $2:raw FROM $3:name ${wherePattern}`;
}
debug(qs, values);
return this._client.any(qs, values)
.catch(() => [])
.then((results) => {
if (fieldName.indexOf('.') === -1) {
return results.map(object => object[field]);
}
const child = fieldName.split('.')[1];
return results.map(object => object[column][child]);
});
}

aggregate(className, pipeline) {
debug('aggregate', className, pipeline);
const values = [className];
let columns = [];
let countField = null;
let wherePattern = '';
let limitPattern = '';
let skipPattern = '';
let sortPattern = '';
let groupPattern = '';
for (let i = 0; i < pipeline.length; i += 1) {
const stage = pipeline[i];
if (stage.$group) {
for (const field in stage.$group) {
const value = stage.$group[field];
if (value === null || value === undefined) {
continue;
}
if (field === '_id') {
columns.push(`${transformAggregateField(value)} AS "objectId"`);
groupPattern = `GROUP BY ${transformAggregateField(value)}`;
continue;
}
if (value.$sum) {
if (typeof value.$sum === 'string') {
columns.push(`SUM(${transformAggregateField(value.$sum)}) AS "${field}"`);
} else {
countField = field;
columns.push(`COUNT(*) AS "${field}"`);
}
}
if (value.$max) {
columns.push(`MAX(${transformAggregateField(value.$max)}) AS "${field}"`);
}
if (value.$min) {
columns.push(`MIN(${transformAggregateField(value.$min)}) AS "${field}"`);
}
if (value.$avg) {
columns.push(`AVG(${transformAggregateField(value.$avg)}) AS "${field}"`);
}
}
columns.join(',');
} else {
columns.push('*');
}
if (stage.$project) {
if (columns.includes('*')) {
columns = [];
}
for (const field in stage.$project) {
const value = stage.$project[field];
if ((value === 1 || value === true)) {
columns.push(field);
}
}
}
if (stage.$match) {
const patterns = [];
for (const field in stage.$match) {
const value = stage.$match[field];
Object.keys(ParseToPosgresComparator).forEach(cmp => {
if (value[cmp]) {
const pgComparator = ParseToPosgresComparator[cmp];
patterns.push(`${field} ${pgComparator} ${value[cmp]}`);
}
});
}
wherePattern = patterns.length > 0 ? `WHERE ${patterns.join(' ')}` : '';
}
if (stage.$limit) {
limitPattern = `LIMIT ${stage.$limit}`;
}
if (stage.$skip) {
skipPattern = `OFFSET ${stage.$skip}`;
}
if (stage.$sort) {
const sort = stage.$sort;
const sorting = Object.keys(sort).map((key) => {
if (sort[key] === 1) {
return `"${key}" ASC`;
}
return `"${key}" DESC`;
}).join(',');
sortPattern = sort !== undefined && Object.keys(sort).length > 0 ? `ORDER BY ${sorting}` : '';
}
}

const qs = `SELECT ${columns} FROM $1:name ${wherePattern} ${sortPattern} ${limitPattern} ${skipPattern} ${groupPattern}`;
debug(qs, values);
return this._client.any(qs, values).then(results => {
if (countField) {
results[0][countField] = parseInt(results[0][countField], 10);
}
results.forEach(result => {
if (!result.hasOwnProperty('objectId')) {
result.objectId = null;
}
});
return results;
});
}

performInitialization({ VolatileClassesSchemas }) {
debug('performInitialization');
const promises = VolatileClassesSchemas.map((schema) => {
Expand Down
14 changes: 14 additions & 0 deletions src/Controllers/DatabaseController.js
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,8 @@ DatabaseController.prototype.find = function(className, query, {
count,
keys,
op,
distinct,
pipeline,
readPreference
} = {}) {
const isMaster = acl === undefined;
Expand Down Expand Up @@ -853,6 +855,18 @@ DatabaseController.prototype.find = function(className, query, {
} else {
return this.adapter.count(className, schema, query, readPreference);
}
} else if (distinct) {
if (!classExists) {
return [];
} else {
return this.adapter.distinct(className, schema, query, distinct);
}
} else if (pipeline) {
if (!classExists) {
return [];
} else {
return this.adapter.aggregate(className, pipeline, readPreference);
}
} else {
if (!classExists) {
return [];
Expand Down
4 changes: 3 additions & 1 deletion src/ParseServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { SessionsRouter } from './Routers/SessionsRouter';
import { UsersRouter } from './Routers/UsersRouter';
import { PurgeRouter } from './Routers/PurgeRouter';
import { AudiencesRouter } from './Routers/AudiencesRouter';
import { AggregateRouter } from './Routers/AggregateRouter';

import { ParseServerRESTController } from './ParseServerRESTController';
import * as controllers from './Controllers';
Expand Down Expand Up @@ -197,7 +198,8 @@ class ParseServer {
new PurgeRouter(),
new HooksRouter(),
new CloudCodeRouter(),
new AudiencesRouter()
new AudiencesRouter(),
new AggregateRouter()
];

const routes = routers.reduce((memo, router) => {
Expand Down
2 changes: 2 additions & 0 deletions src/RestQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ function RestQuery(config, auth, className, restWhere = {}, restOptions = {}, cl
case 'count':
this.doCount = true;
break;
case 'distinct':
case 'pipeline':
case 'skip':
case 'limit':
case 'readPreference':
Expand Down
77 changes: 77 additions & 0 deletions src/Routers/AggregateRouter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import ClassesRouter from './ClassesRouter';
import rest from '../rest';
import * as middleware from '../middlewares';
import Parse from 'parse/node';

const ALLOWED_KEYS = [
'where',
'distinct',
'project',
'match',
'redact',
'limit',
'skip',
'unwind',
'group',
'sample',
'sort',
'geoNear',
'lookup',
'out',
'indexStats',
'facet',
'bucket',
'bucketAuto',
'sortByCount',
'addFields',
'replaceRoot',
'count',
'graphLookup',
];

export class AggregateRouter extends ClassesRouter {

handleFind(req) {
const body = Object.assign(req.body, ClassesRouter.JSONFromQuery(req.query));
const options = {};
const pipeline = [];

for (const key in body) {
if (ALLOWED_KEYS.indexOf(key) === -1) {
throw new Parse.Error(Parse.Error.INVALID_QUERY, `Invalid parameter for query: ${key}`);
}
if (key === 'group') {
if (body[key].hasOwnProperty('_id')) {
throw new Parse.Error(
Parse.Error.INVALID_QUERY,
`Invalid parameter for query: group. Please use objectId instead of _id`
);
}
if (!body[key].hasOwnProperty('objectId')) {
throw new Parse.Error(
Parse.Error.INVALID_QUERY,
`Invalid parameter for query: group. objectId is required`
);
}
body[key]._id = body[key].objectId;
delete body[key].objectId;
}
pipeline.push({ [`$${key}`]: body[key] });
}
if (body.distinct) {
options.distinct = String(body.distinct);
}
options.pipeline = pipeline;
if (typeof body.where === 'string') {
body.where = JSON.parse(body.where);
}
return rest.find(req.config, req.auth, this.className(req), body.where, options, req.info.clientSDK)
.then((response) => { return { response }; });
}

mountRoutes() {
this.route('GET','/aggregate/:className', middleware.promiseEnforceMasterKeyAccess, req => { return this.handleFind(req); });
}
}

export default AggregateRouter;

0 comments on commit 7223add

Please sign in to comment.