Skip to content

Commit

Permalink
refactor(collator): improve internal queue commit performance
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Dec 9, 2024
1 parent 28996f3 commit 964a1a7
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 109 deletions.
7 changes: 7 additions & 0 deletions collator/src/queue_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,17 @@ impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdI
}

fn clear_uncommitted_state(&self) -> Result<()> {
tracing::trace!(target: tracing_targets::MQ_ADAPTER, "Clearing uncommitted state");
self.queue.clear_uncommitted_state()
}

fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> {
tracing::trace!(
target: tracing_targets::MQ_ADAPTER,
source_shard = %source_shard,
inclusive_until = %inclusive_until,
"Trimming diffs"
);
self.queue.trim_diffs(source_shard, inclusive_until)
}

Expand Down
2 changes: 1 addition & 1 deletion collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ async fn test_queue_tail() -> anyhow::Result<()> {
)?,
];

if let Some(stored_object) = stored_objects.iter().next() {
if let Some(stored_object) = stored_objects.first() {
diff_mc1
.messages
.insert(stored_object.key(), stored_object.clone());
Expand Down
204 changes: 96 additions & 108 deletions storage/src/store/internal_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,90 @@ impl InternalQueueStorage {
.await?
}

pub fn delete_messages(
&self,
source_shard: ShardIdent,
from: &QueueKey,
to: &QueueKey,
) -> Result<()> {
let start_key = ShardsInternalMessagesKey::new(source_shard, *from);
let end_key = ShardsInternalMessagesKey::new(source_shard, *to);

let shards_internal_messages_cf = self.db.shards_internal_messages.cf();

let mut batch = WriteBatch::default();

let start_key = start_key.to_vec();
let end_key = end_key.to_vec();

batch.delete_range_cf(&shards_internal_messages_cf, &start_key, &end_key);
batch.delete_cf(&shards_internal_messages_cf, &end_key);

self.db.rocksdb().write(batch)?;
self.db.rocksdb().compact_range_cf(
&shards_internal_messages_cf,
Some(&start_key),
Some(&end_key),
);

Ok(())
}

pub fn commit(&self, ranges: FastHashMap<ShardIdent, QueueKey>) -> Result<()> {
let snapshot = self.snapshot();

let mut batch = WriteBatch::default();

for range in ranges {
let from = ShardsInternalMessagesKey {
shard_ident: range.0,
internal_message_key: QueueKey::MIN,
};
let to = ShardsInternalMessagesKey {
shard_ident: range.0,
internal_message_key: range.1,
};

let mut readopts = self
.db
.shards_internal_messages_uncommitted
.new_read_config();
readopts.set_snapshot(&snapshot);

let internal_messages_uncommitted_cf =
self.db.shards_internal_messages_uncommitted.cf();
let internal_messages_cf = self.db.shards_internal_messages.cf();
let mut iter = self
.db
.rocksdb()
.raw_iterator_cf_opt(&internal_messages_uncommitted_cf, readopts);

iter.seek(from.to_vec().as_slice());

while iter.valid() {
let (mut key, value) = match (iter.key(), iter.value()) {
(Some(key), Some(value)) => (key, value),
_ => break,
};

let current_position = ShardsInternalMessagesKey::deserialize(&mut key);

if current_position > to || current_position < from {
break;
}
let current_position_vec = current_position.to_vec();
batch.delete_cf(&internal_messages_uncommitted_cf, &current_position_vec);
batch.put_cf(&internal_messages_cf, &current_position_vec, value);

iter.next();
}
}

self.db.rocksdb().write(batch)?;

Ok(())
}

pub fn snapshot(&self) -> OwnedSnapshot {
self.db.owned_snapshot()
}
Expand All @@ -102,19 +186,6 @@ impl InternalQueueStorage {
snapshot,
)
}

fn clear_queue(&self, cf: &BoundedCfHandle<'_>) -> Result<()> {
let start_key = [0x00; ShardsInternalMessagesKey::SIZE_HINT];
let end_key = [0xFF; ShardsInternalMessagesKey::SIZE_HINT];
self.db
.rocksdb()
.delete_range_cf(cf, &start_key, &end_key)?;
self.db
.rocksdb()
.compact_range_cf(cf, Some(start_key), Some(end_key));
Ok(())
}

pub fn clear_uncommitted_queue(&self) -> Result<()> {
let cf = self.db.shards_internal_messages_uncommitted.cf();
self.clear_queue(&cf)
Expand Down Expand Up @@ -145,6 +216,18 @@ impl InternalQueueStorage {
Self::insert_message(batch, cf, key, dest.workchain() as i8, dest.prefix(), value)
}

fn clear_queue(&self, cf: &BoundedCfHandle<'_>) -> Result<()> {
let start_key = [0x00; ShardsInternalMessagesKey::SIZE_HINT];
let end_key = [0xFF; ShardsInternalMessagesKey::SIZE_HINT];
self.db
.rocksdb()
.delete_range_cf(cf, &start_key, &end_key)?;
self.db
.rocksdb()
.compact_range_cf(cf, Some(start_key), Some(end_key));
Ok(())
}

fn build_iterator(
&self,
cf: BoundedCfHandle<'_>,
Expand Down Expand Up @@ -183,99 +266,4 @@ impl InternalQueueStorage {

Ok(())
}

pub fn delete_messages(
&self,
source_shard: ShardIdent,
from: &QueueKey,
to: &QueueKey,
) -> Result<()> {
let start_key = ShardsInternalMessagesKey::new(source_shard, *from);
let end_key = ShardsInternalMessagesKey::new(source_shard, *to);

let shards_internal_messages_cf = self.db.shards_internal_messages.cf();

let mut batch = WriteBatch::default();

let start_key = start_key.to_vec();
let end_key = end_key.to_vec();

batch.delete_range_cf(&shards_internal_messages_cf, &start_key, &end_key);
batch.delete_cf(&shards_internal_messages_cf, &end_key);

self.db.rocksdb().write(batch)?;
self.db.rocksdb().compact_range_cf(
&shards_internal_messages_cf,
Some(&start_key),
Some(&end_key),
);

Ok(())
}

pub fn commit(&self, ranges: FastHashMap<ShardIdent, QueueKey>) -> Result<()> {
let snapshot = self.snapshot();

let mut batch = WriteBatch::default();

for range in ranges {
let from = ShardsInternalMessagesKey {
shard_ident: range.0,
internal_message_key: QueueKey::MIN,
};
let to = ShardsInternalMessagesKey {
shard_ident: range.0,
internal_message_key: range.1,
};

let mut readopts = self
.db
.shards_internal_messages_uncommitted
.new_read_config();
readopts.set_snapshot(&snapshot);

let internal_messages_uncommitted_cf =
self.db.shards_internal_messages_uncommitted.cf();
let internal_messages_cf = self.db.shards_internal_messages.cf();
let mut iter = self
.db
.rocksdb()
.raw_iterator_cf_opt(&internal_messages_uncommitted_cf, readopts);

iter.seek(from.to_vec().as_slice());

while iter.valid() {
let (mut key, value) = match (iter.key(), iter.value()) {
(Some(key), Some(value)) => (key, value),
_ => break,
};

let current_position = ShardsInternalMessagesKey::deserialize(&mut key);

if current_position > to || current_position < from {
break;
}

let dest_workchain = value[0] as i8;
let dest_prefix = u64::from_be_bytes(value[1..9].try_into().unwrap());
let cell_bytes = &value[9..];

batch.delete_cf(&internal_messages_uncommitted_cf, current_position.to_vec());
Self::insert_message(
&mut batch,
internal_messages_cf,
current_position,
dest_workchain,
dest_prefix,
cell_bytes,
)?;

iter.next();
}
}

self.db.rocksdb().write(batch)?;

Ok(())
}
}

0 comments on commit 964a1a7

Please sign in to comment.