From 57b9e5c93ab60343f47c247d55fc168ab88914cd Mon Sep 17 00:00:00 2001 From: Mais Alheraki Date: Tue, 26 Mar 2024 21:07:12 +0300 Subject: [PATCH] fix: no resource found for `fsimportexistingdocs` (#2018) --- firestore-bigquery-export/CHANGELOG.md | 4 + firestore-bigquery-export/extension.yaml | 2 +- .../functions/src/index.ts | 138 +++++++++--------- 3 files changed, 77 insertions(+), 67 deletions(-) diff --git a/firestore-bigquery-export/CHANGELOG.md b/firestore-bigquery-export/CHANGELOG.md index d8ccac375..0c594ecb9 100644 --- a/firestore-bigquery-export/CHANGELOG.md +++ b/firestore-bigquery-export/CHANGELOG.md @@ -1,3 +1,7 @@ +## Version 0.1.48 + +fix - fix the error "no resource found for `fsimportexistingdocs`" + ## Version 0.1.47 fix - temporarily disable backfill feature diff --git a/firestore-bigquery-export/extension.yaml b/firestore-bigquery-export/extension.yaml index 018d6ec20..5bc8ff0fc 100644 --- a/firestore-bigquery-export/extension.yaml +++ b/firestore-bigquery-export/extension.yaml @@ -13,7 +13,7 @@ # limitations under the License. name: firestore-bigquery-export -version: 0.1.47 +version: 0.1.48 specVersion: v1beta displayName: Stream Firestore to BigQuery diff --git a/firestore-bigquery-export/functions/src/index.ts b/firestore-bigquery-export/functions/src/index.ts index a4a49010b..c51834abf 100644 --- a/firestore-bigquery-export/functions/src/index.ts +++ b/firestore-bigquery-export/functions/src/index.ts @@ -207,69 +207,75 @@ export const initBigQuerySync = functions.tasks return; }); -// exports.fsimportexistingdocs = functions.tasks -// .taskQueue() -// .onDispatch(async (data, context) => { -// const runtime = getExtensions().runtime(); -// if (!config.doBackfill || !config.importCollectionPath) { -// await runtime.setProcessingState( -// "PROCESSING_COMPLETE", -// "Completed. No existing documents imported into BigQuery." -// ); -// return; -// } - -// const offset = (data["offset"] as number) ?? 0; -// const docsCount = (data["docsCount"] as number) ?? 0; - -// const query = config.useCollectionGroupQuery -// ? getFirestore(config.databaseId).collectionGroup( -// config.importCollectionPath.split("/")[ -// config.importCollectionPath.split("/").length - 1 -// ] -// ) -// : getFirestore(config.databaseId).collection(config.importCollectionPath); - -// const snapshot = await query -// .offset(offset) -// .limit(config.docsPerBackfill) -// .get(); - -// const rows = snapshot.docs.map((d) => { -// return { -// timestamp: new Date().toISOString(), -// operation: ChangeType.IMPORT, -// documentName: `projects/${config.bqProjectId}/databases/(default)/documents/${d.ref.path}`, -// documentId: d.id, -// eventId: "", -// pathParams: resolveWildcardIds(config.importCollectionPath, d.ref.path), -// data: eventTracker.serializeData(d.data()), -// }; -// }); -// try { -// await eventTracker.record(rows); -// } catch (err: any) { -// /** If configured, event tracker wil handle failed rows in a backup collection */ -// functions.logger.log(err); -// } -// if (rows.length == config.docsPerBackfill) { -// // There are more documents to import - enqueue another task to continue the backfill. -// const queue = getFunctions().taskQueue( -// `locations/${config.location}/functions/fsimportexistingdocs`, -// config.instanceId -// ); -// await queue.enqueue({ -// offset: offset + config.docsPerBackfill, -// docsCount: docsCount + rows.length, -// }); -// } else { -// // We are finished, set the processing state to report back how many docs were imported. -// runtime.setProcessingState( -// "PROCESSING_COMPLETE", -// `Successfully imported ${ -// docsCount + rows.length -// } documents into BigQuery` -// ); -// } -// await events.recordCompletionEvent({ context }); -// }); +exports.fsimportexistingdocs = functions.tasks + .taskQueue() + .onDispatch(async (data, context) => { + const runtime = getExtensions().runtime(); + await runtime.setProcessingState( + "PROCESSING_COMPLETE", + "Completed. No existing documents imported into BigQuery." + ); + return; + + // if (!config.doBackfill || !config.importCollectionPath) { + // await runtime.setProcessingState( + // "PROCESSING_COMPLETE", + // "Completed. No existing documents imported into BigQuery." + // ); + // return; + // } + + // const offset = (data["offset"] as number) ?? 0; + // const docsCount = (data["docsCount"] as number) ?? 0; + + // const query = config.useCollectionGroupQuery + // ? getFirestore(config.databaseId).collectionGroup( + // config.importCollectionPath.split("/")[ + // config.importCollectionPath.split("/").length - 1 + // ] + // ) + // : getFirestore(config.databaseId).collection(config.importCollectionPath); + + // const snapshot = await query + // .offset(offset) + // .limit(config.docsPerBackfill) + // .get(); + + // const rows = snapshot.docs.map((d) => { + // return { + // timestamp: new Date().toISOString(), + // operation: ChangeType.IMPORT, + // documentName: `projects/${config.bqProjectId}/databases/(default)/documents/${d.ref.path}`, + // documentId: d.id, + // eventId: "", + // pathParams: resolveWildcardIds(config.importCollectionPath, d.ref.path), + // data: eventTracker.serializeData(d.data()), + // }; + // }); + // try { + // await eventTracker.record(rows); + // } catch (err: any) { + // /** If configured, event tracker wil handle failed rows in a backup collection */ + // functions.logger.log(err); + // } + // if (rows.length == config.docsPerBackfill) { + // // There are more documents to import - enqueue another task to continue the backfill. + // const queue = getFunctions().taskQueue( + // `locations/${config.location}/functions/fsimportexistingdocs`, + // config.instanceId + // ); + // await queue.enqueue({ + // offset: offset + config.docsPerBackfill, + // docsCount: docsCount + rows.length, + // }); + // } else { + // // We are finished, set the processing state to report back how many docs were imported. + // runtime.setProcessingState( + // "PROCESSING_COMPLETE", + // `Successfully imported ${ + // docsCount + rows.length + // } documents into BigQuery` + // ); + // } + // await events.recordCompletionEvent({ context }); + });