Skip to content

Commit

Permalink
Merge pull request #590 from subspace/background-recommitment
Browse files Browse the repository at this point in the history
Make recommitment not block other operations
  • Loading branch information
nazar-pc authored Jun 13, 2022
2 parents f49009f + 2029fe4 commit 3c3a48f
Showing 1 changed file with 67 additions and 54 deletions.
121 changes: 67 additions & 54 deletions crates/subspace-farmer/src/commitments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,73 +91,78 @@ impl Commitments {

/// Create commitments for all pieces for a given salt
pub fn create(&self, salt: Salt, plot: Plot) -> Result<(), CommitmentError> {
let mut commitment_databases = self.inner.commitment_databases.lock();

let db_entry = match commitment_databases.create_db_entry(salt)? {
Some(CreateDbEntryResult {
db_entry,
removed_entry_salt,
}) => {
if let Some(salt) = removed_entry_salt {
{
let mut commitment_databases = self.inner.commitment_databases.lock();

let db_entry = match commitment_databases.create_db_entry(salt)? {
Some(CreateDbEntryResult {
db_entry,
removed_entry_salt,
}) => {
if let Some(salt) = removed_entry_salt {
self.inner
.handlers
.status_change
.call_simple(&CommitmentStatusChange::Removed { salt });
}
self.inner
.handlers
.status_change
.call_simple(&CommitmentStatusChange::Removed { salt });
.call_simple(&CommitmentStatusChange::Creating { salt });
db_entry
}
self.inner
.handlers
.status_change
.call_simple(&CommitmentStatusChange::Creating { salt });
db_entry
}
None => {
return Ok(());
}
};
let (current, next) = commitment_databases.get_db_entries();
self.inner.current.swap(current);
self.inner.next.swap(next);

let mut db_guard = db_entry.lock();
// Release lock to allow working with other databases, but hold lock for `db_entry.db` such
// that nothing else can modify it.
drop(commitment_databases);

let db_path = self.inner.base_directory.join(hex::encode(salt));

let db = {
let db = DB::open_default(db_path).map_err(CommitmentError::CommitmentDb)?;
let piece_count = plot.piece_count();
for batch_start in (0..piece_count).step_by(BATCH_SIZE as usize) {
let pieces_to_process = (batch_start + BATCH_SIZE).min(piece_count) - batch_start;
// TODO: Read next batch while creating tags for the previous one for faster
// recommitment.
let pieces = plot
.read_pieces(batch_start, pieces_to_process)
.map_err(CommitmentError::Plot)?;

let tags: Vec<Tag> = pieces
.par_chunks_exact(PIECE_SIZE)
.map(|piece| create_tag(piece, salt))
.collect();
None => {
return Ok(());
}
};
let (current, next) = commitment_databases.get_db_entries();
self.inner.current.swap(current);
self.inner.next.swap(next);

let db_path = self.inner.base_directory.join(hex::encode(salt));
db_entry.lock().replace(Arc::new(
DB::open_default(db_path).map_err(CommitmentError::CommitmentDb)?,
));
}

let piece_count = plot.piece_count();
for batch_start in (0..piece_count).step_by(BATCH_SIZE as usize) {
let pieces_to_process = (batch_start + BATCH_SIZE).min(piece_count) - batch_start;
// TODO: Read next batch while creating tags for the previous one for faster
// recommitment.
let pieces = plot
.read_pieces(batch_start, pieces_to_process)
.map_err(CommitmentError::Plot)?;

let tags: Vec<Tag> = pieces
.par_chunks_exact(PIECE_SIZE)
.map(|piece| create_tag(piece, salt))
.collect();

let db_entry = match self.get_db_entry(salt) {
Some(db_entry) => db_entry,
None => {
// Database was already removed, no need to continue
break;
}
};

let db_guard = db_entry.lock();

if let Some(db) = db_guard.as_ref() {
for (tag, offset) in tags.iter().zip(batch_start..) {
db.put(tag, offset.to_le_bytes())
.map_err(CommitmentError::CommitmentDb)?;
}
} else {
// Database was already removed, no need to continue
break;
}

db
};

db_guard.replace(Arc::new(db));
// Drop guard because locks need to be taken in a specific order or else will result in a
// deadlock
drop(db_guard);
}

let mut commitment_databases = self.inner.commitment_databases.lock();

// Check if database was already been removed
// Check if database was already removed
if commitment_databases
.get_db_entry(&salt)
.map(|db_entry| db_entry.lock().is_some())
Expand All @@ -180,6 +185,10 @@ impl Commitments {
}

pub(crate) fn remove_pieces(&self, pieces: &[Piece]) -> Result<(), CommitmentError> {
if pieces.is_empty() {
return Ok(());
}

for db_entry in self.get_db_entries() {
let salt = db_entry.salt();
let db_guard = db_entry.lock();
Expand All @@ -204,6 +213,10 @@ impl Commitments {
F: Fn() -> Iter,
Iter: Iterator<Item = (PieceOffset, &'iter [u8])>,
{
if pieces_with_offsets().next().is_none() {
return Ok(());
}

for db_entry in self.get_db_entries() {
let salt = db_entry.salt();
let db_guard = db_entry.lock();
Expand Down

0 comments on commit 3c3a48f

Please sign in to comment.