Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion resources/formats.json
Original file line number Diff line number Diff line change
Expand Up @@ -1466,8 +1466,22 @@
]
},
{
"name": "rust_server_logs",
"name": "parseable_server_logs",
"regex": [
{
"pattern": "^(?P<customer_id>\\S+)\\s+(?P<deployment_id>\\S+)\\s+(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z?)\\s+(?P<level>\\w+)\\s+(?P<logger_context>\\S+)\\s+(?P<thread_id>ThreadId\\(\\d+\\))\\s+(?P<module>.*?):(?P<line_number>\\d+):\\s+(?P<body>.*)",
"fields": [
"customer_id",
"deployment_id",
"timestamp",
"level",
"logger_context",
"thread_id",
"module",
"line_number",
"body"
]
},
{
"pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z)\\s+(?P<level>\\w+)\\s+(?P<logger_context>\\S+)\\s+(?P<thread_id>ThreadId\\(\\d+\\))\\s+(?P<module>.*?):(?P<line_number>\\d+):\\s+(?P<body>.*)",
"fields": [
Expand Down
6 changes: 3 additions & 3 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,17 @@ fn extract_partition_metrics(stream_name: &str, partition_lower: DateTime<Utc>)

let events_ingested = EVENTS_INGESTED_DATE
.get_metric_with_label_values(&event_labels)
.map(|metric| metric.get() as u64)
.map(|metric| metric.get())
.unwrap_or(0);

let ingestion_size = EVENTS_INGESTED_SIZE_DATE
.get_metric_with_label_values(&event_labels)
.map(|metric| metric.get() as u64)
.map(|metric| metric.get())
.unwrap_or(0);

let storage_size = EVENTS_STORAGE_SIZE_DATE
.get_metric_with_label_values(&storage_labels)
.map(|metric| metric.get() as u64)
.map(|metric| metric.get())
.unwrap_or(0);

(events_ingested, ingestion_size, storage_size)
Expand Down
18 changes: 6 additions & 12 deletions src/event/format/known_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,25 +515,19 @@ mod tests {
}

#[test]
fn test_rust_server_logs() {
fn test_parseable_server_logs() {
let processor = EventProcessor::new(FORMATS_JSON);
let schema = processor
.schema_definitions
.get("rust_server_logs")
.get("parseable_server_logs")
.unwrap();

let test_logs = vec![
// Current parseable format with ThreadId
"2025-09-06T10:43:01.628980875Z WARN main ThreadId(01) parseable::handlers::http::cluster:919: node http://0.0.0.0:8010/ is not live",
"2025-09-06T10:44:12.62276265Z ERROR actix-rt|system:0|arbiter:17 ThreadId(163) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named a. Valid fields are serverlogs.log\")",
"2025-09-06T05:16:46.092071318Z ERROR actix-rt|system:0|arbiter:21 ThreadId(167) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named ansible.host.ip\")",
"2025-09-06T11:22:07.500864363Z WARN main ThreadId(01) parseable_enterprise:70: Received shutdown signal, notifying server to shut down...",
// env_logger format
"[2025-09-06T10:43:01.628980875Z INFO parseable::storage] Initializing storage backend",
"[2025-09-06T10:43:01.628980875Z ERROR parseable::http::ingest] Failed to parse JSON",
// Simple tracing format (no ThreadId)
"2025-09-06T10:43:01.628980875Z INFO parseable::storage::s3: Storage configured successfully",
"2025-09-06T10:43:01.628980875Z DEBUG parseable::query::engine: Query executed in 45ms",
"01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T10:43:01.628980875Z WARN main ThreadId(01) parseable::handlers::http::cluster:919: node http://0.0.0.0:8010/ is not live",
"01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T10:44:12.62276265Z ERROR actix-rt|system:0|arbiter:17 ThreadId(163) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named a. Valid fields are serverlogs.log\")",
"01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T05:16:46.092071318Z ERROR actix-rt|system:0|arbiter:21 ThreadId(167) parseable_enterprise::http::handlers::query:43: JsonParse(\"Datafusion Error: Schema error: No field named ansible.host.ip\")",
"01K4SHM6VQASBJ7G8V0STZN6N1 01K4SHM6VQASBJ7G8V0STZN6N1 2025-09-06T11:22:07.500864363Z WARN main ThreadId(01) parseable_enterprise:70: Received shutdown signal, notifying server to shut down...",
];

for (i, log_text) in test_logs.iter().enumerate() {
Expand Down
6 changes: 6 additions & 0 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use self::error::EventError;
use crate::{
LOCK_EXPECT,
metadata::update_stats,
metrics::{increment_events_ingested_by_date, increment_events_ingested_size_by_date},
parseable::{PARSEABLE, StagingError},
storage::StreamType,
};
Expand Down Expand Up @@ -88,6 +89,11 @@ impl Event {
self.parsed_timestamp.date(),
);

// Track billing metrics for event ingestion
let date_string = self.parsed_timestamp.date().to_string();
increment_events_ingested_by_date(self.rb.num_rows() as u64, &date_string);
increment_events_ingested_size_by_date(self.origin_size, &date_string);

crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);

Ok(())
Expand Down
2 changes: 0 additions & 2 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ impl ParseableServer for IngestServer {
})
.await;

PARSEABLE.storage.register_store_metrics(prometheus);

migration::run_migration(&PARSEABLE).await?;

// local sync on init
Expand Down
1 change: 0 additions & 1 deletion src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ impl ParseableServer for QueryServer {
prometheus: &PrometheusMetrics,
shutdown_rx: oneshot::Receiver<()>,
) -> anyhow::Result<()> {
PARSEABLE.storage.register_store_metrics(prometheus);
// write the ingestor metadata to storage
QUERIER_META
.get_or_init(|| async {
Expand Down
2 changes: 0 additions & 2 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ impl ParseableServer for Server {
prometheus: &PrometheusMetrics,
shutdown_rx: oneshot::Receiver<()>,
) -> anyhow::Result<()> {
PARSEABLE.storage.register_store_metrics(prometheus);

migration::run_migration(&PARSEABLE).await?;

// load on init
Expand Down
10 changes: 8 additions & 2 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use tokio::task::JoinSet;
use tracing::{error, warn};

use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::metrics::{QUERY_EXECUTE_TIME, increment_query_calls_by_date};
use crate::parseable::{PARSEABLE, StreamNotFound};
use crate::query::error::ExecuteError;
use crate::query::{CountsRequest, Query as LogicalQuery, execute};
Expand Down Expand Up @@ -123,6 +123,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
user_auth_for_datasets(&permissions, &tables).await?;
let time = Instant::now();

// Track billing metrics for query calls
let current_date = chrono::Utc::now().date_naive().to_string();
increment_query_calls_by_date(&current_date);

// if the query is `select count(*) from <dataset>`
// we use the `get_bin_density` method to get the count of records in the dataset
// instead of executing the query using datafusion
Expand Down Expand Up @@ -348,7 +352,9 @@ pub async fn get_counts(

// does user have access to table?
user_auth_for_datasets(&permissions, std::slice::from_ref(&body.stream)).await?;

// Track billing metrics for query calls
let current_date = chrono::Utc::now().date_naive().to_string();
increment_query_calls_by_date(&current_date);
// if the user has given a sql query (counts call with filters applied), then use this flow
// this could include filters or group by
if body.conditions.is_some() {
Expand Down
10 changes: 5 additions & 5 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ pub fn update_stats(
.add(num_rows as i64);
EVENTS_INGESTED_DATE
.with_label_values(&[stream_name, origin, &parsed_date])
.add(num_rows as i64);
.inc_by(num_rows as u64);
EVENTS_INGESTED_SIZE
.with_label_values(&[stream_name, origin])
.add(size as i64);
EVENTS_INGESTED_SIZE_DATE
.with_label_values(&[stream_name, origin, &parsed_date])
.add(size as i64);
.inc_by(size);
LIFETIME_EVENTS_INGESTED
.with_label_values(&[stream_name, origin])
.add(num_rows as i64);
Expand Down Expand Up @@ -173,12 +173,12 @@ pub fn load_daily_metrics(manifests: &Vec<ManifestItem>, stream_name: &str) {
let storage_size = manifest.storage_size;
EVENTS_INGESTED_DATE
.with_label_values(&[stream_name, "json", &manifest_date])
.set(events_ingested as i64);
.inc_by(events_ingested);
EVENTS_INGESTED_SIZE_DATE
.with_label_values(&[stream_name, "json", &manifest_date])
.set(ingestion_size as i64);
.inc_by(ingestion_size);
EVENTS_STORAGE_SIZE_DATE
.with_label_values(&["data", stream_name, "parquet", &manifest_date])
.set(storage_size as i64);
.inc_by(storage_size);
}
}
Loading
Loading