Skip to content
This repository has been archived by the owner on Feb 21, 2024. It is now read-only.

Commit

Permalink
Introduce FlatPieces for more efficient work with memory, refactor …
Browse files Browse the repository at this point in the history
…archiver to use it, refactor plotter to use batch plotting
  • Loading branch information
nazar-pc committed Nov 29, 2021
1 parent 4ff1024 commit 26990db
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 120 deletions.
11 changes: 6 additions & 5 deletions crates/sc-consensus-subspace/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ use std::sync::Arc;
use std::{cell::RefCell, task::Poll, time::Duration};
use subspace_archiving::archiver::Archiver;
use subspace_core_primitives::objects::BlockObjectMapping;
use subspace_core_primitives::{LocalChallenge, Piece, Signature, Tag};
use subspace_core_primitives::{FlatPieces, LocalChallenge, Piece, Signature, Tag, PIECE_SIZE};
use subspace_solving::{SubspaceCodec, SOLUTION_SIGNING_CONTEXT};
use substrate_test_runtime::{Block as TestBlock, Hash};

Expand Down Expand Up @@ -465,7 +465,7 @@ fn rejects_empty_block() {
})
}

fn get_archived_pieces(client: &TestClient) -> Vec<Piece> {
fn get_archived_pieces(client: &TestClient) -> Vec<FlatPieces> {
let genesis_block_id = BlockId::Number(Zero::zero());
let runtime_api = client.runtime_api();

Expand All @@ -481,7 +481,7 @@ fn get_archived_pieces(client: &TestClient) -> Vec<Piece> {
archiver
.add_block(genesis_block.encode(), BlockObjectMapping::default())
.into_iter()
.flat_map(|archived_segment| archived_segment.pieces.into_iter())
.map(|archived_segment| archived_segment.pieces)
.collect()
}

Expand Down Expand Up @@ -599,10 +599,11 @@ fn run_one_test(mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + 'static
let (piece_index, mut encoding) = archived_pieces_receiver
.await
.unwrap()
.into_iter()
.iter()
.flat_map(|flat_pieces| flat_pieces.chunks_exact(PIECE_SIZE))
.enumerate()
.choose(&mut rand::thread_rng())
.map(|(piece_index, piece)| (piece_index as u64, piece))
.map(|(piece_index, piece)| (piece_index as u64, Piece::try_from(piece).unwrap()))
.unwrap();
subspace_solving.encode(&mut encoding, piece_index).unwrap();

Expand Down
79 changes: 39 additions & 40 deletions crates/subspace-archiving/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

use crate::merkle_tree::{MerkleTree, Witness};
use crate::utils;
use crate::utils::{Gf16Element, GF_16_ELEMENT_BYTES};
use parity_scale_codec::{Compact, CompactLen, Decode, Encode};
use reed_solomon_erasure::galois_16::ReedSolomon;
use serde::{Deserialize, Serialize};
Expand All @@ -27,8 +28,8 @@ use subspace_core_primitives::objects::{
BlockObject, BlockObjectMapping, PieceObject, PieceObjectMapping,
};
use subspace_core_primitives::{
crypto, ArchivedBlockProgress, LastArchivedBlock, Piece, RootBlock, Sha256Hash, PIECE_SIZE,
SHA256_HASH_SIZE,
crypto, ArchivedBlockProgress, FlatPieces, LastArchivedBlock, RootBlock, Sha256Hash,
PIECE_SIZE, SHA256_HASH_SIZE,
};
use thiserror::Error;

Expand Down Expand Up @@ -110,7 +111,7 @@ pub struct ArchivedSegment {
/// Root block of the segment
pub root_block: RootBlock,
/// Pieces that correspond to this segment
pub pieces: Vec<Piece>,
pub pieces: FlatPieces,
/// Mappings for objects stored in corresponding pieces.
///
/// NOTE: Only first half (data pieces) will have corresponding mapping item in this `Vec`.
Expand Down Expand Up @@ -599,15 +600,15 @@ impl Archiver {
segment
};

let data_shards: Vec<Vec<[u8; 2]>> = segment
let data_shards: Vec<Vec<Gf16Element>> = segment
.chunks_exact(self.record_size)
.map(utils::slice_to_arrays)
.collect();

drop(segment);

let mut parity_shards: Vec<Vec<[u8; 2]>> =
iter::repeat(vec![[0u8; 2]; self.record_size / 2])
let mut parity_shards: Vec<Vec<Gf16Element>> =
iter::repeat(vec![Gf16Element::default(); self.record_size / 2])
.take(data_shards.len())
.collect();

Expand All @@ -616,49 +617,45 @@ impl Archiver {
.encode_sep(&data_shards, &mut parity_shards)
.expect("Encoding is running with fixed parameters and should never fail");

// Combine data and parity records back into vectors for further processing
let records: Vec<Vec<u8>> = data_shards
.into_iter()
.chain(parity_shards)
.map(|shard| {
let mut record = Vec::with_capacity(self.record_size);

for chunk in shard {
record.extend_from_slice(&chunk);
}

record
let mut pieces = vec![0u8; (data_shards.len() + parity_shards.len()) * PIECE_SIZE];
// Combine data and parity records back into flat vector of pieces (witness part of piece is
// still zeroes after this and is filled later)
pieces
.chunks_exact_mut(PIECE_SIZE)
.zip(data_shards.into_iter().chain(parity_shards))
.flat_map(|(piece, shard)| {
piece
.chunks_exact_mut(GF_16_ELEMENT_BYTES)
.zip(shard.into_iter())
})
.collect();

// Build a Merkle tree over data and parity records
let merkle_tree = MerkleTree::from_data(records.iter());

// Take records, combine them with witnesses (Merkle proofs) to produce data and parity
// pieces
let pieces: Vec<Piece> = records
.into_iter()
.for_each(|(piece_chunk, shard_chunk)| {
piece_chunk
.as_mut()
.write_all(&shard_chunk)
.expect("Both source and target are exactly the same size; qed");
});

// Build a Merkle tree over all records
let merkle_tree = MerkleTree::from_data(
pieces
.chunks_exact(PIECE_SIZE)
.map(|piece| &piece[..self.record_size]),
);

// Fill witnesses (Merkle proofs) in pieces created earlier
pieces
.chunks_exact_mut(PIECE_SIZE)
.enumerate()
.map(|(position, record)| {
.for_each(|(position, piece)| {
let witness = merkle_tree
.get_witness(position)
.expect("We use the same indexes as during Merkle tree creation; qed");
let mut piece = Piece::default();

piece.as_mut().write_all(&record).expect(
"With correct archiver parameters record is always smaller than \
piece size; qed",
);
drop(record);

(&mut piece[self.record_size..]).write_all(&witness).expect(
"Parameters are verified in the archiver constructor to make sure this \
never happens; qed",
);

piece
})
.collect();
});

// Now produce root block
let root_block = RootBlock::V0 {
Expand All @@ -678,7 +675,9 @@ impl Archiver {

ArchivedSegment {
root_block,
pieces,
pieces: pieces
.try_into()
.expect("Pieces length is correct as created above; qed"),
object_mapping,
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/subspace-archiving/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use reed_solomon_erasure::galois_16::Field as Galois16Field;
use reed_solomon_erasure::Field;
use std::mem;

type Elem = <Galois16Field as Field>::Elem;
const ELEM_BYTES: usize = mem::size_of::<Elem>();
pub(crate) type Gf16Element = <Galois16Field as Field>::Elem;
pub(crate) const GF_16_ELEMENT_BYTES: usize = mem::size_of::<Gf16Element>();

/// Convert slice to a vector of arrays for `reed_solomon_erasure` library.
pub(crate) fn slice_to_arrays<S: AsRef<[u8]> + ?Sized>(slice: &S) -> Vec<Elem> {
pub(crate) fn slice_to_arrays<S: AsRef<[u8]> + ?Sized>(slice: &S) -> Vec<Gf16Element> {
slice
.as_ref()
.chunks_exact(ELEM_BYTES)
.chunks_exact(GF_16_ELEMENT_BYTES)
.map(|s| s.try_into().unwrap())
.collect()
}
18 changes: 11 additions & 7 deletions crates/subspace-archiving/tests/integration/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn archiver() {
assert_eq!(archived_segments.len(), 1);

let first_archived_segment = archived_segments.into_iter().next().unwrap();
assert_eq!(first_archived_segment.pieces.len(), MERKLE_NUM_LEAVES);
assert_eq!(first_archived_segment.pieces.count(), MERKLE_NUM_LEAVES);
assert_eq!(first_archived_segment.root_block.segment_index(), 0);
assert_eq!(
first_archived_segment.root_block.prev_root_block_hash(),
Expand All @@ -130,7 +130,7 @@ fn archiver() {
.chain(iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects.iter()));
let piece_objects = first_archived_segment
.pieces
.iter()
.chunks_exact(PIECE_SIZE)
.zip(&first_archived_segment.object_mapping)
.flat_map(|(piece, object_mapping)| {
iter::repeat(piece.as_ref()).zip(&object_mapping.objects)
Expand All @@ -140,7 +140,11 @@ fn archiver() {
}

// Check that all pieces are valid
for (position, piece) in first_archived_segment.pieces.iter().enumerate() {
for (position, piece) in first_archived_segment
.pieces
.chunks_exact(PIECE_SIZE)
.enumerate()
{
assert!(archiver::is_piece_valid(
piece,
first_archived_segment.root_block.records_root(),
Expand Down Expand Up @@ -189,7 +193,7 @@ fn archiver() {
iter::repeat(block_1.as_ref()).zip(block_1_object_mapping.objects.iter().skip(2));
let piece_objects = archived_segments[0]
.pieces
.iter()
.chunks_exact(PIECE_SIZE)
.zip(&archived_segments[0].object_mapping)
.flat_map(|(piece, object_mapping)| {
iter::repeat(piece.as_ref()).zip(&object_mapping.objects)
Expand Down Expand Up @@ -217,7 +221,7 @@ fn archiver() {
let mut previous_root_block_hash = first_archived_segment.root_block.hash();
let last_root_block = archived_segments.iter().last().unwrap().root_block;
for archived_segment in archived_segments {
assert_eq!(archived_segment.pieces.len(), MERKLE_NUM_LEAVES);
assert_eq!(archived_segment.pieces.count(), MERKLE_NUM_LEAVES);
assert_eq!(
archived_segment.root_block.segment_index(),
expected_segment_index
Expand All @@ -227,7 +231,7 @@ fn archiver() {
previous_root_block_hash
);

for (position, piece) in archived_segment.pieces.iter().enumerate() {
for (position, piece) in archived_segment.pieces.chunks_exact(PIECE_SIZE).enumerate() {
assert!(archiver::is_piece_valid(
piece,
archived_segment.root_block.records_root(),
Expand Down Expand Up @@ -270,7 +274,7 @@ fn archiver() {
assert_eq!(last_archived_block.number, 3);
assert_eq!(last_archived_block.partial_archived(), None);

for (position, piece) in archived_segment.pieces.iter().enumerate() {
for (position, piece) in archived_segment.pieces.chunks_exact(PIECE_SIZE).enumerate() {
assert!(archiver::is_piece_valid(
piece,
archived_segment.root_block.records_root(),
Expand Down
Loading

0 comments on commit 26990db

Please sign in to comment.