From 213a2fd8b046fbd88042eea6bd7f37a441d34f6c Mon Sep 17 00:00:00 2001 From: Jacob Cable Date: Thu, 18 Apr 2024 17:07:01 +0100 Subject: [PATCH] fix(firestore-bigquery-export): initialization fix --- .../package.json | 2 +- .../src/__tests__/bigquery/e2e.test.ts | 34 ++++++- .../src/bigquery/index.ts | 63 ++++++------ .../src/bigquery/partitioning.ts | 96 +++++++++++-------- .../src/bigquery/utils.test.ts | 91 ++++++++++++++++++ .../src/bigquery/utils.ts | 78 +++++++++++++++ 6 files changed, 291 insertions(+), 73 deletions(-) create mode 100644 firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.test.ts create mode 100644 firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.ts diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/package.json b/firestore-bigquery-export/firestore-bigquery-change-tracker/package.json index 868d90862..de3bfa5f4 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/package.json +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/package.json @@ -5,7 +5,7 @@ "url": "github.com/firebase/extensions.git", "directory": "firestore-bigquery-export/firestore-bigquery-change-tracker" }, - "version": "1.1.32", + "version": "1.1.33", "description": "Core change-tracker library for Cloud Firestore Collection BigQuery Exports", "main": "./lib/index.js", "scripts": { diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/e2e.test.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/e2e.test.ts index eb9ab2664..bfb6fbf67 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/e2e.test.ts +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/__tests__/bigquery/e2e.test.ts @@ -28,6 +28,31 @@ let dataset: Dataset; let table: Table; let view: Table; describe("e2e", () => { + describe("initialization", () => { + beforeEach(async () => { + randomID = (Math.random() + 1).toString(36).substring(7); + datasetId = `dataset_${randomID}`; + tableId = `table_${randomID}`; + tableId_raw = `${tableId}_raw_changelog`; + dataset = bq.dataset(datasetId); + }); + + afterEach(async () => { + await deleteTable({ + datasetId, + }); + }); + test("successfully creates a dataset and table", async () => { + await changeTracker({ + datasetId, + tableId, + }).record([event]); + + const [metadata] = await dataset.table(tableId_raw).getMetadata(); + + expect(metadata).toBeDefined(); + }); + }); describe("Partitioning", () => { beforeEach(async () => { randomID = (Math.random() + 1).toString(36).substring(7); @@ -108,13 +133,14 @@ describe("e2e", () => { const [metadata] = await dataset.table(`${tableId_raw}`).getMetadata(); + expect(metadata.timePartitioning).toBeDefined(); + const [changeLogRows] = await getBigQueryTableData( process.env.PROJECT_ID, datasetId, tableId ); - expect(metadata.timePartitioning).toBeDefined(); expect(changeLogRows[0].created.value).toBe( BigQuery.timestamp(created.toDate()).value ); @@ -369,6 +395,10 @@ describe("e2e", () => { }); test("does not update an existing non partitioned table, that has a valid schema with timePartitioning only", async () => { + const tableExists = await dataset.table(tableId_raw).exists(); + + expect(tableExists[0]).toBe(true); + await changeTracker({ datasetId, tableId, @@ -380,7 +410,7 @@ describe("e2e", () => { expect(metadata.timePartitioning).toBeUndefined(); expect(consoleLogSpyWarn).toBeCalledWith( - `Cannot partition an existing table ${datasetId}_${tableId_raw}` + `Did not add partitioning to schema: Partition field not provided` ); expect(consoleLogSpy).toBeCalledWith( `BigQuery dataset already exists: ${datasetId}` diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts index 7ca9b05c5..cc2c8d772 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/index.ts @@ -42,6 +42,7 @@ import { import { Partitioning } from "./partitioning"; import { Clustering } from "./clustering"; import { tableRequiresUpdate, viewRequiresUpdate } from "./checkUpdates"; +import { parseErrorMessage, waitForInitialization } from "./utils"; export { RawChangelogSchema, RawChangelogViewSchema } from "./schema"; @@ -203,27 +204,10 @@ export class FirestoreBigQueryEventHistoryTracker * A half a second delay is added per check while the function * continually re-checks until the referenced dataset and table become available. */ - private async waitForInitialization() { - return new Promise((resolve) => { - let handle = setInterval(async () => { - try { - const dataset = this.bigqueryDataset(); - const changelogName = this.rawChangeLogTableName(); - const table = dataset.table(changelogName); - - const [datasetExists] = await dataset.exists(); - const [tableExists] = await table.exists(); - - if (datasetExists && tableExists) { - clearInterval(handle); - return resolve(table); - } - } catch (ex) { - clearInterval(handle); - logs.failedToInitializeWait(ex.message); - } - }, 5000); - }); + private async _waitForInitialization() { + const dataset = this.bigqueryDataset(); + const changelogName = this.rawChangeLogTableName(); + return waitForInitialization({ dataset, changelogName }); } /** @@ -279,17 +263,39 @@ export class FirestoreBigQueryEventHistoryTracker if (this._initialized) { return; } + try { + await this.initializeDataset(); + } catch (error) { + const message = parseErrorMessage(error, "initializing dataset"); + throw new Error(`Error initializing dataset: ${message}`); + } - await this.initializeDataset(); - - await this.initializeRawChangeLogTable(); + try { + await this.initializeRawChangeLogTable(); + } catch (error) { + const message = parseErrorMessage( + error, + "initializing raw change log table" + ); + throw new Error(`Error initializing raw change log table: ${message}`); + } - await this.initializeLatestView(); + try { + await this.initializeLatestView(); + } catch (error) { + const message = parseErrorMessage(error, "initializing latest view"); + throw new Error(`Error initializing latest view: ${message}`); + } + await this._waitForInitialization(); this._initialized = true; - } catch (ex) { - await this.waitForInitialization(); - this._initialized = true; + } catch (error) { + const message = parseErrorMessage( + error, + "initializing BigQuery resources" + ); + console.error("Error initializing BigQuery resources: ", message); + throw error; } } @@ -396,7 +402,6 @@ export class FirestoreBigQueryEventHistoryTracker kmsKeyName: this.config.kmsKeyName, }; } - //Add partitioning await partitioning.addPartitioningToSchema(schema.fields); diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/partitioning.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/partitioning.ts index 8788f5f28..d5fe08327 100644 --- a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/partitioning.ts +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/partitioning.ts @@ -4,7 +4,7 @@ import * as firebase from "firebase-admin"; import * as logs from "../logs"; import * as bigquery from "@google-cloud/bigquery"; - +import * as functions from "firebase-functions"; import { getNewPartitionField } from "./schema"; import { BigQuery, TableMetadata } from "@google-cloud/bigquery"; @@ -135,18 +135,22 @@ export class Partitioning { } private async isTablePartitioned() { + const [tableExists] = await this.table.exists(); + + if (!this.table || !tableExists) return false; + /* Return true if partition metadata already exists */ const [metadata] = await this.table.getMetadata(); - if (!!metadata.timePartitioning) { + if (metadata.timePartitioning) { logs.cannotPartitionExistingTable(this.table); - return Promise.resolve(true); + return true; } /** Find schema fields **/ const schemaFields = await this.metaDataSchemaFields(); /** Return false if no schema exists */ - if (!schemaFields) return Promise.resolve(false); + if (!schemaFields) return false; /* Return false if time partition field not found */ return schemaFields.some( @@ -156,11 +160,11 @@ export class Partitioning { async isValidPartitionForExistingTable(): Promise { /** Return false if partition type option has not been set */ - if (!this.isPartitioningEnabled()) return Promise.resolve(false); + if (!this.isPartitioningEnabled()) return false; /* Return false if table is already partitioned */ const isPartitioned = await this.isTablePartitioned(); - if (isPartitioned) return Promise.resolve(false); + if (isPartitioned) return false; return this.hasValidCustomPartitionConfig(); } @@ -242,44 +246,54 @@ export class Partitioning { return fields.map(($) => $.name).includes(timePartitioningField); } - async addPartitioningToSchema(fields = []): Promise { - /** Return if partition type option has not been set */ - if (!this.isPartitioningEnabled()) return; - - /** Return if class has invalid table reference */ - if (!this.hasValidTableReference()) return; - - /** Return if table is already partitioned **/ - if (await this.isTablePartitioned()) return; - - /** Return if partition config is invalid */ - if (!this.hasValidCustomPartitionConfig()) return; - - /** Return if an invalid partition type has been requested */ - if (!this.hasValidTimePartitionType()) return; - - /** Return if an invalid partition option has been requested */ - if (!this.hasValidTimePartitionOption()) return; - - /** Return if invalid partitioning and field type combination */ - if (this.hasHourAndDatePartitionConfig()) return; - - /** Return if partition field has not been provided */ - if (!this.config.timePartitioningField) return; - - /** Return if field already exists on schema */ - if (this.customFieldExists(fields)) return; + private async shouldAddPartitioningToSchema(): Promise<{ + proceed: boolean; + message: string; + }> { + if (!this.isPartitioningEnabled()) { + return { proceed: false, message: "Partitioning not enabled" }; + } + if (!this.hasValidTableReference()) { + return { proceed: false, message: "Invalid table reference" }; + } + if (!this.hasValidCustomPartitionConfig()) { + return { proceed: false, message: "Invalid partition config" }; + } + if (!this.hasValidTimePartitionType()) { + return { proceed: false, message: "Invalid partition type" }; + } + if (!this.hasValidTimePartitionOption()) { + return { proceed: false, message: "Invalid partition option" }; + } + if (this.hasHourAndDatePartitionConfig()) { + return { + proceed: false, + message: "Invalid partitioning and field type combination", + }; + } + if (this.customFieldExists()) { + return { proceed: false, message: "Field already exists on schema" }; + } + if (await this.isTablePartitioned()) { + return { proceed: false, message: "Table is already partitioned" }; + } + if (!this.config.timePartitioningField) { + return { proceed: false, message: "Partition field not provided" }; + } + return { proceed: true, message: "" }; + } - /** Add new partitioning field **/ + async addPartitioningToSchema(fields = []): Promise { + const { proceed, message } = await this.shouldAddPartitioningToSchema(); + if (!proceed) { + functions.logger.warn(`Did not add partitioning to schema: ${message}`); + return; + } + // Add new partitioning field fields.push(getNewPartitionField(this.config)); - - /** Log successful addition of partition column */ - logs.addPartitionFieldColumn( - this.table.id, - this.config.timePartitioningField + functions.logger.log( + `Added new partition field: ${this.config.timePartitioningField} to table ID: ${this.table.id}` ); - - return; } async updateTableMetadata(options: bigquery.TableMetadata): Promise { diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.test.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.test.ts new file mode 100644 index 000000000..f4cf8045f --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.test.ts @@ -0,0 +1,91 @@ +import { waitForInitialization } from "./utils"; +import { Dataset, Table } from "@google-cloud/bigquery"; +import * as logs from "../logs"; + +jest.mock("@google-cloud/bigquery"); +jest.mock("../../logs"); +const dataset = { + exists: jest.fn(), + table: jest.fn(), +}; +const table = { + exists: jest.fn(), +}; +const changelogName = "testTable"; + +describe("waitForInitialization", () => { + beforeEach(() => { + jest.clearAllMocks(); + dataset.table.mockReturnValue(table); + }); + + test("should successfully find the dataset and table", async () => { + dataset.exists.mockResolvedValue([true]); + table.exists.mockResolvedValue([true]); + + const result = await waitForInitialization({ + dataset: dataset as unknown as Dataset, + changelogName, + }); + expect(result).toBe(table); + expect(dataset.exists).toHaveBeenCalledTimes(1); + expect(table.exists).toHaveBeenCalledTimes(1); + }); + + test("should fail after max attempts if table does not exist", async () => { + dataset.exists.mockResolvedValue([true]); + table.exists.mockResolvedValue([false]); + + await expect( + waitForInitialization( + { dataset: dataset as unknown as Dataset, changelogName }, + 3 + ) + ).rejects.toThrow( + "Initialization timed out. Dataset or table could not be verified to exist after multiple attempts." + ); + expect(dataset.exists).toHaveBeenCalledTimes(3); + expect(table.exists).toHaveBeenCalledTimes(3); + }); + + test("should handle and throw an error if dataset.exists throws", async () => { + const error = new Error("Access denied"); + dataset.exists.mockRejectedValue(error); + + await expect( + waitForInitialization({ + dataset: dataset as unknown as Dataset, + changelogName, + }) + ).rejects.toThrow("Access denied"); + expect(logs.failedToInitializeWait).toHaveBeenCalledWith(error.message); + }); + + test("should handle and throw an error if table.exists throws", async () => { + dataset.exists.mockResolvedValue([true]); + const error = new Error("Table error"); + table.exists.mockRejectedValue(error); + + await expect( + waitForInitialization({ + dataset: dataset as unknown as Dataset, + changelogName, + }) + ).rejects.toThrow("Table error"); + expect(logs.failedToInitializeWait).toHaveBeenCalledWith(error.message); + }); + + test("should handle unexpected error types gracefully", async () => { + dataset.exists.mockRejectedValue("String error"); + + await expect( + waitForInitialization({ + dataset: dataset as unknown as Dataset, + changelogName, + }) + ).rejects.toThrow("An unexpected error occurred"); + expect(logs.failedToInitializeWait).toHaveBeenCalledWith( + "An unexpected error occurred" + ); + }); +}); diff --git a/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.ts b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.ts new file mode 100644 index 000000000..e930c43f2 --- /dev/null +++ b/firestore-bigquery-export/firestore-bigquery-change-tracker/src/bigquery/utils.ts @@ -0,0 +1,78 @@ +import { Dataset, Table } from "@google-cloud/bigquery"; +import * as logs from "../logs"; + +interface WaitForInitializationParams { + dataset: Dataset; + changelogName: string; +} + +/** + * Periodically checks for the existence of a dataset and table until both are found or a maximum number of attempts is reached. + * @param {WaitForInitializationParams} params - Parameters for initialization including the dataset and the table name to check. + * @param {number} maxAttempts - Maximum number of attempts before giving up (defaults to 12). + * @returns {Promise} A promise that resolves with the Table if it exists, or rejects if it doesn't exist after maxAttempts or an error occurs. + * @throws {Error} Throws an error if the dataset or table cannot be verified to exist after multiple attempts or if an unexpected error occurs. + */ +export async function waitForInitialization( + { dataset, changelogName }: WaitForInitializationParams, + maxAttempts = 12 +): Promise
{ + return new Promise((resolve, reject) => { + let attempts = 0; + let handle = setInterval(async () => { + try { + const [datasetExists] = await dataset.exists(); + const table = dataset.table(changelogName); + const [tableExists] = await table.exists(); + + if (datasetExists && tableExists) { + clearInterval(handle); + resolve(table); + } else { + attempts++; + if (attempts >= maxAttempts) { + clearInterval(handle); + reject( + new Error( + "Initialization timed out. Dataset or table could not be verified to exist after multiple attempts." + ) + ); + } + } + } catch (error) { + clearInterval(handle); + const message = + error instanceof Error + ? error.message + : "An unexpected error occurred"; + logs.failedToInitializeWait(message); + reject(new Error(message)); + } + }, 500); + }); +} + +export function parseErrorMessage( + error: unknown, + process: string = "" +): string { + let message: string; + + if (error instanceof Error) { + // Standard error handling + message = error.message; + } else if ( + typeof error === "object" && + error !== null && + "message" in error + ) { + // Handling errors from APIs or other sources that are not native Error objects + message = (error as { message: string }).message; + } else { + // Fallback for when error is neither an Error object nor an expected structured object + message = `An unexpected error occurred${ + process ? ` during ${process}` : "" + }.`; + } + return message; +}