diff --git a/src/client.ts b/src/client.ts index 0e6ea38a..d2fa4042 100644 --- a/src/client.ts +++ b/src/client.ts @@ -9,6 +9,7 @@ export interface ClientOptions { client_secret: string; developer_token: string; disable_parsing?: boolean; + max_reporting_rows?: number; } export class Client { diff --git a/src/customer.ts b/src/customer.ts index 36b9bded..7b8a5fed 100644 --- a/src/customer.ts +++ b/src/customer.ts @@ -1,7 +1,8 @@ import { CancellableStream } from "google-gax"; import axios from "axios"; import { chain } from "stream-chain"; -import { parser } from "stream-json"; +import { parser, Parser } from "stream-json"; + import { streamArray } from "stream-json/streamers/StreamArray"; import { ClientOptions } from "./client"; @@ -25,9 +26,11 @@ import { ReportOptions, RequestOptions, } from "./types"; -import { createNextChunkArrivedPromise } from "./utils"; + import { googleAdsVersion } from "./version"; +const ROWS_PER_STREAMED_CHUNK = 10_000; // From experience, this is what can be expected from the API. + /** * TODO: * - rename querierRest to streamRest. Add hooks support to it. OK @@ -40,7 +43,7 @@ import { googleAdsVersion } from "./version"; * - adapt reportCount() * - adapt reportStream() * - reportStreamRaw() ? - * - bonus: use queryStream when the params allow it + * - bonus: use queryStream when the params allow it. Note: do it with native async generator to make it as fast as possible. * - bonus: make it possible to cancel a stream or pagination when row count exceeds specified max * */ @@ -213,13 +216,12 @@ export class Customer extends ServiceFactory { const searchResponse = rawResponse.data as any; const results = searchResponse.results ?? []; - // console.time("parsing"); - const response: any[] = this.clientOptions.disable_parsing - ? results - : results.map((row: any) => decamelizeKeys(row)); - // console.timeEnd("parsing"); - const summaryRow = decamelizeKeys(searchResponse.summaryRow); + const response: any[] = results.map((row: any) => + this.decamelizeKeysIfNeeded(row) + ); + + const summaryRow = this.decamelizeKeysIfNeeded(searchResponse.summaryRow); const nextPageToken = searchResponse.nextPageToken; const totalResultsCount = searchResponse.totalResultsCount ? +searchResponse.totalResultsCount @@ -230,7 +232,7 @@ export class Customer extends ServiceFactory { console.dir({ e: e.response.data }, { depth: null }); if (e.response.data.error.details[0]) { throw new errors.GoogleAdsFailure( - decamelizeKeys(e.response.data.error.details[0]) + this.decamelizeKeysIfNeeded(e.response.data.error.details[0]) ); } throw "bag"; @@ -244,6 +246,23 @@ export class Customer extends ServiceFactory { response: services.IGoogleAdsRow[]; totalResultsCount?: number; }> { + /* + When possible, use the searchStream method to avoid the overhead of pagination. + */ + if ( + requestOptions.page_size === undefined && + // @ts-expect-error we do not allow this field in reportOptions, however it is still a valid request option + requestOptions.return_total_results_count === undefined + ) { + // If no pagination or summary options are set, we can use the non-paginated search method. + const { response } = await this.useStreamToImitateRegularSearch( + gaqlQuery, + requestOptions + ); + + return { response }; + } + const response: services.IGoogleAdsRow[] = []; let nextPageToken: PageToken = undefined; const initialSearch = await this.search(gaqlQuery, requestOptions); @@ -272,6 +291,87 @@ export class Customer extends ServiceFactory { return { response, totalResultsCount }; } + // Google's searchStream method is faster than search, but it does not support all features. + // When report() is called, we use searchStream if possible, otherwise we use paginatedSearch. + // Note that just like `paginatedSearch`, this method accumulates results in memory. Use + // `reportStream` for a more memory-efficient alternative (at the cost of more CPU usage). + private async useStreamToImitateRegularSearch( + gaqlQuery: string, + requestOptions: Readonly + ): Promise<{ + response: services.IGoogleAdsRow[]; + }> { + const accessToken = await this.getAccessToken(); + + try { + const args = this.prepareGoogleAdsServicePostRequestArgs( + "searchStream", + accessToken, + { + responseType: "stream", + data: { + query: gaqlQuery, + ...requestOptions, + }, + } + ); + + const response = await axios(args); + + const stream = response.data as any; + + const buffers = []; + + let rowCount = -ROWS_PER_STREAMED_CHUNK; + for await (const data of stream) { + if ( + this.clientOptions.max_reporting_rows && + !this.gaqlQueryStringIncludesLimit(gaqlQuery) + ) { + // this is a quick-and-dirty way to count rows, but it's good enough for our purposes. + // We want to avoid using a proper JSON streamer here for performance reasons. + if (data.toString("utf-8").includes(`results":`)) { + rowCount += ROWS_PER_STREAMED_CHUNK; + } + + if (rowCount > this.clientOptions.max_reporting_rows) { + throw this.generateTooManyRowsError(); + } + } + + buffers.push(data); + } + + const asString = Buffer.concat(buffers).toString("utf-8"); + + const accumulator: services.IGoogleAdsRow[] = []; + let foundSummaryRow: services.IGoogleAdsRow | undefined; + console.time("total parsing"); + for (const { results, summaryRow } of JSON.parse(asString)) { + if (summaryRow) { + foundSummaryRow = this.decamelizeKeysIfNeeded(summaryRow); + } + console.time("parsing"); + accumulator.push( + ...(results ?? []).map((row: any) => { + return this.decamelizeKeysIfNeeded(row); + }) + ); + console.timeEnd("parsing"); + + if (foundSummaryRow) { + accumulator.unshift(foundSummaryRow); + } + } + console.timeEnd("total parsing"); + + return { response: accumulator }; + } catch (e: any) { + await this.handleStreamError(e); + throw e; // The line above should always throw. + } + } + private async querier( gaqlQuery: string, requestOptions: RequestOptions = {}, @@ -372,13 +472,6 @@ export class Customer extends ServiceFactory { const accessToken = await this.getAccessToken(); - // console.time("request"); - - let streamFinished = false; - const accumulator: T[] = []; - - let nextChunk = createNextChunkArrivedPromise(); - try { const args = this.prepareGoogleAdsServicePostRequestArgs( "searchStream", @@ -391,80 +484,70 @@ export class Customer extends ServiceFactory { }, } ); + const abortController = new AbortController(); - const response = await axios(args); + const response = await axios({ ...args, signal: abortController.signal }); const stream = response.data as any; - const pipeline = chain([stream, parser(), streamArray()]); - - stream.once("data", () => { - // console.timeEnd("request"); + // The options below help to make the stream less CPU intensive. + const parser = new Parser({ + streamValues: false, + streamKeys: false, + packValues: true, + packKeys: true, }); - pipeline.on("data", (data) => { - // console.time("chunk parsing"); + const pipeline = chain([stream, parser, streamArray()]); + let count = 0; + for await (const data of pipeline) { const results = data.value.results ?? [data.value.summaryRow]; - const parsed = this.clientOptions.disable_parsing - ? results - : results.map((row: any) => decamelizeKeys(row)); - - // console.timeEnd("chunk parsing"); - - accumulator.push(...(parsed as T[])); - - nextChunk.resolve(); - nextChunk = createNextChunkArrivedPromise(); - }); - - pipeline.on("error", (searchError: Error) => { - nextChunk.reject(searchError); - }); - - stream.on("end", () => { - streamFinished = true; - nextChunk.resolve(); - }); + count += results.length; + if ( + this.clientOptions.max_reporting_rows && + count > this.clientOptions.max_reporting_rows && + !this.gaqlQueryStringIncludesLimit(gaqlQuery) + ) { + throw this.generateTooManyRowsError(); + } - while (!streamFinished || accumulator.length) { - if (accumulator.length > 0) { - const item = accumulator.shift(); - if (item === undefined) { - throw new Error("UNDEFINED_STREAM_ERROR"); - } - yield item; - } else { - await nextChunk.newPromise; + for (const row of results) { + const parsed = this.decamelizeKeysIfNeeded(row); + yield parsed as T; } } + + return; } catch (e: any) { - // The error is also a stream, so some effort is required to parse it. - const stream = e.response.data as any; + await this.handleStreamError(e); + } + } - const pipeline = chain([stream, parser(), streamArray()]); + private async handleStreamError(e: any) { + if (!e?.response?.data) { + throw e; + } + // The error is a stream, so some effort is required to parse it. + const stream = e.response.data as any; - stream.once("data", () => { - console.timeEnd("request"); - console.time("parsing"); - }); + const pipeline = chain([stream, parser(), streamArray()]); - let googleAdsFailure: errors.GoogleAdsFailure | undefined; + let googleAdsFailure: errors.GoogleAdsFailure | undefined; - // Only throw the first error. - pipeline.once("data", (data) => { - googleAdsFailure = new errors.GoogleAdsFailure( - decamelizeKeys(data.value.error.details[0]) - ); - }); + // Only throw the first error. + pipeline.once("data", (data) => { + googleAdsFailure = new errors.GoogleAdsFailure( + this.decamelizeKeysIfNeeded(data.value.error.details[0]) + ); + }); - // Must always reject. - await new Promise((_, reject) => { - pipeline.on("end", () => reject(googleAdsFailure)); - pipeline.on("error", (err) => reject(err)); - }); - } + // Must always reject. + await new Promise((_, reject) => { + pipeline.on("end", () => reject(googleAdsFailure)); + pipeline.on("error", (err) => reject(err)); + }); } /** @@ -579,6 +662,24 @@ export class Customer extends ServiceFactory { "login-customer-id": this.credentials.login_customer_id ?? "", }, ...extra, + maxContentLength: 2000, }; } + + private decamelizeKeysIfNeeded(input: any) { + if (this.clientOptions.disable_parsing) { + return input; + } + return decamelizeKeys(input); + } + + private gaqlQueryStringIncludesLimit(gaqlQuery: string) { + return gaqlQuery.toLowerCase().includes("limit "); + } + + private generateTooManyRowsError() { + return new Error( + `Exceeded the maximum number of rows set by "max_reporting_rows" (${this.clientOptions.max_reporting_rows}).` + ); + } } diff --git a/src/utils.ts b/src/utils.ts index f0b0ccfb..5e8d5336 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -99,25 +99,3 @@ export function getFieldMask(data: Record): protobuf.FieldMask { paths, }); } - -export function createNextChunkArrivedPromise(): { - newPromise: Promise; - resolve: () => void; - reject: (error: Error) => void; -} { - let resolvePromise = (): void => { - return; - }; - - let rejectPromise = (error: Error): void => { - throw error; - }; - - const newPromise = new Promise((resolve, reject) => { - // @ts-ignore - resolvePromise = resolve; - rejectPromise = reject; - }); - - return { newPromise, resolve: resolvePromise, reject: rejectPromise }; -}