Skip to content

Commit

Permalink
add support for max_reporting_rows
Browse files Browse the repository at this point in the history
  • Loading branch information
avermeil committed Feb 16, 2024
1 parent ea18c81 commit 17179ea
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 95 deletions.
1 change: 1 addition & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export interface ClientOptions {
client_secret: string;
developer_token: string;
disable_parsing?: boolean;
max_reporting_rows?: number;
}

export class Client {
Expand Down
247 changes: 174 additions & 73 deletions src/customer.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { CancellableStream } from "google-gax";
import axios from "axios";
import { chain } from "stream-chain";
import { parser } from "stream-json";
import { parser, Parser } from "stream-json";

import { streamArray } from "stream-json/streamers/StreamArray";

import { ClientOptions } from "./client";
Expand All @@ -25,9 +26,11 @@ import {
ReportOptions,
RequestOptions,
} from "./types";
import { createNextChunkArrivedPromise } from "./utils";

import { googleAdsVersion } from "./version";

const ROWS_PER_STREAMED_CHUNK = 10_000; // From experience, this is what can be expected from the API.

/**
* TODO:
* - rename querierRest to streamRest. Add hooks support to it. OK
Expand All @@ -40,7 +43,7 @@ import { googleAdsVersion } from "./version";
* - adapt reportCount()
* - adapt reportStream()
* - reportStreamRaw() ?
* - bonus: use queryStream when the params allow it
* - bonus: use queryStream when the params allow it. Note: do it with native async generator to make it as fast as possible.
* - bonus: make it possible to cancel a stream or pagination when row count exceeds specified max
*
*/
Expand Down Expand Up @@ -213,13 +216,12 @@ export class Customer extends ServiceFactory {
const searchResponse = rawResponse.data as any;

Check warning on line 216 in src/customer.ts

View workflow job for this annotation

GitHub Actions / build-and-test (16.x)

Unexpected any. Specify a different type

const results = searchResponse.results ?? [];
// console.time("parsing");
const response: any[] = this.clientOptions.disable_parsing
? results
: results.map((row: any) => decamelizeKeys(row));
// console.timeEnd("parsing");

const summaryRow = decamelizeKeys(searchResponse.summaryRow);
const response: any[] = results.map((row: any) =>

Check warning on line 220 in src/customer.ts

View workflow job for this annotation

GitHub Actions / build-and-test (16.x)

Unexpected any. Specify a different type

Check warning on line 220 in src/customer.ts

View workflow job for this annotation

GitHub Actions / build-and-test (16.x)

Unexpected any. Specify a different type
this.decamelizeKeysIfNeeded(row)
);

const summaryRow = this.decamelizeKeysIfNeeded(searchResponse.summaryRow);
const nextPageToken = searchResponse.nextPageToken;
const totalResultsCount = searchResponse.totalResultsCount
? +searchResponse.totalResultsCount
Expand All @@ -230,7 +232,7 @@ export class Customer extends ServiceFactory {
console.dir({ e: e.response.data }, { depth: null });
if (e.response.data.error.details[0]) {
throw new errors.GoogleAdsFailure(
decamelizeKeys(e.response.data.error.details[0])
this.decamelizeKeysIfNeeded(e.response.data.error.details[0])
);
}
throw "bag";
Expand All @@ -244,6 +246,23 @@ export class Customer extends ServiceFactory {
response: services.IGoogleAdsRow[];
totalResultsCount?: number;
}> {
/*
When possible, use the searchStream method to avoid the overhead of pagination.
*/
if (
requestOptions.page_size === undefined &&
// @ts-expect-error we do not allow this field in reportOptions, however it is still a valid request option
requestOptions.return_total_results_count === undefined
) {
// If no pagination or summary options are set, we can use the non-paginated search method.
const { response } = await this.useStreamToImitateRegularSearch(
gaqlQuery,
requestOptions
);

return { response };
}

const response: services.IGoogleAdsRow[] = [];
let nextPageToken: PageToken = undefined;
const initialSearch = await this.search(gaqlQuery, requestOptions);
Expand Down Expand Up @@ -272,6 +291,87 @@ export class Customer extends ServiceFactory {
return { response, totalResultsCount };
}

// Google's searchStream method is faster than search, but it does not support all features.
// When report() is called, we use searchStream if possible, otherwise we use paginatedSearch.
// Note that just like `paginatedSearch`, this method accumulates results in memory. Use
// `reportStream` for a more memory-efficient alternative (at the cost of more CPU usage).
private async useStreamToImitateRegularSearch(
gaqlQuery: string,
requestOptions: Readonly<RequestOptions>
): Promise<{
response: services.IGoogleAdsRow[];
}> {
const accessToken = await this.getAccessToken();

try {
const args = this.prepareGoogleAdsServicePostRequestArgs(
"searchStream",
accessToken,
{
responseType: "stream",
data: {
query: gaqlQuery,
...requestOptions,
},
}
);

const response = await axios(args);

const stream = response.data as any;

Check warning on line 321 in src/customer.ts

View workflow job for this annotation

GitHub Actions / build-and-test (16.x)

Unexpected any. Specify a different type

const buffers = [];

let rowCount = -ROWS_PER_STREAMED_CHUNK;
for await (const data of stream) {
if (
this.clientOptions.max_reporting_rows &&
!this.gaqlQueryStringIncludesLimit(gaqlQuery)
) {
// this is a quick-and-dirty way to count rows, but it's good enough for our purposes.
// We want to avoid using a proper JSON streamer here for performance reasons.
if (data.toString("utf-8").includes(`results":`)) {
rowCount += ROWS_PER_STREAMED_CHUNK;
}

if (rowCount > this.clientOptions.max_reporting_rows) {
throw this.generateTooManyRowsError();
}
}

buffers.push(data);
}

const asString = Buffer.concat(buffers).toString("utf-8");

const accumulator: services.IGoogleAdsRow[] = [];
let foundSummaryRow: services.IGoogleAdsRow | undefined;
console.time("total parsing");
for (const { results, summaryRow } of JSON.parse(asString)) {
if (summaryRow) {
foundSummaryRow = this.decamelizeKeysIfNeeded(summaryRow);
}
console.time("parsing");
accumulator.push(
...(results ?? []).map((row: any) => {
return this.decamelizeKeysIfNeeded(row);
})
);
console.timeEnd("parsing");

if (foundSummaryRow) {
accumulator.unshift(foundSummaryRow);
}
}
console.timeEnd("total parsing");

return { response: accumulator };
} catch (e: any) {
await this.handleStreamError(e);
throw e; // The line above should always throw.
}
}

private async querier<T = services.IGoogleAdsRow[]>(
gaqlQuery: string,
requestOptions: RequestOptions = {},
Expand Down Expand Up @@ -372,13 +472,6 @@ export class Customer extends ServiceFactory {

const accessToken = await this.getAccessToken();

// console.time("request");

let streamFinished = false;
const accumulator: T[] = [];

let nextChunk = createNextChunkArrivedPromise();

try {
const args = this.prepareGoogleAdsServicePostRequestArgs(
"searchStream",
Expand All @@ -391,80 +484,70 @@ export class Customer extends ServiceFactory {
},
}
);
const abortController = new AbortController();

const response = await axios(args);
const response = await axios({ ...args, signal: abortController.signal });

const stream = response.data as any;

const pipeline = chain([stream, parser(), streamArray()]);

stream.once("data", () => {
// console.timeEnd("request");
// The options below help to make the stream less CPU intensive.
const parser = new Parser({
streamValues: false,
streamKeys: false,
packValues: true,
packKeys: true,
});

pipeline.on("data", (data) => {
// console.time("chunk parsing");
const pipeline = chain([stream, parser, streamArray()]);
let count = 0;

for await (const data of pipeline) {
const results = data.value.results ?? [data.value.summaryRow];

const parsed = this.clientOptions.disable_parsing
? results
: results.map((row: any) => decamelizeKeys(row));

// console.timeEnd("chunk parsing");

accumulator.push(...(parsed as T[]));

nextChunk.resolve();
nextChunk = createNextChunkArrivedPromise();
});

pipeline.on("error", (searchError: Error) => {
nextChunk.reject(searchError);
});

stream.on("end", () => {
streamFinished = true;
nextChunk.resolve();
});
count += results.length;
if (
this.clientOptions.max_reporting_rows &&
count > this.clientOptions.max_reporting_rows &&
!this.gaqlQueryStringIncludesLimit(gaqlQuery)
) {
throw this.generateTooManyRowsError();
}

while (!streamFinished || accumulator.length) {
if (accumulator.length > 0) {
const item = accumulator.shift();
if (item === undefined) {
throw new Error("UNDEFINED_STREAM_ERROR");
}
yield item;
} else {
await nextChunk.newPromise;
for (const row of results) {
const parsed = this.decamelizeKeysIfNeeded(row);
yield parsed as T;
}
}

return;
} catch (e: any) {
// The error is also a stream, so some effort is required to parse it.
const stream = e.response.data as any;
await this.handleStreamError(e);
}
}

const pipeline = chain([stream, parser(), streamArray()]);
private async handleStreamError(e: any) {
if (!e?.response?.data) {
throw e;
}
// The error is a stream, so some effort is required to parse it.
const stream = e.response.data as any;

stream.once("data", () => {
console.timeEnd("request");
console.time("parsing");
});
const pipeline = chain([stream, parser(), streamArray()]);

let googleAdsFailure: errors.GoogleAdsFailure | undefined;
let googleAdsFailure: errors.GoogleAdsFailure | undefined;

// Only throw the first error.
pipeline.once("data", (data) => {
googleAdsFailure = new errors.GoogleAdsFailure(
decamelizeKeys(data.value.error.details[0])
);
});
// Only throw the first error.
pipeline.once("data", (data) => {
googleAdsFailure = new errors.GoogleAdsFailure(
this.decamelizeKeysIfNeeded(data.value.error.details[0])
);
});

// Must always reject.
await new Promise<void>((_, reject) => {
pipeline.on("end", () => reject(googleAdsFailure));
pipeline.on("error", (err) => reject(err));
});
}
// Must always reject.
await new Promise<void>((_, reject) => {
pipeline.on("end", () => reject(googleAdsFailure));
pipeline.on("error", (err) => reject(err));
});
}

/**
Expand Down Expand Up @@ -579,6 +662,24 @@ export class Customer extends ServiceFactory {
"login-customer-id": this.credentials.login_customer_id ?? "",
},
...extra,
maxContentLength: 2000,
};
}

private decamelizeKeysIfNeeded(input: any) {
if (this.clientOptions.disable_parsing) {
return input;
}
return decamelizeKeys(input);
}

private gaqlQueryStringIncludesLimit(gaqlQuery: string) {
return gaqlQuery.toLowerCase().includes("limit ");
}

private generateTooManyRowsError() {
return new Error(
`Exceeded the maximum number of rows set by "max_reporting_rows" (${this.clientOptions.max_reporting_rows}).`
);
}
}
22 changes: 0 additions & 22 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,3 @@ export function getFieldMask(data: Record<string, any>): protobuf.FieldMask {
paths,
});
}

export function createNextChunkArrivedPromise(): {
newPromise: Promise<unknown>;
resolve: () => void;
reject: (error: Error) => void;
} {
let resolvePromise = (): void => {
return;
};

let rejectPromise = (error: Error): void => {
throw error;
};

const newPromise = new Promise((resolve, reject) => {
// @ts-ignore
resolvePromise = resolve;
rejectPromise = reject;
});

return { newPromise, resolve: resolvePromise, reject: rejectPromise };
}

0 comments on commit 17179ea

Please sign in to comment.