Skip to content

Commit

Permalink
fix(firestore-bigquery-export): initialization fix
Browse files Browse the repository at this point in the history
  • Loading branch information
cabljac committed Apr 18, 2024
1 parent 9969223 commit 213a2fd
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"url": "github.com/firebase/extensions.git",
"directory": "firestore-bigquery-export/firestore-bigquery-change-tracker"
},
"version": "1.1.32",
"version": "1.1.33",
"description": "Core change-tracker library for Cloud Firestore Collection BigQuery Exports",
"main": "./lib/index.js",
"scripts": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,31 @@ let dataset: Dataset;
let table: Table;
let view: Table;
describe("e2e", () => {
describe("initialization", () => {
beforeEach(async () => {
randomID = (Math.random() + 1).toString(36).substring(7);
datasetId = `dataset_${randomID}`;
tableId = `table_${randomID}`;
tableId_raw = `${tableId}_raw_changelog`;
dataset = bq.dataset(datasetId);
});

afterEach(async () => {
await deleteTable({
datasetId,
});
});
test("successfully creates a dataset and table", async () => {
await changeTracker({
datasetId,
tableId,
}).record([event]);

const [metadata] = await dataset.table(tableId_raw).getMetadata();

expect(metadata).toBeDefined();
});
});
describe("Partitioning", () => {
beforeEach(async () => {
randomID = (Math.random() + 1).toString(36).substring(7);
Expand Down Expand Up @@ -108,13 +133,14 @@ describe("e2e", () => {

const [metadata] = await dataset.table(`${tableId_raw}`).getMetadata();

expect(metadata.timePartitioning).toBeDefined();

const [changeLogRows] = await getBigQueryTableData(
process.env.PROJECT_ID,
datasetId,
tableId
);

expect(metadata.timePartitioning).toBeDefined();
expect(changeLogRows[0].created.value).toBe(
BigQuery.timestamp(created.toDate()).value
);
Expand Down Expand Up @@ -369,6 +395,10 @@ describe("e2e", () => {
});

test("does not update an existing non partitioned table, that has a valid schema with timePartitioning only", async () => {
const tableExists = await dataset.table(tableId_raw).exists();

expect(tableExists[0]).toBe(true);

await changeTracker({
datasetId,
tableId,
Expand All @@ -380,7 +410,7 @@ describe("e2e", () => {
expect(metadata.timePartitioning).toBeUndefined();

expect(consoleLogSpyWarn).toBeCalledWith(
`Cannot partition an existing table ${datasetId}_${tableId_raw}`
`Did not add partitioning to schema: Partition field not provided`
);
expect(consoleLogSpy).toBeCalledWith(
`BigQuery dataset already exists: ${datasetId}`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
import { Partitioning } from "./partitioning";
import { Clustering } from "./clustering";
import { tableRequiresUpdate, viewRequiresUpdate } from "./checkUpdates";
import { parseErrorMessage, waitForInitialization } from "./utils";

export { RawChangelogSchema, RawChangelogViewSchema } from "./schema";

Expand Down Expand Up @@ -203,27 +204,10 @@ export class FirestoreBigQueryEventHistoryTracker
* A half a second delay is added per check while the function
* continually re-checks until the referenced dataset and table become available.
*/
private async waitForInitialization() {
return new Promise((resolve) => {
let handle = setInterval(async () => {
try {
const dataset = this.bigqueryDataset();
const changelogName = this.rawChangeLogTableName();
const table = dataset.table(changelogName);

const [datasetExists] = await dataset.exists();
const [tableExists] = await table.exists();

if (datasetExists && tableExists) {
clearInterval(handle);
return resolve(table);
}
} catch (ex) {
clearInterval(handle);
logs.failedToInitializeWait(ex.message);
}
}, 5000);
});
private async _waitForInitialization() {
const dataset = this.bigqueryDataset();
const changelogName = this.rawChangeLogTableName();
return waitForInitialization({ dataset, changelogName });
}

/**
Expand Down Expand Up @@ -279,17 +263,39 @@ export class FirestoreBigQueryEventHistoryTracker
if (this._initialized) {
return;
}
try {
await this.initializeDataset();
} catch (error) {
const message = parseErrorMessage(error, "initializing dataset");
throw new Error(`Error initializing dataset: ${message}`);
}

await this.initializeDataset();

await this.initializeRawChangeLogTable();
try {
await this.initializeRawChangeLogTable();
} catch (error) {
const message = parseErrorMessage(
error,
"initializing raw change log table"
);
throw new Error(`Error initializing raw change log table: ${message}`);
}

await this.initializeLatestView();
try {
await this.initializeLatestView();
} catch (error) {
const message = parseErrorMessage(error, "initializing latest view");
throw new Error(`Error initializing latest view: ${message}`);
}
await this._waitForInitialization();

this._initialized = true;
} catch (ex) {
await this.waitForInitialization();
this._initialized = true;
} catch (error) {
const message = parseErrorMessage(
error,
"initializing BigQuery resources"
);
console.error("Error initializing BigQuery resources: ", message);
throw error;
}
}

Expand Down Expand Up @@ -396,7 +402,6 @@ export class FirestoreBigQueryEventHistoryTracker
kmsKeyName: this.config.kmsKeyName,
};
}

//Add partitioning
await partitioning.addPartitioningToSchema(schema.fields);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as firebase from "firebase-admin";

import * as logs from "../logs";
import * as bigquery from "@google-cloud/bigquery";

import * as functions from "firebase-functions";
import { getNewPartitionField } from "./schema";
import { BigQuery, TableMetadata } from "@google-cloud/bigquery";

Expand Down Expand Up @@ -135,18 +135,22 @@ export class Partitioning {
}

private async isTablePartitioned() {
const [tableExists] = await this.table.exists();

if (!this.table || !tableExists) return false;

/* Return true if partition metadata already exists */
const [metadata] = await this.table.getMetadata();
if (!!metadata.timePartitioning) {
if (metadata.timePartitioning) {
logs.cannotPartitionExistingTable(this.table);
return Promise.resolve(true);
return true;
}

/** Find schema fields **/
const schemaFields = await this.metaDataSchemaFields();

/** Return false if no schema exists */
if (!schemaFields) return Promise.resolve(false);
if (!schemaFields) return false;

/* Return false if time partition field not found */
return schemaFields.some(
Expand All @@ -156,11 +160,11 @@ export class Partitioning {

async isValidPartitionForExistingTable(): Promise<boolean> {
/** Return false if partition type option has not been set */
if (!this.isPartitioningEnabled()) return Promise.resolve(false);
if (!this.isPartitioningEnabled()) return false;

/* Return false if table is already partitioned */
const isPartitioned = await this.isTablePartitioned();
if (isPartitioned) return Promise.resolve(false);
if (isPartitioned) return false;

return this.hasValidCustomPartitionConfig();
}
Expand Down Expand Up @@ -242,44 +246,54 @@ export class Partitioning {
return fields.map(($) => $.name).includes(timePartitioningField);
}

async addPartitioningToSchema(fields = []): Promise<void> {
/** Return if partition type option has not been set */
if (!this.isPartitioningEnabled()) return;

/** Return if class has invalid table reference */
if (!this.hasValidTableReference()) return;

/** Return if table is already partitioned **/
if (await this.isTablePartitioned()) return;

/** Return if partition config is invalid */
if (!this.hasValidCustomPartitionConfig()) return;

/** Return if an invalid partition type has been requested */
if (!this.hasValidTimePartitionType()) return;

/** Return if an invalid partition option has been requested */
if (!this.hasValidTimePartitionOption()) return;

/** Return if invalid partitioning and field type combination */
if (this.hasHourAndDatePartitionConfig()) return;

/** Return if partition field has not been provided */
if (!this.config.timePartitioningField) return;

/** Return if field already exists on schema */
if (this.customFieldExists(fields)) return;
private async shouldAddPartitioningToSchema(): Promise<{
proceed: boolean;
message: string;
}> {
if (!this.isPartitioningEnabled()) {
return { proceed: false, message: "Partitioning not enabled" };
}
if (!this.hasValidTableReference()) {
return { proceed: false, message: "Invalid table reference" };
}
if (!this.hasValidCustomPartitionConfig()) {
return { proceed: false, message: "Invalid partition config" };
}
if (!this.hasValidTimePartitionType()) {
return { proceed: false, message: "Invalid partition type" };
}
if (!this.hasValidTimePartitionOption()) {
return { proceed: false, message: "Invalid partition option" };
}
if (this.hasHourAndDatePartitionConfig()) {
return {
proceed: false,
message: "Invalid partitioning and field type combination",
};
}
if (this.customFieldExists()) {
return { proceed: false, message: "Field already exists on schema" };
}
if (await this.isTablePartitioned()) {
return { proceed: false, message: "Table is already partitioned" };
}
if (!this.config.timePartitioningField) {
return { proceed: false, message: "Partition field not provided" };
}
return { proceed: true, message: "" };
}

/** Add new partitioning field **/
async addPartitioningToSchema(fields = []): Promise<void> {
const { proceed, message } = await this.shouldAddPartitioningToSchema();
if (!proceed) {
functions.logger.warn(`Did not add partitioning to schema: ${message}`);
return;
}
// Add new partitioning field
fields.push(getNewPartitionField(this.config));

/** Log successful addition of partition column */
logs.addPartitionFieldColumn(
this.table.id,
this.config.timePartitioningField
functions.logger.log(
`Added new partition field: ${this.config.timePartitioningField} to table ID: ${this.table.id}`
);

return;
}

async updateTableMetadata(options: bigquery.TableMetadata): Promise<void> {
Expand Down
Loading

0 comments on commit 213a2fd

Please sign in to comment.