Skip to content

Commit

Permalink
always send last ts during sync
Browse files Browse the repository at this point in the history
  • Loading branch information
somtochiama committed Jul 25, 2024
1 parent 37ac4d1 commit aa48dca
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 8 deletions.
1 change: 1 addition & 0 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ pub async fn handle_emptyset(
},
maybe_change_src = rx_emptysets.recv() => match maybe_change_src {
Some(change) => {
info!("received emptyset changes in emptyset channel from {}", change.actor_id);
if let Changeset::EmptySet { versions, ts } = change.changeset {
buf.entry(change.actor_id).or_insert(VecDeque::new()).push_back((versions.clone(), ts));
} else {
Expand Down
12 changes: 4 additions & 8 deletions crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,8 @@ fn handle_need(
if last_cleared_ts.is_none() {
return Ok(());
}
info!("processing empty versions to {actor_id} with ts: {:?}", ts );
let ts = ts.unwrap_or(Default::default());
debug!("processing empty versions to {actor_id}");
let mut stmt = tx.prepare_cached(
"
SELECT start_version, end_version, ts FROM __corro_bookkeeping
Expand Down Expand Up @@ -1134,13 +1134,9 @@ pub async fn parallel_sync(
let cleared_ts = their_sync_state.last_cleared_ts;

info!(%actor_id, "got last cleared ts {cleared_ts:?}");
if let Some(ts) = cleared_ts {
if let Some(last_seen) = our_empty_ts.get(&actor_id) {
if last_seen.is_none() || last_seen.unwrap() < ts {
needs.entry(actor_id).or_default().push( SyncNeedV1::Empty { ts: *last_seen });
}
}
}
let last_seen = our_empty_ts.get(&actor_id).unwrap_or(&None);
needs.entry(actor_id).or_default().push( SyncNeedV1::Empty { ts: *last_seen });

Ok::<_, SyncError>((needs, tx, read))
}.await
)
Expand Down

0 comments on commit aa48dca

Please sign in to comment.