diff --git a/datafusion-cli/src/object_storage/instrumented.rs b/datafusion-cli/src/object_storage/instrumented.rs index 4486b84a90ea..722d4e1ce7a8 100644 --- a/datafusion-cli/src/object_storage/instrumented.rs +++ b/datafusion-cli/src/object_storage/instrumented.rs @@ -121,6 +121,54 @@ impl InstrumentedObjectStore { != InstrumentedObjectStoreMode::Disabled as u8 } + async fn instrumented_put_opts( + &self, + location: &Path, + payload: PutPayload, + opts: PutOptions, + ) -> Result { + let timestamp = Utc::now(); + let start = Instant::now(); + let size = payload.content_length(); + let ret = self.inner.put_opts(location, payload, opts).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Put, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: Some(size), + range: None, + extra_display: None, + }); + + Ok(ret) + } + + async fn instrumented_put_multipart( + &self, + location: &Path, + opts: PutMultipartOptions, + ) -> Result> { + let timestamp = Utc::now(); + let start = Instant::now(); + let ret = self.inner.put_multipart_opts(location, opts).await?; + let elapsed = start.elapsed(); + + self.requests.lock().push(RequestDetails { + op: Operation::Put, + path: location.clone(), + timestamp, + duration: Some(elapsed), + size: None, + range: None, + extra_display: None, + }); + + Ok(ret) + } + async fn instrumented_get_opts( &self, location: &Path, @@ -209,6 +257,10 @@ impl ObjectStore for InstrumentedObjectStore { payload: PutPayload, opts: PutOptions, ) -> Result { + if self.enabled() { + return self.instrumented_put_opts(location, payload, opts).await; + } + self.inner.put_opts(location, payload, opts).await } @@ -217,6 +269,10 @@ impl ObjectStore for InstrumentedObjectStore { location: &Path, opts: PutMultipartOptions, ) -> Result> { + if self.enabled() { + return self.instrumented_put_multipart(location, opts).await; + } + self.inner.put_multipart_opts(location, opts).await } @@ -269,7 +325,7 @@ pub enum Operation { Get, _Head, List, - _Put, + Put, } impl fmt::Display for Operation { @@ -599,6 +655,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry { #[cfg(test)] mod tests { + use object_store::WriteMultipart; + use super::*; use insta::assert_snapshot; @@ -741,6 +799,72 @@ mod tests { assert!(request.extra_display.is_none()); } + #[tokio::test] + async fn instrumented_store_put_opts() { + // The `setup_test_store()` method comes with data already `put` into it, so we'll setup + // manually for this test + let store = Arc::new(object_store::memory::InMemory::new()); + let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8); + let instrumented = InstrumentedObjectStore::new(store, mode); + + let path = Path::from("test/data"); + let payload = PutPayload::from_static(b"test_data"); + let size = payload.content_length(); + + // By default no requests should be instrumented/stored + assert!(instrumented.requests.lock().is_empty()); + instrumented.put(&path, payload.clone()).await.unwrap(); + assert!(instrumented.requests.lock().is_empty()); + + instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace); + assert!(instrumented.requests.lock().is_empty()); + instrumented.put(&path, payload).await.unwrap(); + assert_eq!(instrumented.requests.lock().len(), 1); + + let request = instrumented.take_requests().pop().unwrap(); + assert_eq!(request.op, Operation::Put); + assert_eq!(request.path, path); + assert!(request.duration.is_some()); + assert_eq!(request.size.unwrap(), size); + assert!(request.range.is_none()); + assert!(request.extra_display.is_none()); + } + + #[tokio::test] + async fn instrumented_store_put_multipart() { + // The `setup_test_store()` method comes with data already `put` into it, so we'll setup + // manually for this test + let store = Arc::new(object_store::memory::InMemory::new()); + let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8); + let instrumented = InstrumentedObjectStore::new(store, mode); + + let path = Path::from("test/data"); + + // By default no requests should be instrumented/stored + assert!(instrumented.requests.lock().is_empty()); + let mp = instrumented.put_multipart(&path).await.unwrap(); + let mut write = WriteMultipart::new(mp); + write.write(b"test_data"); + write.finish().await.unwrap(); + assert!(instrumented.requests.lock().is_empty()); + + instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace); + assert!(instrumented.requests.lock().is_empty()); + let mp = instrumented.put_multipart(&path).await.unwrap(); + let mut write = WriteMultipart::new(mp); + write.write(b"test_data"); + write.finish().await.unwrap(); + assert_eq!(instrumented.requests.lock().len(), 1); + + let request = instrumented.take_requests().pop().unwrap(); + assert_eq!(request.op, Operation::Put); + assert_eq!(request.path, path); + assert!(request.duration.is_some()); + assert!(request.size.is_none()); + assert!(request.range.is_none()); + assert!(request.extra_display.is_none()); + } + #[test] fn request_details() { let rd = RequestDetails { @@ -819,7 +943,7 @@ mod tests { // Add Put requests to test grouping requests.push(RequestDetails { - op: Operation::_Put, + op: Operation::Put, path: Path::from("test4"), timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(), duration: Some(Duration::from_millis(200)), @@ -834,8 +958,8 @@ mod tests { +-----------+----------+-----------+-----------+-----------+------------+-------+ | Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 | | Get | size | 50 B | 150 B | 100 B | 300 B | 3 | - | _Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 | - | _Put | size | 75 B | 75 B | 75 B | 75 B | 1 | + | Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 | + | Put | size | 75 B | 75 B | 75 B | 75 B | 1 | +-----------+----------+-----------+-----------+-----------+------------+-------+ "); }