-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
headers.rs
130 lines (115 loc) · 4.52 KB
/
headers.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use crate::segments::{dataset_for_compression, prepare_jar, Segment, SegmentHeader};
use reth_db::{
cursor::DbCursorRO, database::Database, snapshot::create_snapshot_T1_T2_T3, tables,
transaction::DbTx, RawKey, RawTable,
};
use reth_interfaces::provider::ProviderResult;
use reth_primitives::{snapshot::SegmentConfig, BlockNumber, SnapshotSegment};
use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
DatabaseProviderRO,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data.
#[derive(Debug, Default)]
pub struct Headers;
impl<DB: Database> Segment<DB> for Headers {
fn segment(&self) -> SnapshotSegment {
SnapshotSegment::Headers
}
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
snapshot_provider.get_writer(*block_range.start(), SnapshotSegment::Headers)?;
let mut headers_cursor = provider.tx_ref().cursor_read::<tables::Headers>()?;
let headers_walker = headers_cursor.walk_range(block_range.clone())?;
let mut header_td_cursor = provider.tx_ref().cursor_read::<tables::HeaderTD>()?;
let header_td_walker = header_td_cursor.walk_range(block_range.clone())?;
let mut canonical_headers_cursor =
provider.tx_ref().cursor_read::<tables::CanonicalHeaders>()?;
let canonical_headers_walker = canonical_headers_cursor.walk_range(block_range)?;
for ((header_entry, header_td_entry), canonical_header_entry) in
headers_walker.zip(header_td_walker).zip(canonical_headers_walker)
{
let (header_block, header) = header_entry?;
let (header_td_block, header_td) = header_td_entry?;
let (canonical_header_block, canonical_header) = canonical_header_entry?;
debug_assert_eq!(header_block, header_td_block);
debug_assert_eq!(header_td_block, canonical_header_block);
if header_block > 0 {
let _snapshot_block = snapshot_writer.increment_block(SnapshotSegment::Headers)?;
debug_assert_eq!(_snapshot_block, header_block);
}
snapshot_writer.append_header(header, header_td.0, canonical_header)?;
}
Ok(())
}
fn create_snapshot_file(
&self,
provider: &DatabaseProviderRO<DB>,
directory: &Path,
config: SegmentConfig,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let range_len = block_range.clone().count();
let mut jar = prepare_jar::<DB, 3>(
provider,
directory,
SnapshotSegment::Headers,
config,
block_range.clone(),
range_len,
|| {
Ok([
dataset_for_compression::<DB, tables::Headers>(
provider,
&block_range,
range_len,
)?,
dataset_for_compression::<DB, tables::HeaderTD>(
provider,
&block_range,
range_len,
)?,
dataset_for_compression::<DB, tables::CanonicalHeaders>(
provider,
&block_range,
range_len,
)?,
])
},
)?;
// Generate list of hashes for filters & PHF
let mut cursor = provider.tx_ref().cursor_read::<RawTable<tables::CanonicalHeaders>>()?;
let mut hashes = None;
if config.filters.has_filters() {
hashes = Some(
cursor
.walk(Some(RawKey::from(*block_range.start())))?
.take(range_len)
.map(|row| row.map(|(_key, value)| value.into_value()).map_err(|e| e.into())),
);
}
create_snapshot_T1_T2_T3::<
tables::Headers,
tables::HeaderTD,
tables::CanonicalHeaders,
BlockNumber,
SegmentHeader,
>(
provider.tx_ref(),
block_range,
None,
// We already prepared the dictionary beforehand
None::<Vec<std::vec::IntoIter<Vec<u8>>>>,
hashes,
range_len,
&mut jar,
)?;
Ok(())
}
}