Skip to content

Commit

Permalink
Add support for LIMIT and OFFSET in queries (Azure#306)
Browse files Browse the repository at this point in the history
  • Loading branch information
southpolesteve authored May 3, 2019
1 parent 47fcaa4 commit 8ad3d5c
Show file tree
Hide file tree
Showing 20 changed files with 262 additions and 357 deletions.
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { Response } from "../../request";
import { AverageAggregator, CountAggregator, MaxAggregator, MinAggregator, SumAggregator } from "../Aggregators";
import { ExecutionContext } from "../ExecutionContext";
import { getInitialHeader } from "../headerUtils";
import { IExecutionContext } from "../IExecutionContext";
import { CosmosHeaders } from "../index";
import { IEndpointComponent } from "./IEndpointComponent";

/** @hidden */
export class AggregateEndpointComponent implements IEndpointComponent {
export class AggregateEndpointComponent implements ExecutionContext {
private toArrayTempResources: any[];
private aggregateValues: any[];
private aggregateValuesIndex: number;
Expand All @@ -18,7 +17,7 @@ export class AggregateEndpointComponent implements IEndpointComponent {
* @param { object } executionContext - Underlying Execution Context
* @ignore
*/
constructor(private executionContext: IExecutionContext, aggregateOperators: string[]) {
constructor(private executionContext: ExecutionContext, aggregateOperators: string[]) {
// TODO: any
this.executionContext = executionContext;
this.localAggregators = [];
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Response } from "../../request";
import { ExecutionContext } from "../ExecutionContext";
import { getInitialHeader } from "../headerUtils";

/** @hidden */
export class OffsetLimitEndpointComponent implements ExecutionContext {
constructor(private executionContext: ExecutionContext, private offset: number, private limit: number) {}

public async nextItem(): Promise<Response<any>> {
if (this.offset > 0) {
// Grab next item but ignore the result. We only need the headers
const { headers } = await this.executionContext.nextItem();
this.offset--;
return { result: undefined, headers };
}
if (this.limit > 0) {
const response = await this.executionContext.nextItem();
this.limit--;
return response;
}
// If both limit and offset are 0, return nothing
return { result: undefined, headers: getInitialHeader() };
}

public async current(): Promise<Response<any>> {
if (this.offset > 0) {
const current = await this.executionContext.current();
return { result: undefined, headers: current.headers };
}
return this.executionContext.current();
}

public hasMoreResults() {
return (this.offset > 0 || this.limit > 0) && this.executionContext.hasMoreResults();
}
}
Original file line number Diff line number Diff line change
@@ -1,53 +1,40 @@
import { Response } from "../../request";
import { IExecutionContext } from "../IExecutionContext";
import { IEndpointComponent } from "./IEndpointComponent";
import { ExecutionContext } from "../ExecutionContext";

/** @hidden */
export class OrderByEndpointComponent implements IEndpointComponent {
export class OrderByEndpointComponent implements ExecutionContext {
/**
* Represents an endpoint in handling an order by query. For each processed orderby \
* result it returns 'payload' item of the result
* @constructor OrderByEndpointComponent
* @param {object} executionContext - Underlying Execution Context
* @ignore
*/
constructor(private executionContext: IExecutionContext) {}
constructor(private executionContext: ExecutionContext) {}
/**
* Execute a provided function on the next element in the OrderByEndpointComponent.
* @memberof OrderByEndpointComponent
* @instance
* @param {callback} callback - Function to execute for each element. the function \
* takes two parameters error, element.
*/
public async nextItem(): Promise<Response<any>> {
try {
const { result: item, headers } = await this.executionContext.nextItem();
return {
result: item !== undefined ? item.payload : undefined,
headers
};
} catch (err) {
throw err;
}
const { result: item, headers } = await this.executionContext.nextItem();
return {
result: item !== undefined ? item.payload : undefined,
headers
};
}

/**
* Retrieve the current element on the OrderByEndpointComponent.
* @memberof OrderByEndpointComponent
* @instance
* @param {callback} callback - Function to execute for the current element. \
* the function takes two parameters error, element.
*/
public async current(): Promise<Response<any>> {
try {
const { result: item, headers } = await this.executionContext.current();
return {
result: item !== undefined ? item.payload : undefined,
headers
};
} catch (err) {
throw err;
}
const { result: item, headers } = await this.executionContext.current();
return {
result: item !== undefined ? item.payload : undefined,
headers
};
}

/**
Expand Down

This file was deleted.

4 changes: 0 additions & 4 deletions src/queryExecutionContext/EndpointComponent/index.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Response } from "../request";

/** @hidden */
export interface IExecutionContext {
export interface ExecutionContext {
nextItem: () => Promise<Response<any>>;
current: () => Promise<Response<any>>;
hasMoreResults: () => boolean;
Expand Down
4 changes: 2 additions & 2 deletions src/queryExecutionContext/defaultQueryExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Constants } from "../common";
import { ClientSideMetrics, QueryMetrics } from "../queryMetrics";
import { Response } from "../request";
import { getInitialHeader } from "./headerUtils";
import { IExecutionContext } from "./index";
import { ExecutionContext } from "./index";

/** @hidden */
export type FetchFunctionCallback = (options: any) => Promise<Response<any>>;
Expand All @@ -15,7 +15,7 @@ enum STATES {
}

/** @hidden */
export class DefaultQueryExecutionContext implements IExecutionContext {
export class DefaultQueryExecutionContext implements ExecutionContext {
private static readonly STATES = STATES;
private resources: any; // TODO: any resources
private currentIndex: number;
Expand Down
3 changes: 1 addition & 2 deletions src/queryExecutionContext/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ export * from "./headerUtils";
export * from "./SqlQuerySpec";
export * from "./defaultQueryExecutionContext";
export * from "./Aggregators";
export * from "./EndpointComponent";
export * from "./documentProducer";
export * from "./FetchResult";
export * from "./orderByDocumentProducerComparator";
export * from "./IExecutionContext";
export * from "./ExecutionContext";
export * from "./parallelQueryExecutionContextBase";
export * from "./parallelQueryExecutionContext";
export * from "./orderByQueryExecutionContext";
Expand Down
4 changes: 2 additions & 2 deletions src/queryExecutionContext/orderByQueryExecutionContext.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { ClientContext } from "../ClientContext";
import { PartitionedQueryExecutionInfo } from "../request/ErrorResponse";
import { DocumentProducer } from "./documentProducer";
import { IExecutionContext } from "./IExecutionContext";
import { ExecutionContext } from "./ExecutionContext";
import { OrderByDocumentProducerComparator } from "./orderByDocumentProducerComparator";
import { ParallelQueryExecutionContextBase } from "./parallelQueryExecutionContextBase";

/** @hidden */
export class OrderByQueryExecutionContext extends ParallelQueryExecutionContextBase implements IExecutionContext {
export class OrderByQueryExecutionContext extends ParallelQueryExecutionContextBase implements ExecutionContext {
private orderByComparator: any;
/**
* Provides the OrderByQueryExecutionContext.
Expand Down
4 changes: 2 additions & 2 deletions src/queryExecutionContext/parallelQueryExecutionContext.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { ClientContext } from "../ClientContext";
import { PartitionedQueryExecutionInfo } from "../request/ErrorResponse";
import { DocumentProducer } from "./documentProducer";
import { IExecutionContext } from "./IExecutionContext";
import { ExecutionContext } from "./ExecutionContext";
import { ParallelQueryExecutionContextBase } from "./parallelQueryExecutionContextBase";

/** @hidden */
export class ParallelQueryExecutionContext extends ParallelQueryExecutionContextBase implements IExecutionContext {
export class ParallelQueryExecutionContext extends ParallelQueryExecutionContextBase implements ExecutionContext {
/**
* Provides the ParallelQueryExecutionContext.
* This class is capable of handling parallelized queries and dervives from ParallelQueryExecutionContextBase.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { QueryRange } from "../routing/QueryRange";
import { PARITIONKEYRANGE, SmartRoutingMapProvider } from "../routing/smartRoutingMapProvider";
import { CosmosHeaders } from "./CosmosHeaders";
import { DocumentProducer } from "./documentProducer";
import { ExecutionContext } from "./ExecutionContext";
import { getInitialHeader, mergeHeaders } from "./headerUtils";
import { IExecutionContext } from "./IExecutionContext";

/** @hidden */
export enum ParallelQueryExecutionContextBaseStates {
Expand All @@ -20,7 +20,7 @@ export enum ParallelQueryExecutionContextBaseStates {
}

/** @hidden */
export abstract class ParallelQueryExecutionContextBase implements IExecutionContext {
export abstract class ParallelQueryExecutionContextBase implements ExecutionContext {
private static readonly DEFAULT_PAGE_SIZE = 10;

private err: any;
Expand Down
26 changes: 15 additions & 11 deletions src/queryExecutionContext/pipelinedQueryExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@ import { ClientContext } from "../ClientContext";
import { Response } from "../request";
import { PartitionedQueryExecutionInfo } from "../request/ErrorResponse";
import { CosmosHeaders } from "./CosmosHeaders";
import {
AggregateEndpointComponent,
IEndpointComponent,
OrderByEndpointComponent,
TopEndpointComponent
} from "./EndpointComponent";
import { AggregateEndpointComponent } from "./EndpointComponent/AggregateEndpointComponent";
import { OffsetLimitEndpointComponent } from "./EndpointComponent/OffsetLimitEndpointComponent";
import { OrderByEndpointComponent } from "./EndpointComponent/OrderByEndpointComponent";
import { ExecutionContext } from "./ExecutionContext";
import { getInitialHeader, mergeHeaders } from "./headerUtils";
import { IExecutionContext } from "./IExecutionContext";
import { OrderByQueryExecutionContext } from "./orderByQueryExecutionContext";
import { ParallelQueryExecutionContext } from "./parallelQueryExecutionContext";

/** @hidden */
export class PipelinedQueryExecutionContext implements IExecutionContext {
export class PipelinedQueryExecutionContext implements ExecutionContext {
private fetchBuffer: any[];
private fetchMoreRespHeaders: CosmosHeaders;
private endpoint: IEndpointComponent;
private endpoint: ExecutionContext;
private pageSize: number;
private static DEFAULT_PAGE_SIZE = 10;
constructor(
Expand Down Expand Up @@ -63,10 +60,17 @@ export class PipelinedQueryExecutionContext implements IExecutionContext {
this.endpoint = new AggregateEndpointComponent(this.endpoint, aggregates);
}

// If top then add that to the pipeline
// If top then add that to the pipeline. TOP N is effectively OFFSET 0 LIMIT N
const top = partitionedQueryExecutionInfo.queryInfo.top;
if (typeof top === "number") {
this.endpoint = new TopEndpointComponent(this.endpoint, top);
this.endpoint = new OffsetLimitEndpointComponent(this.endpoint, 0, top);
}

// If offset+limit then add that to the pipeline
const limit = partitionedQueryExecutionInfo.queryInfo.limit;
const offset = partitionedQueryExecutionInfo.queryInfo.offset;
if (typeof limit === "number" && typeof offset === "number") {
this.endpoint = new OffsetLimitEndpointComponent(this.endpoint, offset, limit);
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/queryExecutionContext/proxyQueryExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ import { StatusCodes, SubStatusCodes } from "../common/statusCodes";
import { ErrorResponse, Response } from "../request";
import { PartitionedQueryExecutionInfo } from "../request/ErrorResponse";
import { DefaultQueryExecutionContext, FetchFunctionCallback } from "./defaultQueryExecutionContext";
import { IExecutionContext } from "./IExecutionContext";
import { ExecutionContext } from "./ExecutionContext";
import { PipelinedQueryExecutionContext } from "./pipelinedQueryExecutionContext";
import { SqlQuerySpec } from "./SqlQuerySpec";

/** @hidden */
export class ProxyQueryExecutionContext implements IExecutionContext {
private queryExecutionContext: IExecutionContext;
export class ProxyQueryExecutionContext implements ExecutionContext {
private queryExecutionContext: ExecutionContext;

constructor(
private clientContext: ClientContext,
Expand Down
Loading

0 comments on commit 8ad3d5c

Please sign in to comment.