forked from erigontech/erigon
-
Notifications
You must be signed in to change notification settings - Fork 8
/
stage_blockhashes.go
127 lines (114 loc) · 2.77 KB
/
stage_blockhashes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package stagedsync
import (
"context"
"encoding/binary"
"fmt"
"github.com/ledgerwatch/erigon-lib/chain"
libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/etl"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon/common/dbutils"
"github.com/ledgerwatch/erigon/eth/stagedsync/stages"
)
func extractHeaders(k []byte, v []byte, next etl.ExtractNextFunc) error {
// We only want to extract entries composed by Block Number + Header Hash
if len(k) != 40 {
return nil
}
return next(k, libcommon.Copy(k[8:]), libcommon.Copy(k[:8]))
}
type BlockHashesCfg struct {
db kv.RwDB
tmpDir string
cc *chain.Config
}
func StageBlockHashesCfg(db kv.RwDB, tmpDir string, cc *chain.Config) BlockHashesCfg {
return BlockHashesCfg{
db: db,
tmpDir: tmpDir,
cc: cc,
}
}
func SpawnBlockHashStage(s *StageState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
quit := ctx.Done()
headNumber, err := stages.GetStageProgress(tx, stages.Headers)
if err != nil {
return fmt.Errorf("getting headers progress: %w", err)
}
if s.BlockNumber == headNumber {
return nil
}
startKey := make([]byte, 8)
binary.BigEndian.PutUint64(startKey, s.BlockNumber)
endKey := dbutils.HeaderKey(headNumber+1, libcommon.Hash{}) // etl.Tranform uses ExractEndKey as exclusive bound, therefore +1
//todo do we need non canonical headers ?
logPrefix := s.LogPrefix()
if err := etl.Transform(
logPrefix,
tx,
kv.Headers,
kv.HeaderNumber,
cfg.tmpDir,
extractHeaders,
etl.IdentityLoadFunc,
etl.TransformArgs{
ExtractStartKey: startKey,
ExtractEndKey: endKey,
Quit: quit,
},
); err != nil {
return err
}
if err = s.Update(tx, headNumber); err != nil {
return err
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}
func UnwindBlockHashStage(u *UnwindState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if err = u.Done(tx); err != nil {
return fmt.Errorf(" reset: %w", err)
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to write db commit: %w", err)
}
}
return nil
}
func PruneBlockHashStage(p *PruneState, tx kv.RwTx, cfg BlockHashesCfg, ctx context.Context) (err error) {
useExternalTx := tx != nil
if !useExternalTx {
tx, err = cfg.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
if !useExternalTx {
if err = tx.Commit(); err != nil {
return err
}
}
return nil
}