Skip to content

updates for enterprise #1247

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 21, 2025
Merged

updates for enterprise #1247

merged 10 commits into from
Mar 21, 2025

Conversation

parmesant
Copy link
Contributor

@parmesant parmesant commented Mar 18, 2025

Adds multiple updates for Parseable Enterprise

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Added flexible configuration options to specify custom connection endpoints for improved integration with indexing services.
    • Introduced enhanced metadata management to clearly separate indexing from ingestion data.
    • Enabled multipart upload support for efficient handling of large file transfers across storage platforms.
    • Added a new function to generate unique indexer IDs.
  • Bug Fixes

    • Improved error handling for endpoint validation based on operational mode.
  • Refactor

    • Streamlined file processing with improved date-based filtering.
    • Simplified endpoint access control.
    • Updated metadata handling logic to accommodate new indexer structures.
    • Enhanced the TimeRange struct to support cloning.
  • Chores

    • Increased HTTP client timeout settings for better performance.
    • Added a constant to define the minimum size for multipart uploads.

Copy link

coderabbitai bot commented Mar 18, 2025

Walkthrough

This pull request introduces several new and enhanced functionalities. It updates the CLI options to include a configurable indexer endpoint and modifies URL generation based on operation mode. Changes in metadata handling include new structures and methods for both ingestors and indexers, plus exposed retrieval of indexer metadata via HTTP handlers. Additionally, the PR adds multipart upload methods to various storage implementations and adjusts utility functions and client configurations. Overall, the changes extend the system’s flexibility in endpoint configuration, metadata management, and file uploading.

Changes

File(s) Summary of Changes
src/cli.rs Added new public field indexer_endpoint in the Options struct; updated get_url to accept a mode parameter and select the appropriate endpoint.
src/enterprise/utils.rs Updated fetch_parquet_file_paths to use filter_map with date validation; added the chrono crate for date manipulation.
src/handlers/http/cluster/mod.rs, src/handlers/http/modal/mod.rs Added type alias IndexerMetadataArr, new async function get_indexer_info(), and new struct IndexerMetadata with associated methods; modified metadata storage logic.
src/handlers/http/middleware.rs Simplified request handling logic in ModeFilterMiddleware.
src/handlers/http/modal/ingest_server.rs Updated check_querier_state function visibility to public.
src/parseable/mod.rs Added new field indexer_metadata and renamed/updated store_ingestor_metadata to store_metadata with mode parameter to handle both ingestor and indexer metadata.
src/lib.rs, src/metrics/prom_utils.rs Made HTTP_CLIENT public and increased its timeout from 10 to 30 seconds; updated URL retrieval calls to include a mode parameter.
src/storage/azure_blob.rs, src/storage/localfs.rs, src/storage/object_storage.rs, src/storage/s3.rs Introduced new async method upload_multipart for multipart uploads; in S3, defined multipart logic with a new size constant and adjusted method signatures.
src/utils/mod.rs, src/utils/time.rs Added new public function get_indexer_id for indexer ID generation; updated TimeRange to derive the Clone trait.

Possibly related PRs

  • fix: bugs introduced in #1143 #1185: The changes in the main PR are related to the modifications in the get_url method of the Options struct, which is also updated in the retrieved PR, indicating a direct connection at the code level.

Suggested labels

for next release

Suggested reviewers

  • nikhilsinhaparseable

🐰 In the meadow, changes bloom,
New endpoints rise, dispelling gloom.
Metadata dances, both new and old,
With multipart uploads, stories unfold.
A hop, a skip, in code we play,
Celebrating changes, hip-hip-hooray! 🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@parmesant parmesant force-pushed the enterprise-changes branch from 8d3c740 to b98106b Compare March 18, 2025 10:07
@parmesant parmesant marked this pull request as ready for review March 18, 2025 10:09
@parmesant parmesant force-pushed the enterprise-changes branch from b98106b to 8699ce8 Compare March 18, 2025 10:09
@nitisht nitisht requested review from nikhilsinhaparseable and de-sh and removed request for nikhilsinhaparseable March 18, 2025 10:10
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (4)
src/cli.rs (1)

407-485: Refactor Duplicate Logic in get_url

Although adding Mode::Index is correct, the blocks for Mode::Ingest and Mode::Index largely duplicate logic. Consider extracting the shared logic into a helper function for maintainability.

pub fn get_url(&self, mode: Mode) -> Url {
    let (endpoint, env_var) = match mode {
        Mode::Ingest => (&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT"),
        Mode::Index => (&self.indexer_endpoint, "P_INDEXER_ENDPOINT"),
        _ => panic!("Invalid mode"),
    };

    if endpoint.is_empty() {
-       // Duplicate code for returning default self.address-based URL ...
+       return self.build_default_url(); // Example helper
    }

    // ...
}
src/parseable/mod.rs (2)

132-133: Consider consolidating metadata fields with a generic approach.
This new indexer_metadata field closely mirrors the pattern of ingestor_metadata. Using a generic or unified structure could reduce code duplication and make maintenance simpler.


268-329: Refactor to eliminate duplication in store_metadata.
The branches for Mode::Ingest and Mode::Index share repeated logic. Extracting common steps into a helper function could reduce duplication and simplify maintenance.

src/handlers/http/modal/mod.rs (1)

337-510: Duplicated struct logic between IndexerMetadata and IngestorMetadata.
The new IndexerMetadata is nearly identical to IngestorMetadata. Consider extracting common fields into a shared struct or trait to reduce duplication and simplify maintenance.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 160dec4 and 8699ce8.

📒 Files selected for processing (15)
  • src/cli.rs (2 hunks)
  • src/enterprise/utils.rs (2 hunks)
  • src/handlers/http/cluster/mod.rs (2 hunks)
  • src/handlers/http/middleware.rs (1 hunks)
  • src/handlers/http/modal/ingest_server.rs (3 hunks)
  • src/handlers/http/modal/mod.rs (3 hunks)
  • src/lib.rs (1 hunks)
  • src/metrics/prom_utils.rs (2 hunks)
  • src/parseable/mod.rs (4 hunks)
  • src/storage/azure_blob.rs (1 hunks)
  • src/storage/localfs.rs (1 hunks)
  • src/storage/object_storage.rs (1 hunks)
  • src/storage/s3.rs (4 hunks)
  • src/utils/mod.rs (1 hunks)
  • src/utils/time.rs (1 hunks)
🧰 Additional context used
🧬 Code Definitions (5)
src/storage/azure_blob.rs (3)
src/storage/s3.rs (1) (1)
  • upload_multipart (589:595)
src/storage/object_storage.rs (1) (1)
  • upload_multipart (93:97)
src/storage/localfs.rs (1) (1)
  • upload_multipart (107:113)
src/metrics/prom_utils.rs (1)
src/option.rs (1) (1)
  • url (123:125)
src/enterprise/utils.rs (2)
src/storage/s3.rs (1) (1)
  • s (181:181)
src/parseable/streams.rs (1) (1)
  • parquet_files (239:251)
src/cli.rs (1)
src/option.rs (1) (1)
  • mode (127:135)
src/utils/mod.rs (1)
src/handlers/http/modal/mod.rs (1) (1)
  • get_indexer_id (452:454)
🔇 Additional comments (22)
src/utils/time.rs (1)

52-52: Good addition of Clone trait to TimeRange

Adding the Clone trait to the TimeRange struct allows for more flexible usage patterns when you need to duplicate the time range data, which is likely needed for the new indexer functionality being added.

src/utils/mod.rs (1)

45-50: Good pattern reuse for indexer ID generation

The implementation follows the same pattern as get_ingestor_id(), which provides consistency in how different component identifiers are generated in the system.

src/metrics/prom_utils.rs (2)

22-22: Added Mode enum import for URL determination

This import supports the URL generation changes based on the operation mode.


65-66: URL generation now mode-specific

The change specifies that the URL should be generated for the ingest mode, which aligns with the enterprise updates for differentiating between ingestor and indexer endpoints.

The comment "for now it is only for ingestor" is helpful for documenting the current state, but consider whether metrics should eventually be collected for indexers as well.

src/handlers/http/middleware.rs (1)

362-367:

❓ Verification inconclusive

Security concern: All endpoints are now accessible in Index mode.

The previous endpoint restriction logic for Index mode has been removed. This means all API endpoints are now accessible when running in Index mode, which could potentially expose sensitive operations that were previously restricted.

Please verify whether this change is intentional and aligned with the security model for enterprise deployments. If some endpoints should still be restricted in Index mode, consider adding appropriate filtering logic.


🏁 Script executed:

#!/bin/bash
# Check if there are any references to endpoint restrictions for Index mode elsewhere in the codebase
grep -r "Mode::Index" --include="*.rs" .

Length of output: 970


Attention: Verify Endpoint Filtering for Index Mode

It appears that the logic which previously restricted certain endpoints in Index mode has been removed from the middleware, leaving all endpoints accessible when running in this mode. The grep output shows that while Mode::Index is still referenced in several parts of the codebase (e.g., in src/catalog/mod.rs, src/main.rs, and others), there’s no evidence of any updated filtering logic in the middleware (specifically around lines 362–367 in src/handlers/http/middleware.rs).

  • Affected File: src/handlers/http/middleware.rs (lines 362–367)
  • Observation: The current implementation wraps the service call without any conditional checks or filtering based on the operating mode.
  • Impact: When running in Index mode, all API endpoints become accessible, potentially exposing sensitive operations that were previously restricted.

Please verify if this removal is intentional for the enterprise deployment security model. If endpoint restrictions should still apply in Index mode, consider reintroducing appropriate filtering logic.

src/storage/object_storage.rs (1)

93-97: New multipart upload interface added to ObjectStorage trait.

The upload_multipart method adds support for multipart file uploads across different storage backends. This is a good addition for handling large files more efficiently. However, ensure that all implementations (S3, Azure, LocalFS) properly implement this method, as the LocalFS implementation currently uses unimplemented!().

src/lib.rs (1)

75-79:

✅ Verification successful

HTTP client visibility and timeout changes.

Two changes have been made to the HTTP_CLIENT:

  1. The client is now publicly accessible (pub static) rather than private
  2. The request timeout has been increased from 10 to 30 seconds

The increased timeout may help with handling larger requests or slower network conditions, which is beneficial for enterprise deployments. However, making the client public exposes implementation details that might be better encapsulated.

Verify whether making the HTTP_CLIENT public is necessary and used appropriately throughout the codebase:


🏁 Script executed:

#!/bin/bash
# Find all places where HTTP_CLIENT is now used outside this module
grep -r "HTTP_CLIENT" --include="*.rs" . | grep -v "src/lib.rs"

Length of output: 1517


HTTP Client Visibility: Confirmed Usage Across the Codebase

The verification shows that HTTP_CLIENT is used extensively in modules outside src/lib.rs (e.g., in src/analytics.rs, src/audit.rs, src/handlers/http/cluster/*, and others). Given this widespread usage, making the HTTP client public appears to be a deliberate design decision. Additionally, increasing the request timeout from 10 to 30 seconds aligns well with handling larger requests or slower network conditions in enterprise deployments.

  • Public Exposure Justified: Multiple modules rely on HTTP_CLIENT, so its public visibility is necessary.
  • Timeout Increase Acceptable: The raised timeout supports more resilient network conditions.

Overall, the changes are appropriate, and no further adjustments are required.

src/handlers/http/cluster/mod.rs (2)

54-54: Consistent Import Usage

Bringing IndexerMetadata and IngestorMetadata into scope here is straightforward and consistent with the existing structure.


60-61: Maintain Naming Consistency

Defining IndexerMetadataArr in parallel with IngestorMetadataArr ensures consistent naming conventions for collection types. No issues here.

src/cli.rs (1)

298-305: Indexer Endpoint Added

Introducing the indexer_endpoint field aligns with the existing style and expands configuration for indexing services. It's good that a default value is provided, though consider validating non-empty values if indexing is mandatory.

src/handlers/http/modal/ingest_server.rs (3)

31-31: Importing Mode Enum

Using Mode here is a natural extension if the ingest server needs mode-specific logic. Nothing concerning spotted.


112-112: Storing Metadata with Explicit Mode

Calling store_metadata(Mode::Ingest) is consistent with the broader shift towards mode-based metadata handling. Looks fine.


255-255: Confirm Public Access to check_querier_state

Changing visibility to pub makes this function callable from other modules. Verify that external callers cannot misuse this to bypass any internal workflows, especially around system readiness or security checks.

src/enterprise/utils.rs (1)

3-3: Chrono Import for Date Handling

Bringing in chrono::{TimeZone, Utc} is appropriate for robust date/time operations. No immediate issues here.

src/parseable/mod.rs (3)

50-50: No issues found for the new imports.
The import statement for IndexerMetadata seems correct and consistent with the existing structure.


149-152: Check whether Mode::All also needs indexer metadata.
The logic only loads metadata for Mode::Index. Please verify if running in Mode::All should also initialize indexer_metadata.


158-158: No concerns with storing the new field.
Passing indexer_metadata to the struct constructor looks straightforward and is consistent with the existing pattern for ingestor metadata.

src/storage/s3.rs (3)

46-46: Import statement needed for async file I/O.
Using tokio’s OpenOptions and AsyncReadExt is appropriate for streaming file reads.


66-66: Defines the minimum part size for multipart uploads.
This constant (5MB) aligns with AWS S3’s minimum valid chunk size for multipart operations.


589-595: Public wrapper for _upload_multipart.
This method succinctly exposes the multipart upload functionality. No concerns identified here.

src/handlers/http/modal/mod.rs (2)

37-37: Importing Mode for indexing logic.
Referencing option::Mode is consistent with the rest of the file’s approach to handling server modes.


40-40: New utility imports for ID retrieval.
Using get_indexer_id and get_ingestor_id will help differentiate between these roles. No issues here.

Comment on lines +639 to +655
pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
let store = PARSEABLE.storage.get_object_store();

let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let arr = store
.get_objects(
Some(&root_path),
Box::new(|file_name| file_name.starts_with("indexer")),
)
.await?
.iter()
// this unwrap will most definateley shoot me in the foot later
.map(|x| serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default())
.collect_vec();

Ok(arr)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve Error Handling in Metadata Deserialization

Using unwrap_or_default() can silently swallow parsing errors and may hinder debugging if the metadata is malformed. Prefer propagating errors or logging them for better visibility.

Consider changing to:

 .map(|x| {
-    serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default()
+    serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_else(|e| {
+        error!("Failed to parse indexer metadata: {:?}", e);
+        IndexerMetadata::default()
+    })
 })

to detect and log failures, or fully propagate the error if data integrity is critical.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
let store = PARSEABLE.storage.get_object_store();
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let arr = store
.get_objects(
Some(&root_path),
Box::new(|file_name| file_name.starts_with("indexer")),
)
.await?
.iter()
// this unwrap will most definateley shoot me in the foot later
.map(|x| serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_default())
.collect_vec();
Ok(arr)
}
pub async fn get_indexer_info() -> anyhow::Result<IndexerMetadataArr> {
let store = PARSEABLE.storage.get_object_store();
let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY);
let arr = store
.get_objects(
Some(&root_path),
Box::new(|file_name| file_name.starts_with("indexer")),
)
.await?
.iter()
// this unwrap will most definateley shoot me in the foot later
.map(|x| {
serde_json::from_slice::<IndexerMetadata>(x).unwrap_or_else(|e| {
error!("Failed to parse indexer metadata: {:?}", e);
IndexerMetadata::default()
})
})
.collect_vec();
Ok(arr)
}

Comment on lines +123 to 153
.filter_map(|file| {
let date = file.file_path.split("/").collect_vec();

let date = date.as_slice()[1..4].iter().map(|s| s.to_string());

let date = RelativePathBuf::from_iter(date);

parquet_files.entry(date).or_default().push(file);
let year = &date[1][5..9];
let month = &date[1][10..12];
let day = &date[1][13..15];
let hour = &date[2][5..7];
let min = &date[3][7..9];
let file_date = Utc
.with_ymd_and_hms(
year.parse::<i32>().unwrap(),
month.parse::<u32>().unwrap(),
day.parse::<u32>().unwrap(),
hour.parse::<u32>().unwrap(),
min.parse::<u32>().unwrap(),
0,
)
.unwrap();

if file_date < time_range.start {
None
} else {
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());

let date = RelativePathBuf::from_iter(date);

parquet_files.entry(date).or_default().push(file);
Some("")
}
})
.for_each(|_| {});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Validate Path String Sub-slices

Extracting substrings like &date[1][5..9] risks panics if date[1] is shorter than expected. Consider verifying path segment lengths to guard against malformed or unexpected file paths.

- let year = &date[1][5..9];
+ if date[1].len() < 9 {
+     warn!("Unexpected file path format for: {:?}", date);
+     return None;
+ }
+ let year = &date[1][5..9];
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.filter_map(|file| {
let date = file.file_path.split("/").collect_vec();
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
parquet_files.entry(date).or_default().push(file);
let year = &date[1][5..9];
let month = &date[1][10..12];
let day = &date[1][13..15];
let hour = &date[2][5..7];
let min = &date[3][7..9];
let file_date = Utc
.with_ymd_and_hms(
year.parse::<i32>().unwrap(),
month.parse::<u32>().unwrap(),
day.parse::<u32>().unwrap(),
hour.parse::<u32>().unwrap(),
min.parse::<u32>().unwrap(),
0,
)
.unwrap();
if file_date < time_range.start {
None
} else {
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
parquet_files.entry(date).or_default().push(file);
Some("")
}
})
.for_each(|_| {});
.filter_map(|file| {
let date = file.file_path.split("/").collect_vec();
if date[1].len() < 9 {
warn!("Unexpected file path format for: {:?}", date);
return None;
}
let year = &date[1][5..9];
let month = &date[1][10..12];
let day = &date[1][13..15];
let hour = &date[2][5..7];
let min = &date[3][7..9];
let file_date = Utc
.with_ymd_and_hms(
year.parse::<i32>().unwrap(),
month.parse::<u32>().unwrap(),
day.parse::<u32>().unwrap(),
hour.parse::<u32>().unwrap(),
min.parse::<u32>().unwrap(),
0,
)
.unwrap();
if file_date < time_range.start {
None
} else {
let date = date.as_slice()[1..4].iter().map(|s| s.to_string());
let date = RelativePathBuf::from_iter(date);
parquet_files.entry(date).or_default().push(file);
Some("")
}
})
.for_each(|_| {});

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
src/storage/object_storage.rs (2)

93-97: Add documentation for the new trait method.
Although this new trait method upload_multipart is a valuable addition, it would be beneficial to include a doc comment explaining usage details and any assumptions made about file sizes or concurrency constraints.


847-851: Ensure robust error handling for partial uploads.
While it is good that the loop continues upon upload errors, consider whether you want to provide any retry logic for partial file uploads. In high-latency or failure scenarios, having granular retries for each chunk could ensure more resilient uploads.

src/storage/s3.rs (2)

66-66: Consider making the threshold configurable.
Defining a 5 MB threshold for MIN_MULTIPART_UPLOAD_SIZE is reasonable, but it might be even more robust to allow a user or environment variable to configure this value for edge cases or variable bandwidth constraints.


514-565: Check concurrency and finalization logic in _upload_multipart.
This implementation executes part-uploads in parallel with tokio::spawn, which can improve speed but may also raise memory usage for very large files. Examine whether a bounded concurrency strategy or streaming approach is more suitable. Additionally, you may want to handle failures in async_writer.complete() by aborting the multipart upload to avoid leaving stale partials.

Do you want a verification script to scan for any usage of abort_multipart calls or relevant error handling in other files that might be triggered upon failure?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8699ce8 and cd4f854.

📒 Files selected for processing (2)
  • src/storage/object_storage.rs (2 hunks)
  • src/storage/s3.rs (4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/storage/s3.rs (3)
src/storage/mod.rs (1) (1)
  • to_object_store_path (266:268)
src/storage/object_storage.rs (1) (1)
  • upload_multipart (93:97)
src/storage/azure_blob.rs (1) (1)
  • upload_multipart (427:433)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (2)
src/storage/s3.rs (2)

46-46: Necessary import for asynchronous file I/O.
This import of OpenOptions and AsyncReadExt is required to support non-blocking reads, which is essential for multipart uploads.


581-587: Minimal pass-through implementation.
Forwarding upload_multipart to the private _upload_multipart method looks consistent. No further concerns.

@parmesant parmesant force-pushed the enterprise-changes branch from cd4f854 to 7e2cbb9 Compare March 18, 2025 12:42
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
src/storage/object_storage.rs (2)

93-97: Add a short docstring to the new method for clarity.
It would be helpful to document the behavior, use cases, and expected error scenarios of upload_multipart, so future contributors can easily understand its role and how it integrates with the rest of the trait methods.


847-851: Consider parallel uploading and improved error handling.
Currently, the upload is performed sequentially within this loop, and if an error persists, the code logs it and proceeds. For large-scale deployments, consider performing parallel uploads for multiple files to reduce overall time. Also, evaluate whether a retry or rollback mechanism is necessary to handle partial failures in multipart uploads.

src/storage/s3.rs (1)

65-65: Make the minimum multipart upload size configurable if future needs demand it.
The current 5 MB threshold is fairly standard. However, if you anticipate different file sizes or have memory constraints, consider making it user-configurable for flexibility.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cd4f854 and 7e2cbb9.

📒 Files selected for processing (2)
  • src/storage/object_storage.rs (2 hunks)
  • src/storage/s3.rs (4 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (2)
src/storage/s3.rs (2)

46-46: Import statement looks good.
The addition of OpenOptions and AsyncReadExt is appropriate for the asynchronous file reads.


578-584: Straightforward delegation to _upload_multipart.
This public method is cleanly forwarding the call to the private _upload_multipart function. The implementation is consistent with the trait requirement.

@parmesant parmesant force-pushed the enterprise-changes branch from 7e2cbb9 to 29597f5 Compare March 18, 2025 13:06
@parmesant parmesant force-pushed the enterprise-changes branch from 29597f5 to 701ad50 Compare March 19, 2025 06:29
@parmesant parmesant force-pushed the enterprise-changes branch from 7022ad5 to 7c97750 Compare March 19, 2025 06:40
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/storage/s3.rs (1)

514-571: 🛠️ Refactor suggestion

⚠️ Potential issue

Optimize memory usage in multipart upload method

The current implementation loads the entire file into memory with file.read_to_end(&mut data) before splitting it into parts. This contradicts the primary purpose of multipart uploads, which is to handle large files efficiently without memory constraints. For very large files, this approach could cause out-of-memory errors.

Consider using a streaming approach to read and upload each part sequentially:

async fn _upload_multipart(
    &self,
    key: &RelativePath,
    path: &Path,
) -> Result<(), ObjectStorageError> {
    let mut file = OpenOptions::new().read(true).open(path).await?;
    let location = &to_object_store_path(key);

    let mut async_writer = self.client.put_multipart(location).await?;

    let meta = file.metadata().await?;
    let total_size = meta.len() as usize;
    if total_size < MIN_MULTIPART_UPLOAD_SIZE {
        let mut data = Vec::new();
        file.read_to_end(&mut data).await?;
        self.client.put(location, data.into()).await?;
-       // async_writer.put_part(data.into()).await?;
-       // async_writer.complete().await?;
        return Ok(());
    } else {
-       let mut data = Vec::new();
-       file.read_to_end(&mut data).await?;
-
-       // let mut upload_parts = Vec::new();
+       // Calculate number of parts
        let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
        let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE;
        let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 };
+       let mut part_number = 0;
+       let mut buffer = vec![0; MIN_MULTIPART_UPLOAD_SIZE];
+       
+       // Stream through the file, reading and uploading each part
+       while part_number < total_parts {
+           let bytes_read = file.read(&mut buffer).await?;
+           if bytes_read == 0 {
+               break;
+           }
+           
+           // Only use the actual bytes read
+           let part_data = buffer[..bytes_read].to_vec();
+           async_writer.put_part(part_data.into()).await?;
+           part_number += 1;
+       }

-       // Upload each part
-       for part_number in 0..(total_parts) {
-           let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE;
-           let end_pos = if part_number == num_full_parts && has_final_partial_part {
-               // Last part might be smaller than 5MB (which is allowed)
-               total_size
-           } else {
-               // All other parts must be at least 5MB
-               start_pos + MIN_MULTIPART_UPLOAD_SIZE
-           };
-
-           // Extract this part's data
-           let part_data = data[start_pos..end_pos].to_vec();
-
-           // Upload the part
-           async_writer.put_part(part_data.into()).await?;
-
-           // upload_parts.push(part_number as u64 + 1);
-       }
        match async_writer.complete().await {
            Ok(_) => {},
            Err(err) => {
                error!("Failed to complete multipart upload. {:?}", err);
                async_writer.abort().await?;
            }
        };
    }
    Ok(())
}

This approach avoids loading the entire file into memory and only buffers one part at a time.

🧹 Nitpick comments (3)
src/storage/s3.rs (3)

530-532: Remove commented-out code

These commented lines appear to be left from development experiments and should be removed if not needed.

-       // async_writer.put_part(data.into()).await?;
-       // async_writer.complete().await?;

537-538: Remove unused commented-out code

This line is not being used and should be removed.

-       // let mut upload_parts = Vec::new();

560-561: Remove unused commented-out code

This commented line should be removed if it's not serving any purpose.

-           // upload_parts.push(part_number as u64 + 1);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7022ad5 and 7c97750.

📒 Files selected for processing (1)
  • src/storage/s3.rs (4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/storage/s3.rs (3)
src/storage/localfs.rs (1) (1)
  • upload_multipart (107-113)
src/storage/object_storage.rs (1) (1)
  • upload_multipart (93-97)
src/storage/azure_blob.rs (1) (1)
  • upload_multipart (427-433)
⏰ Context from checks skipped due to timeout of 90000ms (3)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
🔇 Additional comments (3)
src/storage/s3.rs (3)

46-46: LGTM - Appropriate imports added for async file operations

The imports for async file operations are correctly added to support the new multipart upload functionality.


65-65: LGTM - Appropriate constant defined for multipart upload size

The constant MIN_MULTIPART_UPLOAD_SIZE is set to 25 MB, which is well above AWS S3's minimum requirement of 5 MB per part.


587-593: LGTM - Appropriate public method for the ObjectStorage trait

The public method correctly delegates to the private implementation. This allows external components to utilize the multipart upload functionality as part of the ObjectStorage trait.

Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/storage/s3.rs (1)

513-570: 🛠️ Refactor suggestion

Memory usage concern in multipart upload implementation.

The S3 implementation has the same memory efficiency issue as the Azure Blob implementation - it reads the entire file into memory (lines 527 and 534) before processing, which could lead to excessive memory consumption for large files.

Apply the same streaming approach suggested for the Azure Blob implementation to read and upload chunks sequentially:

async fn _upload_multipart(
    &self,
    key: &RelativePath,
    path: &Path,
) -> Result<(), ObjectStorageError> {
    let mut file = OpenOptions::new().read(true).open(path).await?;
    let location = &to_object_store_path(key);

    let meta = file.metadata().await?;
    let total_size = meta.len() as usize;
    if total_size < MIN_MULTIPART_UPLOAD_SIZE {
        let mut data = Vec::new();
        file.read_to_end(&mut data).await?;
        self.client.put(location, data.into()).await?;
        return Ok(());
    } else {
-       let mut data = Vec::new();
-       file.read_to_end(&mut data).await?;
+       let mut async_writer = self.client.put_multipart(location).await?;
+       let mut buffer = vec![0u8; MIN_MULTIPART_UPLOAD_SIZE];
+       let mut part_number = 0;

-       let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
-       let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE;
-       let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 };

-       // Upload each part
-       for part_number in 0..(total_parts) {
-           let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE;
-           let end_pos = if part_number == num_full_parts && has_final_partial_part {
-               // Last part might be smaller than 5MB (which is allowed)
-               total_size
-           } else {
-               // All other parts must be at least 5MB
-               start_pos + MIN_MULTIPART_UPLOAD_SIZE
-           };
-
-           // Extract this part's data
-           let part_data = data[start_pos..end_pos].to_vec();
-
-           // Upload the part
-           async_writer.put_part(part_data.into()).await?;
+       // Read and upload chunks sequentially
+       loop {
+           let bytes_read = file.read(&mut buffer).await?;
+           if bytes_read == 0 {
+               break;
+           }
+           
+           part_number += 1;
+           let part_data = if bytes_read == buffer.len() {
+               buffer.clone()
+           } else {
+               buffer[..bytes_read].to_vec()
+           };
+           
+           async_writer.put_part(part_data.into()).await?;
        }

        match async_writer.complete().await {
            Ok(_) => {}
            Err(err) => {
                error!("Failed to complete multipart upload. {:?}", err);
                async_writer.abort().await?;
            }
        };
    }
    Ok(())
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7c97750 and 1531215.

📒 Files selected for processing (4)
  • src/storage/azure_blob.rs (4 hunks)
  • src/storage/localfs.rs (2 hunks)
  • src/storage/mod.rs (1 hunks)
  • src/storage/s3.rs (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/storage/localfs.rs
🧰 Additional context used
🧬 Code Definitions (2)
src/storage/azure_blob.rs (3)
src/storage/s3.rs (2) (2)
  • _upload_multipart (513-570)
  • upload_multipart (586-592)
src/storage/localfs.rs (2) (2)
  • new (99-101)
  • upload_multipart (110-119)
src/storage/object_storage.rs (1) (1)
  • upload_multipart (93-97)
src/storage/s3.rs (3)
src/storage/azure_blob.rs (2) (2)
  • _upload_multipart (382-439)
  • upload_multipart (487-493)
src/storage/localfs.rs (2) (2)
  • new (99-101)
  • upload_multipart (110-119)
src/storage/object_storage.rs (1) (1)
  • upload_multipart (93-97)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (7)
src/storage/mod.rs (1)

80-80: Added MIN_MULTIPART_UPLOAD_SIZE constant for multipart upload threshold.

The new constant defines 25 MB as the minimum file size for multipart uploads, which is a reasonable threshold to balance efficiency and performance. Files smaller than this will be uploaded in a single operation.

src/storage/azure_blob.rs (3)

44-44: Added necessary imports for async file operations.

The inclusion of OpenOptions and AsyncReadExt from tokio enables asynchronous file handling for the multipart upload implementation.


57-58: Updated import list to include MIN_MULTIPART_UPLOAD_SIZE constant.

This addition makes the multipart upload size threshold available for the Azure Blob implementation.


487-493: Implementation of upload_multipart in ObjectStorage trait.

Good implementation that delegates to the private method, providing a clean API interface for multipart uploads.

src/storage/s3.rs (3)

46-46: Added necessary imports for async file operations.

The inclusion of OpenOptions and AsyncReadExt from tokio enables asynchronous file handling for the multipart upload implementation.


58-59: Updated import list to include MIN_MULTIPART_UPLOAD_SIZE constant.

This addition makes the multipart upload size threshold available for the S3 implementation.


586-592: Implementation of upload_multipart in ObjectStorage trait.

Good implementation that delegates to the private method, providing a clean API interface for multipart uploads.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
src/storage/azure_blob.rs (1)

382-436: 🛠️ Refactor suggestion

Memory usage concern in multipart upload implementation.

The current implementation reads the entire file into memory before processing, which could lead to excessive memory consumption for large files. This approach works but isn't memory-efficient for production environments with large file uploads.

Consider implementing a streaming approach that reads and uploads chunks sequentially:

async fn _upload_multipart(
    &self,
    key: &RelativePath,
    path: &Path,
) -> Result<(), ObjectStorageError> {
    let mut file = OpenOptions::new().read(true).open(path).await?;
    let location = &to_object_store_path(key);

    let mut async_writer = self.client.put_multipart(location).await?;

    let meta = file.metadata().await?;
    let total_size = meta.len() as usize;
    if total_size < MIN_MULTIPART_UPLOAD_SIZE {
        let mut data = Vec::new();
        file.read_to_end(&mut data).await?;
        self.client.put(location, data.into()).await?;
        return Ok(());
    } else {
-       let mut data = Vec::new();
-       file.read_to_end(&mut data).await?;
+       let mut buffer = vec![0u8; MIN_MULTIPART_UPLOAD_SIZE];
+       let mut part_number = 0;

-       let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
-       let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE;
-       let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 };

-       // Upload each part
-       for part_number in 0..(total_parts) {
-           let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE;
-           let end_pos = if part_number == num_full_parts && has_final_partial_part {
-               // Last part might be smaller than 5MB (which is allowed)
-               total_size
-           } else {
-               // All other parts must be at least 5MB
-               start_pos + MIN_MULTIPART_UPLOAD_SIZE
-           };
-
-           // Extract this part's data
-           let part_data = data[start_pos..end_pos].to_vec();
-
-           // Upload the part
-           async_writer.put_part(part_data.into()).await?;
+       // Read and upload chunks sequentially
+       loop {
+           let bytes_read = file.read(&mut buffer).await?;
+           if bytes_read == 0 {
+               break;
+           }
+           
+           part_number += 1;
+           let part_data = if bytes_read == buffer.len() {
+               buffer.clone()
+           } else {
+               buffer[..bytes_read].to_vec()
+           };
+           
+           async_writer.put_part(part_data.into()).await?;
        }

        if let Err(err) = async_writer.complete().await {
            error!("Failed to complete multipart upload. {:?}", err);
            async_writer.abort().await?;
        };
    }
    Ok(())
}
src/storage/s3.rs (1)

513-567: 🛠️ Refactor suggestion

Memory usage concern in S3 multipart upload implementation.

Similar to the Azure Blob implementation, this code reads the entire file into memory before processing it, which could cause memory issues with large files in production.

Consider implementing a streaming approach that reads and uploads chunks sequentially, similar to the recommendation for Azure Blob storage:

async fn _upload_multipart(
    &self,
    key: &RelativePath,
    path: &Path,
) -> Result<(), ObjectStorageError> {
    let mut file = OpenOptions::new().read(true).open(path).await?;
    let location = &to_object_store_path(key);

    let mut async_writer = self.client.put_multipart(location).await?;

    let meta = file.metadata().await?;
    let total_size = meta.len() as usize;
    if total_size < MIN_MULTIPART_UPLOAD_SIZE {
        let mut data = Vec::new();
        file.read_to_end(&mut data).await?;
        self.client.put(location, data.into()).await?;
        return Ok(());
    } else {
-       let mut data = Vec::new();
-       file.read_to_end(&mut data).await?;
+       let mut buffer = vec![0u8; MIN_MULTIPART_UPLOAD_SIZE];
+       let mut part_number = 0;

-       // let mut upload_parts = Vec::new();
-       
-       let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
-       let num_full_parts = total_size / MIN_MULTIPART_UPLOAD_SIZE;
-       let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 };
-
-       // Upload each part
-       for part_number in 0..(total_parts) {
-           let start_pos = part_number * MIN_MULTIPART_UPLOAD_SIZE;
-           let end_pos = if part_number == num_full_parts && has_final_partial_part {
-               // Last part might be smaller than 5MB (which is allowed)
-               total_size
-           } else {
-               // All other parts must be at least 5MB
-               start_pos + MIN_MULTIPART_UPLOAD_SIZE
-           };
-
-           // Extract this part's data
-           let part_data = data[start_pos..end_pos].to_vec();
-
-           // Upload the part
-           async_writer.put_part(part_data.into()).await?;
-
-           // upload_parts.push(part_number as u64 + 1);
+       // Read and upload chunks sequentially
+       loop {
+           let bytes_read = file.read(&mut buffer).await?;
+           if bytes_read == 0 {
+               break;
+           }
+           
+           part_number += 1;
+           let part_data = if bytes_read == buffer.len() {
+               buffer.clone()
+           } else {
+               buffer[..bytes_read].to_vec()
+           };
+           
+           async_writer.put_part(part_data.into()).await?;
        }

        if let Err(err) = async_writer.complete().await {
            error!("Failed to complete multipart upload. {:?}", err);
            async_writer.abort().await?;
        };
    }
    Ok(())
}
🧹 Nitpick comments (3)
src/storage/azure_blob.rs (2)

398-399: Remove commented-out code.

There are commented-out lines that appear to be alternative approaches that were considered but not used. Either implement these approaches or remove them to keep the codebase clean.

let mut data = Vec::new();
file.read_to_end(&mut data).await?;
self.client.put(location, data.into()).await?;
- // async_writer.put_part(data.into()).await?;
- // async_writer.complete().await?;
return Ok(());

430-434: Improve error handling in async_writer.complete().

The current error handling approach uses a proper pattern but can be simplified slightly for better readability and error propagation.

- if let Err(err) = async_writer.complete().await {
-     error!("Failed to complete multipart upload. {:?}", err);
-     async_writer.abort().await?;
- };
+ match async_writer.complete().await {
+     Ok(_) => {}
+     Err(err) => {
+         error!("Failed to complete multipart upload. {:?}", err);
+         async_writer.abort().await?;
+     }
+ };
src/storage/s3.rs (1)

536-537: Remove commented-out upload_parts vector.

There's a commented-out declaration of an upload_parts vector and later a commented-out line that would push to this vector (line 559). This suggests vestigial code from an earlier implementation approach.

let mut data = Vec::new();
file.read_to_end(&mut data).await?;

- // let mut upload_parts = Vec::new();
-
let has_final_partial_part = total_size % MIN_MULTIPART_UPLOAD_SIZE > 0;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1531215 and 03cfbc9.

📒 Files selected for processing (2)
  • src/storage/azure_blob.rs (4 hunks)
  • src/storage/s3.rs (4 hunks)
🧰 Additional context used
🧬 Code Definitions (2)
src/storage/azure_blob.rs (3)
src/storage/s3.rs (2) (2)
  • _upload_multipart (513-567)
  • upload_multipart (583-589)
src/storage/localfs.rs (2) (2)
  • new (99-101)
  • upload_multipart (110-119)
src/storage/object_storage.rs (1) (1)
  • upload_multipart (93-97)
src/storage/s3.rs (3)
src/storage/azure_blob.rs (2) (2)
  • _upload_multipart (382-436)
  • upload_multipart (484-490)
src/storage/localfs.rs (2) (2)
  • new (99-101)
  • upload_multipart (110-119)
src/storage/object_storage.rs (1) (1)
  • upload_multipart (93-97)
🔇 Additional comments (7)
src/storage/azure_blob.rs (3)

44-44: Appropriate import for async file operations.

The addition of tokio::fs::OpenOptions and tokio::io::AsyncReadExt imports is necessary for implementing the asynchronous file operations in the new multipart upload functionality.


57-58: Good import update for multipart upload constants.

The addition of MIN_MULTIPART_UPLOAD_SIZE to the import list is appropriate since it's used in the multipart upload implementation to determine chunk sizes.


484-490: Clean interface implementation for upload_multipart.

The public upload_multipart method cleanly delegates to the private implementation, which is a good practice for separation of concerns.

src/storage/s3.rs (4)

46-46: Appropriate import for async file operations.

The addition of tokio::fs::OpenOptions and tokio::io::AsyncReadExt imports enables the asynchronous file operations needed for multipart uploads.


58-59: Consistent imports across storage implementations.

The import of MIN_MULTIPART_UPLOAD_SIZE and other constants ensures consistency in how different storage backends handle multipart uploads.


561-564: Improve error handling in async_writer.complete().

Same issue as in the Azure Blob implementation - the error handling can be simplified for better readability.

- if let Err(err) = async_writer.complete().await {
-     error!("Failed to complete multipart upload. {:?}", err);
-     async_writer.abort().await?;
- };
+ match async_writer.complete().await {
+     Ok(_) => {}
+     Err(err) => {
+         error!("Failed to complete multipart upload. {:?}", err);
+         async_writer.abort().await?;
+     }
+ };

583-589: Clean interface implementation for upload_multipart.

The public upload_multipart method provides a clean interface by delegating to the private implementation.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/cli.rs (2)

448-456: Fix typo in error messages.

There's a consistent typo in the error messages: "environement" should be "environment".

- panic!("Invalid value `{}`, please set the environement variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);
+ panic!("Invalid value `{}`, please set the environment variable `{env_var}` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", endpoint);

The same typo appears in other error messages within this function (lines 468, 471, 484).


414-492: Consider refactoring duplicate code in get_url method.

There's significant duplicate code in the get_url method that could be extracted to improve readability and maintainability. The TODO comment on line 414 ("TODO: refactor and document") also suggests this was planned.

Consider extracting the common URL generation logic to a private helper method:

// Helper method to generate default URL based on server address
fn default_url(&self) -> Url {
    format!(
        "{}://{}",
        self.get_scheme(),
        self.address
    )
    .parse::<Url>()
    .unwrap_or_else(|err| {
        panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
    })
}

pub fn get_url(&self, mode: Mode) -> Url {
    match mode {
        Mode::Ingest => {
            if self.ingestor_endpoint.is_empty() {
                return self.default_url();
            }
            self.parse_endpoint(&self.ingestor_endpoint, "P_INGESTOR_ENDPOINT")
        }
        Mode::Index => {
            if self.indexer_endpoint.is_empty() {
                return self.default_url();
            }
            self.parse_endpoint(&self.indexer_endpoint, "P_INDEXER_ENDPOINT")
        }
        Mode::Query | Mode::All => self.default_url(),
    }
}

// Helper method to parse endpoint with proper error handling
fn parse_endpoint(&self, endpoint: &str, env_var: &str) -> Url {
    // ... existing parsing logic ...
}

This would make the method more maintainable and address the TODO comment.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 03cfbc9 and 763571c.

📒 Files selected for processing (1)
  • src/cli.rs (2 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/cli.rs (1)
src/option.rs (1) (1)
  • mode (127-135)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
🔇 Additional comments (1)
src/cli.rs (1)

306-312: Good addition of indexer_endpoint configuration.

This change enables connecting to a dedicated indexer, which aligns with the PR objectives for enterprise updates. The implementation follows the same pattern as the existing ingestor_endpoint.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

268-329: Consider reducing duplication in metadata store logic.

The method has been appropriately renamed from store_ingestor_metadata to store_metadata and modified to handle both ingestor and indexer metadata. However, there's significant code duplication between the Mode::Ingest and Mode::Index cases.

Consider refactoring to extract the common logic into a separate helper function. For example:

pub async fn store_metadata(&self, mode: Mode) -> anyhow::Result<()> {
    match mode {
        Mode::Ingest => {
-           let Some(meta) = self.ingestor_metadata.as_ref() else {
-               return Ok(());
-           };
-           let storage_ingestor_metadata = meta.migrate().await?;
-           let store = self.storage.get_object_store();
-
-           // use the id that was generated/found in the staging and
-           // generate the path for the object store
-           let path = meta.file_path();
-
-           // we are considering that we can always get from object store
-           if let Some(mut store_data) = storage_ingestor_metadata {
-               if store_data.domain_name != meta.domain_name {
-                   store_data.domain_name.clone_from(&meta.domain_name);
-                   store_data.port.clone_from(&meta.port);
-
-                   let resource = Bytes::from(serde_json::to_vec(&store_data)?);
-
-                   // if pushing to object store fails propagate the error
-                   store.put_object(&path, resource).await?;
-               }
-           } else {
-               let resource = serde_json::to_vec(&meta)?.into();
-               store.put_object(&path, resource).await?;
-           }
-           Ok(())
+           self.store_metadata_internal(self.ingestor_metadata.as_ref()).await
        }
        Mode::Index => {
-           let Some(meta) = self.indexer_metadata.as_ref() else {
-               return Ok(());
-           };
-           let storage_indexer_metadata = meta.migrate().await?;
-           let store = self.storage.get_object_store();
-
-           // use the id that was generated/found in the staging and
-           // generate the path for the object store
-           let path = meta.file_path();
-
-           // we are considering that we can always get from object store
-           if let Some(mut store_data) = storage_indexer_metadata {
-               if store_data.domain_name != meta.domain_name {
-                   store_data.domain_name.clone_from(&meta.domain_name);
-                   store_data.port.clone_from(&meta.port);
-
-                   let resource = Bytes::from(serde_json::to_vec(&store_data)?);
-
-                   // if pushing to object store fails propagate the error
-                   store.put_object(&path, resource).await?;
-               }
-           } else {
-               let resource = serde_json::to_vec(&meta)?.into();
-               store.put_object(&path, resource).await?;
-           }
-           Ok(())
+           self.store_metadata_internal(self.indexer_metadata.as_ref()).await
        }
        _ => Err(anyhow::anyhow!("Invalid mode")),
    }
}

// Add a new helper method
async fn store_metadata_internal<T>(&self, meta_opt: Option<&Arc<T>>) -> anyhow::Result<()>
where
    T: serde::Serialize + std::fmt::Debug,
    T: std::ops::Deref<Target = dyn MetadataOps>,
{
    let Some(meta) = meta_opt else {
        return Ok(());
    };
    let storage_metadata = meta.migrate().await?;
    let store = self.storage.get_object_store();

    // use the id that was generated/found in the staging and
    // generate the path for the object store
    let path = meta.file_path();

    // we are considering that we can always get from object store
    if let Some(mut store_data) = storage_metadata {
        if store_data.domain_name != meta.domain_name {
            store_data.domain_name.clone_from(&meta.domain_name);
            store_data.port.clone_from(&meta.port);

            let resource = Bytes::from(serde_json::to_vec(&store_data)?);

            // if pushing to object store fails propagate the error
            store.put_object(&path, resource).await?;
        }
    } else {
        let resource = serde_json::to_vec(&meta)?.into();
        store.put_object(&path, resource).await?;
    }
    Ok(())
}

Note: This would require introducing a common trait (e.g., MetadataOps) for both IngestorMetadata and IndexerMetadata that provides methods like migrate and file_path.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 763571c and de1e559.

📒 Files selected for processing (1)
  • src/parseable/mod.rs (4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/parseable/mod.rs (1)
src/handlers/http/modal/mod.rs (4) (4)
  • load (197-275)
  • load (372-450)
  • new (174-194)
  • new (349-369)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (3)
src/parseable/mod.rs (3)

132-133: Appropriate parallel structure for indexer metadata.

Good addition of the indexer_metadata field to match the existing pattern used for ingestor_metadata. The use of Option<Arc<IndexerMetadata>> maintains consistency with the existing approach.


149-152: LGTM: Proper initialization of indexer metadata.

The initialization logic for indexer_metadata follows the same pattern as ingestor_metadata, correctly handling the Mode::Index case by loading metadata from storage.


385-392:

❓ Verification inconclusive

Consider updating create_stream_and_schema_from_storage to use mode-specific metadata.

The method currently only uses ingestor_metadata when creating streams. Consider updating it to use either ingestor_metadata or indexer_metadata based on the current mode to maintain consistent behavior.


🏁 Script executed:

#!/bin/bash
# Check how stream creation behaves with the indexer mode
rg -A 5 "create_stream.*mode" src/

Length of output: 36


Action required: Verify mode-specific metadata usage in stream creation

The current rg search did not yield evidence of mode-specific handling in the stream creation logic. It appears that the implementation in create_stream_and_schema_from_storage continues to use only ingestor_metadata, but we couldn’t automatically confirm whether indexer_metadata is required for indexer mode. Please manually verify that:

  • The function in src/parseable/mod.rs (lines 385-392) indeed doesn’t handle mode-specific metadata.
  • Other parts of the codebase don’t already conditionally use indexer_metadata where appropriate.
  • Consistent behavior is maintained based on the current mode (ingestor vs. indexer).

Once you confirm the intended behavior, update the function implementation accordingly if needed.

@parmesant parmesant force-pushed the enterprise-changes branch from de1e559 to fcb35da Compare March 21, 2025 09:16
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

268-329: Method renamed and extended to handle both metadata types.

The method was renamed from store_ingestor_metadata to store_metadata and now handles both ingestor and indexer metadata with nearly identical logic paths. This refactoring improves maintainability while extending functionality.

Two observations:

  1. Both the ingestor and indexer branches have similar code patterns that could potentially be refactored to reduce duplication.
  2. The error case returns a generic message "Invalid mode" which could be more specific.

You could consider refactoring the duplicate logic into a helper method:

pub async fn store_metadata(&self, mode: Mode) -> anyhow::Result<()> {
    match mode {
        Mode::Ingest => {
-            let Some(meta) = self.ingestor_metadata.as_ref() else {
-                return Ok(());
-            };
-            let storage_ingestor_metadata = meta.migrate().await?;
-            let store = self.storage.get_object_store();
-
-            // use the id that was generated/found in the staging and
-            // generate the path for the object store
-            let path = meta.file_path();
-
-            // we are considering that we can always get from object store
-            if let Some(mut store_data) = storage_ingestor_metadata {
-                if store_data.domain_name != meta.domain_name {
-                    store_data.domain_name.clone_from(&meta.domain_name);
-                    store_data.port.clone_from(&meta.port);
-
-                    let resource = Bytes::from(serde_json::to_vec(&store_data)?);
-
-                    // if pushing to object store fails propagate the error
-                    store.put_object(&path, resource).await?;
-                }
-            } else {
-                let resource = serde_json::to_vec(&meta)?.into();
-
-                store.put_object(&path, resource).await?;
-            }
-            Ok(())
+            self.store_specific_metadata(self.ingestor_metadata.as_ref()).await
        }
        Mode::Index => {
-            let Some(meta) = self.indexer_metadata.as_ref() else {
-                return Ok(());
-            };
-            let storage_indexer_metadata = meta.migrate().await?;
-            let store = self.storage.get_object_store();
-
-            // use the id that was generated/found in the staging and
-            // generate the path for the object store
-            let path = meta.file_path();
-
-            // we are considering that we can always get from object store
-            if let Some(mut store_data) = storage_indexer_metadata {
-                if store_data.domain_name != meta.domain_name {
-                    store_data.domain_name.clone_from(&meta.domain_name);
-                    store_data.port.clone_from(&meta.port);
-
-                    let resource = Bytes::from(serde_json::to_vec(&store_data)?);
-
-                    // if pushing to object store fails propagate the error
-                    store.put_object(&path, resource).await?;
-                }
-            } else {
-                let resource = serde_json::to_vec(&meta)?.into();
-
-                store.put_object(&path, resource).await?;
-            }
-            Ok(())
+            self.store_specific_metadata(self.indexer_metadata.as_ref()).await
        }
-        _ => Err(anyhow::anyhow!("Invalid mode")),
+        _ => Err(anyhow::anyhow!("Can only store metadata for Ingest or Index mode")),
    }
}

async fn store_specific_metadata<T>(&self, meta_opt: Option<&Arc<T>>) -> anyhow::Result<()> 
where 
    T: HasMigrate + HasFilePath + Serialize + HasDomainAndPort,
{
    let Some(meta) = meta_opt else {
        return Ok(());
    };
    let storage_metadata = meta.migrate().await?;
    let store = self.storage.get_object_store();

    // use the id that was generated/found in the staging and
    // generate the path for the object store
    let path = meta.file_path();

    // we are considering that we can always get from object store
    if let Some(mut store_data) = storage_metadata {
        if store_data.domain_name() != meta.domain_name() {
            store_data.update_domain_and_port(meta);

            let resource = Bytes::from(serde_json::to_vec(&store_data)?);

            // if pushing to object store fails propagate the error
            store.put_object(&path, resource).await?;
        }
    } else {
        let resource = serde_json::to_vec(&meta)?.into();
        store.put_object(&path, resource).await?;
    }
    Ok(())
}

// You would need to define appropriate traits for the metadata types:
trait HasMigrate {
    type Output;
    async fn migrate(&self) -> anyhow::Result<Option<Self::Output>>;
}

trait HasFilePath {
    fn file_path(&self) -> String;
}

trait HasDomainAndPort {
    fn domain_name(&self) -> &String;
    fn port(&self) -> &String;
}

trait DomainPortUpdate {
    fn update_domain_and_port<T: HasDomainAndPort>(&mut self, source: &T);
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between de1e559 and fcb35da.

📒 Files selected for processing (2)
  • src/lib.rs (2 hunks)
  • src/parseable/mod.rs (4 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
src/parseable/mod.rs (1)
src/handlers/http/modal/mod.rs (4) (4)
  • load (197-275)
  • load (372-450)
  • new (174-194)
  • new (349-369)
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
🔇 Additional comments (7)
src/parseable/mod.rs (4)

50-51: Import for IndexerMetadata added to support new functionality.

The import has been expanded to include IndexerMetadata, aligning with the new enterprise functionality where both ingestors and indexers can co-exist in a distributed deployment model.


132-133: New field for indexer metadata extends Parseable's capabilities.

Adding the indexer_metadata field parallel to the existing ingestor_metadata field properly supports the distributed mode where a Parseable instance can operate as an indexer.


149-152: Logical initialization of indexer_metadata based on mode.

The initialization approach for indexer_metadata mirrors that of ingestor_metadata, providing consistent handling of both types of metadata based on the operating mode.


158-159: Proper inclusion of indexer_metadata in struct initialization.

The indexer_metadata field is correctly initialized in the Parseable struct constructor, ensuring that the instance has access to the indexer metadata when operating in Index mode.

src/lib.rs (3)

30-30: Module event changed from private to public.

Making the event module public allows it to be imported and used by external crates or modules, which is necessary for integrating with enterprise features. This change is in line with the overall goal of enhancing enterprise functionality.


75-75: HTTP_CLIENT is now publicly accessible.

Making the HTTP client public allows it to be reused across the codebase, which promotes consistency in how HTTP requests are handled and configured. This change supports the extended functionality for enterprise operations.


78-79:

✅ Verification successful

HTTP request timeout increased from 10 to 30 seconds.

Increasing the HTTP request timeout is appropriate for enterprise environments where operations between ingestors and indexers may take longer than in a standalone setup. This change improves reliability for distributed deployments.


🏁 Script executed:

#!/bin/bash
# Verify where this client is used to ensure the timeout increase is appropriate

echo "Searching for references to HTTP_CLIENT in the codebase..."
rg "HTTP_CLIENT" -A 3 -B 3

Length of output: 9809


Verification Passed: HTTP Request Timeout Update

The search confirmed that the updated HTTP timeout in src/lib.rs is consistently used across the codebase (e.g., in prom_utils.rs, various HTTP handlers, and cluster modules). Increasing the HTTP request timeout from 10 to 30 seconds is well justified for enterprise environments where longer processing times are expected.

@nitisht nitisht merged commit 526907c into parseablehq:main Mar 21, 2025
14 checks passed
This was referenced Apr 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants