Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🚑 Scaling fetching datta from gdrive #710

Merged
merged 3 commits into from
Oct 8, 2024
Merged

Conversation

naelob
Copy link
Contributor

@naelob naelob commented Oct 8, 2024

Summary by CodeRabbit

  • New Features

    • Introduced a new queue for third-party data ingestion, enhancing data processing capabilities.
    • Added a new method for batch processing of Google Drive files, improving synchronization efficiency.
    • Implemented a sleep function to manage execution timing within the application.
    • Enhanced the FileService and GoogleDriveService with new integrations for improved data handling.
  • Bug Fixes

    • Restructured error handling in the data ingestion process for improved reliability.
  • Documentation

    • Updated interfaces and types to reflect new properties and methods for better clarity on data ingestion processes.

@naelob
Copy link
Contributor Author

naelob commented Oct 8, 2024

Qovery Preview

Qovery can create a Preview Environment for this PR.
To trigger its creation, please post a comment with one of the following command.

Command Blueprint environment
/qovery preview 783d0240-ec38-4387-a9a9-8e225f1bd3e1 dev
/qovery preview {all|UUID1,UUID2,...} To preview multiple environments

This comment has been generated from Qovery AI 🤖.
Below, a word from its wisdom :

Don’t Sacrifice Readability, you are going to debug it in 6 months

Copy link

changeset-bot bot commented Oct 8, 2024

⚠️ No Changeset found

Latest commit: fc854e1

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

Copy link
Contributor

coderabbitai bot commented Oct 8, 2024

Caution

Review failed

The pull request is closed.

📝 Walkthrough
📝 Walkthrough

Walkthrough

The pull request introduces several changes to the queue management and data ingestion processes within the application. A new queue for third-party data ingestion is added, along with corresponding updates to the service and type definitions to accommodate this functionality. Modifications to the IngestDataService and GoogleDriveService enhance data handling capabilities, while a new processor class is introduced for managing Google Drive synchronization jobs. Additionally, a utility function for pausing execution is added, and changes to the application bootstrap process are made to exclude certain initializations.

Changes

File Path Change Summary
packages/api/src/@core/@core-services/queues/queue.module.ts Added new queue: { name: Queues.THIRD_PARTY_DATA_INGESTION } in BullQueueModule.
packages/api/src/@core/@core-services/queues/shared.service.ts Added property: public readonly thirdPartyDataIngestionQueue: Queue.
Added method: getThirdPartyDataIngestionQueue().
packages/api/src/@core/@core-services/queues/types.ts Updated Queues enum with new entry THIRD_PARTY_DATA_INGESTION.
packages/api/src/@core/@core-services/unification/ingest-data.service.ts Updated syncForLinkedUser method to include new ingestParams variable.
Removed commented-out code for sourceObject.
packages/api/src/@core/utils/helpers.ts Added function: export function sleep(ms: number): Promise<void>.
packages/api/src/@core/utils/types/interface.ts Updated SyncParam type with new optional properties.
Added method: ingestData in IBaseObjectService.
packages/api/src/filestorage/file/services/googledrive/index.ts Added method: async ingestData(...).
Updated method signature for sync.
Added methods: processBatch, getLastSyncTime.
packages/api/src/filestorage/file/services/googledrive/processor.ts Added class: GoogleDriveQueueProcessor.
Added method: async handleGoogleDriveSync(job: Job).
packages/api/src/main.ts Commented out the call to generatePanoraParamsSpec(document) in the bootstrap function.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant GoogleDriveService
    participant GoogleDriveQueueProcessor
    participant BullQueueService

    User->>BullQueueService: Add job to Queues.THIRD_PARTY_DATA_INGESTION
    BullQueueService->>GoogleDriveQueueProcessor: Process job
    GoogleDriveQueueProcessor->>GoogleDriveService: handleGoogleDriveSync(job)
    GoogleDriveService->>GoogleDriveService: ingestData(sourceData, connectionId, customFieldMappings, extraParams)
    GoogleDriveService-->>GoogleDriveQueueProcessor: Return processed data
    GoogleDriveQueueProcessor-->>BullQueueService: Job completed
Loading

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai or @coderabbitai title anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

zeropath-ai bot commented Oct 8, 2024

Security

  • No security or compliance vulnerabilities detected
  • Scanned 8 changed file(s)

Changes Detected

  • [Feature] Add third-party data ingestion queue [packages/api/src/@core/@core-services/queues/shared.service.ts]

    • Implemented in BullQueueService class
    • Added thirdPartyDataIngestionQueue and getter method
    • Improves scalability for handling third-party data ingestion
  • [Feature] Implement batch processing for Google Drive sync [packages/api/src/filestorage/file/services/googledrive/index.ts]

    • Added new sync method with pagination and queue-based processing
    • Introduced BATCH_SIZE and API_RATE_LIMIT constants
    • Improves handling of large datasets and respects API rate limits
  • [Feature] Add Google Drive queue processor [packages/api/src/filestorage/file/services/googledrive/processor.ts]

    • Created new GoogleDriveQueueProcessor class
    • Handles 'fs_file_googledrive' jobs from THIRD_PARTY_DATA_INGESTION queue
    • Enables asynchronous processing of Google Drive sync tasks
  • [Refactor] Update SyncParam interface [packages/api/src/@core/utils/types/interface.ts]

    • Added custom_field_mappings and ingestParams to SyncParam
    • Updated IBaseObjectService interface with ingestData method
    • Improves flexibility for custom field mapping and ingestion
  • [Feature] Add sleep helper function [packages/api/src/@core/utils/helpers.ts]

    • Implemented sleep function for introducing delays
    • Useful for rate limiting and managing API requests
  • [Bug Fix] Comment out generatePanoraParamsSpec call [packages/api/src/main.ts]

    • Temporarily disabled generatePanoraParamsSpec function call
    • Prevents potential issues related to Panora params spec generation

Copy link

zeropath-ai bot commented Oct 8, 2024

Security

  • No security or compliance vulnerabilities detected
  • Scanned 9 changed file(s)

Changes Detected

  • [Feature] Add new queue for third-party data ingestion [queue.module.ts, shared.service.ts, types.ts]

    • Implemented in the queue module and shared service
    • Added THIRD_PARTY_DATA_INGESTION to Queues enum
    • Introduced thirdPartyDataIngestionQueue in BullQueueService
  • [Refactor] Update IngestDataService for improved data handling [ingest-data.service.ts]

    • Modified sync method in IngestDataService
    • Added custom_field_mappings and ingestParams to SyncParam
    • Commented out previous ingestData call
  • [Feature] Implement sleep function for rate limiting [helpers.ts]

    • Added new utility function in helpers.ts
    • Allows for introducing delays in asynchronous operations
  • [Refactor] Enhance IBaseObjectService interface [interface.ts]

    • Added ingestData method to IBaseObjectService interface
    • Updated SyncParam type with new properties
  • [Feature] Implement batch processing for Google Drive sync [googledrive/index.ts]

    • Added batch processing logic in GoogleDriveService
    • Introduced rate limiting and pagination for API requests
    • Implemented processBatch method for handling individual batches
  • [Feature] Add Google Drive queue processor [googledrive/processor.ts]

    • Created new GoogleDriveQueueProcessor class
    • Handles 'fs_file_googledrive' jobs in THIRD_PARTY_DATA_INGESTION queue
  • [Bug Fix] Comment out generatePanoraParamsSpec call [main.ts]

    • Temporarily disabled generatePanoraParamsSpec function call
    • Added TODO comment for future reference

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🧹 Outside diff range comments (1)
packages/api/src/@core/@core-services/unification/ingest-data.service.ts (1)

Line range hint 1-324: Summary of changes and recommendations

The modifications to IngestDataService appear to be part of a larger refactoring effort to enhance the data ingestion process. Key points to address:

  1. Optimize the creation of ingestParams for better performance.
  2. Clarify the new data ingestion strategy, given the removal of the direct ingestData call.
  3. Consider removing commented-out code if it's no longer needed.

These changes have the potential to improve the overall efficiency and maintainability of the service. Please review the specific comments for detailed recommendations and ensure that the new data flow is well-documented for future maintenance.

🧰 Tools
🪛 Biome

[error] 90-90: Avoid the use of spread (...) syntax on accumulators.

Spread syntax should be avoided on accumulators (like those in .reduce) because it causes a time complexity of O(n^2).
Consider methods such as .splice or .push instead.

(lint/performance/noAccumulatingSpread)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: ASSERTIVE

📥 Commits

Files that changed from the base of the PR and between a12f389 and a775926.

📒 Files selected for processing (9)
  • packages/api/src/@core/@core-services/queues/queue.module.ts (1 hunks)
  • packages/api/src/@core/@core-services/queues/shared.service.ts (2 hunks)
  • packages/api/src/@core/@core-services/queues/types.ts (1 hunks)
  • packages/api/src/@core/@core-services/unification/ingest-data.service.ts (3 hunks)
  • packages/api/src/@core/utils/helpers.ts (1 hunks)
  • packages/api/src/@core/utils/types/interface.ts (1 hunks)
  • packages/api/src/filestorage/file/services/googledrive/index.ts (2 hunks)
  • packages/api/src/filestorage/file/services/googledrive/processor.ts (1 hunks)
  • packages/api/src/main.ts (1 hunks)
🧰 Additional context used
🪛 Biome
packages/api/src/@core/@core-services/unification/ingest-data.service.ts

[error] 90-90: Avoid the use of spread (...) syntax on accumulators.

Spread syntax should be avoided on accumulators (like those in .reduce) because it causes a time complexity of O(n^2).
Consider methods such as .splice or .push instead.

(lint/performance/noAccumulatingSpread)

packages/api/src/filestorage/file/services/googledrive/index.ts

[error] 31-31: Template literals are preferred over string concatenation.

Unsafe fix: Use a template literal.

(lint/style/useTemplate)


[error] 151-151: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 152-152: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 153-153: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 154-154: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 155-155: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)

🔇 Additional comments (10)
packages/api/src/filestorage/file/services/googledrive/processor.ts (3)

7-9: LGTM! Class declaration and decorators are well-structured.

The class declaration and decorators are correctly implemented:

  • @Injectable() is properly used for dependency injection in NestJS.
  • @Processor(Queues.THIRD_PARTY_DATA_INGESTION) correctly specifies the queue this processor will handle.
  • The class name GoogleDriveQueueProcessor is descriptive and follows PascalCase convention.

10-10: LGTM! Constructor follows dependency injection best practices.

The constructor is well-implemented:

  • It correctly injects an instance of GoogleDriveService.
  • The use of private readonly ensures the service cannot be modified after initialization, promoting immutability.

1-23: Overall, the implementation is solid. Verify integration with the broader system.

The GoogleDriveQueueProcessor is well-implemented, following NestJS and Bull best practices. It correctly sets up a processor for handling Google Drive synchronization jobs.

To ensure full functionality:

Verify that this processor is properly registered in the module. Run the following command to check the module file:

This will search for references to GoogleDriveQueueProcessor in files named "module.ts" within the file storage context. Ensure it's included in the providers array of the relevant module.

packages/api/src/@core/@core-services/queues/shared.service.ts (2)

40-42: LGTM! The new getter method is well-implemented.

The getThirdPartyDataIngestionQueue() method is correctly implemented and follows the existing patterns in the class. It provides a clean way to access the thirdPartyDataIngestionQueue property.


17-18: Verify the usage of the new queue and getter method.

The new thirdPartyDataIngestionQueue and its getter method getThirdPartyDataIngestionQueue() have been correctly implemented. To ensure these changes are fully integrated:

  1. Verify that Queues.THIRD_PARTY_DATA_INGESTION is properly defined in the Queues enum.
  2. Check if the new queue and getter method are being used appropriately in other parts of the codebase.

Run the following script to verify the integration:

This script will help ensure that the new queue is properly defined and used throughout the codebase.

Also applies to: 40-42

✅ Verification successful

Verification Successful: The new queue and getter method are properly integrated.

  • THIRD_PARTY_DATA_INGESTION is defined in the Queues enum.
  • thirdPartyDataIngestionQueue is declared and used within shared.service.ts.
  • getThirdPartyDataIngestionQueue() method is implemented and utilized in googledrive/index.ts.
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the integration of the new third-party data ingestion queue

# Test 1: Check if THIRD_PARTY_DATA_INGESTION is defined in Queues enum
echo "Checking Queues enum definition:"
rg --type typescript "enum\s+Queues" -A 10 | rg "THIRD_PARTY_DATA_INGESTION"

# Test 2: Search for usage of thirdPartyDataIngestionQueue
echo "\nChecking usage of thirdPartyDataIngestionQueue:"
rg --type typescript "thirdPartyDataIngestionQueue"

# Test 3: Search for usage of getThirdPartyDataIngestionQueue method
echo "\nChecking usage of getThirdPartyDataIngestionQueue method:"
rg --type typescript "getThirdPartyDataIngestionQueue\(\)"

Length of output: 607


Script:

#!/bin/bash
# Description: Verify the integration of the new third-party data ingestion queue

# Test 1: Check if THIRD_PARTY_DATA_INGESTION is defined in Queues enum
echo "Checking Queues enum definition:"
rg --type ts "enum\s+Queues" -A 10 | rg "THIRD_PARTY_DATA_INGESTION"

# Test 2: Search for usage of thirdPartyDataIngestionQueue
echo "\nChecking usage of thirdPartyDataIngestionQueue:"
rg --type ts "thirdPartyDataIngestionQueue"

# Test 3: Search for usage of getThirdPartyDataIngestionQueue method
echo "\nChecking usage of getThirdPartyDataIngestionQueue method:"
rg --type ts "getThirdPartyDataIngestionQueue\(\)"

Length of output: 1030

packages/api/src/@core/utils/types/interface.ts (1)

Line range hint 1-88: Final thoughts and verification step

The changes to this file enhance the flexibility of data synchronization and ingestion, which aligns well with the PR's objective of scaling data fetching from Google Drive. The additions to SyncParam and IBaseObjectService provide a good foundation for handling custom field mappings and additional parameters during the ingestion process.

To ensure these changes are properly integrated and don't introduce any breaking changes, please run the following verification script:

This script will help identify any areas of the codebase that might need updates to accommodate these new changes, as well as any remaining tasks related to the implementation.

packages/api/src/main.ts (1)

98-98: ⚠️ Potential issue

Clarify the impact and plans for the commented-out generatePanoraParamsSpec call.

The generatePanoraParamsSpec(document) call has been commented out and replaced with a TODO comment. This change could potentially affect the completeness of the API documentation. Please address the following points:

  1. What is the impact of removing this function call? Are there any parts of the API documentation that will be missing or incomplete as a result?
  2. Why was this change made? Is there a specific issue that necessitated commenting out this line?
  3. What are the plans to address this TODO? Is there a timeline for re-implementing or replacing this functionality?

To help assess the impact, let's check for any references to generatePanoraParamsSpec in the codebase:

To ensure this TODO is not forgotten, consider creating a JIRA ticket or GitHub issue to track this task. This will help maintain visibility on this pending work and ensure it's addressed in future development cycles.

✅ Verification successful

Verify the impact of commenting out generatePanoraParamsSpec

The generatePanoraParamsSpec(document) call has been commented out in packages/api/src/main.ts and is not referenced elsewhere in the codebase. This change may lead to incomplete API documentation generation. Please ensure that:

  • The TODO is tracked in your issue management system to address re-enabling or replacing this functionality in the future.
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for references to generatePanoraParamsSpec
rg "generatePanoraParamsSpec" --type ts

Length of output: 325

packages/api/src/@core/@core-services/unification/ingest-data.service.ts (2)

96-97: LGTM: Proper inclusion of ingestParams in syncParam.

The addition of ingestParams to the syncParam object ensures that the filtered and processed parameters are correctly passed to the sync operation. This change is consistent with the introduction of ingestParams and improves the flexibility of the sync process.


Line range hint 133-143: Clarify the removal of direct ingestData call and update comments.

The commented-out code block removes the direct call to ingestData after the sync operation. This change suggests a potential shift in the data ingestion strategy.

Please clarify:

  1. Has the data ingestion process been moved to a different part of the application?
  2. If so, where is it now handled?
  3. If this code is no longer needed, consider removing it entirely rather than leaving it commented out.

To help verify the current usage of ingestData, you can run the following script:

This will help identify if and where the ingestData method is now being called, providing context for the removed code.

✅ Verification successful

Remove the commented-out ingestData code block.

The ingestData method is still utilized in other parts of the codebase:

  • packages/api/src/filestorage/file/services/googledrive/index.ts
  • packages/api/src/@core/utils/types/interface.ts

Since data ingestion is handled elsewhere, the commented-out code in ingest-data.service.ts is unnecessary and should be removed to enhance code clarity and maintainability.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for calls to ingestData method
rg "ingestData\(" --type typescript

Length of output: 72


Script:

#!/bin/bash
# Search for calls to ingestData method using the correct TypeScript file type
rg "ingestData\(" --type ts

Length of output: 266

packages/api/src/filestorage/file/services/googledrive/index.ts (1)

93-105: Ensure proper pagination and processing of all files

The current implementation schedules jobs for processing batches but may miss processing the last page if nextPageToken is not properly handled.

Run the following script to check whether all pages are being processed:

Ensure that SCHEDULED_JOBS matches EXPECTED_PAGES.

Comment on lines +1 to +3
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a brief JSDoc comment.

The sleep function is well-implemented and follows good practices. It provides a useful utility for introducing delays in asynchronous operations, which could be beneficial for rate limiting or managing API requests when fetching data from Google Drive.

Consider adding a brief JSDoc comment to improve documentation:

+/**
+ * Pauses execution for the specified number of milliseconds.
+ * @param ms The number of milliseconds to sleep.
+ * @returns A Promise that resolves after the specified delay.
+ */
export function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => setTimeout(resolve, ms));
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* Pauses execution for the specified number of milliseconds.
* @param ms The number of milliseconds to sleep.
* @returns A Promise that resolves after the specified delay.
*/
export function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

@@ -4,4 +4,5 @@ export enum Queues {
SYNC_JOBS_WORKER = 'SYNC_JOBS_WORKER', // Queue which syncs data from remote 3rd parties
FAILED_PASSTHROUGH_REQUESTS_HANDLER = 'FAILED_PASSTHROUGH_REQUESTS_HANDLER', // Queue which handles failed passthrough request due to rate limit and retries it with backOff
RAG_DOCUMENT_PROCESSING = 'RAG_DOCUMENT_PROCESSING',
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a descriptive comment.

The addition of THIRD_PARTY_DATA_INGESTION to the Queues enum is appropriate and aligns with the PR's objective of scaling data fetching from Google Drive. The naming convention is consistent with other entries.

Consider adding a descriptive comment for this new queue type, similar to the other entries. For example:

-  THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
+  THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', // Queue for ingesting data from third-party sources like Google Drive
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION',
THIRD_PARTY_DATA_INGESTION = 'THIRD_PARTY_DATA_INGESTION', // Queue for ingesting data from third-party sources like Google Drive

Comment on lines +24 to +26
{
name: Queues.THIRD_PARTY_DATA_INGESTION,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider adding a comment for the new queue.

The addition of the THIRD_PARTY_DATA_INGESTION queue is correct and aligns with the PR objective of scaling data fetching from Google Drive. The implementation is consistent with the existing code structure.

Consider adding a brief comment above this queue configuration to explain its purpose, similar to:

// Queue for handling third-party data ingestion tasks, such as fetching data from Google Drive
{
  name: Queues.THIRD_PARTY_DATA_INGESTION,
},

This would improve code readability and make it easier for other developers to understand the purpose of this queue.

Comment on lines +1 to +5
import { Process, Processor } from '@nestjs/bull';
import { Injectable } from '@nestjs/common';
import { Job } from 'bull';
import { Queues } from '@@core/@core-services/queues/types';
import { GoogleDriveService } from '.';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider using named imports for clarity.

The imports are appropriate for a NestJS application using Bull for queue processing. However, for better clarity and maintainability, consider using named imports for the Queues type.

Apply this change to improve import clarity:

-import { Queues } from '@@core/@core-services/queues/types';
+import { Queues } from '@@core/@core-services/queues/types';

Committable suggestion was skipped due to low confidence.

Comment on lines +12 to +22
@Process('fs_file_googledrive')
async handleGoogleDriveSync(job: Job) {
try {
await this.googleDriveService.processBatch(job);
} catch (error) {
console.error(
`Failed to process Google Drive sync job: ${error.message}`,
);
throw error;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

LGTM! Consider enhancing error logging for better debugging.

The handleGoogleDriveSync method is well-implemented:

  • The @Process('fs_file_googledrive') decorator correctly specifies the job type.
  • The method is properly declared as async and takes a Job parameter.
  • Error handling is implemented with a try-catch block.

However, there's room for improvement in error logging:

Consider enhancing the error logging to include more context:

 } catch (error) {
   console.error(
-    `Failed to process Google Drive sync job: ${error.message}`,
+    `Failed to process Google Drive sync job: ${error.message}`,
+    `Job ID: ${job.id}, Data: ${JSON.stringify(job.data)}`,
+    error.stack
   );
   throw error;
 }

This change will provide more context for debugging, including the job ID and data.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@Process('fs_file_googledrive')
async handleGoogleDriveSync(job: Job) {
try {
await this.googleDriveService.processBatch(job);
} catch (error) {
console.error(
`Failed to process Google Drive sync job: ${error.message}`,
);
throw error;
}
}
@Process('fs_file_googledrive')
async handleGoogleDriveSync(job: Job) {
try {
await this.googleDriveService.processBatch(job);
} catch (error) {
console.error(
`Failed to process Google Drive sync job: ${error.message}`,
`Job ID: ${job.id}, Data: ${JSON.stringify(job.data)}`,
error.stack
);
throw error;
}
}

Comment on lines +114 to +122
async processBatch(job: any) {
const {
linkedUserId,
query,
pageToken,
connectionId,
custom_field_mappings,
ingestParams,
} = job.data;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Define a specific type for the job parameter in processBatch

Using any for the job parameter reduces type safety and may introduce runtime errors. Defining a specific type enhances maintainability and readability.

Consider defining an interface for the job data:

interface GoogleDriveJobData {
  linkedUserId: string;
  query: string;
  pageToken?: string;
  connectionId: string;
  custom_field_mappings?: { slug: string; remote_id: string }[];
  ingestParams?: { [key: string]: any };
}

Then update the method signature:

- async processBatch(job: any) {
+ async processBatch(job: { data: GoogleDriveJobData }) {

Comment on lines +248 to +253
private async getLastSyncTime(connectionId: string): Promise<Date | null> {
const lastSync = await this.prisma.fs_files.findFirst({
where: { id_connection: connectionId },
orderBy: { modified_at: 'desc' },
});
return lastSync ? lastSync.modified_at : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Optimize getLastSyncTime query performance

Retrieving the latest modified time without proper indexing can lead to performance issues on large datasets.

Ensure that there's an index on id_connection and modified_at fields in the fs_files table to optimize the query.

Consider updating the database schema to include the necessary indexes.

@Injectable()
export class GoogleDriveService implements IFileService {
constructor(
private prisma: PrismaService,
private logger: LoggerService,
private cryptoService: EncryptionService,
private registry: ServiceRegistry,
private ingestService: IngestDataService,
private bullQueueService: BullQueueService,
) {
this.logger.setContext(
FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use template literals instead of string concatenation

The current string concatenation can be replaced with a template literal for improved readability and consistency.

Apply this diff to update the code:

- this.logger.setContext(
-   FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name,
- );
+ this.logger.setContext(
+   `${FileStorageObject.file.toUpperCase()}:${GoogleDriveService.name}`,
+ );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
FileStorageObject.file.toUpperCase() + ':' + GoogleDriveService.name,
`${FileStorageObject.file.toUpperCase()}:${GoogleDriveService.name}`,
🧰 Tools
🪛 Biome

[error] 31-31: Template literals are preferred over string concatenation.

Unsafe fix: Use a template literal.

(lint/style/useTemplate)

Comment on lines 151 to 155
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
modifiedTime: file.modifiedTime!,
size: file.size!,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid using non-null assertions

Using the non-null assertion operator ! can lead to runtime errors if any of the properties are null or undefined. It's safer to handle possible undefined values appropriately.

Consider handling the potential undefined values or ensuring that these properties are always defined before usage. You can use optional chaining or provide default values.

Apply this diff to remove the non-null assertions and handle potential undefined values:

- id: file.id!,
+ id: file.id ?? '',
- name: file.name!,
+ name: file.name ?? '',
- mimeType: file.mimeType!,
+ mimeType: file.mimeType ?? '',
- modifiedTime: file.modifiedTime!,
+ modifiedTime: file.modifiedTime ?? '',
- size: file.size!,
+ size: file.size ?? '0',
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
id: file.id!,
name: file.name!,
mimeType: file.mimeType!,
modifiedTime: file.modifiedTime!,
size: file.size!,
id: file.id ?? '',
name: file.name ?? '',
mimeType: file.mimeType ?? '',
modifiedTime: file.modifiedTime ?? '',
size: file.size ?? '0',
🧰 Tools
🪛 Biome

[error] 151-151: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 152-152: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 153-153: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 154-154: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)


[error] 155-155: Forbidden non-null assertion.

(lint/style/noNonNullAssertion)

Comment on lines 168 to 175
private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve) => {
setTimeout(async () => {
const result = await request();
resolve(result);
}, 1000 / API_RATE_LIMIT);
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle errors in rateLimitedRequest to prevent unhandled rejections

If the request() throws an error, the promise may neither resolve nor reject, leading to unhandled promise rejections. It's important to handle exceptions within the promise.

Wrap the request() call in a try-catch block and reject the promise on error.

Apply this diff to handle errors appropriately:

 private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
   return new Promise((resolve, reject) => {
     setTimeout(async () => {
+      try {
         const result = await request();
         resolve(result);
+      } catch (error) {
+        reject(error);
+      }
     }, 1000 / API_RATE_LIMIT);
   });
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve) => {
setTimeout(async () => {
const result = await request();
resolve(result);
}, 1000 / API_RATE_LIMIT);
});
}
private async rateLimitedRequest<T>(request: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(async () => {
try {
const result = await request();
resolve(result);
} catch (error) {
reject(error);
}
}, 1000 / API_RATE_LIMIT);
});
}

Copy link

zeropath-ai bot commented Oct 8, 2024

Security

  • No security or compliance vulnerabilities detected
  • Scanned 12 changed file(s)

Changes Detected

  • [Feature] Add new queue for third-party data ingestion [queue.module.ts, shared.service.ts, types.ts]

    • Implemented in @core/@core-services/queues
    • Added THIRD_PARTY_DATA_INGESTION to Queues enum
    • Updated BullQueueService to include thirdPartyDataIngestionQueue
  • [Refactor] Modify IngestDataService for improved data handling [ingest-data.service.ts]

    • Updated syncData method to include custom_field_mappings and ingestParams
    • Commented out file processing for RAG
  • [Feature] Add sleep helper function [helpers.ts]

    • Implemented in @core/utils/helpers.ts
    • Allows for adding delays in code execution
  • [Refactor] Update IBaseObjectService interface [interface.ts]

    • Added optional ingestData method to interface
    • Includes parameters for sourceData, connectionId, customFieldMappings, and extraParams
  • [Refactor] Modify GoogleDriveService for batch processing [index.ts]

    • Implemented batch processing of files
    • Added rate limiting for API requests
    • Utilizes BullQueue for processing file batches
  • [Feature] Add GoogleDriveQueueProcessor [processor.ts]

    • New processor for handling Google Drive sync jobs
    • Processes jobs from THIRD_PARTY_DATA_INGESTION queue
  • [Bug Fix] Comment out generatePanoraParamsSpec in main.ts [main.ts]

    • Temporarily disabled generatePanoraParamsSpec function call

@naelob naelob merged commit 462c514 into main Oct 8, 2024
11 of 15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants