Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make InMemory object store track last modified time for each entry #3796

Merged
merged 2 commits into from
Mar 4, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 33 additions & 28 deletions object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::MultipartId;
use crate::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Utc;
use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use parking_lot::RwLock;
use snafu::{ensure, OptionExt, Snafu};
Expand All @@ -33,6 +33,9 @@ use std::sync::Arc;
use std::task::Poll;
use tokio::io::AsyncWrite;

type Entry = (Bytes, DateTime<Utc>);
type StorageType = Arc<RwLock<BTreeMap<Path, Entry>>>;

/// A specialized `Error` for in-memory object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -73,7 +76,7 @@ impl From<Error> for super::Error {
/// storage provider.
#[derive(Debug, Default)]
pub struct InMemory {
storage: Arc<RwLock<BTreeMap<Path, Bytes>>>,
storage: StorageType,
}

impl std::fmt::Display for InMemory {
Expand All @@ -85,7 +88,9 @@ impl std::fmt::Display for InMemory {
#[async_trait]
impl ObjectStore for InMemory {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
self.storage.write().insert(location.clone(), bytes);
self.storage
.write()
.insert(location.clone(), (bytes, Utc::now()));
Ok(())
}

Expand Down Expand Up @@ -113,44 +118,43 @@ impl ObjectStore for InMemory {
}

async fn get(&self, location: &Path) -> Result<GetResult> {
let data = self.get_bytes(location).await?;
let data = self.entry(location).await?;

Ok(GetResult::Stream(
futures::stream::once(async move { Ok(data) }).boxed(),
futures::stream::once(async move { Ok(data.0) }).boxed(),
))
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
let data = self.get_bytes(location).await?;
ensure!(range.end <= data.len(), OutOfRangeSnafu);
let data = self.entry(location).await?;
ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
ensure!(range.start <= range.end, BadRangeSnafu);

Ok(data.slice(range))
Ok(data.0.slice(range))
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> Result<Vec<Bytes>> {
let data = self.get_bytes(location).await?;
let data = self.entry(location).await?;
ranges
.iter()
.map(|range| {
ensure!(range.end <= data.len(), OutOfRangeSnafu);
ensure!(range.end <= data.0.len(), OutOfRangeSnafu);
ensure!(range.start <= range.end, BadRangeSnafu);
Ok(data.slice(range.clone()))
Ok(data.0.slice(range.clone()))
})
.collect()
}

async fn head(&self, location: &Path) -> Result<ObjectMeta> {
let last_modified = Utc::now();
let bytes = self.get_bytes(location).await?;
let entry = self.entry(location).await?;
Ok(ObjectMeta {
location: location.clone(),
last_modified,
size: bytes.len(),
last_modified: entry.1,
size: entry.0.len(),
})
}

Expand All @@ -165,7 +169,6 @@ impl ObjectStore for InMemory {
) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
let root = Path::default();
let prefix = prefix.unwrap_or(&root);
let last_modified = Utc::now();

let storage = self.storage.read();
let values: Vec<_> = storage
Expand All @@ -180,8 +183,8 @@ impl ObjectStore for InMemory {
.map(|(key, value)| {
Ok(ObjectMeta {
location: key.clone(),
last_modified,
size: value.len(),
last_modified: value.1,
size: value.0.len(),
})
})
.collect();
Expand All @@ -197,7 +200,6 @@ impl ObjectStore for InMemory {
let prefix = prefix.unwrap_or(&root);

let mut common_prefixes = BTreeSet::new();
let last_modified = Utc::now();

// Only objects in this base level should be returned in the
// response. Otherwise, we just collect the common prefixes.
Expand All @@ -224,8 +226,8 @@ impl ObjectStore for InMemory {
} else {
let object = ObjectMeta {
location: k.clone(),
last_modified,
size: v.len(),
last_modified: v.1,
size: v.0.len(),
};
objects.push(object);
}
Expand All @@ -238,13 +240,13 @@ impl ObjectStore for InMemory {
}

async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let data = self.get_bytes(from).await?;
let data = self.entry(from).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should probably generate a new last modified timestamp

Copy link
Member Author

@Weijun-H Weijun-H Mar 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like this self.storage.write().insert(to.clone(), (data.0, Utc::now()));?

self.storage.write().insert(to.clone(), data);
Ok(())
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
let data = self.get_bytes(from).await?;
let data = self.entry(from).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here

let mut storage = self.storage.write();
if storage.contains_key(to) {
return Err(Error::AlreadyExists {
Expand Down Expand Up @@ -273,22 +275,23 @@ impl InMemory {
}
}

async fn get_bytes(&self, location: &Path) -> Result<Bytes> {
async fn entry(&self, location: &Path) -> Result<(Bytes, DateTime<Utc>)> {
let storage = self.storage.read();
let bytes = storage
let value = storage
.get(location)
.cloned()
.context(NoDataInMemorySnafu {
path: location.to_string(),
})?;
Ok(bytes)

Ok(value)
}
}

struct InMemoryUpload {
location: Path,
data: Vec<u8>,
storage: Arc<RwLock<BTreeMap<Path, Bytes>>>,
storage: StorageType,
}

impl AsyncWrite for InMemoryUpload {
Expand All @@ -313,7 +316,9 @@ impl AsyncWrite for InMemoryUpload {
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
let data = Bytes::from(std::mem::take(&mut self.data));
self.storage.write().insert(self.location.clone(), data);
self.storage
.write()
.insert(self.location.clone(), (data, Utc::now()));
Poll::Ready(Ok(()))
}
}
Expand Down