Skip to content

Commit

Permalink
Fix potential infinite recursion in push_update
Browse files Browse the repository at this point in the history
  • Loading branch information
kasugamirai committed Oct 11, 2024
1 parent e04c30c commit d011525
Showing 1 changed file with 68 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,25 +217,6 @@ impl FlowProjectRedisDataManager {
.map_err(FlowProjectRedisDataManagerError::from)
}

pub async fn get_current_state_update(
&self,
) -> Result<Option<Vec<u8>>, FlowProjectRedisDataManagerError> {
let state_update = self.get_state_update_in_redis().await?;
if state_update.is_none() {
return Ok(None);
}
let merged_stream_update = self.get_merged_update_from_stream().await?;
if let Some((merged_update, _, _)) = merged_stream_update {
let doc = Doc::new();
let mut txn = doc.transact_mut();
txn.apply_update(Update::decode_v2(&state_update.unwrap()).unwrap());
txn.apply_update(Update::decode_v2(&merged_update).unwrap());
Ok(Some(txn.encode_update_v2()))
} else {
Ok(state_update)
}
}

pub async fn get_current_state_updated_by(
&self,
) -> Result<Vec<String>, FlowProjectRedisDataManagerError> {
Expand Down Expand Up @@ -326,61 +307,6 @@ impl FlowProjectRedisDataManager {
Ok(())
}

pub async fn push_update(
&self,
update: Vec<u8>,
updated_by: String,
) -> Result<(), FlowProjectRedisDataManagerError> {
let update_data = FlowEncodedUpdate {
update: Self::encode_state_data(update),
updated_by: Some(updated_by),
};

let value = serde_json::to_string(&update_data)?;
let fields = &[("value", value.as_str()), ("format", "json")];

let connection = self.redis_client.connection();
let mut connection_guard = connection.lock().await;

let _: () = connection_guard
.xadd(self.state_updates_key(), "*", fields)
.await?;

let timestamp = chrono::Utc::now().timestamp().to_string();
let _: () = connection_guard
.set(self.last_updated_at_key(), &timestamp)
.await?;

Ok(())
}

pub async fn clear_data(&self) -> Result<(), FlowProjectRedisDataManagerError> {
let connection = self.redis_client.connection();
let mut connection_guard = connection.lock().await;

let keys_to_delete = vec![
self.state_key(),
self.state_updated_by_key(),
self.state_updates_key(),
self.last_updated_at_key(),
];

let _: () = connection_guard.del(&keys_to_delete).await?;

if let Some(active_session_id) = self.active_editing_session_id().await? {
let editing_session = self.editing_session.lock().await;
if let Some(current_session_id) = editing_session.session_id.as_ref() {
if active_session_id == *current_session_id {
let _: () = connection_guard
.del(&[self.active_editing_session_id_key()])
.await?;
}
}
}

Ok(())
}

pub async fn last_updated_at(&self) -> Result<i64, FlowProjectRedisDataManagerError> {
let last_updated_at: Option<String> =
self.redis_client.get(&self.last_updated_at_key()).await?;
Expand Down Expand Up @@ -467,17 +393,6 @@ impl FlowProjectRedisDataManager {
Ok((optimized_merged_state, merged_state_updated_by))
}

pub async fn merge_updates(
&self,
skip_lock: bool,
) -> Result<(Vec<u8>, Vec<String>), FlowProjectRedisDataManagerError> {
if skip_lock {
self.execute_merge_updates().await
} else {
self.lock_and_execute_merge_updates().await
}
}

async fn lock_and_execute_merge_updates(
&self,
) -> Result<(Vec<u8>, Vec<String>), FlowProjectRedisDataManagerError> {
Expand All @@ -497,19 +412,82 @@ impl RedisDataManager for FlowProjectRedisDataManager {
type Error = FlowProjectRedisDataManagerError;

async fn get_current_state(&self) -> Result<Option<Vec<u8>>, Self::Error> {
self.get_current_state_update().await
let state_update = self.get_state_update_in_redis().await?;
if state_update.is_none() {
return Ok(None);
}
let merged_stream_update = self.get_merged_update_from_stream().await?;
if let Some((merged_update, _, _)) = merged_stream_update {
let doc = Doc::new();
let mut txn = doc.transact_mut();
txn.apply_update(Update::decode_v2(&state_update.unwrap()).unwrap());
txn.apply_update(Update::decode_v2(&merged_update).unwrap());
Ok(Some(txn.encode_update_v2()))
} else {
Ok(state_update)
}
}

async fn push_update(&self, update: Vec<u8>, updated_by: String) -> Result<(), Self::Error> {
self.push_update(update, updated_by).await
let update_data: FlowEncodedUpdate = FlowEncodedUpdate {
update: Self::encode_state_data(update),
updated_by: Some(updated_by),
};

let value = serde_json::to_string(&update_data)?;
let fields = &[("value", value.as_str()), ("format", "json")];

let connection = self.redis_client.connection();
let mut connection_guard = connection.lock().await;

let _: () = connection_guard
.xadd(self.state_updates_key(), "*", fields)
.await?;

let timestamp = chrono::Utc::now().timestamp().to_string();
let _: () = connection_guard
.set(self.last_updated_at_key(), &timestamp)
.await?;

Ok(())
}

async fn clear_data(&self) -> Result<(), Self::Error> {
self.clear_data().await
let connection = self.redis_client.connection();
let mut connection_guard = connection.lock().await;

let keys_to_delete = vec![
self.state_key(),
self.state_updated_by_key(),
self.state_updates_key(),
self.last_updated_at_key(),
];

let _: () = connection_guard.del(&keys_to_delete).await?;

if let Some(active_session_id) = self.active_editing_session_id().await? {
let editing_session = self.editing_session.lock().await;
if let Some(current_session_id) = editing_session.session_id.as_ref() {
if active_session_id == *current_session_id {
let _: () = connection_guard
.del(&[self.active_editing_session_id_key()])
.await?;
}
}
}

Ok(())
}

async fn merge_updates(&self, skip_lock: bool) -> Result<(Vec<u8>, Vec<String>), Self::Error> {
self.merge_updates(skip_lock).await
async fn merge_updates(
&self,
skip_lock: bool,
) -> Result<(Vec<u8>, Vec<String>), FlowProjectRedisDataManagerError> {
if skip_lock {
self.execute_merge_updates().await
} else {
self.lock_and_execute_merge_updates().await
}
}
}

Expand Down

0 comments on commit d011525

Please sign in to comment.