forked from erigontech/erigon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stage_blockhashes.go
128 lines (115 loc) · 2.9 KB
/
stage_blockhashes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package stagedsync
import (
"context"
"encoding/binary"
"fmt"
"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/common/etl"
"github.com/ledgerwatch/erigon/core/rawdb"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
"github.com/ledgerwatch/erigon/ethdb"
)
func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error {
// We only want to extract entries composed by Block Number + Header Hash
if len(k) != 40 {
return nil
}
return next(k, common.CopyBytes(k[8:]), common.CopyBytes(k[:8]))
}
type BlockHashesCfg struct {
db ethdb.RwKV
tmpDir string
}
func StageBlockHashesCfg(db ethdb.RwKV, tmpDir string) BlockHashesCfg {
return BlockHashesCfg{
db: db,
tmpDir: tmpDir,
}
}
func SpawnBlockHashStage(s *StageState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
quit := ctx.Done()
headNumber, err := stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return fmt.Errorf("getting headers progress: %w", err)
}
headHash := rawdb.ReadHeaderByNumber(tx, headNumber).Hash()
if s.BlockNumber == headNumber {
return nil
}
startKey := make([]byte, 8)
binary.BigEndian.PutUint64(startKey, s.BlockNumber)
endKey := dbutils.HeaderKey(headNumber, headHash) // Make sure we stop at head
//todo do we need non canonical headers ?
logPrefix := s.LogPrefix()
if err := etl.Transform(
logPrefix,
tx,
dbutils.HeadersBucket,
dbutils.HeaderNumberBucket,
cfg.tmpDir,
extractHeaders,
etl.IdentityLoadFunc,
etl.TransformArgs{
ExtractStartKey: startKey,
ExtractEndKey: endKey,
Quit: quit,
},
); err != nil {
return err
}
if err = s.Update(tx, headNumber); err != nil {
return err
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}
func UnwindBlockHashStage(u *UnwindState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
logPrefix := u.LogPrefix()
if err = u.Done(tx); err != nil {
return fmt.Errorf("%s: reset: %v", logPrefix, err)
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err)
}
}
return nil
}
func PruneBlockHashStage(p *PruneState, tx ethdb.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
logPrefix := p.LogPrefix()
if !useExternalTx {
if err = tx.Commit(); err != nil {
return fmt.Errorf("%s: failed to write db commit: %v", logPrefix, err)
}
}
return nil
}