Skip to content

Commit ca2585e

Browse files
authored
Adds summary output to CLI instrumented object stores (#18045)
* Adds summary output to CLI instrumented object stores - Adds a `RequestSummary` type for the instrumented object store to display summary statistics about instrumented requests - Adds a generic Stats type to track the statistics for the summary - Adds tests for the new code - Adds a basic summary output to the user-facing display when profiling is enabled - Adds docs for new and newly exported public items * - Updates integration test and validation snapshot
1 parent a4c95aa commit ca2585e

File tree

4 files changed

+284
-4
lines changed

4 files changed

+284
-4
lines changed

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 256 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616
// under the License.
1717

1818
use std::{
19-
fmt,
19+
cmp, fmt,
20+
ops::AddAssign,
2021
str::FromStr,
2122
sync::{
2223
atomic::{AtomicU8, Ordering},
@@ -28,7 +29,7 @@ use std::{
2829
use async_trait::async_trait;
2930
use chrono::Utc;
3031
use datafusion::{
31-
common::instant::Instant,
32+
common::{instant::Instant, HashMap},
3233
error::DataFusionError,
3334
execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
3435
};
@@ -201,8 +202,9 @@ impl ObjectStore for InstrumentedObjectStore {
201202
}
202203
}
203204

205+
/// Object store operation types tracked by [`InstrumentedObjectStore`]
204206
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
205-
enum Operation {
207+
pub enum Operation {
206208
_Copy,
207209
_Delete,
208210
Get,
@@ -250,6 +252,103 @@ impl fmt::Display for RequestDetails {
250252
}
251253
}
252254

255+
/// Summary statistics for an [`InstrumentedObjectStore`]'s [`RequestDetails`]
256+
#[derive(Default)]
257+
pub struct RequestSummary {
258+
count: usize,
259+
duration_stats: Option<Stats<Duration>>,
260+
size_stats: Option<Stats<usize>>,
261+
}
262+
263+
impl RequestSummary {
264+
/// Generates a set of [RequestSummaries](RequestSummary) from the input [`RequestDetails`]
265+
/// grouped by the input's [`Operation`]
266+
pub fn summarize_by_operation(
267+
requests: &[RequestDetails],
268+
) -> HashMap<Operation, Self> {
269+
let mut summaries: HashMap<Operation, Self> = HashMap::new();
270+
for rd in requests {
271+
match summaries.get_mut(&rd.op) {
272+
Some(rs) => rs.push(rd),
273+
None => {
274+
let mut rs = RequestSummary::default();
275+
rs.push(rd);
276+
summaries.insert(rd.op, rs);
277+
}
278+
}
279+
}
280+
281+
summaries
282+
}
283+
284+
fn push(&mut self, request: &RequestDetails) {
285+
self.count += 1;
286+
if let Some(dur) = request.duration {
287+
self.duration_stats.get_or_insert_default().push(dur)
288+
}
289+
if let Some(size) = request.size {
290+
self.size_stats.get_or_insert_default().push(size)
291+
}
292+
}
293+
}
294+
295+
impl fmt::Display for RequestSummary {
296+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
297+
writeln!(f, "count: {}", self.count)?;
298+
299+
if let Some(dur_stats) = &self.duration_stats {
300+
writeln!(f, "duration min: {:.6}s", dur_stats.min.as_secs_f32())?;
301+
writeln!(f, "duration max: {:.6}s", dur_stats.max.as_secs_f32())?;
302+
let avg = dur_stats.sum.as_secs_f32() / (self.count as f32);
303+
writeln!(f, "duration avg: {:.6}s", avg)?;
304+
}
305+
306+
if let Some(size_stats) = &self.size_stats {
307+
writeln!(f, "size min: {} B", size_stats.min)?;
308+
writeln!(f, "size max: {} B", size_stats.max)?;
309+
let avg = size_stats.sum / self.count;
310+
writeln!(f, "size avg: {} B", avg)?;
311+
writeln!(f, "size sum: {} B", size_stats.sum)?;
312+
}
313+
314+
Ok(())
315+
}
316+
}
317+
318+
struct Stats<T: Copy + Ord + AddAssign<T>> {
319+
min: T,
320+
max: T,
321+
sum: T,
322+
}
323+
324+
impl<T: Copy + Ord + AddAssign<T>> Stats<T> {
325+
fn push(&mut self, val: T) {
326+
self.min = cmp::min(val, self.min);
327+
self.max = cmp::max(val, self.max);
328+
self.sum += val;
329+
}
330+
}
331+
332+
impl Default for Stats<Duration> {
333+
fn default() -> Self {
334+
Self {
335+
min: Duration::MAX,
336+
max: Duration::ZERO,
337+
sum: Duration::ZERO,
338+
}
339+
}
340+
}
341+
342+
impl Default for Stats<usize> {
343+
fn default() -> Self {
344+
Self {
345+
min: usize::MAX,
346+
max: usize::MIN,
347+
sum: 0,
348+
}
349+
}
350+
}
351+
253352
/// Provides access to [`InstrumentedObjectStore`] instances that record requests for reporting
254353
#[derive(Debug)]
255354
pub struct InstrumentedObjectStoreRegistry {
@@ -420,4 +519,158 @@ mod tests {
420519
"1970-01-01T00:00:00+00:00 operation=Get duration=5.000000s size=10 range: bytes=0-9 path=test extra info"
421520
);
422521
}
522+
523+
#[test]
524+
fn request_summary() {
525+
// Test empty request list
526+
let mut requests = Vec::new();
527+
let summaries = RequestSummary::summarize_by_operation(&requests);
528+
assert!(summaries.is_empty());
529+
530+
requests.push(RequestDetails {
531+
op: Operation::Get,
532+
path: Path::from("test1"),
533+
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
534+
duration: Some(Duration::from_secs(5)),
535+
size: Some(100),
536+
range: None,
537+
extra_display: None,
538+
});
539+
540+
let summaries = RequestSummary::summarize_by_operation(&requests);
541+
assert_eq!(summaries.len(), 1);
542+
543+
let summary = summaries.get(&Operation::Get).unwrap();
544+
assert_eq!(summary.count, 1);
545+
assert_eq!(
546+
summary.duration_stats.as_ref().unwrap().min,
547+
Duration::from_secs(5)
548+
);
549+
assert_eq!(
550+
summary.duration_stats.as_ref().unwrap().max,
551+
Duration::from_secs(5)
552+
);
553+
assert_eq!(
554+
summary.duration_stats.as_ref().unwrap().sum,
555+
Duration::from_secs(5)
556+
);
557+
assert_eq!(summary.size_stats.as_ref().unwrap().min, 100);
558+
assert_eq!(summary.size_stats.as_ref().unwrap().max, 100);
559+
assert_eq!(summary.size_stats.as_ref().unwrap().sum, 100);
560+
561+
// Add more Get requests to test aggregation
562+
requests.push(RequestDetails {
563+
op: Operation::Get,
564+
path: Path::from("test2"),
565+
timestamp: chrono::DateTime::from_timestamp(1, 0).unwrap(),
566+
duration: Some(Duration::from_secs(8)),
567+
size: Some(150),
568+
range: None,
569+
extra_display: None,
570+
});
571+
requests.push(RequestDetails {
572+
op: Operation::Get,
573+
path: Path::from("test3"),
574+
timestamp: chrono::DateTime::from_timestamp(2, 0).unwrap(),
575+
duration: Some(Duration::from_secs(2)),
576+
size: Some(50),
577+
range: None,
578+
extra_display: None,
579+
});
580+
581+
let summaries = RequestSummary::summarize_by_operation(&requests);
582+
assert_eq!(summaries.len(), 1);
583+
584+
let summary = summaries.get(&Operation::Get).unwrap();
585+
assert_eq!(summary.count, 3);
586+
assert_eq!(
587+
summary.duration_stats.as_ref().unwrap().min,
588+
Duration::from_secs(2)
589+
);
590+
assert_eq!(
591+
summary.duration_stats.as_ref().unwrap().max,
592+
Duration::from_secs(8)
593+
);
594+
assert_eq!(
595+
summary.duration_stats.as_ref().unwrap().sum,
596+
Duration::from_secs(15)
597+
);
598+
assert_eq!(summary.size_stats.as_ref().unwrap().min, 50);
599+
assert_eq!(summary.size_stats.as_ref().unwrap().max, 150);
600+
assert_eq!(summary.size_stats.as_ref().unwrap().sum, 300);
601+
602+
// Add Put requests to test grouping
603+
requests.push(RequestDetails {
604+
op: Operation::_Put,
605+
path: Path::from("test4"),
606+
timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
607+
duration: Some(Duration::from_millis(200)),
608+
size: Some(75),
609+
range: None,
610+
extra_display: None,
611+
});
612+
613+
let summaries = RequestSummary::summarize_by_operation(&requests);
614+
assert_eq!(summaries.len(), 2);
615+
616+
let get_summary = summaries.get(&Operation::Get).unwrap();
617+
assert_eq!(get_summary.count, 3);
618+
619+
let put_summary = summaries.get(&Operation::_Put).unwrap();
620+
assert_eq!(put_summary.count, 1);
621+
assert_eq!(
622+
put_summary.duration_stats.as_ref().unwrap().min,
623+
Duration::from_millis(200)
624+
);
625+
assert_eq!(put_summary.size_stats.as_ref().unwrap().sum, 75);
626+
627+
// Test request with only duration (no size)
628+
let only_duration = vec![RequestDetails {
629+
op: Operation::Get,
630+
path: Path::from("test1"),
631+
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
632+
duration: Some(Duration::from_secs(3)),
633+
size: None,
634+
range: None,
635+
extra_display: None,
636+
}];
637+
let summaries = RequestSummary::summarize_by_operation(&only_duration);
638+
let summary = summaries.get(&Operation::Get).unwrap();
639+
assert_eq!(summary.count, 1);
640+
assert!(summary.duration_stats.is_some());
641+
assert!(summary.size_stats.is_none());
642+
643+
// Test request with only size (no duration)
644+
let only_size = vec![RequestDetails {
645+
op: Operation::Get,
646+
path: Path::from("test1"),
647+
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
648+
duration: None,
649+
size: Some(200),
650+
range: None,
651+
extra_display: None,
652+
}];
653+
let summaries = RequestSummary::summarize_by_operation(&only_size);
654+
let summary = summaries.get(&Operation::Get).unwrap();
655+
assert_eq!(summary.count, 1);
656+
assert!(summary.duration_stats.is_none());
657+
assert!(summary.size_stats.is_some());
658+
assert_eq!(summary.size_stats.as_ref().unwrap().sum, 200);
659+
660+
// Test request with neither duration nor size
661+
let no_stats = vec![RequestDetails {
662+
op: Operation::Get,
663+
path: Path::from("test1"),
664+
timestamp: chrono::DateTime::from_timestamp(0, 0).unwrap(),
665+
duration: None,
666+
size: None,
667+
range: None,
668+
extra_display: None,
669+
}];
670+
let summaries = RequestSummary::summarize_by_operation(&no_stats);
671+
let summary = summaries.get(&Operation::Get).unwrap();
672+
assert_eq!(summary.count, 1);
673+
assert!(summary.duration_stats.is_none());
674+
assert!(summary.size_stats.is_none());
675+
}
423676
}

datafusion-cli/src/print_options.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::str::FromStr;
2222
use std::sync::Arc;
2323

2424
use crate::object_storage::instrumented::{
25-
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
25+
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummary,
2626
};
2727
use crate::print_format::PrintFormat;
2828

@@ -202,6 +202,13 @@ impl PrintOptions {
202202
}
203203
// Add an extra blank line to help visually organize the output
204204
writeln!(writer)?;
205+
206+
writeln!(writer, "Summaries:")?;
207+
let summaries = RequestSummary::summarize_by_operation(&requests);
208+
for (op, summary) in summaries {
209+
writeln!(writer, "{op:?}")?;
210+
writeln!(writer, "{summary}")?;
211+
}
205212
}
206213
}
207214
}

datafusion-cli/tests/cli_integration.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,15 @@ async fn test_object_store_profiling() {
416416
"<TIMESTAMP> operation=$1 duration=[DURATION] size=$2 path=$3",
417417
);
418418

419+
// We also need to filter out the durations reported in the summary output
420+
//
421+
// Example line(s) to filter:
422+
//
423+
// duration min: 0.000729s
424+
// duration max: 0.000729s
425+
// duration avg: 0.000729s
426+
settings.add_filter(r"duration (min|max|avg): \d+\.\d{6}s", "[SUMMARY_DURATION]");
427+
419428
let _bound = settings.bind_to_scope();
420429

421430
let input = r#"

datafusion-cli/tests/snapshots/object_store_profiling@s3_url_fallback.snap

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,17 @@ Object Store Profiling
3939
Instrumented Object Store: instrument_mode: Enabled, inner: AmazonS3(data)
4040
<TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
4141

42+
Summaries:
43+
Get
44+
count: 1
45+
[SUMMARY_DURATION]
46+
[SUMMARY_DURATION]
47+
[SUMMARY_DURATION]
48+
size min: 1006 B
49+
size max: 1006 B
50+
size avg: 1006 B
51+
size sum: 1006 B
52+
4253
ObjectStore Profile mode set to Disabled
4354
+-----+-------+---------------------+
4455
| car | speed | time |

0 commit comments

Comments
 (0)