Skip to content

Commit

Permalink
catchpoints: Add onlineaccounts and onlineroundparamstail tables to s…
Browse files Browse the repository at this point in the history
…napshot files (#6177)

Co-authored-by: Pavel Zbitskiy <65323360+algorandskiy@users.noreply.github.com>
  • Loading branch information
cce and algorandskiy authored Dec 20, 2024
1 parent 2e043df commit a10b6b3
Show file tree
Hide file tree
Showing 35 changed files with 1,928 additions and 291 deletions.
4 changes: 2 additions & 2 deletions catchup/catchpointService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ func (m *catchpointCatchupAccessorMock) Ledger() (l ledger.CatchupAccessorClient
}

// GetVerifyData returns the balances hash, spver hash and totals used by VerifyCatchpoint
func (m *catchpointCatchupAccessorMock) GetVerifyData(ctx context.Context) (balancesHash crypto.Digest, spverHash crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
func (m *catchpointCatchupAccessorMock) GetVerifyData(ctx context.Context) (balancesHash, spverHash, onlineAccountsHash, onlineRoundParamsHash crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
}

// TestCatchpointServicePeerRank ensures CatchpointService does not crash when a block fetched
Expand Down
7 changes: 4 additions & 3 deletions cmd/catchpointdump/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,13 @@ func loadCatchpointIntoDatabase(ctx context.Context, catchupAccessor ledger.Catc
if err != nil {
return fileHeader, err
}
var balanceHash, spverHash crypto.Digest
balanceHash, spverHash, _, err = catchupAccessor.GetVerifyData(ctx)
var balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash crypto.Digest
balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash, _, err = catchupAccessor.GetVerifyData(ctx)
if err != nil {
return fileHeader, err
}
fmt.Printf("accounts digest=%s, spver digest=%s\n\n", balanceHash, spverHash)
fmt.Printf("accounts digest=%s, spver digest=%s, onlineaccounts digest=%s onlineroundparams digest=%s\n\n",
balanceHash, spverHash, onlineAccountsHash, onlineRoundParamsHash)
}
return fileHeader, nil
}
Expand Down
4 changes: 2 additions & 2 deletions components/mocks/mockCatchpointCatchupAccessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (m *MockCatchpointCatchupAccessor) GetCatchupBlockRound(ctx context.Context
}

// GetVerifyData returns the balances hash, spver hash and totals used by VerifyCatchpoint
func (m *MockCatchpointCatchupAccessor) GetVerifyData(ctx context.Context) (balancesHash crypto.Digest, spverHash crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
func (m *MockCatchpointCatchupAccessor) GetVerifyData(ctx context.Context) (balancesHash, spverHash, onlineAccountsHash, onlineRoundParams crypto.Digest, totals ledgercore.AccountTotals, err error) {
return crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, crypto.Digest{}, ledgercore.AccountTotals{}, nil
}

// VerifyCatchpoint verifies that the catchpoint is valid by reconstructing the label.
Expand Down
6 changes: 6 additions & 0 deletions config/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,10 @@ type ConsensusParams struct {
// Version 7 includes state proof verification contexts
EnableCatchpointsWithSPContexts bool

// EnableCatchpointsWithOnlineAccounts specifies when to enable version 8 catchpoints.
// Version 8 includes onlineaccounts and onlineroundparams amounts, for historical stake lookups.
EnableCatchpointsWithOnlineAccounts bool

// AppForbidLowResources enforces a rule that prevents apps from accessing
// asas and apps below 256, in an effort to decrease the ambiguity of
// opcodes that accept IDs or slot indexes. Simultaneously, the first ID
Expand Down Expand Up @@ -1537,6 +1541,8 @@ func initConsensusProtocols() {

vFuture.Heartbeat = true

vFuture.EnableCatchpointsWithOnlineAccounts = true

Consensus[protocol.ConsensusFuture] = vFuture

// vAlphaX versions are an separate series of consensus parameters and versions for alphanet
Expand Down
20 changes: 11 additions & 9 deletions ledger/catchpointfileheader.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import (
type CatchpointFileHeader struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Totals ledgercore.AccountTotals `codec:"accountTotals"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
TotalKVs uint64 `codec:"kvsCount"`
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
Version uint64 `codec:"version"`
BalancesRound basics.Round `codec:"balancesRound"`
BlocksRound basics.Round `codec:"blocksRound"`
Totals ledgercore.AccountTotals `codec:"accountTotals"`
TotalAccounts uint64 `codec:"accountsCount"`
TotalChunks uint64 `codec:"chunksCount"`
TotalKVs uint64 `codec:"kvsCount"`
TotalOnlineAccounts uint64 `codec:"onlineAccountsCount"`
TotalOnlineRoundParams uint64 `codec:"onlineRoundParamsCount"`
Catchpoint string `codec:"catchpoint"`
BlockHeaderDigest crypto.Digest `codec:"blockHeaderDigest"`
}
197 changes: 141 additions & 56 deletions ledger/catchpointfilewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,33 +51,29 @@ const (
// the writing is complete. It might take multiple steps until the operation is over, and the caller
// has the option of throttling the CPU utilization in between the calls.
type catchpointFileWriter struct {
ctx context.Context
tx trackerdb.SnapshotScope
filePath string
totalAccounts uint64
totalKVs uint64
file *os.File
tar *tar.Writer
compressor io.WriteCloser
chunk catchpointFileChunkV6
chunkNum uint64
writtenBytes int64
biggestChunkLen uint64
accountsIterator accountsBatchIter
maxResourcesPerChunk int
accountsDone bool
kvRows kvIter
}

type kvIter interface {
Next() bool
KeyValue() ([]byte, []byte, error)
Close()
}

type accountsBatchIter interface {
Next(ctx context.Context, accountCount int, resourceCount int) ([]encoded.BalanceRecordV6, uint64, error)
Close()
ctx context.Context
tx trackerdb.SnapshotScope
filePath string
totalAccounts uint64
totalKVs uint64
totalOnlineAccounts uint64
totalOnlineRoundParams uint64
file *os.File
tar *tar.Writer
compressor io.WriteCloser
chunk catchpointFileChunkV6
chunkNum uint64
writtenBytes int64
biggestChunkLen uint64
accountsIterator trackerdb.EncodedAccountsBatchIter
maxResourcesPerChunk int
accountsDone bool
kvRows trackerdb.KVsIter
kvDone bool
onlineAccountRows trackerdb.TableIterator[*encoded.OnlineAccountRecordV6]
onlineAccountsDone bool
onlineRoundParamsRows trackerdb.TableIterator[*encoded.OnlineRoundParamsRecordV6]
onlineRoundParamsDone bool
}

type catchpointFileBalancesChunkV5 struct {
Expand All @@ -88,13 +84,15 @@ type catchpointFileBalancesChunkV5 struct {
type catchpointFileChunkV6 struct {
_struct struct{} `codec:",omitempty,omitemptyarray"`

Balances []encoded.BalanceRecordV6 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
numAccounts uint64
KVs []encoded.KVRecordV6 `codec:"kv,allocbound=BalancesPerCatchpointFileChunk"`
Balances []encoded.BalanceRecordV6 `codec:"bl,allocbound=BalancesPerCatchpointFileChunk"`
numAccounts uint64
KVs []encoded.KVRecordV6 `codec:"kv,allocbound=BalancesPerCatchpointFileChunk"`
OnlineAccounts []encoded.OnlineAccountRecordV6 `codec:"oa,allocbound=BalancesPerCatchpointFileChunk"`
OnlineRoundParams []encoded.OnlineRoundParamsRecordV6 `codec:"orp,allocbound=BalancesPerCatchpointFileChunk"`
}

func (chunk catchpointFileChunkV6) empty() bool {
return len(chunk.Balances) == 0 && len(chunk.KVs) == 0
return len(chunk.Balances) == 0 && len(chunk.KVs) == 0 && len(chunk.OnlineAccounts) == 0 && len(chunk.OnlineRoundParams) == 0
}

type catchpointStateProofVerificationContext struct {
Expand Down Expand Up @@ -122,6 +120,16 @@ func makeCatchpointFileWriter(ctx context.Context, filePath string, tx trackerdb
return nil, err
}

totalOnlineAccounts, err := aw.TotalOnlineAccountRows(ctx)
if err != nil {
return nil, err
}

totalOnlineRoundParams, err := aw.TotalOnlineRoundParams(ctx)
if err != nil {
return nil, err
}

err = os.MkdirAll(filepath.Dir(filePath), 0700)
if err != nil {
return nil, err
Expand All @@ -137,16 +145,18 @@ func makeCatchpointFileWriter(ctx context.Context, filePath string, tx trackerdb
tar := tar.NewWriter(compressor)

res := &catchpointFileWriter{
ctx: ctx,
tx: tx,
filePath: filePath,
totalAccounts: totalAccounts,
totalKVs: totalKVs,
file: file,
compressor: compressor,
tar: tar,
accountsIterator: tx.MakeEncodedAccoutsBatchIter(),
maxResourcesPerChunk: maxResourcesPerChunk,
ctx: ctx,
tx: tx,
filePath: filePath,
totalAccounts: totalAccounts,
totalKVs: totalKVs,
totalOnlineAccounts: totalOnlineAccounts,
totalOnlineRoundParams: totalOnlineRoundParams,
file: file,
compressor: compressor,
tar: tar,
accountsIterator: tx.MakeEncodedAccountsBatchIter(),
maxResourcesPerChunk: maxResourcesPerChunk,
}
return res, nil
}
Expand Down Expand Up @@ -233,6 +243,14 @@ func (cw *catchpointFileWriter) FileWriteStep(stepCtx context.Context) (more boo
cw.kvRows.Close()
cw.kvRows = nil
}
if cw.onlineAccountRows != nil {
cw.onlineAccountRows.Close()
cw.onlineAccountRows = nil
}
if cw.onlineRoundParamsRows != nil {
cw.onlineRoundParamsRows.Close()
cw.onlineRoundParamsRows = nil
}
}
}()

Expand Down Expand Up @@ -323,27 +341,94 @@ func (cw *catchpointFileWriter) readDatabaseStep(ctx context.Context) error {
cw.accountsDone = true
}

// Create the *Rows iterator JIT
if cw.kvRows == nil {
rows, err := cw.tx.MakeKVsIter(ctx)
if err != nil {
return err
// Create the kvRows iterator JIT
if !cw.kvDone {
if cw.kvRows == nil {
rows, err := cw.tx.MakeKVsIter(ctx)
if err != nil {
return err
}
cw.kvRows = rows
}

kvrs := make([]encoded.KVRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.kvRows.Next() {
k, v, err := cw.kvRows.KeyValue()
if err != nil {
return err
}
kvrs = append(kvrs, encoded.KVRecordV6{Key: k, Value: v})
if len(kvrs) == BalancesPerCatchpointFileChunk {
break
}
}
if len(kvrs) > 0 {
cw.chunk = catchpointFileChunkV6{KVs: kvrs}
return nil
}
cw.kvRows = rows
// Do not close kvRows here, or it will start over on the next iteration
cw.kvDone = true
}

kvrs := make([]encoded.KVRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.kvRows.Next() {
k, v, err := cw.kvRows.KeyValue()
if err != nil {
return err
if !cw.onlineAccountsDone {
// Create the OnlineAccounts iterator JIT
if cw.onlineAccountRows == nil {
rows, err := cw.tx.MakeOnlineAccountsIter(ctx)
if err != nil {
return err
}
cw.onlineAccountRows = rows
}
kvrs = append(kvrs, encoded.KVRecordV6{Key: k, Value: v})
if len(kvrs) == BalancesPerCatchpointFileChunk {
break

onlineAccts := make([]encoded.OnlineAccountRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.onlineAccountRows.Next() {
oa, err := cw.onlineAccountRows.GetItem()
if err != nil {
return err
}
onlineAccts = append(onlineAccts, *oa)
if len(onlineAccts) == BalancesPerCatchpointFileChunk {
break
}
}
if len(onlineAccts) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineAccounts: onlineAccts}
return nil
}
// Do not close onlineAccountRows here, or it will start over on the next iteration
cw.onlineAccountsDone = true
}

if !cw.onlineRoundParamsDone {
// Create the OnlineRoundParams iterator JIT
if cw.onlineRoundParamsRows == nil {
rows, err := cw.tx.MakeOnlineRoundParamsIter(ctx)
if err != nil {
return err
}
cw.onlineRoundParamsRows = rows
}

onlineRndParams := make([]encoded.OnlineRoundParamsRecordV6, 0, BalancesPerCatchpointFileChunk)
for cw.onlineRoundParamsRows.Next() {
or, err := cw.onlineRoundParamsRows.GetItem()
if err != nil {
return err
}
onlineRndParams = append(onlineRndParams, *or)
if len(onlineRndParams) == BalancesPerCatchpointFileChunk {
break
}
}
if len(onlineRndParams) > 0 {
cw.chunk = catchpointFileChunkV6{OnlineRoundParams: onlineRndParams}
return nil
}
// Do not close onlineRndParamsRows here, or it will start over on the next iteration
cw.onlineRoundParamsDone = true
}
cw.chunk = catchpointFileChunkV6{KVs: kvrs}

// Finished the last chunk
return nil
}

Expand Down
Loading

0 comments on commit a10b6b3

Please sign in to comment.