Skip to content

Conversation

@nikhilsinhaparseable
Copy link
Contributor

@nikhilsinhaparseable nikhilsinhaparseable commented Oct 20, 2025

  • fetch billing metrics from all live nodes
  • convert to event
  • ingest in pbilling stream
  • like cluster metrics, fetch every minute in the same scheduled task

Summary by CodeRabbit

  • New Features

    • Added a dedicated billing metrics pipeline that collects metrics from all cluster nodes in parallel and consolidates them into a separate billing event stream for reporting.
  • Chores

    • Reorganized internal stream handling and initialization to support the new billing stream and improve startup resilience and synchronization.
    • Adjusted internal ingestion metadata format and added an early-return optimization when streams already exist.

- fetch billing metrics from all live nodes
- convert to event
- ingest in pbilling stream
- like cluster metrics, fetch every minute in the same scheduled task
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 20, 2025

Walkthrough

Adds a billing metrics subsystem that collects, normalizes, and ingests per-node billing events into a new pbilling stream; renames the internal stream constant to PMETA_STREAM_NAME (pmeta) and adjusts stream-creation and ingestion flows accordingly.

Changes

Cohort / File(s) Summary
Billing metrics core
src/handlers/http/cluster/mod.rs
Adds PMETA_STREAM_NAME and BILLING_METRICS_STREAM_NAME; introduces BillingMetricEvent and BillingMetricsCollector; implements parsing of Prometheus samples (simple, object-store, LLM) into billing events; adds fetch_node_billing_metrics, fetch_nodes_billing_metrics, fetch_cluster_billing_metrics, extract_billing_metrics_from_samples; ingests billing events to pbilling.
Internal stream ingest change
src/handlers/http/ingest.rs
When creating internal-stream events, sets FORMAT_KEY/origin_format to LogSource::Json (was LogSource::Pmeta).
Stream name refactor
src/hottier.rs, src/migration/stream_metadata_migration.rs
Replaces uses of removed INTERNAL_STREAM_NAME with PMETA_STREAM_NAME and updates migration logic that assigns internal stream type based on name.
Internal stream creation & sync
src/parseable/mod.rs
Exports PMETA_STREAM_NAME and BILLING_METRICS_STREAM_NAME; create_internal_stream_if_not_exists now ensures both pmeta (LogSource::Pmeta) and pbilling (LogSource::Json) exist and syncs each with ingestors, handling existing-stream short-circuits and per-stream errors.
Object storage short-circuit
src/storage/object_storage.rs
create_stream_from_ingestor short-circuits and returns existing metastore JSON if the in-memory PARSEABLE already contains the stream, skipping further ingestor-merge work.

Sequence Diagram(s)

sequenceDiagram
    participant Scheduler as Cluster Scheduler
    participant Cluster as cluster/mod.rs
    participant Nodes as Cluster Nodes
    participant Prom as Prometheus
    participant Collector as BillingMetricsCollector
    participant Ingest as Ingest
    participant PMETA as "pmeta (PMETA)"
    participant PBILL as "pbilling (PBILLING)"

    Scheduler->>Cluster: collect_cluster_metrics()
    Cluster->>Nodes: fetch metrics (parallel)
    Nodes-->>Cluster: cluster metrics
    Cluster->>PMETA: ingest cluster metrics

    Note over Cluster: billing metrics flow (new)
    Cluster->>Nodes: fetch_nodes_billing_metrics() (parallel)
    Nodes->>Prom: query Prometheus samples
    Prom-->>Nodes: samples (labels & values)
    Nodes-->>Cluster: samples
    Cluster->>Collector: extract_billing_metrics_from_samples()
    Collector-->>Cluster: BillingMetricEvent[]
    Cluster->>Ingest: ingest billing events
    Ingest->>PBILL: write billing events
    PBILL-->>Ingest: success
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • parmesant

Poem

🐰 I hopped through nodes and sampled every line,

I counted carrots by label, timestamp, and time.
Pmeta hums softly while pbilling sings,
Events hop in neat rows—ledgered little things. 🥕

Pre-merge checks and finishing touches

❌ Failed checks (2 warnings)
Check name Status Explanation Resolution
Description Check ⚠️ Warning The provided description is significantly incomplete compared to the required template. While the author provides brief bullet points about the functionality (fetch billing metrics, convert to events, ingest to pbilling stream, scheduled task), the description lacks the structured sections defined in the template. Most critically, the checklist items with checkboxes are entirely missing—the template explicitly requires marking boxes for testing, code comments, and documentation. The description also lacks elaboration on the goal, rationale, and key changes beyond the bullet points, and provides no indication that any of the checklist items have been addressed. The pull request description should be expanded to include: a proper Description section with goal and rationale, elaboration on the key changes beyond the bullet points, and completion of the required checklist items (testing, code comments, and documentation). Even if some items don't apply, they should be explicitly marked in the checklist to indicate they were considered. This ensures the description follows the repository's template and provides reviewers with necessary context about testing and documentation completeness.
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (1 passed)
Check name Status Explanation
Title Check ✅ Passed The title "ingest billing metrics in internal stream" clearly and concisely captures the main objective of the pull request. Based on the raw summary and PR objectives, the primary change is the implementation of billing metrics collection and ingestion into an internal stream (specifically the "pbilling" stream). The title is specific enough to convey the key change without unnecessary verbosity or vagueness. While it doesn't mention the specific stream name "pbilling," the core intent—billing metrics ingestion—is unmistakably communicated.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/migration/stream_metadata_migration.rs (1)

161-174: v4_v5 misclassifies pbilling as UserDefined when stream_type is absent.

Stream type is set to Internal only for PMETA_STREAM_NAME; BILLING_METRICS_STREAM_NAME must be included.

-    if stream_type.is_none() {
-        if stream_name.eq(PMETA_STREAM_NAME) {
+    if stream_type.is_none() {
+        if stream_name.eq(PMETA_STREAM_NAME) || stream_name.eq(BILLING_METRICS_STREAM_NAME) {
             stream_metadata_map.insert(
                 "stream_type".to_owned(),
                 Value::String(storage::StreamType::Internal.to_string()),
             );
         } else {

Add unit tests that assert Internal for both "pmeta" and "pbilling" in v4_v5. Based on learnings.

🧹 Nitpick comments (9)
src/hottier.rs (1)

699-712: Only PMETA gets a hot tier; clarify pbilling intent.

If pbilling also needs hot tier in some deployments, mirror this block for BILLING_METRICS_STREAM_NAME; else add a brief comment stating pbilling is intentionally excluded.

src/parseable/mod.rs (2)

396-407: Align PMETA log_source with internal ingestion change.

Internal ingestion writes FORMAT_KEY=json; PMETA here is created with LogSource::Pmeta. Prefer consistency.

-        let log_source_entry = LogSourceEntry::new(LogSource::Pmeta, HashSet::new());
+        let log_source_entry = LogSourceEntry::new(LogSource::Json, HashSet::new());

Legacy stores remain covered by v5→v6 mapping from "Pmeta"→"pmeta". Add a brief note in release docs.


435-437: Sync order/headers OK; optionally collapse duplicate calls.

You can DRY this by looping over [PMETA_STREAM_NAME, BILLING_METRICS_STREAM_NAME].

src/storage/object_storage.rs (1)

607-615: Early return is unprotected by upstream guards; consider clarifying invariant or adding trace logs.

Verification found that the early return bypassing merge semantics is NOT protected as the review assumed:

  • In migration/mod.rs (line 248): called unconditionally as a fallback with no upstream guards
  • In parseable/mod.rs (line 333): caller only checks metastore.list_streams(), not in-memory state, so the early return can still execute

The different code paths (early return fetches metastore JSON; normal path merges ingestor sources) represent different behavior that could mask races or redundant calls. Suggestions remain valid:

  • Add a trace log documenting the early-return scenario and where it's expected
  • Or enforce the invariant at callers: ensure they check PARSEABLE.get_stream() before calling this method
src/handlers/http/cluster/mod.rs (5)

72-87: Event schema: prefer timezone-aware timestamp; dedupe literal

Consider:

  • Use DateTime for event_time to avoid timezone ambiguity and cross-language parsing issues.
  • Hoist "billing-metrics" into a constant to avoid magic strings.

Example constant and usage:

+const BILLING_EVENT_TYPE: &str = "billing-metrics";

Then replace event_type assignments with:

-    event_type: "billing-metrics".to_string(),
+    event_type: BILLING_EVENT_TYPE.to_string(),

Also verify chrono's serde feature is enabled so event_time serializes correctly.


1239-1257: Robustness: guard non-finite/negative counter values

Before dispatching by metric kind, skip NaN/∞ (and optionally negatives) to avoid bogus events from scraped samples.

 fn process_sample(
     collector: &mut BillingMetricsCollector,
     sample: &prometheus_parse::Sample,
     val: f64,
 ) {
+    if !val.is_finite() {
+        return;
+    }
     match sample.metric.as_str() {

1402-1455: Potential compile break: NodeType::to_string requires Display

If NodeType doesn't implement Display, this won't compile. Safer default is Debug formatting.

-                node.node_type().to_string(),
+                format!("{:?}", node.node_type()),

Would you prefer I open a quick follow-up to implement Display for NodeType instead?


1457-1486: Unbounded concurrency on node fetches; cap to protect the cluster

Large clusters could create a burst of concurrent requests. Mirror the limited concurrency pattern used elsewhere.

-        .buffer_unordered(nodes_len) // No concurrency limit
+        .buffer_unordered(MAX_CONCURRENT_BILLING_FETCHES)

Add near other consts:

const MAX_CONCURRENT_BILLING_FETCHES: usize = 16;

1563-1617: Scheduler ingestion flow: bootstrap pbilling and add context

  • If pbilling stream is missing, ingestion will fail every minute. Ensure stream bootstrap (create if absent) before first ingestion.
  • Consider adding anyhow::Context on failures to aid ops triage.

Would you like me to add a small bootstrap step (create internal streams pmeta/pbilling if not found) in initialization?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f6da2e and 9f89d30.

📒 Files selected for processing (6)
  • src/handlers/http/cluster/mod.rs (4 hunks)
  • src/handlers/http/ingest.rs (1 hunks)
  • src/hottier.rs (2 hunks)
  • src/migration/stream_metadata_migration.rs (2 hunks)
  • src/parseable/mod.rs (3 hunks)
  • src/storage/object_storage.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-28T17:10:39.448Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1392
File: src/migration/stream_metadata_migration.rs:303-322
Timestamp: 2025-07-28T17:10:39.448Z
Learning: In Parseable's migration system (src/migration/stream_metadata_migration.rs), each migration function updates the metadata to the current latest format using CURRENT_OBJECT_STORE_VERSION and CURRENT_SCHEMA_VERSION constants, rather than producing incremental versions. For example, v5_v6 function produces v7 format output when these constants are set to "v7", not v6 format.

Applied to files:

  • src/migration/stream_metadata_migration.rs
🧬 Code graph analysis (4)
src/storage/object_storage.rs (1)
src/parseable/mod.rs (1)
  • new (180-194)
src/migration/stream_metadata_migration.rs (1)
src/validator.rs (1)
  • stream_name (36-71)
src/parseable/mod.rs (2)
src/handlers/http/cluster/mod.rs (2)
  • sync_streams_with_ingestors (320-372)
  • new (110-117)
src/event/format/mod.rs (1)
  • new (126-131)
src/handlers/http/cluster/mod.rs (3)
src/handlers/http/modal/mod.rs (7)
  • new (288-310)
  • node_type (569-569)
  • node_type (582-584)
  • domain_name (567-567)
  • domain_name (574-576)
  • serde_json (372-372)
  • serde_json (620-620)
src/handlers/http/mod.rs (1)
  • base_path_without_preceding_slash (77-79)
src/handlers/http/ingest.rs (1)
  • ingest_internal_stream (133-156)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
🔇 Additional comments (3)
src/handlers/http/ingest.rs (1)

139-140: Metadata mismatch confirmed but appears intentional—verify design rationale.

The mismatch is real: PMETA stream created with LogSource::Pmeta (line 397) but events ingested with FORMAT_KEY="json" (line 139). However, the comment "For internal streams, use old schema" combined with explicit SchemaVersion::V0 usage suggests this is intentional backward compatibility design.

The p_format field is not validated downstream—FORMAT_VERIFY_KEY is the active validation field. No functional breakage occurs, but the metadata inconsistency could confuse operators or future maintainers.

Verification needed: Confirm whether setting FORMAT_KEY to LogSource::Json for PMETA is the correct design choice, or if it should use LogSource::Pmeta to align with stream creation metadata.

src/handlers/http/cluster/mod.rs (2)

61-62: Streams: ensure pbilling exists and PMETA rename is complete

Looks good. Please confirm:

  • pbilling internal stream is created at startup (schema matches BillingMetricEvent).
  • All INTERNAL_STREAM_NAME references are updated to PMETA_STREAM_NAME across the repo.

1221-1237: LGTM: scrape-to-events pipeline shape

Clean separation: scrape -> collect -> flatten. Good foundation for future metric types.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/parseable/mod.rs (1)

396-456: Excellent implementation of resilient dual-stream initialization!

The refactored logic correctly handles all scenarios for creating pmeta and pbilling internal streams:

  • ✅ Individual creation failures are logged but don't abort server initialization
  • ✅ Successfully created streams are properly synced with ingestors
  • ✅ Early return optimization when both streams already exist
  • ✅ Sync errors are logged but don't abort initialization
  • ✅ Always returns Ok(()) for initialization resilience

The implementation aligns perfectly with the recorded learning and addresses the past review discussion. Based on learnings.

Optional refinement:

The comment at line 419 could be more precise:

-        // Check if either stream creation failed
+        // Log any stream creation failures
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c240104 and 2d47aad.

📒 Files selected for processing (1)
  • src/parseable/mod.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-21T02:22:24.392Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1448
File: src/parseable/mod.rs:419-432
Timestamp: 2025-10-21T02:22:24.392Z
Learning: In Parseable's internal stream creation (`create_internal_stream_if_not_exists` in `src/parseable/mod.rs`), errors should not propagate to fail server initialization. The function creates both pmeta and pbilling internal streams, and failures are logged but the function always returns `Ok(())` to ensure server startup resilience. Individual stream creation failures should not prevent syncing of successfully created streams.

Applied to files:

  • src/parseable/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/parseable/mod.rs
🧬 Code graph analysis (1)
src/parseable/mod.rs (1)
src/handlers/http/cluster/mod.rs (2)
  • sync_streams_with_ingestors (320-372)
  • new (110-117)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (1)
src/parseable/mod.rs (1)

52-54: LGTM! Clean import additions.

The new imports for BILLING_METRICS_STREAM_NAME, PMETA_STREAM_NAME, and sync_streams_with_ingestors are appropriately scoped and necessary for the dual internal stream creation functionality.

@nitisht nitisht merged commit bc87257 into parseablehq:main Oct 21, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants