Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 128 additions & 4 deletions datafusion-cli/src/object_storage/instrumented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,54 @@ impl InstrumentedObjectStore {
!= InstrumentedObjectStoreMode::Disabled as u8
}

async fn instrumented_put_opts(
&self,
location: &Path,
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
let timestamp = Utc::now();
let start = Instant::now();
let size = payload.content_length();
let ret = self.inner.put_opts(location, payload, opts).await?;
let elapsed = start.elapsed();

self.requests.lock().push(RequestDetails {
op: Operation::Put,
path: location.clone(),
timestamp,
duration: Some(elapsed),
size: Some(size),
range: None,
extra_display: None,
});

Ok(ret)
}

async fn instrumented_put_multipart(
&self,
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
let timestamp = Utc::now();
let start = Instant::now();
let ret = self.inner.put_multipart_opts(location, opts).await?;
let elapsed = start.elapsed();

self.requests.lock().push(RequestDetails {
op: Operation::Put,
path: location.clone(),
timestamp,
duration: Some(elapsed),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm feeling a bit torn on using a duration here. Unlike list() this duration is accurate for what's happening, however, I fear it may be misleading. The duration for a multipart put will just be the duration spent initiating a multipart put session with the backing store. It won't be able to capture the true duration of uploading any data, which is what I think a user would expect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we log a ticket to try and make the accounting more accurate.

In general I think trying to get the level of timing might be a better case for https://github.com/datafusion-contrib/datafusion-tracing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's probably a good way to address this. I noted that you made one for tracking duration for list and this probably falls into a similar category.

Prior to closing the overarching issue for these PRs should document some of the current caveats for duration metrics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to closing the overarching issue for these PRs should document some of the current caveats for duration metrics?

That sounds good to me -- I suggest putting it in the code (not just the docs) so it is more discoverable -- maybe a note after the summary output?


Object Store Profiling
....
Put
count: 1
duration min: 0.000249s
duration max: 0.000249s
duration avg: 0.000249s
size min: 815 B
size max: 815 B
size avg: 815 B
size sum: 815 B

*** NEW ***
Note: Duration for multipart PUT is time spent initiating a multipart PUT session with the backing store. It does not include the time to actually upload data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I've opened an issue related to this comment thread here:

size: None,
range: None,
extra_display: None,
});

Ok(ret)
}

async fn instrumented_get_opts(
&self,
location: &Path,
Expand Down Expand Up @@ -209,6 +257,10 @@ impl ObjectStore for InstrumentedObjectStore {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
if self.enabled() {
return self.instrumented_put_opts(location, payload, opts).await;
}

self.inner.put_opts(location, payload, opts).await
}

Expand All @@ -217,6 +269,10 @@ impl ObjectStore for InstrumentedObjectStore {
location: &Path,
opts: PutMultipartOptions,
) -> Result<Box<dyn MultipartUpload>> {
if self.enabled() {
return self.instrumented_put_multipart(location, opts).await;
}

self.inner.put_multipart_opts(location, opts).await
}

Expand Down Expand Up @@ -269,7 +325,7 @@ pub enum Operation {
Get,
_Head,
List,
_Put,
Put,
}

impl fmt::Display for Operation {
Expand Down Expand Up @@ -599,6 +655,8 @@ impl ObjectStoreRegistry for InstrumentedObjectStoreRegistry {

#[cfg(test)]
mod tests {
use object_store::WriteMultipart;

use super::*;
use insta::assert_snapshot;

Expand Down Expand Up @@ -741,6 +799,72 @@ mod tests {
assert!(request.extra_display.is_none());
}

#[tokio::test]
async fn instrumented_store_put_opts() {
// The `setup_test_store()` method comes with data already `put` into it, so we'll setup
// manually for this test
let store = Arc::new(object_store::memory::InMemory::new());
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
let instrumented = InstrumentedObjectStore::new(store, mode);

let path = Path::from("test/data");
let payload = PutPayload::from_static(b"test_data");
let size = payload.content_length();

// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
instrumented.put(&path, payload.clone()).await.unwrap();
assert!(instrumented.requests.lock().is_empty());

instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
instrumented.put(&path, payload).await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);

let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::Put);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert_eq!(request.size.unwrap(), size);
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}

#[tokio::test]
async fn instrumented_store_put_multipart() {
// The `setup_test_store()` method comes with data already `put` into it, so we'll setup
// manually for this test
let store = Arc::new(object_store::memory::InMemory::new());
let mode = AtomicU8::new(InstrumentedObjectStoreMode::default() as u8);
let instrumented = InstrumentedObjectStore::new(store, mode);

let path = Path::from("test/data");

// By default no requests should be instrumented/stored
assert!(instrumented.requests.lock().is_empty());
let mp = instrumented.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new(mp);
write.write(b"test_data");
write.finish().await.unwrap();
assert!(instrumented.requests.lock().is_empty());

instrumented.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
assert!(instrumented.requests.lock().is_empty());
let mp = instrumented.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new(mp);
write.write(b"test_data");
write.finish().await.unwrap();
assert_eq!(instrumented.requests.lock().len(), 1);

let request = instrumented.take_requests().pop().unwrap();
assert_eq!(request.op, Operation::Put);
assert_eq!(request.path, path);
assert!(request.duration.is_some());
assert!(request.size.is_none());
assert!(request.range.is_none());
assert!(request.extra_display.is_none());
}

#[test]
fn request_details() {
let rd = RequestDetails {
Expand Down Expand Up @@ -819,7 +943,7 @@ mod tests {

// Add Put requests to test grouping
requests.push(RequestDetails {
op: Operation::_Put,
op: Operation::Put,
path: Path::from("test4"),
timestamp: chrono::DateTime::from_timestamp(3, 0).unwrap(),
duration: Some(Duration::from_millis(200)),
Expand All @@ -834,8 +958,8 @@ mod tests {
+-----------+----------+-----------+-----------+-----------+------------+-------+
| Get | duration | 2.000000s | 8.000000s | 5.000000s | 15.000000s | 3 |
| Get | size | 50 B | 150 B | 100 B | 300 B | 3 |
| _Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 |
| _Put | size | 75 B | 75 B | 75 B | 75 B | 1 |
| Put | duration | 0.200000s | 0.200000s | 0.200000s | 0.200000s | 1 |
| Put | size | 75 B | 75 B | 75 B | 75 B | 1 |
+-----------+----------+-----------+-----------+-----------+------------+-------+
");
}
Expand Down