Skip to content

Commit

Permalink
Feature/query pipeline rewrite (#32497)
Browse files Browse the repository at this point in the history
### Packages impacted by this PR


### Issues associated with this PR


### Describe the problem that is addressed by this PR


### What are the possible designs available to address the problem? If
there are more than one possible design, why was the one in this PR
chosen?


### Are there test cases added in this PR? _(If not, why?)_


### Provide a list of related PRs _(if any)_


### Command used to generate this PR:**_(Applicable only to SDK release
request PRs)_

### Checklists
- [ ] Added impacted package name to the issue description
- [ ] Does this PR needs any fixes in the SDK Generator?** _(If so,
create an Issue in the
[Autorest/typescript](https://github.com/Azure/autorest.typescript)
repository and link it here)_
- [ ] Added a changelog (if necessary)

---------

Co-authored-by: Manik Khandelwal <mkhandelwal@microsoft.com>
Co-authored-by: Aman Rao <amanrao@microsoft.com>
  • Loading branch information
3 people authored Jan 10, 2025
1 parent 859569e commit c3660e6
Show file tree
Hide file tree
Showing 19 changed files with 167 additions and 740 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@ import { getInitialHeader, mergeHeaders } from "../headerUtils";
import { emptyGroup, extractAggregateResult } from "./emptyGroup";
import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";

interface GroupByResponse {
result: GroupByResult;
headers: CosmosHeaders;
}

interface GroupByResult {
groupByItems: any[];
Expand All @@ -32,80 +28,6 @@ export class GroupByEndpointComponent implements ExecutionContext {
private readonly aggregateResultArray: any[] = [];
private completed: boolean = false;

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
// If we have a full result set, begin returning results
if (this.aggregateResultArray.length > 0) {
return {
result: this.aggregateResultArray.pop(),
headers: getInitialHeader(),
};
}

if (this.completed) {
return {
result: undefined,
headers: getInitialHeader(),
};
}

const aggregateHeaders = getInitialHeader();

while (this.executionContext.hasMoreResults()) {
// Grab the next result
const { result, headers } = (await this.executionContext.nextItem(
diagnosticNode,
)) as GroupByResponse;
mergeHeaders(aggregateHeaders, headers);

// If it exists, process it via aggregators
if (result) {
const group = result.groupByItems ? await hashObject(result.groupByItems) : emptyGroup;
const aggregators = this.groupings.get(group);
const payload = result.payload;
if (aggregators) {
// Iterator over all results in the payload
Object.keys(payload).map((key) => {
// in case the value of a group is null make sure we create a dummy payload with item2==null
const effectiveGroupByValue = payload[key]
? payload[key]
: new Map().set("item2", null);
const aggregateResult = extractAggregateResult(effectiveGroupByValue);
aggregators.get(key).aggregate(aggregateResult);
});
} else {
// This is the first time we have seen a grouping. Setup the initial result without aggregate values
const grouping = new Map();
this.groupings.set(group, grouping);
// Iterator over all results in the payload
Object.keys(payload).map((key) => {
const aggregateType = this.queryInfo.groupByAliasToAggregateType[key];
// Create a new aggregator for this specific aggregate field
const aggregator = createAggregator(aggregateType);
grouping.set(key, aggregator);
if (aggregateType) {
const aggregateResult = extractAggregateResult(payload[key]);
aggregator.aggregate(aggregateResult);
} else {
aggregator.aggregate(payload[key]);
}
});
}
}
}

for (const grouping of this.groupings.values()) {
const groupResult: any = {};
for (const [aggregateKey, aggregator] of grouping.entries()) {
groupResult[aggregateKey] = aggregator.getResult();
}
this.aggregateResultArray.push(groupResult);
}
this.completed = true;
return {
result: this.aggregateResultArray.pop(),
headers: aggregateHeaders,
};
}

public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults();
Expand All @@ -130,7 +52,7 @@ export class GroupByEndpointComponent implements ExecutionContext {
return { result: undefined, headers: aggregateHeaders };
}

for (const item of response.result) {
for (const item of response.result as GroupByResult[]) {
// If it exists, process it via aggregators
if (item) {
const group = item.groupByItems ? await hashObject(item.groupByItems) : emptyGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@ import { getInitialHeader, mergeHeaders } from "../headerUtils";
import { emptyGroup, extractAggregateResult } from "./emptyGroup";
import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";

interface GroupByResponse {
result: GroupByResult;
headers: CosmosHeaders;
}

interface GroupByResult {
groupByItems: any[];
payload: any;
Expand All @@ -36,80 +31,6 @@ export class GroupByValueEndpointComponent implements ExecutionContext {
this.aggregateType = this.queryInfo.aggregates[0];
}

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
// Start returning results if we have processed a full results set
if (this.aggregateResultArray.length > 0) {
return {
result: this.aggregateResultArray.pop(),
headers: getInitialHeader(),
};
}

if (this.completed) {
return {
result: undefined,
headers: getInitialHeader(),
};
}

const aggregateHeaders = getInitialHeader();

while (this.executionContext.hasMoreResults()) {
// Grab the next result
const { result, headers } = (await this.executionContext.nextItem(
diagnosticNode,
)) as GroupByResponse;
mergeHeaders(aggregateHeaders, headers);

// If it exists, process it via aggregators
if (result) {
let grouping: string = emptyGroup;
let payload: any = result;
if (result.groupByItems) {
// If the query contains a GROUP BY clause, it will have a payload property and groupByItems
payload = result.payload;
grouping = await hashObject(result.groupByItems);
}

const aggregator = this.aggregators.get(grouping);
if (!aggregator) {
// This is the first time we have seen a grouping so create a new aggregator
this.aggregators.set(grouping, createAggregator(this.aggregateType));
}

if (this.aggregateType) {
const aggregateResult = extractAggregateResult(payload[0]);
// if aggregate result is null, we need to short circuit aggregation and return undefined
if (aggregateResult === null) {
this.completed = true;
}
this.aggregators.get(grouping).aggregate(aggregateResult);
} else {
// Queries with no aggregates pass the payload directly to the aggregator
// Example: SELECT VALUE c.team FROM c GROUP BY c.team
this.aggregators.get(grouping).aggregate(payload);
}
}
}

// We bail early since we got an undefined result back `[{}]`
if (this.completed) {
return {
result: undefined,
headers: aggregateHeaders,
};
}
// If no results are left in the underlying execution context, convert our aggregate results to an array
for (const aggregator of this.aggregators.values()) {
this.aggregateResultArray.push(aggregator.getResult());
}
this.completed = true;
return {
result: this.aggregateResultArray.pop(),
headers: aggregateHeaders,
};
}

public hasMoreResults(): boolean {
return this.executionContext.hasMoreResults();
}
Expand All @@ -132,7 +53,7 @@ export class GroupByValueEndpointComponent implements ExecutionContext {
return { result: undefined, headers: aggregateHeaders };
}

for (const item of response.result) {
for (const item of (response.result as GroupByResult[])) {
if (item) {
let grouping: string = emptyGroup;
let payload: any = item;
Expand Down Expand Up @@ -176,7 +97,6 @@ export class GroupByValueEndpointComponent implements ExecutionContext {
} else {
// If no results are left in the underlying execution context, convert our aggregate results to an array
return this.generateAggregateResponse(aggregateHeaders);

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { getInitialHeader } from "../headerUtils";
import type { DiagnosticNodeInternal } from "../../diagnostics/DiagnosticNodeInternal";
import { hashObject } from "../../utils/hashObject";
import type { NonStreamingOrderByResult } from "../nonStreamingOrderByResult";
import type { NonStreamingOrderByResponse } from "../nonStreamingOrderByResponse";
import { FixedSizePriorityQueue } from "../../utils/fixedSizePriorityQueue";
import { NonStreamingOrderByMap } from "../../utils/nonStreamingOrderByMap";
import { OrderByComparator } from "../orderByComparator";
Expand Down Expand Up @@ -56,58 +55,6 @@ export class NonStreamingOrderByDistinctEndpointComponent implements ExecutionCo
);
}

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
let resHeaders = getInitialHeader();
// if size is 0, just return undefined to signal to more results. Valid if query is TOP 0 or LIMIT 0
if (this.priorityQueueBufferSize <= 0) {
return {
result: undefined,
headers: resHeaders,
};
}

// If there are more results in backend, keep filling map.
if (this.executionContext.hasMoreResults()) {
// Grab the next result
const { result, headers } = (await this.executionContext.nextItem(
diagnosticNode,
)) as NonStreamingOrderByResponse;
resHeaders = headers;
if (result) {
// make hash of result object and update the map if required.
const key = await hashObject(result?.payload);
this.aggregateMap.set(key, result);
}

// return {} to signal that there are more results to fetch.
if (this.executionContext.hasMoreResults()) {
return {
result: {},
headers: resHeaders,
};
}
}

// If all results are fetched from backend, prepare final results
if (!this.executionContext.hasMoreResults() && !this.isCompleted) {
this.isCompleted = true;
await this.buildFinalResultArray();
}

// Return results from final array.
if (this.finalResultArray.length > 0) {
return {
result: this.finalResultArray.shift(),
headers: resHeaders,
};
}
// Signal that there are no more results.
return {
result: undefined,
headers: resHeaders,
};
}

/**
* Build final sorted result array from which responses will be served.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import type { ExecutionContext } from "../ExecutionContext";
import { OrderByComparator } from "../orderByComparator";
import type { NonStreamingOrderByResult } from "../nonStreamingOrderByResult";
import { FixedSizePriorityQueue } from "../../utils/fixedSizePriorityQueue";
import { CosmosHeaders, getInitialHeader } from "../headerUtils";
import type { CosmosHeaders } from "../headerUtils";
import {getInitialHeader} from "../headerUtils";

/**
* @hidden
Expand Down Expand Up @@ -44,69 +45,6 @@ export class NonStreamingOrderByEndpointComponent implements ExecutionContext {
);
}

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
let resHeaders = getInitialHeader();
// if size is 0, just return undefined to signal to more results. Valid if query is TOP 0 or LIMIT 0
if (this.priorityQueueBufferSize <= 0) {
return {
result: undefined,
headers: resHeaders,
};
}

// If there are more results in backend, keep filling pq.
if (this.executionContext.hasMoreResults()) {
const { result: item, headers } = await this.executionContext.nextItem(diagnosticNode);
resHeaders = headers;
if (item !== undefined) {
this.nonStreamingOrderByPQ.enqueue(item);
}

// If the backend has more results to fetch, return {} to signal that there are more results to fetch.
if (this.executionContext.hasMoreResults()) {
return {
result: {},
headers: resHeaders,
};
}
}
// If all results are fetched from backend, prepare final results
if (!this.executionContext.hasMoreResults() && !this.isCompleted) {
// Set isCompleted to true.
this.isCompleted = true;
// Reverse the priority queue to get the results in the correct order
this.nonStreamingOrderByPQ = this.nonStreamingOrderByPQ.reverse();
// For offset limit case we set the size of priority queue to offset + limit
// and we drain offset number of items from the priority queue
while (
this.offset < this.priorityQueueBufferSize &&
this.offset > 0 &&
!this.nonStreamingOrderByPQ.isEmpty()
) {
this.nonStreamingOrderByPQ.dequeue();
this.offset--;
}
}
// If pq is not empty, return the result from pq.
if (!this.nonStreamingOrderByPQ.isEmpty()) {
let item;
if (this.emitRawOrderByPayload) {
item = this.nonStreamingOrderByPQ.dequeue();
} else {
item = this.nonStreamingOrderByPQ.dequeue()?.payload;
}
return {
result: item,
headers: resHeaders,
};
}
// If pq is empty, return undefined to signal that there are no more results.
return {
result: undefined,
headers: resHeaders,
};
}

/**
* Determine if there are still remaining resources to processs.
* @returns true if there is other elements to process in the NonStreamingOrderByEndpointComponent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,6 @@ export class OffsetLimitEndpointComponent implements ExecutionContext {
private limit: number,
) {}

public async nextItem(diagnosticNode: DiagnosticNodeInternal): Promise<Response<any>> {
const aggregateHeaders = getInitialHeader();
while (this.offset > 0) {
// Grab next item but ignore the result. We only need the headers
const { headers } = await this.executionContext.nextItem(diagnosticNode);
this.offset--;
mergeHeaders(aggregateHeaders, headers);
}
if (this.limit > 0) {
const { result, headers } = await this.executionContext.nextItem(diagnosticNode);
this.limit--;
mergeHeaders(aggregateHeaders, headers);
return { result, headers: aggregateHeaders };
}
// If both limit and offset are 0, return nothing
return {
result: undefined,
headers: getInitialHeader(),
};
}

public hasMoreResults(): boolean {
return (this.offset > 0 || this.limit > 0) && this.executionContext.hasMoreResults();
}
Expand Down
Loading

0 comments on commit c3660e6

Please sign in to comment.