From d4f088d1f75473e1ac16ac60af63941385b5012d Mon Sep 17 00:00:00 2001 From: xy Date: Tue, 1 Oct 2024 02:56:46 +0900 Subject: [PATCH] Fix threads safely --- .../src/persistence/project_repository.rs | 11 +--- .../src/manage_project_edit_session.rs | 4 +- websocket/crates/services/src/project.rs | 56 +++++++++++++------ 3 files changed, 42 insertions(+), 29 deletions(-) diff --git a/websocket/crates/infra/src/persistence/project_repository.rs b/websocket/crates/infra/src/persistence/project_repository.rs index d2134dd80..802dda64e 100644 --- a/websocket/crates/infra/src/persistence/project_repository.rs +++ b/websocket/crates/infra/src/persistence/project_repository.rs @@ -166,10 +166,6 @@ impl ProjectSnapshotRepository for ProjectLocalRepositor ) -> Result<(), ProjectRepositoryError> { let path = format!("snapshots/{}", snapshot.metadata.id); self.client.upload(path, &snapshot, true).await?; - - // Update latest snapshot - let latest_path = format!("latest_snapshots/{}", snapshot.metadata.project_id); - self.client.upload(latest_path, &snapshot, true).await?; Ok(()) } @@ -178,11 +174,8 @@ impl ProjectSnapshotRepository for ProjectLocalRepositor project_id: &str, ) -> Result, ProjectRepositoryError> { let path = format!("snapshot/{}:latest_snapshot", project_id); - match self.client.download::(path).await { - Ok(snapshot) => Ok(Some(snapshot)), - Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None), - Err(e) => Err(ProjectRepositoryError::Io(e)), - } + let snapshot = self.client.download::(path).await?; + Ok(Some(snapshot)) } async fn get_latest_snapshot_state( diff --git a/websocket/crates/services/src/manage_project_edit_session.rs b/websocket/crates/services/src/manage_project_edit_session.rs index a11762350..954ddc0c7 100644 --- a/websocket/crates/services/src/manage_project_edit_session.rs +++ b/websocket/crates/services/src/manage_project_edit_session.rs @@ -3,7 +3,7 @@ use flow_websocket_domain::project::ProjectEditingSession; use flow_websocket_domain::repository::{ ProjectEditingSessionRepository, ProjectSnapshotRepository, }; -use flow_websocket_domain::snapshot::Metadata; +use flow_websocket_domain::snapshot::{Metadata, SnapshotInfo}; use flow_websocket_domain::utils::generate_id; use std::error::Error; use std::sync::Arc; @@ -104,7 +104,7 @@ impl ManageEditSessionService { String::new(), ); - let snapshot_state = SnapshotState::new( + let snapshot_state = SnapshotInfo::new( None, // created_by vec![], // changes_by ObjectTenant { diff --git a/websocket/crates/services/src/project.rs b/websocket/crates/services/src/project.rs index 6d61874f0..aebc8b040 100644 --- a/websocket/crates/services/src/project.rs +++ b/websocket/crates/services/src/project.rs @@ -3,20 +3,25 @@ use flow_websocket_domain::project::{Project, ProjectAllowedActions, ProjectEdit use flow_websocket_domain::repository::{ ProjectEditingSessionRepository, ProjectRepository, ProjectSnapshotRepository, }; -use std::error::Error; +use flow_websocket_infra::persistence::project_repository::ProjectRepositoryError; use std::sync::Arc; -pub struct ProjectService { - project_repository: Arc>, - session_repository: Arc>, - snapshot_repository: Arc>, +pub struct ProjectService { + project_repository: Arc + Send + Sync>, + session_repository: + Arc + Send + Sync>, + snapshot_repository: Arc + Send + Sync>, } -impl ProjectService { +impl ProjectService { pub fn new( - project_repository: Arc>, - session_repository: Arc>, - snapshot_repository: Arc>, + project_repository: Arc + Send + Sync>, + session_repository: Arc< + dyn ProjectEditingSessionRepository + Send + Sync, + >, + snapshot_repository: Arc< + dyn ProjectSnapshotRepository + Send + Sync, + >, ) -> Self { Self { project_repository, @@ -25,14 +30,17 @@ impl ProjectService { } } - pub async fn get_project(&self, project_id: &str) -> Result, Box> { + pub async fn get_project( + &self, + project_id: &str, + ) -> Result, ProjectRepositoryError> { self.project_repository.get_project(project_id).await } pub async fn get_or_create_editing_session( &self, project_id: &str, - ) -> Result> { + ) -> Result { let mut session = match self .session_repository .get_active_session(project_id) @@ -58,7 +66,7 @@ impl ProjectService { &self, project_id: &str, actions: Vec, - ) -> Result> { + ) -> Result { // This is a placeholder implementation. You should implement the actual logic // to determine allowed actions based on certain requirements we'll have decide @pyshx Ok(ProjectAllowedActions { @@ -75,26 +83,38 @@ impl ProjectService { } #[async_trait] -impl ProjectRepository for ProjectService { - async fn get_project(&self, project_id: &str) -> Result, Box> { +impl ProjectRepository for ProjectService +where + Self: Send + Sync, +{ + async fn get_project( + &self, + project_id: &str, + ) -> Result, ProjectRepositoryError> { self.project_repository.get_project(project_id).await } } #[async_trait] -impl ProjectEditingSessionRepository for ProjectService { - async fn create_session(&self, session: ProjectEditingSession) -> Result<(), Box> { +impl ProjectEditingSessionRepository for ProjectService { + async fn create_session( + &self, + session: ProjectEditingSession, + ) -> Result<(), ProjectRepositoryError> { self.session_repository.create_session(session).await } async fn get_active_session( &self, project_id: &str, - ) -> Result, Box> { + ) -> Result, ProjectRepositoryError> { self.session_repository.get_active_session(project_id).await } - async fn update_session(&self, session: ProjectEditingSession) -> Result<(), Box> { + async fn update_session( + &self, + session: ProjectEditingSession, + ) -> Result<(), ProjectRepositoryError> { self.session_repository.update_session(session).await } }