Skip to content

Commit 93f136c

Browse files
BlakeOrthalamb
andauthored
Adds instrumentation to PUT ops in the CLI (#18139)
## Which issue does this PR close? This does not fully close, but is an incremental building block component for: - #17207 The full context of how this code is likely to progress can be seen in the POC for this effort: - #17266 ## Rationale for this change Further fills out the missing methods that have yet to be instrumented in the instrumented object store. ## What changes are included in this PR? - Adds instrumentation around put_opts - Adds instrumentation around put_multipart - Adds tests for newly instrumented methods ## Are these changes tested? Yes. Unit tests have been added for the new methods Example output: ```sql DataFusion CLI v50.2.0 > CREATE EXTERNAL TABLE test(a bigint, b bigint) STORED AS parquet LOCATION '../../test_table/'; 0 row(s) fetched. Elapsed 0.003 seconds. > \object_store_profiling trace ObjectStore Profile mode set to Trace > INSERT INTO test values (1, 2), (3, 4); +-------+ | count | +-------+ | 2 | +-------+ 1 row(s) fetched. Elapsed 0.007 seconds. Object Store Profiling Instrumented Object Store: instrument_mode: Trace, inner: LocalFileSystem(file:///) 2025-10-17T19:02:15.440246215+00:00 operation=List path=home/blake/open_source_src/datafusion-BlakeOrth/test_table 2025-10-17T19:02:15.444096012+00:00 operation=Put duration=0.000249s size=815 path=home/blake/open_source_src/datafusion-BlakeOrth/test_table/a9pjKBxSOtXZobJO_0.parquet Summaries: List count: 1 Put count: 1 duration min: 0.000249s duration max: 0.000249s duration avg: 0.000249s size min: 815 B size max: 815 B size avg: 815 B size sum: 815 B > ``` (note: I have no idea how to exercise/show a multi-part put operation, or if DataFusion even utilizes multipart puts for large files) ## Are there any user-facing changes? No-ish cc @alamb --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 0ddc82e commit 93f136c

File tree

1 file changed

+128
-4
lines changed

1 file changed

+128
-4
lines changed

datafusion-cli/src/object_storage/instrumented.rs

Lines changed: 128 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,54 @@ impl InstrumentedObjectStore {
121121
!= InstrumentedObjectStoreMode::Disabled as u8
122122
}
123123

124+
async fn instrumented_put_opts(
125+
&self,
126+
location: &Path,
127+
payload: PutPayload,
128+
opts: PutOptions,
129+
) -> Result<PutResult> {
130+
let timestamp = Utc::now();
131+
let start = Instant::now();
132+
let size = payload.content_length();
133+
let ret = self.inner.put_opts(location, payload, opts).await?;
134+
let elapsed = start.elapsed();
135+
136+
self.requests.lock().push(RequestDetails {
137+
op: Operation::Put,
138+
path: location.clone(),
139+
timestamp,
140+
duration: Some(elapsed),
141+
size: Some(size),
142+
range: None,
143+
extra_display: None,
144+
});
145+
146+
Ok(ret)
147+
}
148+
149+
async fn instrumented_put_multipart(
150+
&self,
151+
location: &Path,
152+
opts: PutMultipartOptions,
153+
) -> Result<Box<dyn MultipartUpload>> {
154+
let timestamp = Utc::now();
155+
let start = Instant::now();
156+
let ret = self.inner.put_multipart_opts(location, opts).await?;
157+
let elapsed = start.elapsed();
158+
159+
self.requests.lock().push(RequestDetails {
160+
op: Operation::Put,
161+
path: location.clone(),
162+
timestamp,
163+
duration: Some(elapsed),
164+
size: None,
165+
range: None,
166+
extra_display: None,
167+
});
168+
169+
Ok(ret)
170+
}
171+
124172
async fn instrumented_get_opts(
125173
&self,
126174
location: &Path,
@@ -209,6 +257,10 @@ impl ObjectStore for InstrumentedObjectStore {
209257
payload: PutPayload,
210258
opts: PutOptions,
211259
) -> Result<PutResult> {
260+
if self.enabled() {
261+
return self.instrumented_put_opts(location, payload, opts).await;
262+
}
263+
212264
self.inner.put_opts(location, payload, opts).await
213265
}
214266

@@ -217,6 +269,10 @@ impl ObjectStore for InstrumentedObjectStore {
217269
location: &Path,
218270
opts: PutMultipartOptions,
219271
) -> Result<Box<dyn MultipartUpload>> {
272+
if self.enabled() {
273+
return self.instrumented_put_multipart(location, opts).await;
274+
}
275+
220276
self.inner.put_multipart_opts(location, opts).await
221277
}
222278

@@ -269,7 +325,7 @@ pub enum Operation {
269325
Get,
270326
_Head,
271327
List,
272-
_Put,
328+
Put,
273329
}
274330

275331
impl fmt::Display for Operation {
@@ -599,6 +655,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {
599655

600656
#[cfg(test)]
601657
mod tests {
658+
use object_store::WriteMultipart;
659+
602660
use super::*;
603661
use insta::assert_snapshot;
604662

@@ -741,6 +799,72 @@ mod tests {
741799
assert!(request.extra_display.is_none());
742800
}
743801

802+
#[tokio::test]
803+
async fn instrumented_store_put_opts() {
804+
// The `setup_test_store()` method comes with data already `put` into it, so we'll setup
805+
// manually for this test
806+
let store = Arc::new(object_store::memory::InMemory::new());
807+
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
808+
let instrumented = InstrumentedObjectStore::new(store, mode);
809+
810+
let path = Path::from("test/data");
811+
let payload = PutPayload::from_static(b"test_data");
812+
let size = payload.content_length();
813+
814+
// By default no requests should be instrumented/stored
815+
assert!(instrumented.requests.lock().is_empty());
816+
instrumented.put(&path, payload.clone()).await.unwrap();
817+
assert!(instrumented.requests.lock().is_empty());
818+
819+
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
820+
assert!(instrumented.requests.lock().is_empty());
821+
instrumented.put(&path, payload).await.unwrap();
822+
assert_eq!(instrumented.requests.lock().len(), 1);
823+
824+
let request = instrumented.take_requests().pop().unwrap();
825+
assert_eq!(request.op, Operation::Put);
826+
assert_eq!(request.path, path);
827+
assert!(request.duration.is_some());
828+
assert_eq!(request.size.unwrap(), size);
829+
assert!(request.range.is_none());
830+
assert!(request.extra_display.is_none());
831+
}
832+
833+
#[tokio::test]
834+
async fn instrumented_store_put_multipart() {
835+
// The `setup_test_store()` method comes with data already `put` into it, so we'll setup
836+
// manually for this test
837+
let store = Arc::new(object_store::memory::InMemory::new());
838+
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
839+
let instrumented = InstrumentedObjectStore::new(store, mode);
840+
841+
let path = Path::from("test/data");
842+
843+
// By default no requests should be instrumented/stored
844+
assert!(instrumented.requests.lock().is_empty());
845+
let mp = instrumented.put_multipart(&path).await.unwrap();
846+
let mut write = WriteMultipart::new(mp);
847+
write.write(b"test_data");
848+
write.finish().await.unwrap();
849+
assert!(instrumented.requests.lock().is_empty());
850+
851+
instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
852+
assert!(instrumented.requests.lock().is_empty());
853+
let mp = instrumented.put_multipart(&path).await.unwrap();
854+
let mut write = WriteMultipart::new(mp);
855+
write.write(b"test_data");
856+
write.finish().await.unwrap();
857+
assert_eq!(instrumented.requests.lock().len(), 1);
858+
859+
let request = instrumented.take_requests().pop().unwrap();
860+
assert_eq!(request.op, Operation::Put);
861+
assert_eq!(request.path, path);
862+
assert!(request.duration.is_some());
863+
assert!(request.size.is_none());
864+
assert!(request.range.is_none());
865+
assert!(request.extra_display.is_none());
866+
}
867+
744868
#[test]
745869
fn request_details() {
746870
let rd = RequestDetails {
@@ -819,7 +943,7 @@ mod tests {
819943

820944
// Add Put requests to test grouping
821945
requests.push(RequestDetails {
822-
op: Operation::_Put,
946+
op: Operation::Put,
823947
path: Path::from("test4"),
824948
timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
825949
duration: Some(Duration::from_millis(200)),
@@ -834,8 +958,8 @@ mod tests {
834958
+-----------+----------+-----------+-----------+-----------+------------+-------+
835959
| Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
836960
| Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
837-
| _Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 |
838-
| _Put | size | 75 B | 75 B | 75 B | 75 B | 1 |
961+
| Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 |
962+
| Put | size | 75 B | 75 B | 75 B | 75 B | 1 |
839963
+-----------+----------+-----------+-----------+-----------+------------+-------+
840964
");
841965
}

0 commit comments

Comments
 (0)