Skip to content

Commit

Permalink
Fix threads safely
Browse files Browse the repository at this point in the history
  • Loading branch information
kasugamirai committed Sep 30, 2024
1 parent 03bd36c commit d4f088d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 29 deletions.
11 changes: 2 additions & 9 deletions websocket/crates/infra/src/persistence/project_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,6 @@ impl ProjectSnapshotRepository<ProjectRepositoryError> for ProjectLocalRepositor
) -> Result<(), ProjectRepositoryError> {
let path = format!("snapshots/{}", snapshot.metadata.id);
self.client.upload(path, &snapshot, true).await?;

// Update latest snapshot
let latest_path = format!("latest_snapshots/{}", snapshot.metadata.project_id);
self.client.upload(latest_path, &snapshot, true).await?;
Ok(())
}

Expand All @@ -178,11 +174,8 @@ impl ProjectSnapshotRepository<ProjectRepositoryError> for ProjectLocalRepositor
project_id: &str,
) -> Result<Option<ProjectSnapshot>, ProjectRepositoryError> {
let path = format!("snapshot/{}:latest_snapshot", project_id);
match self.client.download::<ProjectSnapshot>(path).await {
Ok(snapshot) => Ok(Some(snapshot)),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(ProjectRepositoryError::Io(e)),
}
let snapshot = self.client.download::<ProjectSnapshot>(path).await?;
Ok(Some(snapshot))
}

async fn get_latest_snapshot_state(
Expand Down
4 changes: 2 additions & 2 deletions websocket/crates/services/src/manage_project_edit_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use flow_websocket_domain::project::ProjectEditingSession;
use flow_websocket_domain::repository::{
ProjectEditingSessionRepository, ProjectSnapshotRepository,
};
use flow_websocket_domain::snapshot::Metadata;
use flow_websocket_domain::snapshot::{Metadata, SnapshotInfo};
use flow_websocket_domain::utils::generate_id;
use std::error::Error;
use std::sync::Arc;
Expand Down Expand Up @@ -104,7 +104,7 @@ impl<E: Error + Send + Sync> ManageEditSessionService<E> {
String::new(),
);

let snapshot_state = SnapshotState::new(
let snapshot_state = SnapshotInfo::new(
None, // created_by
vec![], // changes_by
ObjectTenant {
Expand Down
56 changes: 38 additions & 18 deletions websocket/crates/services/src/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,25 @@ use flow_websocket_domain::project::{Project, ProjectAllowedActions, ProjectEdit
use flow_websocket_domain::repository::{
ProjectEditingSessionRepository, ProjectRepository, ProjectSnapshotRepository,
};
use std::error::Error;
use flow_websocket_infra::persistence::project_repository::ProjectRepositoryError;
use std::sync::Arc;

pub struct ProjectService<E: Error + Send + Sync> {
project_repository: Arc<dyn ProjectRepository<E>>,
session_repository: Arc<dyn ProjectEditingSessionRepository<E>>,
snapshot_repository: Arc<dyn ProjectSnapshotRepository<E>>,
pub struct ProjectService {
project_repository: Arc<dyn ProjectRepository<ProjectRepositoryError> + Send + Sync>,
session_repository:
Arc<dyn ProjectEditingSessionRepository<ProjectRepositoryError> + Send + Sync>,
snapshot_repository: Arc<dyn ProjectSnapshotRepository<ProjectRepositoryError> + Send + Sync>,
}

impl<E: Error + Send + Sync> ProjectService<E> {
impl ProjectService {
pub fn new(
project_repository: Arc<dyn ProjectRepository<E>>,
session_repository: Arc<dyn ProjectEditingSessionRepository<E>>,
snapshot_repository: Arc<dyn ProjectSnapshotRepository<E>>,
project_repository: Arc<dyn ProjectRepository<ProjectRepositoryError> + Send + Sync>,
session_repository: Arc<
dyn ProjectEditingSessionRepository<ProjectRepositoryError> + Send + Sync,
>,
snapshot_repository: Arc<
dyn ProjectSnapshotRepository<ProjectRepositoryError> + Send + Sync,
>,
) -> Self {
Self {
project_repository,
Expand All @@ -25,14 +30,17 @@ impl<E: Error + Send + Sync> ProjectService<E> {
}
}

pub async fn get_project(&self, project_id: &str) -> Result<Option<Project>, Box<dyn Error>> {
pub async fn get_project(
&self,
project_id: &str,
) -> Result<Option<Project>, ProjectRepositoryError> {
self.project_repository.get_project(project_id).await
}

pub async fn get_or_create_editing_session(
&self,
project_id: &str,
) -> Result<ProjectEditingSession, Box<dyn Error>> {
) -> Result<ProjectEditingSession, ProjectRepositoryError> {
let mut session = match self
.session_repository
.get_active_session(project_id)
Expand All @@ -58,7 +66,7 @@ impl<E: Error + Send + Sync> ProjectService<E> {
&self,
project_id: &str,
actions: Vec<String>,
) -> Result<ProjectAllowedActions, Box<dyn Error>> {
) -> Result<ProjectAllowedActions, ProjectRepositoryError> {
// This is a placeholder implementation. You should implement the actual logic
// to determine allowed actions based on certain requirements we'll have decide @pyshx
Ok(ProjectAllowedActions {
Expand All @@ -75,26 +83,38 @@ impl<E: Error + Send + Sync> ProjectService<E> {
}

#[async_trait]
impl<E: Error + Send + Sync> ProjectRepository<E> for ProjectService<E> {
async fn get_project(&self, project_id: &str) -> Result<Option<Project>, Box<dyn Error>> {
impl ProjectRepository<ProjectRepositoryError> for ProjectService
where
Self: Send + Sync,
{
async fn get_project(
&self,
project_id: &str,
) -> Result<Option<Project>, ProjectRepositoryError> {
self.project_repository.get_project(project_id).await
}
}

#[async_trait]
impl<E: Error + Send + Sync> ProjectEditingSessionRepository<E> for ProjectService<E> {
async fn create_session(&self, session: ProjectEditingSession) -> Result<(), Box<dyn Error>> {
impl ProjectEditingSessionRepository<ProjectRepositoryError> for ProjectService {
async fn create_session(
&self,
session: ProjectEditingSession,
) -> Result<(), ProjectRepositoryError> {
self.session_repository.create_session(session).await
}

async fn get_active_session(
&self,
project_id: &str,
) -> Result<Option<ProjectEditingSession>, Box<dyn Error>> {
) -> Result<Option<ProjectEditingSession>, ProjectRepositoryError> {
self.session_repository.get_active_session(project_id).await
}

async fn update_session(&self, session: ProjectEditingSession) -> Result<(), Box<dyn Error>> {
async fn update_session(
&self,
session: ProjectEditingSession,
) -> Result<(), ProjectRepositoryError> {
self.session_repository.update_session(session).await
}
}

0 comments on commit d4f088d

Please sign in to comment.