Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

account manager: avoid taking locks for long period of time #3717

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions components/mocks/mockParticipationRegistry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package mocks

import (
"time"

"github.com/algorand/go-algorand/data/account"
"github.com/algorand/go-algorand/data/basics"
)

// MockParticipationRegistry is a dummy ParticipationRegistry that doesn't do anything
type MockParticipationRegistry struct {
}

// Insert adds a record to storage and computes the ParticipationID
func (m *MockParticipationRegistry) Insert(record account.Participation) (account.ParticipationID, error) {
return account.ParticipationID{}, nil
}

// AppendKeys appends state proof keys to an existing Participation record. Keys can only be appended
// once, an error will occur when the data is flushed when inserting a duplicate key.
func (m *MockParticipationRegistry) AppendKeys(id account.ParticipationID, keys account.StateProofKeys) error {
return nil
}

// Delete removes a record from storage.
func (m *MockParticipationRegistry) Delete(id account.ParticipationID) error {
return nil
}

// DeleteExpired removes all records from storage which are expired on the given round.
func (m *MockParticipationRegistry) DeleteExpired(round basics.Round) error {
return nil
}

// Get a participation record.
func (m *MockParticipationRegistry) Get(id account.ParticipationID) account.ParticipationRecord {
return account.ParticipationRecord{}
}

// GetAll of the participation records.
func (m *MockParticipationRegistry) GetAll() []account.ParticipationRecord {
return []account.ParticipationRecord{}
}

// GetForRound fetches a record with voting secrets for a particular round.
func (m *MockParticipationRegistry) GetForRound(id account.ParticipationID, round basics.Round) (account.ParticipationRecordForRound, error) {
return account.ParticipationRecordForRound{}, nil
}

// GetStateProofForRound fetches a record with stateproof secrets for a particular round.
func (m *MockParticipationRegistry) GetStateProofForRound(id account.ParticipationID, round basics.Round) (account.StateProofRecordForRound, error) {
return account.StateProofRecordForRound{}, nil
}

// Register updates the EffectiveFirst and EffectiveLast fields. If there are multiple records for the account
// then it is possible for multiple records to be updated.
func (m *MockParticipationRegistry) Register(id account.ParticipationID, on basics.Round) error {
return nil
}

// Record sets the Last* field for the active ParticipationID for the given account.
func (m *MockParticipationRegistry) Record(account basics.Address, round basics.Round, participationType account.ParticipationAction) error {
return nil
}

// Flush ensures that all changes have been written to the underlying data store.
func (m *MockParticipationRegistry) Flush(timeout time.Duration) error {
return nil
}

// Close any resources used to implement the interface.
func (m *MockParticipationRegistry) Close() {

}
2 changes: 1 addition & 1 deletion crypto/falconWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (d *FalconVerifier) GetSignatureFixedLengthHashableRepresentation(signature
return ctSignature[:], err
}

// NewFalconSigner creates a falconSinger that is used to sign and verify falcon signatures
// NewFalconSigner creates a falconSigner that is used to sign and verify falcon signatures
func NewFalconSigner() (*FalconSigner, error) {
var seed FalconSeed
RandBytes(seed[:])
Expand Down
61 changes: 33 additions & 28 deletions data/accountManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ import (
type AccountManager struct {
mu deadlock.Mutex

// syncronized by mu
partKeys map[account.ParticipationKeyIdentity]account.PersistedParticipation

// Map to keep track of accounts for which we've sent
// AccountRegistered telemetry events
// syncronized by mu
registeredAccounts map[string]bool

registry account.ParticipationRegistry
tsachiherman marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -168,37 +170,40 @@ func (manager *AccountManager) DeleteOldKeys(latestHdr bookkeeping.BlockHeader,

manager.mu.Lock()
pendingItems := make(map[string]<-chan error, len(manager.partKeys))
func() {
defer manager.mu.Unlock()
for _, part := range manager.partKeys {
// We need a key for round r+1 for agreement.
nextRound := latestHdr.Round + 1

if latestHdr.CompactCert[protocol.CompactCertBasic].CompactCertNextRound > 0 {
// We need a key for the next compact cert round.
// This would be CompactCertNextRound+1 (+1 because compact
// cert code uses the next round's ephemeral key), except
// if we already used that key to produce a signature (as
// reported in ccSigs).
nextCC := latestHdr.CompactCert[protocol.CompactCertBasic].CompactCertNextRound + 1
if ccSigs[part.Parent] >= nextCC {
nextCC = ccSigs[part.Parent] + basics.Round(latestProto.CompactCertRounds) + 1
}

if nextCC < nextRound {
nextRound = nextCC
}
}

// we pre-create the reported error string here, so that we won't need to have the participation key object if error is detected.
first, last := part.ValidInterval()
errString := fmt.Sprintf("AccountManager.DeleteOldKeys(): key for %s (%d-%d), nextRound %d",
part.Address().String(), first, last, nextRound)
errCh := part.DeleteOldKeys(nextRound, agreementProto)
partKeys := make([]account.PersistedParticipation, 0, len(manager.partKeys))
for _, part := range manager.partKeys {
partKeys = append(partKeys, part)
}
manager.mu.Unlock()
for _, part := range partKeys {
// We need a key for round r+1 for agreement.
nextRound := latestHdr.Round + 1

if latestHdr.CompactCert[protocol.CompactCertBasic].CompactCertNextRound > 0 {
// We need a key for the next compact cert round.
// This would be CompactCertNextRound+1 (+1 because compact
// cert code uses the next round's ephemeral key), except
// if we already used that key to produce a signature (as
// reported in ccSigs).
nextCC := latestHdr.CompactCert[protocol.CompactCertBasic].CompactCertNextRound + 1
if ccSigs[part.Parent] >= nextCC {
nextCC = ccSigs[part.Parent] + basics.Round(latestProto.CompactCertRounds) + 1
}

pendingItems[errString] = errCh
if nextCC < nextRound {
nextRound = nextCC
}
}
}()

// we pre-create the reported error string here, so that we won't need to have the participation key object if error is detected.
first, last := part.ValidInterval()
errString := fmt.Sprintf("AccountManager.DeleteOldKeys(): key for %s (%d-%d), nextRound %d",
part.Address().String(), first, last, nextRound)
errCh := part.DeleteOldKeys(nextRound, agreementProto)

pendingItems[errString] = errCh
}

// wait for all disk flushes, and report errors as they appear.
for errString, errCh := range pendingItems {
Expand Down
110 changes: 110 additions & 0 deletions data/accountManager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (C) 2019-2022 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package data

import (
"fmt"
"os"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/components/mocks"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/data/account"
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/db"
)

func TestAccountManagerKeys(t *testing.T) {
partitiontest.PartitionTest(t)
log := logging.TestingLog(t)
log.SetLevel(logging.Error)
registry := &mocks.MockParticipationRegistry{}

acctManager := MakeAccountManager(log, registry)

databaseFiles := make([]string, 0)
defer func() {
for _, fileName := range databaseFiles {
os.Remove(fileName)
os.Remove(fileName + "-shm")
os.Remove(fileName + "-wal")
}
}()

// create participation keys
const numPartKeys = 10
for partKeyIdx := 0; partKeyIdx < numPartKeys; partKeyIdx++ {
rootFilename := t.Name() + "_root_" + strconv.Itoa(partKeyIdx) + ".sqlite"
partFilename := t.Name() + "_part_" + strconv.Itoa(partKeyIdx) + ".sqlite"
os.Remove(rootFilename)
os.Remove(partFilename)
rootAccessor, err := db.MakeAccessor(rootFilename, false, true)
require.NoError(t, err)

root, err := account.GenerateRoot(rootAccessor)
require.NoError(t, err)

accessor, err := db.MakeErasableAccessor(partFilename)
require.NoError(t, err)
accessor.SetLogger(log)

part, err := account.FillDBWithParticipationKeys(accessor, root.Address(), 0, 100, 10000)
require.NoError(t, err)

rootAccessor.Close()
databaseFiles = append(databaseFiles, rootFilename)
databaseFiles = append(databaseFiles, partFilename)

acctManager.AddParticipation(part)
}

keyDeletionDone := make(chan struct{}, 1)
nextRoundCh := make(chan basics.Round, 2)
// kick off key deletion thread.
go func() {
defer close(keyDeletionDone)
ccSigs := make(map[basics.Address]basics.Round)
agreementProto := config.Consensus[protocol.ConsensusCurrentVersion]
header := bookkeeping.BlockHeader{}
for rnd := range nextRoundCh {
header.Round = rnd
acctManager.DeleteOldKeys(header, ccSigs, agreementProto)
}
}()

testStartTime := time.Now()
keysTotalDuration := time.Duration(0)
for i := 1; i < 10; i++ {
nextRoundCh <- basics.Round(i)
startTime := time.Now()
acctManager.Keys(basics.Round(i))
keysTotalDuration += time.Since(startTime)
}
close(nextRoundCh)
<-keyDeletionDone
testDuration := time.Since(testStartTime)
require.Lessf(t, keysTotalDuration, testDuration/100, fmt.Sprintf("the time to aquire the keys via Keys() was %v whereas blocking on keys deletion took %v", keysTotalDuration, testDuration))
t.Logf("Calling AccountManager.Keys() while AccountManager.DeleteOldKeys() was busy, 10 times in a row, resulted in accumulated delay of %v\n", keysTotalDuration)
}
6 changes: 3 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,11 +1011,11 @@ func insertStateProofToRegistry(part account.PersistedParticipation, node *Algor
return nil
}
keys := part.StateProofSecrets.GetAllKeys()
keysSinger := make(account.StateProofKeys, len(keys))
keysSigner := make(account.StateProofKeys, len(keys))
for i := uint64(0); i < uint64(len(keys)); i++ {
keysSinger[i] = keys[i]
keysSigner[i] = keys[i]
}
return node.accountManager.Registry().AppendKeys(partID, keysSinger)
return node.accountManager.Registry().AppendKeys(partID, keysSigner)

}

Expand Down