Skip to content

Commit

Permalink
feat(websocket): enhance ManageEditSessionService with dynamic sessio…
Browse files Browse the repository at this point in the history
…n management
  • Loading branch information
kasugamirai committed Oct 29, 2024
1 parent e6d17f4 commit 28d0608
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 57 deletions.
2 changes: 1 addition & 1 deletion websocket/crates/services/examples/edit_session_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
trace!("Session repository created");

let project_id = "project_123".to_string();
let tenant = ObjectTenant::new(generate_id(14, "tenant"), "tenant".to_owned());
let tenant = ObjectTenant::new(generate_id!("tenant"), "tenant".to_owned());
let mut session = ProjectEditingSession::new(project_id.clone(), tenant);
session.session_id = Some(generate_id(14, "session"));

Expand Down
205 changes: 151 additions & 54 deletions websocket/crates/services/src/manage_project_edit_session.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{DateTime, Utc};
use chrono::Utc;
use flow_websocket_domain::{
editing_session::ProjectEditingSession,
generate_id,
Expand All @@ -12,8 +12,9 @@ use flow_websocket_infra::persistence::{
redis::flow_project_redis_data_manager::FlowProjectRedisDataManagerError,
};
use mockall::automock;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, Mutex};
use tokio::time::sleep;
use tracing::debug;

Expand All @@ -32,6 +33,32 @@ where
pub session_repository: Arc<R>,
pub snapshot_repository: Arc<S>,
pub redis_data_manager: Arc<M>,
tasks: Arc<Mutex<HashMap<String, ManageProjectEditSessionTaskData>>>,
}

#[derive(Debug)]
pub enum SessionCommand {
Start {
project_id: String,
user: User,
},
End {
project_id: String,
user: User,
},
Complete {
project_id: String,
user: User,
},
CheckStatus {
project_id: String,
},
AddTask {
task_data: ManageProjectEditSessionTaskData,
},
RemoveTask {
project_id: String,
},
}

#[automock]
Expand All @@ -50,43 +77,121 @@ where
session_repository,
snapshot_repository,
redis_data_manager,
tasks: Arc::new(Mutex::new(HashMap::new())),
}
}

pub async fn process(
&self,
mut data: ManageProjectEditSessionTaskData,
user: &User,
mut command_rx: mpsc::Receiver<SessionCommand>,
) -> Result<(), ProjectServiceError> {
let mut session = ProjectEditingSession::new(data.project_id.clone());

session
.start_or_join_session(&*self.snapshot_repository, &*self.redis_data_manager, user)
.await?;
let session_repository = Arc::clone(&self.session_repository);
let snapshot_repository = Arc::clone(&self.snapshot_repository);
let redis_data_manager = Arc::clone(&self.redis_data_manager);

loop {
tokio::select! {
Some(command) = command_rx.recv() => {
match command {
SessionCommand::Start { project_id, user } => {
if let Some(mut session) = self.get_latest_session(&project_id).await? {
debug!("Session already exists for project: {}", project_id);
} else {
let mut new_session = ProjectEditingSession::new(project_id.clone());
new_session
.start_or_join_session(&*snapshot_repository, &*redis_data_manager, &user)
.await?;
debug!("Session started by user: {} for project: {}", user.name, project_id);
}
},
SessionCommand::End { project_id, user } => {
if let Some(data) = self.get_task_data(&project_id).await {
if let Some(mut session) = self.get_latest_session(&project_id).await? {
if let Ok(()) = self.end_editing_session_if_conditions_met(&mut session, &data).await {
debug!("Session ended by user: {} for project: {}", user.name, project_id);
break;
}
}
}
},
SessionCommand::Complete { project_id, user } => {
if let Some(mut session) = self.get_latest_session(&project_id).await? {
if let Ok(()) = self.complete_job_if_met_requirements(&mut session).await {
debug!("Job completed by user: {} for project: {}", user.name, project_id);
break;
}
}
},
SessionCommand::CheckStatus { project_id } => {
debug!("Checking session status for project: {}", project_id);
},
SessionCommand::AddTask { task_data } => {
if let Some(project_id) = &task_data.project_id {
let mut tasks = self.tasks.lock().await;
tasks.insert(project_id.clone(), task_data.clone());
debug!("Added task for project: {}", project_id);
}
},
SessionCommand::RemoveTask { project_id } => {
let mut tasks = self.tasks.lock().await;
tasks.remove(&project_id);
debug!("Removed task for project: {}", project_id);
}
}
},
_ = tokio::time::sleep(Duration::from_secs(1)) => {
let tasks = self.tasks.lock().await;
for (project_id, data) in tasks.iter() {
if let Some(mut session) = self.get_latest_session(project_id).await? {
if let Ok(()) = self.end_editing_session_if_conditions_met(&mut session, data).await {
debug!("Session ended by condition check for project: {}", project_id);
}
}
}
}
}
}

let client_count = self.session_repository.get_client_count().await?;
Ok(())
}

if self
.end_editing_session_if_conditions_met(&mut session, &data, client_count)
.await?
{
return Ok(());
}
async fn get_task_data(&self, project_id: &str) -> Option<ManageProjectEditSessionTaskData> {
let tasks = self.tasks.lock().await;
tasks.get(project_id).cloned()
}

self.session_repository
.update_session(session.clone())
.await?;
pub async fn get_latest_session(
&self,
project_id: &str,
) -> Result<Option<ProjectEditingSession>, ProjectServiceError> {
Ok(self
.session_repository
.get_active_session(project_id)
.await?)
}

Ok(())
pub async fn update_session(
&self,
session: &mut ProjectEditingSession,
) -> Result<(), ProjectServiceError> {
unimplemented!()
}

pub async fn end_editing_session_if_conditions_met(
&self,
session: &mut ProjectEditingSession,
data: &ManageProjectEditSessionTaskData,
client_count: usize,
end_session: bool,
) -> Result<bool, ProjectServiceError> {
) -> Result<(), ProjectServiceError> {
let client_count_future = async {
loop {
let count = self.session_repository.get_client_count().await?;
if count == 0 {
return Ok::<_, ProjectServiceError>(0);
}
sleep(Duration::from_secs(1)).await;
}
};

if let Some(clients_disconnected_at) = data.clients_disconnected_at {
let current_time = Utc::now();
let clients_disconnection_elapsed_time = current_time - clients_disconnected_at;
Expand All @@ -95,47 +200,39 @@ where
.to_std()
.map_err(ProjectServiceError::ChronoDurationConversionError)?
> MAX_EMPTY_SESSION_DURATION
&& client_count == 1
{
session
.end_session(&*self.redis_data_manager, &*self.snapshot_repository)
.await?;
return Ok(true);
if let Ok(Ok(0)) =
tokio::time::timeout(MAX_EMPTY_SESSION_DURATION, client_count_future).await
{
session
.end_session(
&*self.redis_data_manager,
&*self.snapshot_repository,
"system".to_string(),
true,
)
.await?;
}
}
}

Ok(false)
Ok(())
}

pub async fn complete_job_if_met_requirements(
&self,
session: &ProjectEditingSession,
session: &mut ProjectEditingSession,
) -> Result<(), ProjectServiceError> {
if (session.active_editing_session().await?).is_some() {
let (final_state, _) = session.merge_updates(&*self.redis_data_manager).await?;

if let Some(snapshot) = self
.snapshot_repository
.get_latest_snapshot(&session.project_id)
.await?
{
let snapshot_data =
SnapshotData::new(session.project_id.clone(), final_state, None, None);

self.snapshot_repository
.update_latest_snapshot(snapshot)
.await?;
self.snapshot_repository
.update_latest_snapshot_state(&session.project_id, snapshot_data)
.await?;
}

sleep(JOB_COMPLETION_DELAY).await;
session
.end_session(
&*self.redis_data_manager,
&*self.snapshot_repository,
"system".to_string(),
true,
)
.await?;

debug!("Task completed successfully");
} else {
debug!("Task skipped - no active session");
}
sleep(JOB_COMPLETION_DELAY).await;

Ok(())
}
Expand Down
25 changes: 23 additions & 2 deletions websocket/crates/services/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,30 @@ pub struct CreateSnapshotData {

#[derive(Clone, Debug)]
pub struct ManageProjectEditSessionTaskData {
pub project_id: String,
pub clients_count: Option<usize>,
pub project_id: Option<String>,
pub last_merged_at: Option<DateTime<Utc>>,
pub last_snapshot_at: Option<DateTime<Utc>>,
pub clients_disconnected_at: Option<DateTime<Utc>>,
}

impl ManageProjectEditSessionTaskData {
pub fn new(
project_id: Option<String>,
last_merged_at: Option<DateTime<Utc>>,
last_snapshot_at: Option<DateTime<Utc>>,
clients_disconnected_at: Option<DateTime<Utc>>,
) -> Self {
Self {
project_id,
last_merged_at,
last_snapshot_at,
clients_disconnected_at,
}
}
}

impl Default for ManageProjectEditSessionTaskData {
fn default() -> Self {
Self::new(None, None, None, None)
}
}

0 comments on commit 28d0608

Please sign in to comment.