Skip to content

Commit

Permalink
Update gsc_client for better version control
Browse files Browse the repository at this point in the history
  • Loading branch information
kasugamirai committed Oct 4, 2024
1 parent 0f1a807 commit 414c0ad
Showing 1 changed file with 102 additions and 0 deletions.
102 changes: 102 additions & 0 deletions websocket/crates/infra/src/persistence/gcs/gcs_client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use chrono::{DateTime, Utc};
use google_cloud_storage::client::{Client, ClientConfig};
use google_cloud_storage::http::objects::delete::DeleteObjectRequest;
use google_cloud_storage::http::objects::download::Range;
use google_cloud_storage::http::objects::get::GetObjectRequest;
use google_cloud_storage::http::objects::upload::{Media, UploadObjectRequest, UploadType};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -78,4 +80,104 @@ impl GcsClient {
.await?;
Ok(())
}

pub async fn upload_versioned<T: Serialize>(
&self,
path: String,
data: &T,
) -> Result<String, GcsError> {
let timestamp = Utc::now().timestamp_millis();
let versioned_path = format!("{}_v{}", path, timestamp);

// Upload the data
self.upload(versioned_path.clone(), data).await?;

// Update metadata
let metadata_path = format!("{}_metadata", path);
let mut metadata = match self
.download::<VersionMetadata>(metadata_path.clone())
.await
{
Ok(existing_metadata) => existing_metadata,
Err(_) => VersionMetadata {
latest_version: versioned_path.clone(),
version_history: BTreeMap::new(),
},
};

metadata.latest_version = versioned_path.clone();
metadata
.version_history
.insert(timestamp, versioned_path.clone());

// Limit version history to last 100 versions
if metadata.version_history.len() > 100 {
let oldest = *metadata.version_history.keys().next().unwrap();
metadata.version_history.remove(&oldest);
}

self.upload(metadata_path, &metadata).await?;

Ok(versioned_path)
}

pub async fn get_latest_version(&self, path_prefix: &str) -> Result<Option<String>, GcsError> {
let metadata_path = format!("{}_metadata", path_prefix);
match self.download::<VersionMetadata>(metadata_path).await {
Ok(metadata) => Ok(Some(metadata.latest_version)),
Err(_) => Ok(None),
}
}

pub async fn get_version_at(
&self,
path_prefix: &str,
timestamp: DateTime<Utc>,
) -> Result<Option<String>, GcsError> {
let metadata_path = format!("{}_metadata", path_prefix);
match self.download::<VersionMetadata>(metadata_path).await {
Ok(metadata) => {
let target_timestamp = timestamp.timestamp_millis();
Ok(metadata
.version_history
.range(..=target_timestamp)
.next_back()
.map(|(_, path)| path.clone()))
}
Err(_) => Ok(None),
}
}

pub async fn list_versions(
&self,
path_prefix: &str,
limit: Option<usize>,
) -> Result<Vec<(DateTime<Utc>, String)>, GcsError> {
let metadata_path = format!("{}_metadata", path_prefix);
match self.download::<VersionMetadata>(metadata_path).await {
Ok(metadata) => {
let mut versions: Vec<_> = metadata
.version_history
.iter()
.rev()
.take(limit.unwrap_or(usize::MAX))
.map(|(&timestamp, path)| {
(
DateTime::<Utc>::from_timestamp_millis(timestamp).unwrap(),
path.clone(),
)
})
.collect();
versions.sort_by_key(|&(timestamp, _)| timestamp);
Ok(versions)
}
Err(_) => Ok(vec![]),
}
}
}

#[derive(Serialize, Deserialize)]
struct VersionMetadata {
latest_version: String,
version_history: BTreeMap<i64, String>, // Timestamp to version path
}

0 comments on commit 414c0ad

Please sign in to comment.