Skip to content

Feat: Prism APIs #1188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Mar 4, 2025
Merged

Feat: Prism APIs #1188

merged 14 commits into from
Mar 4, 2025

Conversation

parmesant
Copy link
Contributor

@parmesant parmesant commented Feb 13, 2025

Fixes #XXXX.

This PR adds the GET /home API and the GET /logstream/{logstream}/info endpoints required for Prism

Sample response for GET api/prism/v1/home-

{
    "alert_titles": [
        {
            "title": "AlertTitle",
            "id": "01JKZGGG2SGB4YTTJQQ3DGYBPC"
        }
    ],
    "alerts_info": {
        "total": 1,
        "silenced": 0,
        "resolved": 1,
        "triggered": 0,
        "low": 0,
        "medium": 1,
        "high": 0
    },
    "correlation_titles": [],
    "stream_info": {
        "stats_summary": {
            "events": 23650,
            "ingestion": 9125399,
            "storage": 3257758
        }
    },
    "stats_details": [
        {
            "date": "2025-02-13",
            "events": 23650,
            "ingestion_size": 9125399,
            "storage_size": 3257758
        },
        {
            "date": "2025-02-12",
            "events": 0,
            "ingestion_size": 0,
            "storage_size": 0
        },
        {
            "date": "2025-02-11",
            "events": 0,
            "ingestion_size": 0,
            "storage_size": 0
        },
        {
            "date": "2025-02-10",
            "events": 0,
            "ingestion_size": 0,
            "storage_size": 0
        },
        {
            "date": "2025-02-09",
            "events": 0,
            "ingestion_size": 0,
            "storage_size": 0
        },
        {
            "date": "2025-02-08",
            "events": 0,
            "ingestion_size": 0,
            "storage_size": 0
        },
        {
            "date": "2025-02-07",
            "events": 0,
            "ingestion_size": 0,
            "storage_size": 0
        }
    ],
    "stream_titles": [
        "teststream",
        "teststream2"
    ],
    "dashboard_titles": [],
    "filter_titles": []
}

Sample response for GET api/prism/v1/logstream/{logstream}/info -

{
    "info": {
        "created-at": "2025-02-18T17:41:48.090809553+05:30",
        "first-event-at": "2025-02-18T17:41:58.784+05:30",
        "stream_type": "UserDefined",
        "log_source": "Json"
    },
    "schema": {
        "fields": [
            {
                "name": "app_meta",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "device_id",
                "data_type": "Float64",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "host",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "level",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "location",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "message",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "os",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "p_timestamp",
                "data_type": {
                    "Timestamp": [
                        "Millisecond",
                        null
                    ]
                },
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "process_id",
                "data_type": "Float64",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "request_body",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "response_time",
                "data_type": "Float64",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "runtime",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "session_id",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "source_time",
                "data_type": {
                    "Timestamp": [
                        "Millisecond",
                        null
                    ]
                },
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "status_code",
                "data_type": "Float64",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "timezone",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "user_agent",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "user_id",
                "data_type": "Float64",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "uuid",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            },
            {
                "name": "version",
                "data_type": "Utf8",
                "nullable": true,
                "dict_id": 0,
                "dict_is_ordered": false,
                "metadata": {}
            }
        ],
        "metadata": {}
    }
}

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Introduced new API endpoints for enhanced Prism functionality, including dedicated routes for home and logstream data.
    • Added an endpoint providing aggregated alerts information, with counts by state and severity to aid monitoring.
    • Implemented user-related API functions to retrieve user data and roles.
    • Added a new UsersPrism struct for encapsulating user-related information.
    • Introduced new asynchronous functions for handling logstream information and statistics.
    • Added a new HomeResponse struct and a function to generate comprehensive home data for users.
  • Enhancements

    • Standardized timestamp display to use UTC for consistent time reporting.
    • Updated dashboard data to improve visibility of key naming details.
    • Simplified logic for creating queried statistics in logstream handling.

Copy link
Contributor

coderabbitai bot commented Feb 13, 2025

Walkthrough

The pull request introduces new API endpoints and modules to support Prism functionality and alert monitoring. It adds structures and asynchronous functions for aggregating alert data and handling HTTP requests for Prism home and logstream endpoints. New modules and helper functions are implemented to retrieve log stream information and schema in a distributed setup. Additionally, timestamp generation is standardized to UTC, and a Dashboard field is made public to improve data accessibility.

Changes

File(s) Change Summary
src/alerts/mod.rs Added AlertsInfo struct and get_alerts_info async function for aggregating alert counts by state and severity.
src/handlers/http/mod.rs, src/handlers/http/modal/query_server.rs, src/handlers/http/modal/server.rs Introduced Prism endpoints by adding new modules (prism_home, prism_logstream), constant PRISM_BASE_PATH, and functions (prism_base_path, get_prism_home, get_prism_logstream) to route Prism-related HTTP requests.
src/handlers/http/prism_home.rs, src/handlers/http/prism_logstream.rs Added asynchronous functions home_api and get_info to process requests for Prism home and logstream, returning JSON responses with proper error handling.
src/prism/home/mod.rs, src/prism/logstream/mod.rs, src/prism/mod.rs Created new Prism modules with components for home and logstream, including response structs, API generators, and custom error enums.
src/logstream/mod.rs Introduced helper functions (get_stream_schema_helper and get_stream_info_helper) to fetch schema and stream information in distributed mode with error handling.
src/metadata.rs, src/parseable/mod.rs, src/storage/mod.rs Updated timestamp generation from local time to UTC by replacing Local::now() with Utc::now().
src/users/dashboards.rs Modified the Dashboard struct to make the name field public.
src/handlers/http/rbac.rs Added new asynchronous functions list_users_prism and get_prism_user for user management in Prism.
src/rbac/mod.rs Introduced UsersPrism struct to encapsulate user-related information for Prism.
src/rbac/utils.rs Added to_prism_user function to convert User objects into UsersPrism.
src/handlers/http/role.rs Added new asynchronous function list_roles to retrieve roles as a JSON response.

Sequence Diagram(s)

sequenceDiagram
  participant Client as User
  participant Handler as Home API Handler
  participant Extractor as Session Key Extractor
  participant ResponseGen as Home Response Generator

  Client->>Handler: GET /api/prism/v1/home
  Handler->>Extractor: Extract session key
  Extractor-->>Handler: session key
  Handler->>ResponseGen: generate_home_response(session key)
  ResponseGen-->>Handler: HomeResponse data
  Handler->>Client: Return JSON response
Loading
sequenceDiagram
  participant Client as User
  participant Handler as Logstream API Handler
  participant PrismService as Prism Logstream Service
  participant SchemaHelper as Schema Helper
  participant InfoHelper as Info Helper

  Client->>Handler: GET /api/prism/v1/logstream/{stream_name}/info
  Handler->>PrismService: Call get_prism_logstream_info(stream_name)
  PrismService->>SchemaHelper: Retrieve stream schema
  SchemaHelper-->>PrismService: Return schema
  PrismService->>InfoHelper: Retrieve stream info
  InfoHelper-->>PrismService: Return stream info
  PrismService-->>Handler: PrismLogstreamInfo
  Handler->>Client: Return JSON response
Loading

Suggested reviewers

  • de-sh
  • nikhilsinhaparseable

Poem

I’m a little rabbit, hopping with delight,
New routes and modules make my code world bright.
Alerts and Prism endpoints dance in the light,
UTC time stamps keep our sessions just right.
With carrots and code, I cheer with pure might,
Celebrating changes that make our system take flight!
🥕🐇 Happy coding!


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c683b96 and 28fed09.

📒 Files selected for processing (3)
  • src/handlers/http/logstream.rs (2 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (1 hunks)
  • src/logstream/mod.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/handlers/http/logstream.rs
  • src/handlers/http/modal/query/querier_logstream.rs
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (4)
src/logstream/mod.rs (4)

1-24: License and imports look good!

The license header is complete and appropriate. The imports bring in all necessary components for the functionality being implemented.


34-45: Good error handling for distributed schema updates.

I like how you're handling schema updates in distributed mode with proper error conversion. The use of match with a custom error message is a good pattern here.


55-69: Well-structured approach to first event timestamp retrieval.

The implementation effectively checks both memory and storage for the first event timestamp, with appropriate fallbacks and updates. This is a good pattern for ensuring data consistency.


76-90: Clear and thorough StreamInfo construction.

The construction of the StreamInfo object is thorough and includes all the necessary fields. The metadata is properly mapped to the appropriate fields in the resulting object.

✨ Finishing Touches
  • 📝 Generate Docstrings

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@parmesant parmesant changed the title init commit for Prism GET /home Feat: Prism home API Feb 19, 2025
- refactored code for clippy
- modified the base path for prism
@parmesant parmesant changed the title Feat: Prism home API Feat: Prism APIs Feb 19, 2025
@parmesant parmesant marked this pull request as ready for review February 19, 2025 09:46
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (7)
src/prism/logstream/mod.rs (1)

125-138: Use a structured error response for API endpoints.

You’re returning plaintext error messages via ContentType::plaintext(), which may complicate error handling for API consumers. Consider returning JSON with additional context (e.g., error codes) for easier parsing and standardized error responses.

 fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
     actix_web::HttpResponse::build(self.status_code())
-        .insert_header(ContentType::plaintext())
-        .body(self.to_string())
+        .insert_header(ContentType::json())
+        .json(json!({
+            "error": self.to_string(),
+            "kind": format!("{:?}", self)
+        }))
 }
src/prism/home/mod.rs (3)

78-88: Handle unauthorized streams more uniformly.

Users who are not authorized to access certain streams have them filtered out. This is helpful, but be sure to log or trace any unauthorized attempts for auditing. Optionally, consider returning partial results with a note indicating which streams were omitted.


89-101: Enhance RBAC checks for alerts.

The TODO comment mentions future RBAC logic for alerts. Implementation of the same filtering approach used for streams (line 84) could be applied here as well. Additionally, consider removing or refactoring this comment once the RBAC logic is in place to keep the code clean.

Would you like help creating an RBAC-enabled filtering approach for alerts?


147-170: Optimize statistics retrieval for larger stream counts.

Querying stats for each stream over 7 days in a nested loop may become slow for large numbers of streams. Consider strategies like batching, parallelization (e.g., using FuturesUnordered to process stats concurrently), or retrieving aggregated stats at once.

 for stream in stream_titles.iter() {
     let stats = get_stats_date(stream, date).await?;
     ...
 }
+// Example: gather calls concurrently using FuturesUnordered:
+/*
+use futures::stream::FuturesUnordered;
+use futures::StreamExt;
+
+let stash = stream_titles
+    .iter()
+    .map(|stream| get_stats_date(stream, date))
+    .collect::<FuturesUnordered<_>>();
+
+while let Some(stats_result) = stash.next().await {
+    let stats = stats_result?;
+    ...
+}
+*/
src/handlers/http/prism_logstream.rs (1)

27-31: Check for request authorization and log potential errors.

Currently, get_info directly retrieves the logstream info. Consider adding an authorization check, as well as logging any errors before returning. This helps maintain consistent access control checks and provides better observability.

 pub async fn get_info(stream_name: Path<String>) -> Result<impl Responder, PrismLogstreamError> {
+    // e.g., check authorization here for the current user
+    // if unauthorized => return some 403 error or custom error
     let prism_logstream_info = get_prism_logstream_info(&stream_name).await?;
     Ok(web::Json(prism_logstream_info))
 }
src/handlers/http/prism_home.rs (1)

26-31: Enhance function documentation.

The documentation is incomplete. Add details about:

  • The session key requirement
  • Possible error cases (PrismHomeError)
  • The structure of the JSON response
 /// Fetches the data to populate Prism's home
 ///
+/// # Arguments
+/// * `req` - The HTTP request containing the session key
 ///
 /// # Returns
+/// A JSON response containing:
+/// - On success: The `HomeResponse` struct
+/// - On error: A `PrismHomeError` indicating the failure reason
+///   - `Anyhow`: Session key extraction failed
+///   - Other variants from `generate_home_response`
 ///
 /// A JSONified version of the `HomeResponse` struct.
src/alerts/mod.rs (1)

878-887: Consider adding documentation for the AlertsInfo struct.

The struct is well-designed but lacks documentation explaining its purpose and fields.

Add documentation:

+/// Represents aggregated information about alerts in the system.
+///
+/// This struct provides counts of alerts by their state (silenced, resolved, triggered)
+/// and severity levels (low, medium, high).
 #[derive(Debug, Serialize)]
 pub struct AlertsInfo {
+    /// Total number of alerts in the system
     total: u64,
+    /// Number of silenced alerts
     silenced: u64,
+    /// Number of resolved alerts
     resolved: u64,
+    /// Number of triggered alerts
     triggered: u64,
+    /// Number of low severity alerts
     low: u64,
+    /// Number of medium severity alerts
     medium: u64,
+    /// Number of high severity alerts
     high: u64,
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e1e2f2a and 9d4f25f.

📒 Files selected for processing (15)
  • src/alerts/mod.rs (2 hunks)
  • src/handlers/http/mod.rs (1 hunks)
  • src/handlers/http/modal/query_server.rs (2 hunks)
  • src/handlers/http/modal/server.rs (3 hunks)
  • src/handlers/http/prism_home.rs (1 hunks)
  • src/handlers/http/prism_logstream.rs (1 hunks)
  • src/lib.rs (1 hunks)
  • src/logstream/mod.rs (1 hunks)
  • src/metadata.rs (2 hunks)
  • src/parseable/mod.rs (2 hunks)
  • src/prism/home/mod.rs (1 hunks)
  • src/prism/logstream/mod.rs (1 hunks)
  • src/prism/mod.rs (1 hunks)
  • src/storage/mod.rs (2 hunks)
  • src/users/dashboards.rs (1 hunks)
✅ Files skipped from review due to trivial changes (2)
  • src/prism/mod.rs
  • src/parseable/mod.rs
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: coverage
🔇 Additional comments (9)
src/prism/logstream/mod.rs (1)

57-67: Consider errors in schema retrieval.

The schema retrieval may fail if the underlying storage or distribution update times out or partially fails. Currently, high-level errors are returned without telling the user whether partial data might be available. Providing more granular error context in the response (e.g., which node or region is unavailable) could help debugging.

src/handlers/http/prism_home.rs (1)

32-39: LGTM! Well-structured error handling.

The implementation correctly:

  • Extracts and validates the session key
  • Maps extraction errors to PrismHomeError
  • Generates and returns the response as JSON
src/lib.rs (1)

40-40: LGTM! Module declaration aligns with PR objectives.

The addition of the prism module is consistent with the introduction of the Prism feature endpoints.

src/handlers/http/mod.rs (1)

43-44: LGTM! Well-structured API path handling.

The changes follow the existing patterns for:

  • Module organization
  • Constant definitions
  • Path generation functions

Also applies to: 52-52, 58-60

src/metadata.rs (1)

20-20: LGTM! Standardizing timestamp generation to UTC.

The change from Local to Utc for timestamp generation is a good practice as it ensures consistent timestamps across different timezones.

Also applies to: 108-108

src/storage/mod.rs (1)

33-34: LGTM! Standardizing timestamp generation to UTC.

The change from Local to Utc for timestamp generation is consistent with the changes in metadata.rs and ensures consistent timestamps across different timezones.

Also applies to: 217-217

src/users/dashboards.rs (1)

101-101: LGTM! Making the name field public.

Making the name field public is appropriate for the new Prism feature to access dashboard names.

src/handlers/http/modal/server.rs (2)

93-93: LGTM! Service configuration for Prism endpoints.

The service configuration correctly integrates the new Prism endpoints into the existing routing structure.


159-161: LGTM! Implementation of get_prism_home endpoint.

The implementation follows the routing pattern used throughout the codebase and correctly maps to the home API handler.

Comment on lines +74 to +78
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home())
.service(Server::get_prism_logstream()),
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add authorization middleware to Prism endpoints.

The new Prism endpoints are missing authorization middleware, which could lead to unauthorized access to sensitive data. Consider adding appropriate authorization checks using the .authorize() middleware, similar to how it's used in other endpoints.

Apply this diff to add authorization:

 .service(
     web::scope(&prism_base_path())
-        .service(Server::get_prism_home())
-        .service(Server::get_prism_logstream()),
+        .service(Server::get_prism_home().authorize(Action::ListStream))
+        .service(Server::get_prism_logstream().authorize_for_stream(Action::GetStreamInfo)),
 )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home())
.service(Server::get_prism_logstream()),
)
.service(
web::scope(&prism_base_path())
.service(Server::get_prism_home().authorize(Action::ListStream))
.service(Server::get_prism_logstream().authorize_for_stream(Action::GetStreamInfo)),
)

Comment on lines 163 to 169
pub fn get_prism_logstream() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}").service(
web::resource("/info").route(web::get().to(http::prism_logstream::get_info)),
),
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding authorization middleware to Prism endpoints.

The implementation follows the routing pattern but lacks authorization middleware that is present in other similar endpoints (e.g., alerts, metrics).

Apply this diff to add authorization:

 pub fn get_prism_logstream() -> Scope {
     web::scope("/logstream").service(
         web::scope("/{logstream}").service(
-            web::resource("/info").route(web::get().to(http::prism_logstream::get_info)),
+            web::resource("/info").route(
+                web::get()
+                    .to(http::prism_logstream::get_info)
+                    .authorize_for_stream(Action::GetStreamInfo)
+            ),
         ),
     )
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn get_prism_logstream() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}").service(
web::resource("/info").route(web::get().to(http::prism_logstream::get_info)),
),
)
}
pub fn get_prism_logstream() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}").service(
web::resource("/info").route(
web::get()
.to(http::prism_logstream::get_info)
.authorize_for_stream(Action::GetStreamInfo)
),
),
)
}

Comment on lines +889 to +925
// TODO: add RBAC
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
let alerts = ALERTS.alerts.read().await;
let mut total = 0;
let mut silenced = 0;
let mut resolved = 0;
let mut triggered = 0;
let mut low = 0;
let mut medium = 0;
let mut high = 0;

for (_, alert) in alerts.iter() {
total += 1;
match alert.state {
AlertState::Silenced => silenced += 1,
AlertState::Resolved => resolved += 1,
AlertState::Triggered => triggered += 1,
}

match alert.severity {
Severity::Low => low += 1,
Severity::Medium => medium += 1,
Severity::High => high += 1,
_ => {}
}
}

Ok(AlertsInfo {
total,
silenced,
resolved,
triggered,
low,
medium,
high,
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

❓ Verification inconclusive

Add RBAC implementation as noted in TODO comment.

The function needs Role-Based Access Control implementation for security.

Let's verify the RBAC implementation in other similar functions:

Additionally:

  1. The function should handle the Critical severity case explicitly instead of silently ignoring it.
  2. Consider using a more efficient counting method.

Apply this diff to improve the implementation:

-// TODO: add RBAC
-pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
+pub async fn get_alerts_info(session: SessionKey) -> Result<AlertsInfo, AlertError> {
     let alerts = ALERTS.alerts.read().await;
-    let mut total = 0;
-    let mut silenced = 0;
-    let mut resolved = 0;
-    let mut triggered = 0;
-    let mut low = 0;
-    let mut medium = 0;
-    let mut high = 0;
+    let mut info = AlertsInfo {
+        total: 0,
+        silenced: 0,
+        resolved: 0,
+        triggered: 0,
+        low: 0,
+        medium: 0,
+        high: 0,
+        critical: 0,
+    };

     for (_, alert) in alerts.iter() {
-        total += 1;
+        // Check if user has permission to view this alert
+        if user_auth_for_query(&session, &alert.query).await.is_err() {
+            continue;
+        }
+
+        info.total += 1;
         match alert.state {
-            AlertState::Silenced => silenced += 1,
-            AlertState::Resolved => resolved += 1,
-            AlertState::Triggered => triggered += 1,
+            AlertState::Silenced => info.silenced += 1,
+            AlertState::Resolved => info.resolved += 1,
+            AlertState::Triggered => info.triggered += 1,
         }

         match alert.severity {
-            Severity::Low => low += 1,
-            Severity::Medium => medium += 1,
-            Severity::High => high += 1,
-            _ => {}
+            Severity::Low => info.low += 1,
+            Severity::Medium => info.medium += 1,
+            Severity::High => info.high += 1,
+            Severity::Critical => info.critical += 1,
         }
     }

-    Ok(AlertsInfo {
-        total,
-        silenced,
-        resolved,
-        triggered,
-        low,
-        medium,
-        high,
-    })
+    Ok(info)
 }

🏁 Script executed:

#!/bin/bash
# Search for RBAC patterns in alert-related functions
rg -A 5 "pub async fn.*alert.*session.*SessionKey" src/

Length of output: 57


Action Required: Verify RBAC Integration and Critical Severity Handling

The suggested changes introduce an RBAC check via a new SessionKey parameter and update the alerts counting logic to account for a new critical field in the AlertsInfo structure. However, the initial search for RBAC patterns in similar functions did not produce any output. Please manually verify that the RBAC approach (i.e. calling user_auth_for_query to filter alerts based on permissions) is consistent with the rest of the codebase and that adding a critical counter aligns with the existing definition of AlertsInfo.

  • Confirm that the new parameter session: SessionKey is appropriate for accessing and validating alerts.
  • Verify that the function user_auth_for_query exists and is used elsewhere for RBAC enforcement.
  • Check that the AlertsInfo struct definition is updated to include a critical field.
  • Ensure that the refactoring to use a single mutable structure for accumulating counts does not affect performance or introduce unwanted side effects.

Please ensure these modifications are in line with your security model and that the alerts API is thoroughly tested before merging.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// TODO: add RBAC
pub async fn get_alerts_info() -> Result<AlertsInfo, AlertError> {
let alerts = ALERTS.alerts.read().await;
let mut total = 0;
let mut silenced = 0;
let mut resolved = 0;
let mut triggered = 0;
let mut low = 0;
let mut medium = 0;
let mut high = 0;
for (_, alert) in alerts.iter() {
total += 1;
match alert.state {
AlertState::Silenced => silenced += 1,
AlertState::Resolved => resolved += 1,
AlertState::Triggered => triggered += 1,
}
match alert.severity {
Severity::Low => low += 1,
Severity::Medium => medium += 1,
Severity::High => high += 1,
_ => {}
}
}
Ok(AlertsInfo {
total,
silenced,
resolved,
triggered,
low,
medium,
high,
})
}
pub async fn get_alerts_info(session: SessionKey) -> Result<AlertsInfo, AlertError> {
let alerts = ALERTS.alerts.read().await;
let mut info = AlertsInfo {
total: 0,
silenced: 0,
resolved: 0,
triggered: 0,
low: 0,
medium: 0,
high: 0,
critical: 0,
};
for (_, alert) in alerts.iter() {
// Check if user has permission to view this alert
if user_auth_for_query(&session, &alert.query).await.is_err() {
continue;
}
info.total += 1;
match alert.state {
AlertState::Silenced => info.silenced += 1,
AlertState::Resolved => info.resolved += 1,
AlertState::Triggered => info.triggered += 1,
}
match alert.severity {
Severity::Low => info.low += 1,
Severity::Medium => info.medium += 1,
Severity::High => info.high += 1,
Severity::Critical => info.critical += 1,
}
}
Ok(info)
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/handlers/http/modal/server.rs (1)

167-173: 🛠️ Refactor suggestion

Consider adding authorization middleware to Prism logstream endpoint.

The implementation follows the routing pattern but lacks authorization middleware that is present in other similar endpoints (e.g., alerts, metrics).

Apply this diff to add authorization:

pub fn get_prism_logstream() -> Scope {
    web::scope("/logstream").service(
        web::scope("/{logstream}").service(
-            web::resource("/info").route(web::get().to(http::prism_logstream::get_info)),
+            web::resource("/info").route(
+                web::get()
+                    .to(http::prism_logstream::get_info)
+                    .authorize_for_stream(Action::GetStreamInfo)
+            ),
        ),
    )
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9d4f25f and ed4c1e0.

📒 Files selected for processing (4)
  • src/handlers/http/modal/server.rs (3 hunks)
  • src/lib.rs (1 hunks)
  • src/parseable/mod.rs (2 hunks)
  • src/storage/mod.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/lib.rs
  • src/storage/mod.rs
  • src/parseable/mod.rs
🔇 Additional comments (1)
src/handlers/http/modal/server.rs (1)

93-97: LGTM! Well-structured integration of Prism API endpoints.

The implementation correctly integrates the new Prism endpoints into the existing routing configuration by creating a dedicated scope with the prism base path.

Comment on lines +163 to +165
pub fn get_prism_home() -> Resource {
web::resource("/home").route(web::get().to(http::prism_home::home_api))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add authorization middleware to the Prism home endpoint.

The implementation follows the routing pattern, but lacks authorization middleware that is present in other similar endpoints (like alerts, metrics, etc.). This is needed to maintain consistent security across all API endpoints.

Apply this diff to add authorization:

pub fn get_prism_home() -> Resource {
-    web::resource("/home").route(web::get().to(http::prism_home::home_api))
+    web::resource("/home").route(web::get().to(http::prism_home::home_api).authorize(Action::GetStreamInfo))
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn get_prism_home() -> Resource {
web::resource("/home").route(web::get().to(http::prism_home::home_api))
}
pub fn get_prism_home() -> Resource {
web::resource("/home").route(web::get().to(http::prism_home::home_api).authorize(Action::GetStreamInfo))
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (1)
src/handlers/http/modal/server.rs (1)

163-165: ⚠️ Potential issue

Add authorization middleware to the Prism home endpoint.

The implementation follows the routing pattern but lacks authorization middleware that is present in other similar endpoints (like alerts, metrics, etc.). This is needed to maintain consistent security across all API endpoints.

Apply this diff to add authorization:

pub fn get_prism_home() -> Resource {
-    web::resource("/home").route(web::get().to(http::prism_home::home_api))
+    web::resource("/home").route(web::get().to(http::prism_home::home_api).authorize(Action::GetStreamInfo))
}
🧹 Nitpick comments (4)
src/prism/home/mod.rs (4)

35-40: Consider removing commented out code or implementing the features.

The struct contains commented out fields which might indicate incomplete implementation or future plans. Either implement these fields or remove the comments to keep the code clean.

struct StreamInfo {
-    // stream_count: u32,
-    // log_source_count: u32,
    stats_summary: Stats,
}

69-76: Improve error handling with a more specific error message.

The error message "User does not exist" is generic. Consider providing a more descriptive message that indicates this is a session validation failure.

    } else {
        return Err(PrismHomeError::Anyhow(anyhow::Error::msg(
-            "User does not exist",
+            "Invalid session: User not found or session expired",
        )));
    };

149-173: Optimize the loop structure to reduce database calls.

The current implementation makes a separate database call for each stream and date combination, which could result in a high number of queries. Consider batching these requests or optimizing the query structure.

You could restructure this to batch stats requests by date or stream to minimize the number of database operations.


188-215: Implement proper content type for JSON responses in error handling.

The error_response method sets the content type to plaintext, but since the API is likely supposed to return JSON responses, consider using JSON for error responses as well for consistency.

    fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
+        use serde_json::json;
        actix_web::HttpResponse::build(self.status_code())
-            .insert_header(ContentType::plaintext())
-            .body(self.to_string())
+            .insert_header(ContentType::json())
+            .json(json!({
+                "error": self.to_string()
+            }))
    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ed4c1e0 and 37ec1f2.

📒 Files selected for processing (2)
  • src/handlers/http/modal/server.rs (3 hunks)
  • src/prism/home/mod.rs (1 hunks)
🔇 Additional comments (1)
src/handlers/http/modal/server.rs (1)

167-177: Looks good, authorization is properly implemented.

The implementation properly includes authorization for the logstream info endpoint using the authorize_for_stream middleware with the appropriate action.

/logstream/{logstream}/info
- added stats and retention to the response body

GET /users
GET /users/{username}
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
src/handlers/http/modal/server.rs (1)

164-166: 🛠️ Refactor suggestion

Add authorization middleware to the Prism home endpoint.

The implementation follows the routing pattern, but lacks authorization middleware that is present in other similar endpoints (like alerts, metrics, etc.). This is needed to maintain consistent security across all API endpoints.

Apply this diff to add authorization:

pub fn get_prism_home() -> Resource {
-    web::resource("/home").route(web::get().to(http::prism_home::home_api))
+    web::resource("/home").route(web::get().to(http::prism_home::home_api).authorize(Action::GetStreamInfo))
}
🧹 Nitpick comments (4)
src/handlers/http/rbac.rs (1)

66-103: Consider refactoring to reduce code duplication.

The list_users_prism function contains logic that is duplicated in the get_prism_user function. Both functions map users to UsersPrism objects with identical transformation logic.

Instead of duplicating the user transformation logic, consider extracting it into a helper function:

+/// Helper function to convert a user to UsersPrism
+fn user_to_prism(u: &user::User) -> UsersPrism {
+    let (id, method, email, picture) = match &u.ty {
+        user::UserType::Native(_) => (u.username(), "native", None, None),
+        user::UserType::OAuth(oauth) => (
+            u.username(),
+            "oauth",
+            oauth.user_info.email.clone(),
+            oauth.user_info.picture.clone(),
+        ),
+    };
+    let roles: HashMap<String, Vec<DefaultPrivilege>> = Users
+        .get_role(id)
+        .iter()
+        .filter_map(|role_name| {
+            roles()
+                .get(role_name)
+                .map(|role| (role_name.to_owned(), role.clone()))
+        })
+        .collect();
+
+    UsersPrism {
+        id: id.into(),
+        method: method.into(),
+        email,
+        picture,
+        roles,
+    }
+}
+
 /// Handler for GET /api/v1/users
 /// returns list of all registerd users along with their roles and other info
 pub async fn list_users_prism() -> impl Responder {
     // get all users
     let prism_users = rbac::map::users()
         .values()
-        .map(|u| {
-            let (id, method, email, picture) = match &u.ty {
-                user::UserType::Native(_) => (u.username(), "native", None, None),
-                user::UserType::OAuth(oauth) => (
-                    u.username(),
-                    "oauth",
-                    oauth.user_info.email.clone(),
-                    oauth.user_info.picture.clone(),
-                ),
-            };
-            let roles: HashMap<String, Vec<DefaultPrivilege>> = Users
-                .get_role(id)
-                .iter()
-                .filter_map(|role_name| {
-                    roles()
-                        .get(role_name)
-                        .map(|role| (role_name.to_owned(), role.clone()))
-                })
-                .collect();
-
-            UsersPrism {
-                id: id.into(),
-                method: method.into(),
-                email,
-                picture,
-                roles,
-            }
-        })
+        .map(user_to_prism)
         .collect_vec();

     web::Json(prism_users)
 }
src/rbac/mod.rs (1)

172-187: Fix documentation typos and clarify field descriptions.

The struct documentation has some typos and could be improved for clarity.

 /// This struct represents a user along with their roles, email, etc
 ///
 /// TODO: rename this after deprecating the older struct
 #[derive(Debug, Serialize, Clone)]
 pub struct UsersPrism {
-    // username
+    /// Username identifier
     pub id: String,
-    // oaith or native
+    /// Authentication method ("oauth" or "native")
     pub method: String,
-    // email only if method is oauth
+    /// Email address (only present if method is oauth)
     pub email: Option<String>,
-    // picture only if oauth
+    /// Profile picture URL (only present if method is oauth)
     pub picture: Option<Url>,
-    // roles for the user
+    /// Map of role names to their associated privileges
     pub roles: HashMap<String, Vec<DefaultPrivilege>>,
 }
src/prism/logstream/mod.rs (2)

39-46: Consider adding documentation for PrismLogstreamInfo struct fields.

The struct lacks field-level documentation which would help API consumers understand the data better.

 #[derive(Serialize)]
 pub struct PrismLogstreamInfo {
+    /// Basic information about the stream (creation date, type, etc.)
     info: StreamInfo,
+    /// The schema describing the structure of the logs in this stream
     schema: Arc<Schema>,
+    /// Current statistics about the stream (events count, storage usage, etc.)
     stats: QueriedStats,
+    /// Retention policy configuration for this stream
     retention: Retention,
 }

159-205: Consider renaming the function to clarify its behavior.

Similar to the get_stream_schema_helper function, this function also checks if a stream exists and returns an error if check_or_load_stream returns true.

Also, consider adding more descriptive error messages to help with debugging.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3e5077a and 6397b71.

📒 Files selected for processing (5)
  • src/handlers/http/modal/query_server.rs (3 hunks)
  • src/handlers/http/modal/server.rs (5 hunks)
  • src/handlers/http/rbac.rs (2 hunks)
  • src/prism/logstream/mod.rs (1 hunks)
  • src/rbac/mod.rs (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/handlers/http/modal/query_server.rs
⏰ Context from checks skipped due to timeout of 90000ms (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
🔇 Additional comments (3)
src/handlers/http/modal/server.rs (2)

168-180: Good implementation with proper authorization.

The endpoint correctly includes multiple authorization requirements to ensure the user has all the necessary permissions to access the logstream information.


503-523: Well-structured implementation with proper authorization.

The users webscope is implemented consistently with other endpoints and includes appropriate authorization middleware for each route.

src/prism/logstream/mod.rs (1)

73-90: Clarify the return value of check_or_load_stream.

Currently, the code returns a StreamNotFound error if check_or_load_stream returns true, which can be confusing.

Consider renaming the function or inverting its return value to make the logic clearer:

-if PARSEABLE.check_or_load_stream(stream_name).await {
-    return Err(StreamNotFound(stream_name.to_owned()).into());
-}
+// Option 1: rename the function to something more explicit about 'missing'
+if PARSEABLE.stream_missing(stream_name).await {
+    return Err(StreamNotFound(stream_name.to_owned()).into());
+}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/prism/logstream/mod.rs (1)

92-128: 🛠️ Refactor suggestion

Reduce code duplication and simplify logic in the get_stats function.

The function creates ingestor_stats as None and later checks if it's Some, which is unnecessary since it will always be None. This pattern appears in multiple files and should be consistently refactored.

 async fn get_stats(stream_name: &str) -> Result<QueriedStats, PrismLogstreamError> {
     let stats = stats::get_current_stats(stream_name, "json")
         .ok_or_else(|| StreamNotFound(stream_name.to_owned()))?;
 
-    let ingestor_stats: Option<Vec<QueriedStats>> = None;
+    let time = Utc::now();

     let ingestion_stats = IngestionStats::new(
         stats.current_stats.events,
         format!("{} {}", stats.current_stats.ingestion, "Bytes"),
         stats.lifetime_stats.events,
         format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
         stats.deleted_stats.events,
         format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
         "json",
     );
     let storage_stats = StorageStats::new(
         format!("{} {}", stats.current_stats.storage, "Bytes"),
         format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
         format!("{} {}", stats.deleted_stats.storage, "Bytes"),
         "parquet",
     );

-    let time = Utc::now();
-
-    let stats = {
-        let ingestion_stats = IngestionStats::new(
-            stats.current_stats.events,
-            format!("{} {}", stats.current_stats.ingestion, "Bytes"),
-            stats.lifetime_stats.events,
-            format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
-            stats.deleted_stats.events,
-            format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
-            "json",
-        );
-        let storage_stats = StorageStats::new(
-            format!("{} {}", stats.current_stats.storage, "Bytes"),
-            format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
-            format!("{} {}", stats.deleted_stats.storage, "Bytes"),
-            "parquet",
-        );
-
-        QueriedStats::new(stream_name, time, ingestion_stats, storage_stats)
-    };
-
-    let stats = if let Some(mut ingestor_stats) = ingestor_stats {
-        ingestor_stats.push(stats);
-        merge_quried_stats(ingestor_stats)
-    } else {
-        stats
-    };
+    // Create the QueriedStats directly
+    let stats = QueriedStats::new(stream_name, time, ingestion_stats, storage_stats);

     Ok(stats)
 }
🧹 Nitpick comments (1)
src/handlers/http/logstream.rs (1)

264-282: Excellent code optimization!

The refactoring eliminates redundant conditional branching that was previously creating QueriedStats with identical parameters. This simplification improves readability and maintainability by consolidating the logic into a single code block.

The code could be further improved by extracting the creation of IngestionStats and StorageStats into separate helper functions to enhance reusability and readability:

- let stats = {
-     let ingestion_stats = IngestionStats::new(
-         stats.current_stats.events,
-         format!("{} {}", stats.current_stats.ingestion, "Bytes"),
-         stats.lifetime_stats.events,
-         format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
-         stats.deleted_stats.events,
-         format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
-         "json",
-     );
-     let storage_stats = StorageStats::new(
-         format!("{} {}", stats.current_stats.storage, "Bytes"),
-         format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
-         format!("{} {}", stats.deleted_stats.storage, "Bytes"),
-         "parquet",
-     );
- 
-     QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats)
- };
+ let ingestion_stats = create_ingestion_stats(&stats, "json");
+ let storage_stats = create_storage_stats(&stats, "parquet");
+ let stats = QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats);

Then add these helper functions at the end of the file:

fn create_ingestion_stats(stats: &stats::Stats, format: &str) -> IngestionStats {
    IngestionStats::new(
        stats.current_stats.events,
        format!("{} {}", stats.current_stats.ingestion, "Bytes"),
        stats.lifetime_stats.events,
        format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"),
        stats.deleted_stats.events,
        format!("{} {}", stats.deleted_stats.ingestion, "Bytes"),
        format,
    )
}

fn create_storage_stats(stats: &stats::Stats, format: &str) -> StorageStats {
    StorageStats::new(
        format!("{} {}", stats.current_stats.storage, "Bytes"),
        format!("{} {}", stats.lifetime_stats.storage, "Bytes"),
        format!("{} {}", stats.deleted_stats.storage, "Bytes"),
        format,
    )
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6397b71 and 4cb0073.

📒 Files selected for processing (6)
  • src/handlers/http/logstream.rs (1 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (1 hunks)
  • src/handlers/http/rbac.rs (2 hunks)
  • src/prism/logstream/mod.rs (1 hunks)
  • src/rbac/mod.rs (2 hunks)
  • src/rbac/utils.rs (1 hunks)
🔇 Additional comments (13)
src/prism/logstream/mod.rs (5)

39-45: Good use of Serialize for the PrismLogstreamInfo struct.

The struct is well-organized and includes all the necessary components for log stream information: basic info, schema, stats, and retention policy. Making it Serialize enables easy JSON conversion for API responses.


47-71: Well-structured async function with clear responsibilities.

The get_prism_logstream_info function has a clean implementation that follows a logical sequence of retrieving each component of stream information. The error handling is appropriate with the ? operator to bubble up errors.


73-90: Clarify the return value of check_or_load_stream.

Currently, the code returns a StreamNotFound error if check_or_load_stream returns true, which can be confusing. Consider renaming the function or inverting its return value to make the logic clearer.

-if PARSEABLE.check_or_load_stream(stream_name).await {
-    return Err(StreamNotFound(stream_name.to_owned()).into());
-}
+// Option 1: rename the function to something more explicit about 'missing'
+if PARSEABLE.stream_missing(stream_name).await {
+    return Err(StreamNotFound(stream_name.to_owned()).into());
+}

130-176: Good error handling and storage fallback mechanism.

The implementation correctly handles the case when a stream is not found in memory by checking the storage. The code for updating first_event_at when it's found in storage but not in memory is well-implemented.


178-202: Well-implemented error handling with appropriate HTTP status codes.

The PrismLogstreamError enum and its implementation of actix_web::ResponseError provide clear error messages and appropriate HTTP status codes. Notably, the StreamNotFound error correctly returns a 404 status code.

src/rbac/utils.rs (1)

27-54: Well-implemented utility function with proper error handling.

The to_prism_user function efficiently extracts user information based on type and collects roles with proper error handling. This utility function helps avoid code duplication in handler functions.

The implementation correctly:

  1. Destructures the user type to extract relevant fields
  2. Uses filter_map to handle potential missing roles
  3. Constructs a clean UsersPrism object with all required fields
src/handlers/http/modal/query/querier_logstream.rs (1)

180-198: Good refactoring to eliminate redundant code.

The refactored code now eliminates unnecessary conditional branching and redundant code. The ingestion and storage stats are now calculated once in a block, which improves readability and maintainability.

src/handlers/http/rbac.rs (3)

22-32: Clean import organization.

The imports are well-organized, with related imports grouped together. The addition of the to_prism_user utility function import is properly placed with other rbac-related imports.


67-73: Efficiently implemented user listing function.

The list_users_prism function efficiently maps all users to their UsersPrism representation using the utility function and collects them into a vector for JSON serialization.


76-87: Optimized user lookup implementation.

The implementation efficiently checks if a user exists before constructing the UsersPrism object, which is a more optimized approach than mapping all users and then filtering by username.

src/rbac/mod.rs (3)

22-22: Added utils module for the Prism feature

This new module likely contains utility functions for the Prism feature, which aligns with the PR objectives of introducing Prism API endpoints.


24-30: Updated imports for new UsersPrism struct

The imports have been properly updated to include the necessary types for the new UsersPrism struct, including HashMap, DefaultPrivilege, Serialize, and Url.


173-188:

✅ Verification successful

New UsersPrism struct for representing user information in the Prism feature

The new struct provides a comprehensive representation of user information with clear documentation. This is well aligned with the PR objectives to introduce Prism API endpoints.

A few observations:

  1. The TODO comment indicates this is a transitional structure that will replace an older one in the future.
  2. The struct includes all necessary fields for representing user information in the Prism context.
  3. The Serialize derive enables JSON serialization for API responses.

Let's check for the related implementation in the utils module:


🏁 Script executed:

#!/bin/bash
# Description: Check if there's a to_prism_user function in the utils module

# Find the utils.rs file in the rbac directory
fd "utils.rs" --exec grep -l "to_prism_user" {}

Length of output: 67


Review Approved – New UsersPrism struct is correctly implemented
The new UsersPrism struct in src/rbac/mod.rs (lines 173–188) now clearly documents and encapsulates the user information needed for the Prism feature. The transitional TODO is appropriately placed to signal future deprecation of the older structure, and the use of the Serialize derive ensures smooth JSON serialization for API responses.

  • Verified that the related utility function to_prism_user exists in src/rbac/utils.rs, confirming integration with the Prism API endpoints.

Overall, the changes are aligned with the PR objectives.

Signed-off-by: parmesant <anant.v09@proton.me>
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

Co-authored-by: Devdutt Shenoi <devdutt@outlook.in>
Signed-off-by: parmesant <anant.v09@proton.me>
Co-authored-by: Devdutt Shenoi <devdutt@outlook.in>
Signed-off-by: parmesant <anant.v09@proton.me>
@nitisht nitisht merged commit 59ba6a8 into parseablehq:main Mar 4, 2025
14 checks passed
This was referenced Apr 7, 2025
@coderabbitai coderabbitai bot mentioned this pull request May 6, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants