Skip to content

Commit

Permalink
test(collator): add internal queue tail test
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Nov 30, 2024
1 parent 20639d4 commit 6be90f0
Showing 1 changed file with 93 additions and 0 deletions.
93 changes: 93 additions & 0 deletions collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,96 @@ fn create_dump_msg_envelope(message: Lazy<OwnedMessage>) -> Lazy<MsgEnvelope> {
})
.unwrap()
}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_queue_tail() -> anyhow::Result<()> {
let (storage, _tmp_dir) = prepare_test_storage().await.unwrap();

let queue_factory = QueueFactoryStdImpl {
uncommitted_state_factory: UncommittedStateImplFactory {
storage: storage.clone(),
},
committed_state_factory: CommittedStateImplFactory { storage },
config: QueueConfig {
gc_interval: Duration::from_secs(1),
},
};

let queue: QueueImpl<UncommittedStateStdImpl, CommittedStateStdImpl, StoredObject> =
queue_factory.create();
let block_mc1 = BlockIdShort {
shard: ShardIdent::new_full(-1),
seqno: 0,
};

let block_mc2 = BlockIdShort {
shard: ShardIdent::new_full(-1),
seqno: 1,
};
let mut diff_mc1 = QueueDiffWithMessages::new();
let mut diff_mc2 = QueueDiffWithMessages::new();

let stored_objects = vec![
create_stored_object(
1,
"-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732",
)?,
create_stored_object(
2,
"-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732",
)?,
create_stored_object(
3,
"0:7d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732",
)?,
create_stored_object(
4,
"-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732",
)?,
create_stored_object(
5,
"-1:6d6e566da0b322193d90020ff65b9b9e91582c953ed587ffd281d8344a7d5732",
)?,
];

if let Some(stored_object) = stored_objects.iter().next() {
diff_mc1
.messages
.insert(stored_object.key(), stored_object.clone());
}

for stored_object in &stored_objects {
diff_mc2
.messages
.insert(stored_object.key(), stored_object.clone());
}

let end_key_mc1 = *diff_mc1.messages.iter().last().unwrap().0;
let end_key_mc2 = *diff_mc2.messages.iter().last().unwrap().0;

// apply two diffs
queue.apply_diff(diff_mc1, block_mc1, &HashBytes::from([1; 32]), end_key_mc1)?;
queue.apply_diff(diff_mc2, block_mc2, &HashBytes::from([2; 32]), end_key_mc2)?;

let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1));

assert_eq!(diff_len_mc, 2);

// commit first diff
queue.commit_diff(&[(block_mc1, true)])?;
let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1));

assert_eq!(diff_len_mc, 2);

// trim first diff
queue.trim_diffs(&ShardIdent::new_full(-1), &end_key_mc1)?;
let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1));
assert_eq!(diff_len_mc, 1);

// clear uncommitted state with second diff
queue.clear_uncommitted_state()?;
let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1));
assert_eq!(diff_len_mc, 0);

Ok(())
}

0 comments on commit 6be90f0

Please sign in to comment.