Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Merge pull request paritytech#152 from subspace/archive-reconstructor
Browse files Browse the repository at this point in the history
Add archive reconstructor for archived segments
  • Loading branch information
nazar-pc authored Nov 25, 2021
2 parents 5cab601 + e5d8fde commit 1ff36db
Show file tree
Hide file tree
Showing 7 changed files with 633 additions and 9 deletions.
10 changes: 2 additions & 8 deletions crates/subspace-archiving/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

use crate::merkle_tree::{MerkleTree, Witness};
use crate::utils;
use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
use reed_solomon_erasure::galois_16::ReedSolomon;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -598,16 +599,9 @@ impl Archiver {
segment
};

fn slice_to_arrays(slice: &[u8]) -> Vec<[u8; 2]> {
slice
.chunks_exact(2)
.map(|s| s.try_into().unwrap())
.collect()
}

let data_shards: Vec<Vec<[u8; 2]>> = segment
.chunks_exact(self.record_size)
.map(slice_to_arrays)
.map(utils::slice_to_arrays)
.collect();

drop(segment);
Expand Down
4 changes: 4 additions & 0 deletions crates/subspace-archiving/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@
pub mod archiver;
#[cfg(feature = "std")]
pub mod merkle_tree;
#[cfg(feature = "std")]
pub mod reconstructor;
#[cfg(feature = "std")]
mod utils;
246 changes: 246 additions & 0 deletions crates/subspace-archiving/src/reconstructor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
use crate::archiver::{Segment, SegmentItem};
use crate::utils;
use parity_scale_codec::Decode;
use reed_solomon_erasure::galois_16::ReedSolomon;
use std::mem;
use subspace_core_primitives::{
ArchivedBlockProgress, LastArchivedBlock, Piece, RootBlock, PIECE_SIZE, SHA256_HASH_SIZE,
};
use thiserror::Error;

/// Reconstructor-related instantiation error.
#[derive(Debug, Error, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum ReconstructorInstantiationError {
/// Segment size is not bigger than record size
#[error("Segment size is not bigger than record size")]
SegmentSizeTooSmall,
/// Segment size is not a multiple of record size
#[error("Segment size is not a multiple of record size")]
SegmentSizesNotMultipleOfRecordSize,
/// Wrong record and segment size, it will not be possible to produce pieces
#[error("Wrong record and segment size, it will not be possible to produce pieces")]
WrongRecordAndSegmentCombination,
}

/// Reconstructor-related instantiation error
#[derive(Debug, Error, Clone, PartialEq)]
pub enum ReconstructorError {
/// Segment size is not bigger than record size
#[error("Error during data shards reconstruction: {0}")]
DataShardsReconstruction(reed_solomon_erasure::Error),
/// Segment size is not bigger than record size
#[error("Error during segment decoding: {0}")]
SegmentDecoding(parity_scale_codec::Error),
/// Incorrect segment order, each next segment must have monotonically increasing segment index
#[error("Incorrect segment order, expected index {expected_segment_index}, actual {actual_segment_index}")]
IncorrectSegmentOrder {
expected_segment_index: u64,
actual_segment_index: u64,
},
}

/// Data structure that contains information reconstructed from given segment (potentially using
/// information from segments that were added previously)
#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct ReconstructedContents {
/// Root block stored in a segment
pub root_block: Option<RootBlock>,
/// Reconstructed encoded blocks with their block numbers
pub blocks: Vec<(u32, Vec<u8>)>,
}

/// Reconstructor helps to retrieve blocks from archived pieces.
#[derive(Debug)]
pub struct Reconstructor {
/// Configuration parameter defining the size of one record (data in one piece excluding witness
/// size)
record_size: usize,
/// Configuration parameter defining the size of one recorded history segment
segment_size: usize,
/// Erasure coding data structure
reed_solomon: ReedSolomon,
/// Index of last segment added to reconstructor
last_segment_index: Option<u64>,
/// Partially reconstructed block waiting for more data
partial_block: Option<Vec<u8>>,
}

impl Reconstructor {
pub fn new(
record_size: usize,
segment_size: usize,
) -> Result<Self, ReconstructorInstantiationError> {
if segment_size <= record_size {
return Err(ReconstructorInstantiationError::SegmentSizeTooSmall);
}
if segment_size % record_size != 0 {
return Err(ReconstructorInstantiationError::SegmentSizesNotMultipleOfRecordSize);
}

// We take N data records and will creates the same number of parity records, hence `*2`
let merkle_num_leaves = segment_size / record_size * 2;
let witness_size = SHA256_HASH_SIZE * merkle_num_leaves.log2() as usize;
if record_size + witness_size != PIECE_SIZE {
return Err(ReconstructorInstantiationError::WrongRecordAndSegmentCombination);
}

let shards = segment_size / record_size;
let reed_solomon = ReedSolomon::new(shards, shards)
.expect("ReedSolomon should always be correctly instantiated");

Ok(Self {
record_size,
segment_size,
reed_solomon,
last_segment_index: None,
partial_block: None,
})
}

/// Given a set of pieces of a segment of the archived history (any half of all pieces are
/// required to be present, the rest will be recovered automatically due to use of erasure
/// coding if needed), reconstructs and returns root block and a list of encoded blocks with
/// corresponding block numbers.
///
/// It is possible to start with any segment, but when next segment is pushed, it needs to
/// follow the previous one or else error will be returned.
pub fn add_segment(
&mut self,
segment_pieces: &[Option<Piece>],
) -> Result<ReconstructedContents, ReconstructorError> {
let mut segment_data = Vec::with_capacity(self.segment_size);
if !segment_pieces
.iter()
.take(self.reed_solomon.data_shard_count())
.all(|maybe_piece| {
if let Some(piece) = maybe_piece {
segment_data.extend_from_slice(&piece[..self.record_size]);
true
} else {
false
}
})
{
// If not all data pieces are available, need to reconstruct data shards using erasure
// coding.
let mut shards = segment_pieces
.iter()
.map(|maybe_piece| maybe_piece.as_ref().map(utils::slice_to_arrays))
.collect::<Vec<_>>();

self.reed_solomon
.reconstruct_data(&mut shards)
.map_err(ReconstructorError::DataShardsReconstruction)?;

segment_data.clear();
shards
.into_iter()
.take(self.reed_solomon.data_shard_count())
.for_each(|maybe_piece| {
let piece = maybe_piece.expect(
"All data shards are available after successful reconstruction; qed",
);

for chunk in piece.iter().take(self.record_size / 2) {
segment_data.extend_from_slice(chunk.as_ref());
}
});
}

let Segment::V0 { items } = Segment::decode(&mut segment_data.as_ref())
.map_err(ReconstructorError::SegmentDecoding)?;

let mut reconstructed_contents = ReconstructedContents::default();
let mut next_block_number = 0;
let mut partial_block = self.partial_block.take().unwrap_or_default();

for segment_item in items {
match segment_item {
SegmentItem::Block { bytes, .. } => {
if !partial_block.is_empty() {
reconstructed_contents
.blocks
.push((next_block_number, mem::take(&mut partial_block)));

next_block_number += 1;
}

reconstructed_contents
.blocks
.push((next_block_number, bytes));

next_block_number += 1;
}
SegmentItem::BlockStart { bytes, .. } => {
if !partial_block.is_empty() {
reconstructed_contents
.blocks
.push((next_block_number, mem::take(&mut partial_block)));

next_block_number += 1;
}

partial_block = bytes;
}
SegmentItem::BlockContinuation { bytes, .. } => {
if partial_block.is_empty() {
// This is continuation from previous segment, we don't have the beginning
// of the block to continue.
continue;
}

partial_block.extend_from_slice(&bytes);
}
SegmentItem::RootBlock(root_block) => {
let segment_index = root_block.segment_index();

if let Some(last_segment_index) = self.last_segment_index {
if last_segment_index != segment_index {
return Err(ReconstructorError::IncorrectSegmentOrder {
expected_segment_index: last_segment_index + 1,
actual_segment_index: segment_index + 1,
});
}
}

self.last_segment_index.replace(segment_index + 1);

let LastArchivedBlock {
number,
archived_progress,
} = root_block.last_archived_block();

reconstructed_contents.root_block.replace(root_block);

match archived_progress {
ArchivedBlockProgress::Complete => {
reconstructed_contents
.blocks
.push((next_block_number, mem::take(&mut partial_block)));

next_block_number = number + 1;
}
ArchivedBlockProgress::Partial(_bytes) => {
next_block_number = number;

if partial_block.is_empty() {
// Will not be able to recover full block, bump right away.
next_block_number += 1;
}
}
}
}
}
}

if !partial_block.is_empty() {
self.partial_block.replace(partial_block);
}

if self.last_segment_index.is_none() {
self.last_segment_index.replace(0);
}

Ok(reconstructed_contents)
}
}
15 changes: 15 additions & 0 deletions crates/subspace-archiving/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use reed_solomon_erasure::galois_16::Field as Galois16Field;
use reed_solomon_erasure::Field;
use std::mem;

type Elem = <Galois16Field as Field>::Elem;
const ELEM_BYTES: usize = mem::size_of::<Elem>();

/// Convert slice to a vector of arrays for `reed_solomon_erasure` library.
pub(crate) fn slice_to_arrays<S: AsRef<[u8]> + ?Sized>(slice: &S) -> Vec<Elem> {
slice
.as_ref()
.chunks_exact(ELEM_BYTES)
.map(|s| s.try_into().unwrap())
.collect()
}
2 changes: 1 addition & 1 deletion crates/subspace-archiving/tests/integration/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ fn archiver() {
}

#[test]
fn archiver_invalid_usage() {
fn invalid_usage() {
assert_matches!(
Archiver::new(5, SEGMENT_SIZE),
Err(ArchiverInstantiationError::RecordSizeTooSmall),
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-archiving/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

mod archiver;
mod merkle_tree;
mod reconstructor;
Loading

0 comments on commit 1ff36db

Please sign in to comment.