Skip to content

Commit b106106

Browse files
author
Devdutt Shenoi
authored
Merge branch 'main' into main
Signed-off-by: Devdutt Shenoi <devdutt@parseable.com>
2 parents e15b0d2 + 10aef7b commit b106106

File tree

15 files changed

+306
-260
lines changed

15 files changed

+306
-260
lines changed

src/cli.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,12 @@ pub struct Options {
260260
help = "Set a fixed memory limit for query in GiB"
261261
)]
262262
pub query_memory_pool_size: Option<usize>,
263-
263+
// reduced the max row group size from 1048576
264+
// smaller row groups help in faster query performance in multi threaded query
264265
#[arg(
265266
long,
266267
env = "P_PARQUET_ROW_GROUP_SIZE",
267-
default_value = "1048576",
268+
default_value = "262144",
268269
help = "Number of rows in a row group"
269270
)]
270271
pub row_group_size: usize,

src/handlers/airplane.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,14 @@ impl FlightService for AirServiceImpl {
215215
Status::permission_denied("User Does not have permission to access this")
216216
})?;
217217
let time = Instant::now();
218-
let (records, _) = query
219-
.execute(stream_name.clone())
220-
.await
221-
.map_err(|err| Status::internal(err.to_string()))?;
218+
219+
let stream_name_clone = stream_name.clone();
220+
let (records, _) =
221+
match tokio::task::spawn_blocking(move || query.execute(stream_name_clone)).await {
222+
Ok(Ok((records, fields))) => (records, fields),
223+
Ok(Err(e)) => return Err(Status::internal(e.to_string())),
224+
Err(err) => return Err(Status::internal(err.to_string())),
225+
};
222226

223227
/*
224228
* INFO: No returning the schema with the data.

src/handlers/http/cluster/mod.rs

Lines changed: 23 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -368,64 +368,29 @@ pub async fn sync_role_update_with_ingestors(
368368
Ok(())
369369
}
370370

371-
pub async fn fetch_daily_stats_from_ingestors(
372-
stream_name: &str,
371+
pub fn fetch_daily_stats_from_ingestors(
373372
date: &str,
373+
stream_meta_list: &[ObjectStoreFormat],
374374
) -> Result<Stats, StreamError> {
375-
let mut total_events_ingested: u64 = 0;
376-
let mut total_ingestion_size: u64 = 0;
377-
let mut total_storage_size: u64 = 0;
378-
379-
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
380-
error!("Fatal: failed to get ingestor info: {:?}", err);
381-
StreamError::Anyhow(err)
382-
})?;
383-
for ingestor in ingestor_infos.iter() {
384-
let uri = Url::parse(&format!(
385-
"{}{}/metrics",
386-
&ingestor.domain_name,
387-
base_path_without_preceding_slash()
388-
))
389-
.map_err(|err| {
390-
StreamError::Anyhow(anyhow::anyhow!("Invalid URL in Ingestor Metadata: {}", err))
391-
})?;
392-
393-
let res = HTTP_CLIENT
394-
.get(uri)
395-
.header(header::AUTHORIZATION, &ingestor.token)
396-
.header(header::CONTENT_TYPE, "application/json")
397-
.send()
398-
.await;
399-
400-
if let Ok(res) = res {
401-
let text = res
402-
.text()
403-
.await
404-
.map_err(|err| StreamError::Anyhow(anyhow::anyhow!("Request failed: {}", err)))?;
405-
let lines: Vec<Result<String, std::io::Error>> =
406-
text.lines().map(|line| Ok(line.to_owned())).collect_vec();
407-
408-
let sample = prometheus_parse::Scrape::parse(lines.into_iter())
409-
.map_err(|err| {
410-
StreamError::Anyhow(anyhow::anyhow!(
411-
"Invalid URL in Ingestor Metadata: {}",
412-
err
413-
))
414-
})?
415-
.samples;
416-
417-
let (events_ingested, ingestion_size, storage_size) =
418-
Metrics::get_daily_stats_from_samples(sample, stream_name, date);
419-
total_events_ingested += events_ingested;
420-
total_ingestion_size += ingestion_size;
421-
total_storage_size += storage_size;
375+
// for the given date, get the stats from the ingestors
376+
let mut events_ingested = 0;
377+
let mut ingestion_size = 0;
378+
let mut storage_size = 0;
379+
380+
for meta in stream_meta_list.iter() {
381+
for manifest in meta.snapshot.manifest_list.iter() {
382+
if manifest.time_lower_bound.date_naive().to_string() == date {
383+
events_ingested += manifest.events_ingested;
384+
ingestion_size += manifest.ingestion_size;
385+
storage_size += manifest.storage_size;
386+
}
422387
}
423388
}
424389

425390
let stats = Stats {
426-
events: total_events_ingested,
427-
ingestion: total_ingestion_size,
428-
storage: total_storage_size,
391+
events: events_ingested,
392+
ingestion: ingestion_size,
393+
storage: storage_size,
429394
};
430395
Ok(stats)
431396
}
@@ -475,17 +440,17 @@ pub async fn fetch_stats_from_ingestors(
475440
Utc::now(),
476441
IngestionStats::new(
477442
count,
478-
format!("{} Bytes", ingestion_size),
443+
ingestion_size,
479444
lifetime_count,
480-
format!("{} Bytes", lifetime_ingestion_size),
445+
lifetime_ingestion_size,
481446
deleted_count,
482-
format!("{} Bytes", deleted_ingestion_size),
447+
deleted_ingestion_size,
483448
"json",
484449
),
485450
StorageStats::new(
486-
format!("{} Bytes", storage_size),
487-
format!("{} Bytes", lifetime_storage_size),
488-
format!("{} Bytes", deleted_storage_size),
451+
storage_size,
452+
lifetime_storage_size,
453+
deleted_storage_size,
489454
"parquet",
490455
),
491456
);

src/handlers/http/cluster/utils.rs

Lines changed: 16 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -93,22 +93,22 @@ impl ClusterInfo {
9393
#[derive(Debug, Default, Serialize, Deserialize)]
9494
pub struct IngestionStats {
9595
pub count: u64,
96-
pub size: String,
96+
pub size: u64,
9797
pub format: String,
9898
pub lifetime_count: u64,
99-
pub lifetime_size: String,
99+
pub lifetime_size: u64,
100100
pub deleted_count: u64,
101-
pub deleted_size: String,
101+
pub deleted_size: u64,
102102
}
103103

104104
impl IngestionStats {
105105
pub fn new(
106106
count: u64,
107-
size: String,
107+
size: u64,
108108
lifetime_count: u64,
109-
lifetime_size: String,
109+
lifetime_size: u64,
110110
deleted_count: u64,
111-
deleted_size: String,
111+
deleted_size: u64,
112112
format: &str,
113113
) -> Self {
114114
Self {
@@ -125,14 +125,14 @@ impl IngestionStats {
125125

126126
#[derive(Debug, Default, Serialize, Deserialize)]
127127
pub struct StorageStats {
128-
pub size: String,
128+
pub size: u64,
129129
pub format: String,
130-
pub lifetime_size: String,
131-
pub deleted_size: String,
130+
pub lifetime_size: u64,
131+
pub deleted_size: u64,
132132
}
133133

134134
impl StorageStats {
135-
pub fn new(size: String, lifetime_size: String, deleted_size: String, format: &str) -> Self {
135+
pub fn new(size: u64, lifetime_size: u64, deleted_size: u64, format: &str) -> Self {
136136
Self {
137137
size,
138138
format: format.to_string(),
@@ -155,71 +155,23 @@ pub fn merge_quried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
155155
.fold(IngestionStats::default(), |acc, x| IngestionStats {
156156
count: acc.count + x.count,
157157

158-
size: format!(
159-
"{} Bytes",
160-
acc.size.split(' ').collect_vec()[0]
161-
.parse::<u64>()
162-
.unwrap_or_default()
163-
+ x.size.split(' ').collect_vec()[0]
164-
.parse::<u64>()
165-
.unwrap_or_default()
166-
),
158+
size: acc.size + x.size,
167159
format: x.format.clone(),
168160
lifetime_count: acc.lifetime_count + x.lifetime_count,
169-
lifetime_size: format!(
170-
"{} Bytes",
171-
acc.lifetime_size.split(' ').collect_vec()[0]
172-
.parse::<u64>()
173-
.unwrap_or_default()
174-
+ x.lifetime_size.split(' ').collect_vec()[0]
175-
.parse::<u64>()
176-
.unwrap_or_default()
177-
),
161+
lifetime_size: acc.lifetime_size + x.lifetime_size,
178162
deleted_count: acc.deleted_count + x.deleted_count,
179-
deleted_size: format!(
180-
"{} Bytes",
181-
acc.deleted_size.split(' ').collect_vec()[0]
182-
.parse::<u64>()
183-
.unwrap_or_default()
184-
+ x.deleted_size.split(' ').collect_vec()[0]
185-
.parse::<u64>()
186-
.unwrap_or_default()
187-
),
163+
deleted_size: acc.deleted_size + x.deleted_size,
188164
});
189165

190166
let cumulative_storage =
191167
stats
192168
.iter()
193169
.map(|x| &x.storage)
194170
.fold(StorageStats::default(), |acc, x| StorageStats {
195-
size: format!(
196-
"{} Bytes",
197-
acc.size.split(' ').collect_vec()[0]
198-
.parse::<u64>()
199-
.unwrap_or_default()
200-
+ x.size.split(' ').collect_vec()[0]
201-
.parse::<u64>()
202-
.unwrap_or_default()
203-
),
171+
size: acc.size + x.size,
204172
format: x.format.clone(),
205-
lifetime_size: format!(
206-
"{} Bytes",
207-
acc.lifetime_size.split(' ').collect_vec()[0]
208-
.parse::<u64>()
209-
.unwrap_or_default()
210-
+ x.lifetime_size.split(' ').collect_vec()[0]
211-
.parse::<u64>()
212-
.unwrap_or_default()
213-
),
214-
deleted_size: format!(
215-
"{} Bytes",
216-
acc.deleted_size.split(' ').collect_vec()[0]
217-
.parse::<u64>()
218-
.unwrap_or_default()
219-
+ x.deleted_size.split(' ').collect_vec()[0]
220-
.parse::<u64>()
221-
.unwrap_or_default()
222-
),
173+
lifetime_size: acc.lifetime_size + x.lifetime_size,
174+
deleted_size: acc.deleted_size + x.deleted_size,
223175
});
224176

225177
QueriedStats::new(

src/handlers/http/logstream.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -268,17 +268,17 @@ pub async fn get_stats(
268268
let stats = {
269269
let ingestion_stats = IngestionStats::new(
270270
stats.current_stats.events,
271-
format!("{} Bytes", stats.current_stats.ingestion),
271+
stats.current_stats.ingestion,
272272
stats.lifetime_stats.events,
273-
format!("{} Bytes", stats.lifetime_stats.ingestion),
273+
stats.lifetime_stats.ingestion,
274274
stats.deleted_stats.events,
275-
format!("{} Bytes", stats.deleted_stats.ingestion),
275+
stats.deleted_stats.ingestion,
276276
"json",
277277
);
278278
let storage_stats = StorageStats::new(
279-
format!("{} Bytes", stats.current_stats.storage),
280-
format!("{} Bytes", stats.lifetime_stats.storage),
281-
format!("{} Bytes", stats.deleted_stats.storage),
279+
stats.current_stats.storage,
280+
stats.lifetime_stats.storage,
281+
stats.deleted_stats.storage,
282282
"parquet",
283283
);
284284

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use actix_web::{
2626
use bytes::Bytes;
2727
use chrono::Utc;
2828
use http::StatusCode;
29+
use relative_path::RelativePathBuf;
2930
use tokio::sync::Mutex;
3031
use tracing::{error, warn};
3132

@@ -44,7 +45,7 @@ use crate::{
4445
hottier::HotTierManager,
4546
parseable::{StreamNotFound, PARSEABLE},
4647
stats::{self, Stats},
47-
storage::StreamType,
48+
storage::{ObjectStoreFormat, StreamType, STREAM_ROOT_DIRECTORY},
4849
};
4950

5051
pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
@@ -151,7 +152,35 @@ pub async fn get_stats(
151152

152153
if !date_value.is_empty() {
153154
let querier_stats = get_stats_date(&stream_name, date_value).await?;
154-
let ingestor_stats = fetch_daily_stats_from_ingestors(&stream_name, date_value).await?;
155+
156+
// this function requires all the ingestor stream jsons
157+
let path = RelativePathBuf::from_iter([&stream_name, STREAM_ROOT_DIRECTORY]);
158+
let obs = PARSEABLE
159+
.storage
160+
.get_object_store()
161+
.get_objects(
162+
Some(&path),
163+
Box::new(|file_name| {
164+
file_name.starts_with(".ingestor") && file_name.ends_with("stream.json")
165+
}),
166+
)
167+
.await?;
168+
169+
let mut ingestor_stream_jsons = Vec::new();
170+
for ob in obs {
171+
let stream_metadata: ObjectStoreFormat = match serde_json::from_slice(&ob) {
172+
Ok(d) => d,
173+
Err(e) => {
174+
error!("Failed to parse stream metadata: {:?}", e);
175+
continue;
176+
}
177+
};
178+
ingestor_stream_jsons.push(stream_metadata);
179+
}
180+
181+
let ingestor_stats =
182+
fetch_daily_stats_from_ingestors(date_value, &ingestor_stream_jsons)?;
183+
155184
let total_stats = Stats {
156185
events: querier_stats.events + ingestor_stats.events,
157186
ingestion: querier_stats.ingestion + ingestor_stats.ingestion,
@@ -180,17 +209,17 @@ pub async fn get_stats(
180209
let stats = {
181210
let ingestion_stats = IngestionStats::new(
182211
stats.current_stats.events,
183-
format!("{} Bytes", stats.current_stats.ingestion),
212+
stats.current_stats.ingestion,
184213
stats.lifetime_stats.events,
185-
format!("{} Bytes", stats.lifetime_stats.ingestion),
214+
stats.lifetime_stats.ingestion,
186215
stats.deleted_stats.events,
187-
format!("{} Bytes", stats.deleted_stats.ingestion),
216+
stats.deleted_stats.ingestion,
188217
"json",
189218
);
190219
let storage_stats = StorageStats::new(
191-
format!("{} Bytes", stats.current_stats.storage),
192-
format!("{} Bytes", stats.lifetime_stats.storage),
193-
format!("{} Bytes", stats.deleted_stats.storage),
220+
stats.current_stats.storage,
221+
stats.lifetime_stats.storage,
222+
stats.deleted_stats.storage,
194223
"parquet",
195224
);
196225

0 commit comments

Comments
 (0)