Skip to content

Commit

Permalink
Simplify err handling
Browse files Browse the repository at this point in the history
  • Loading branch information
kasugamirai committed Oct 13, 2024
1 parent 06a4c65 commit 4093c6f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 81 deletions.
118 changes: 66 additions & 52 deletions websocket/crates/domain/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,29 @@ pub struct ProjectEditingSession {
}

#[derive(Error, Debug)]
pub enum ProjectEditingSessionError<S, R> {
pub enum ProjectEditingSessionError {
#[error("Session not setup")]
SessionNotSetup,
#[error("Snapshot not found")]
SnapshotNotFound,
#[error(transparent)]
Snapshot(#[from] S),
#[error(transparent)]
Redis(R),
#[error("Snapshot error: {0}")]
Snapshot(String),
#[error("Redis error: {0}")]
Redis(String),
#[error("{0}")]
Custom(String),
}

impl ProjectEditingSessionError {
pub fn snapshot<E: std::fmt::Display>(err: E) -> Self {
Self::Snapshot(err.to_string())
}

pub fn redis<E: std::fmt::Display>(err: E) -> Self {
Self::Redis(err.to_string())
}
}

impl Default for ProjectEditingSession {
fn default() -> Self {
Self {
Expand Down Expand Up @@ -61,24 +71,22 @@ impl ProjectEditingSession {
&mut self,
snapshot_repo: &S,
redis_data_manager: &R,
) -> Result<String, ProjectEditingSessionError<S::Error, R::Error>>
) -> Result<String, ProjectEditingSessionError>
where
S: ProjectSnapshotRepository + ?Sized,
R: RedisDataManager,
{
// Logic to start or join a session
let session_id = generate_id(14, "editor-session");
self.session_id = Some(session_id.clone());
if !self.session_setup_complete {
// get latest snapshot state
let latest_snapshot_state = snapshot_repo
.get_latest_snapshot_state(&self.project_id)
.await?;
// Initialize Redis with latest snapshot state
.await
.map_err(ProjectEditingSessionError::snapshot)?;
redis_data_manager
.push_update(latest_snapshot_state, None)
.await
.map_err(ProjectEditingSessionError::Redis)?;
.map_err(ProjectEditingSessionError::redis)?;
}
self.session_setup_complete = true;
Ok(session_id)
Expand All @@ -88,15 +96,18 @@ impl ProjectEditingSession {
&self,
state_vector: Vec<u8>,
redis_data_manager: &R,
) -> Result<(Vec<u8>, Vec<u8>), ProjectEditingSessionError<R::Error, ()>>
) -> Result<(Vec<u8>, Vec<u8>), ProjectEditingSessionError>
where
R: RedisDataManager,
{
if !self.session_setup_complete {
return Err(ProjectEditingSessionError::SessionNotSetup);
}

let current_state = redis_data_manager.get_current_state().await?;
let current_state = redis_data_manager
.get_current_state()
.await
.map_err(ProjectEditingSessionError::redis)?;
if let Some(current_state) = current_state {
if current_state == state_vector {
return Ok((vec![], current_state));
Expand All @@ -112,7 +123,7 @@ impl ProjectEditingSession {
&self,
redis_data_manager: &R,
skip_lock: bool,
) -> Result<(Vec<u8>, Vec<String>), ProjectEditingSessionError<R::Error, ()>>
) -> Result<(Vec<u8>, Vec<String>), ProjectEditingSessionError>
where
R: RedisDataManager,
{
Expand All @@ -121,10 +132,16 @@ impl ProjectEditingSession {
}

let result = if skip_lock {
redis_data_manager.merge_updates(false).await?
redis_data_manager
.merge_updates(false)
.await
.map_err(ProjectEditingSessionError::redis)?
} else {
let _lock = self.session_lock.lock().await;
redis_data_manager.merge_updates(false).await?
redis_data_manager
.merge_updates(false)
.await
.map_err(ProjectEditingSessionError::redis)?
};

Ok(result)
Expand All @@ -133,14 +150,17 @@ impl ProjectEditingSession {
pub async fn get_state_update<R>(
&self,
redis_data_manager: &R,
) -> Result<Vec<u8>, ProjectEditingSessionError<R::Error, ()>>
) -> Result<Vec<u8>, ProjectEditingSessionError>
where
R: RedisDataManager,
{
if !self.session_setup_complete {
return Err(ProjectEditingSessionError::SessionNotSetup);
}
let current_state = redis_data_manager.get_current_state().await?;
let current_state = redis_data_manager
.get_current_state()
.await
.map_err(ProjectEditingSessionError::redis)?;

match current_state {
Some(state) => Ok(state),
Expand All @@ -154,7 +174,7 @@ impl ProjectEditingSession {
updated_by: String,
redis_data_manager: &R,
skip_lock: bool,
) -> Result<(), ProjectEditingSessionError<R::Error, ()>>
) -> Result<(), ProjectEditingSessionError>
where
R: RedisDataManager,
{
Expand All @@ -165,12 +185,14 @@ impl ProjectEditingSession {
if skip_lock {
redis_data_manager
.push_update(update, Some(updated_by))
.await?;
.await
.map_err(ProjectEditingSessionError::redis)?;
} else {
let _lock = self.session_lock.lock().await;
redis_data_manager
.push_update(update, Some(updated_by))
.await?;
.await
.map_err(ProjectEditingSessionError::redis)?;
}

Ok(())
Expand All @@ -182,7 +204,7 @@ impl ProjectEditingSession {
redis_data_manager: &R,
data: SnapshotData,
skip_lock: bool,
) -> Result<(), ProjectEditingSessionError<S::Error, R::Error>>
) -> Result<(), ProjectEditingSessionError>
where
S: ProjectSnapshotRepository,
R: RedisDataManager,
Expand All @@ -205,14 +227,12 @@ impl ProjectEditingSession {
snapshot_repo: &S,
redis_data_manager: &R,
data: SnapshotData,
) -> Result<(), ProjectEditingSessionError<S::Error, R::Error>>
) -> Result<(), ProjectEditingSessionError>
where
S: ProjectSnapshotRepository,
R: RedisDataManager,
{
self.merge_updates(redis_data_manager, false)
.await
.map_err(|_| ProjectEditingSessionError::SessionNotSetup)?;
self.merge_updates(redis_data_manager, false).await?;

let now = Utc::now();

Expand All @@ -221,32 +241,35 @@ impl ProjectEditingSession {
self.project_id.clone(),
self.session_id.clone(),
data.name.unwrap_or_default(),
String::new(), // path
String::new(),
);

let snapshot_info = SnapshotInfo::new(
data.created_by,
vec![],
self.tenant.clone(), // use tenant from project
self.tenant.clone(),
ObjectDelete {
deleted: false,
delete_after: None,
},
Some(now), // created_at
None, // updated_at
Some(now),
None,
);

let snapshot = ProjectSnapshot::new(metadata, snapshot_info);

snapshot_repo.create_snapshot(snapshot).await?;
snapshot_repo
.create_snapshot(snapshot)
.await
.map_err(ProjectEditingSessionError::snapshot)?;
Ok(())
}

pub async fn end_session<R, S>(
&mut self,
redis_data_manager: &R,
snapshot_repo: &S,
) -> Result<(), ProjectEditingSessionError<S::Error, R::Error>>
) -> Result<(), ProjectEditingSessionError>
where
R: RedisDataManager,
S: ProjectSnapshotRepository,
Expand All @@ -255,33 +278,25 @@ impl ProjectEditingSession {
return Err(ProjectEditingSessionError::SessionNotSetup);
}

// Acquire the session lock
let _lock = self.session_lock.lock().await;

// Merge any pending updates
let (state, _) = redis_data_manager
.merge_updates(true)
.await
.map_err(ProjectEditingSessionError::Redis)?;
.map_err(ProjectEditingSessionError::redis)?;

let snapshot_data = SnapshotData::new(
self.project_id.clone(), // project_id
state,
None, // name (Optional)
None, // created_by (Optional)
);
let snapshot_data = SnapshotData::new(self.project_id.clone(), state, None, None);

snapshot_repo
.update_latest_snapshot_data(&snapshot_data.project_id, snapshot_data.clone())
.await?;
.await
.map_err(ProjectEditingSessionError::snapshot)?;

// Clear the session data from Redis
redis_data_manager
.clear_data()
.await
.map_err(ProjectEditingSessionError::Redis)?;
.map_err(ProjectEditingSessionError::redis)?;

// Reset session state
self.session_id = None;
self.session_setup_complete = false;

Expand All @@ -292,28 +307,27 @@ impl ProjectEditingSession {
&mut self,
snapshot_repo: &S,
session_id: &str,
) -> Result<(), ProjectEditingSessionError<S::Error, ()>>
) -> Result<(), ProjectEditingSessionError>
where
S: ProjectSnapshotRepository,
{
// Get the latest snapshot for the given session_id
let snapshot = snapshot_repo.get_latest_snapshot(session_id).await?;
// If a snapshot is found, update the project_id
let snapshot = snapshot_repo
.get_latest_snapshot(session_id)
.await
.map_err(ProjectEditingSessionError::snapshot)?;
if let Some(snapshot) = snapshot {
self.project_id = snapshot.metadata.project_id;
} else {
// If no snapshot is found, return an error
return Err(ProjectEditingSessionError::SnapshotNotFound);
}
// Set the session_id and mark the session as set up
self.session_id = Some(session_id.to_string());
self.session_setup_complete = true;
Ok(())
}

pub async fn active_editing_session(
&self,
) -> Result<Option<String>, ProjectEditingSessionError<(), ()>> {
) -> Result<Option<String>, ProjectEditingSessionError> {
if !self.session_setup_complete {
Err(ProjectEditingSessionError::SessionNotSetup)
} else {
Expand Down
32 changes: 3 additions & 29 deletions websocket/crates/services/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,15 @@
use flow_websocket_domain::project::ProjectEditingSessionError;
use flow_websocket_infra::persistence::project_repository::ProjectRepositoryError;
use std::fmt;
use thiserror::Error;

#[derive(Error, Debug)]
pub enum ProjectServiceError {
#[error(transparent)]
Repository(#[from] ProjectRepositoryError),

#[error(transparent)]
EditingSession(#[from] ProjectEditingSessionError),

#[error("Session not setup")]
SessionNotSetup,

#[error("Snapshot repository error: {0}")]
SnapshotRepository(String),

#[error("Unexpected error: {0}")]
Unexpected(String),
}

impl<S, R> From<ProjectEditingSessionError<S, R>> for ProjectServiceError
where
S: fmt::Debug,
R: fmt::Debug,
{
fn from(err: ProjectEditingSessionError<S, R>) -> Self {
match err {
ProjectEditingSessionError::SessionNotSetup => ProjectServiceError::SessionNotSetup,
ProjectEditingSessionError::Snapshot(repo_err) => {
ProjectServiceError::SnapshotRepository(format!("{:?}", repo_err))
}
ProjectEditingSessionError::Redis(repo_err) => {
ProjectServiceError::SnapshotRepository(format!("{:?}", repo_err))
}
ProjectEditingSessionError::Custom(err) => ProjectServiceError::Unexpected(err),
ProjectEditingSessionError::SnapshotNotFound => {
ProjectServiceError::SnapshotRepository("Snapshot not found".to_string())
}
}
}
}

0 comments on commit 4093c6f

Please sign in to comment.