Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: Optimization and Parallelization of processes and reduction of Goroutines #11058

Merged
merged 18 commits into from
Jul 16, 2024
34 changes: 17 additions & 17 deletions cl/phase1/forkchoice/fork_graph/fork_graph_disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func NewForkGraphDisk(anchorState *state.CachingBeaconState, aferoFs afero.Fs, r
f.lowestAvaiableBlock.Store(anchorState.Slot())
f.headers.Store(libcommon.Hash(anchorRoot), &anchorHeader)

f.dumpBeaconStateOnDisk(anchorState, anchorRoot)
f.DumpBeaconStateOnDisk(anchorRoot, anchorState, true)
return f
}

Expand Down Expand Up @@ -313,12 +313,6 @@ func (f *forkGraphDisk) AddChainSegment(signedBlock *cltypes.SignedBeaconBlock,
BodyRoot: bodyRoot,
})

if newState.Slot()%dumpSlotFrequency == 0 {
if err := f.dumpBeaconStateOnDisk(newState, blockRoot); err != nil {
return nil, LogisticError, err
}
}

// Lastly add checkpoints to caches as well.
f.currentJustifiedCheckpoints.Store(libcommon.Hash(blockRoot), newState.CurrentJustifiedCheckpoint().Copy())
f.finalizedCheckpoints.Store(libcommon.Hash(blockRoot), newState.FinalizedCheckpoint().Copy())
Expand Down Expand Up @@ -361,31 +355,37 @@ func (f *forkGraphDisk) GetState(blockRoot libcommon.Hash, alwaysCopy bool) (*st
blocksInTheWay := []*cltypes.SignedBeaconBlock{}
// Use the parent root as a reverse iterator.
currentIteratorRoot := blockRoot
var copyReferencedState *state.CachingBeaconState
var err error
// try and find the point of recconection
for {
for copyReferencedState == nil {
block, isSegmentPresent := f.getBlock(currentIteratorRoot)
if !isSegmentPresent {
// check if it is in the header
bHeader, ok := f.GetHeader(currentIteratorRoot)
if ok && bHeader.Slot%dumpSlotFrequency == 0 {
break
copyReferencedState, err = f.readBeaconStateFromDisk(currentIteratorRoot)
if err != nil {
log.Trace("Could not retrieve state: Missing header", "missing", currentIteratorRoot, "err", err)
copyReferencedState = nil
}
continue
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
}
log.Trace("Could not retrieve state: Missing header", "missing", currentIteratorRoot)
return nil, nil
}
if block.Block.Slot%dumpSlotFrequency == 0 {
break
copyReferencedState, err = f.readBeaconStateFromDisk(currentIteratorRoot)
if err != nil {
log.Trace("Could not retrieve state: Missing header", "missing", currentIteratorRoot, "err", err)
}
if copyReferencedState != nil {
break
}
}
blocksInTheWay = append(blocksInTheWay, block)
currentIteratorRoot = block.Block.ParentRoot
}
copyReferencedState, err := f.readBeaconStateFromDisk(currentIteratorRoot)
if err != nil {
return nil, err
}
if copyReferencedState == nil {
return nil, ErrStateNotFound
}

// Traverse the blocks from top to bottom.
for i := len(blocksInTheWay) - 1; i >= 0; i-- {
Expand Down
5 changes: 4 additions & 1 deletion cl/phase1/forkchoice/fork_graph/fork_graph_disk_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func (f *forkGraphDisk) readBeaconStateFromDisk(blockRoot libcommon.Hash) (bs *s
}

// dumpBeaconStateOnDisk dumps a beacon state on disk in ssz snappy format
func (f *forkGraphDisk) dumpBeaconStateOnDisk(bs *state.CachingBeaconState, blockRoot libcommon.Hash) (err error) {
func (f *forkGraphDisk) DumpBeaconStateOnDisk(blockRoot libcommon.Hash, bs *state.CachingBeaconState, forced bool) (err error) {
if !forced && bs.Slot()%dumpSlotFrequency != 0 {
return
}
// Truncate and then grow the buffer to the size of the state.
encodingSizeSSZ := bs.EncodingSizeSSZ()
f.sszBuffer.Grow(encodingSizeSSZ)
Expand Down
1 change: 1 addition & 0 deletions cl/phase1/forkchoice/fork_graph/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ type ForkGraph interface {
GetPreviousPartecipationIndicies(blockRoot libcommon.Hash) (*solid.BitList, error)
GetValidatorSet(blockRoot libcommon.Hash) (*solid.ValidatorSet, error)
GetCurrentPartecipationIndicies(blockRoot libcommon.Hash) (*solid.BitList, error)
DumpBeaconStateOnDisk(blockRoot libcommon.Hash, state *state.CachingBeaconState, forced bool) error
}
8 changes: 8 additions & 0 deletions cl/phase1/forkchoice/forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,3 +540,11 @@ func (f *ForkChoiceStore) IsHeadOptimistic() bool {
latestRoot := headState.LatestBlockHeader().Root
return f.optimisticStore.IsOptimistic(latestRoot)
}

func (f *ForkChoiceStore) DumpBeaconStateOnDisk(bs *state.CachingBeaconState) error {
anchorRoot, err := bs.BlockRoot()
if err != nil {
return err
}
return f.forkGraph.DumpBeaconStateOnDisk(anchorRoot, bs, false)
}
4 changes: 2 additions & 2 deletions cl/phase1/network/services/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const (
blobJobsIntervalTick = 5 * time.Millisecond
singleAttestationIntervalTick = 10 * time.Millisecond
attestationJobsIntervalTick = 100 * time.Millisecond
blockJobExpiry = 7 * time.Minute
blobJobExpiry = 7 * time.Minute
blockJobExpiry = 24 * time.Second
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
blobJobExpiry = 24 * time.Second
attestationJobExpiry = 30 * time.Minute
singleAttestationJobExpiry = 6 * time.Second
)
Expand Down
5 changes: 5 additions & 0 deletions cl/phase1/stages/clstages.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ func ConsensusClStages(ctx context.Context,
if _, err = cfg.attestationDataProducer.ProduceAndCacheAttestationData(copiedHeadState, copiedHeadState.Slot(), 0); err != nil {
logger.Warn("failed to produce and cache attestation data", "err", err)
}

// Incement some stuff here
preverifiedValidators := cfg.forkChoice.PreverifiedValidator(headState.FinalizedCheckpoint().BlockRoot())
preverifiedHistoricalSummary := cfg.forkChoice.PreverifiedHistoricalSummaries(headState.FinalizedCheckpoint().BlockRoot())
Expand All @@ -683,6 +684,10 @@ func ConsensusClStages(ctx context.Context,
return fmt.Errorf("failed to hash ssz: %w", err)
}

if err := cfg.forkChoice.DumpBeaconStateOnDisk(headState); err != nil {
return fmt.Errorf("failed to dump beacon state on disk: %w", err)
}

headEpoch := headSlot / cfg.beaconCfg.SlotsPerEpoch
previous_duty_dependent_root, err := headState.GetBlockRootAtSlot((headEpoch-1)*cfg.beaconCfg.SlotsPerEpoch - 1)
if err != nil {
Expand Down
8 changes: 3 additions & 5 deletions cl/sentinel/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ func (s *Sentinel) listenForPeers() {
continue
}

go func(peerInfo *peer.AddrInfo) {
if err := s.ConnectWithPeer(s.ctx, *peerInfo); err != nil {
log.Trace("[Sentinel] Could not connect with peer", "err", err)
}
}(peerInfo)
if err := s.ConnectWithPeer(s.ctx, *peerInfo); err != nil {
log.Trace("[Sentinel] Could not connect with peer", "err", err)
}
}
}

Expand Down
81 changes: 44 additions & 37 deletions cl/sentinel/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,10 @@ func (s *Sentinel) forkWatcher() {
s.subManager.subscriptions.Range(func(key, value interface{}) bool {
sub := value.(*GossipSubscription)
s.subManager.unsubscribe(key.(string))
newSub, err := s.SubscribeGossip(sub.gossip_topic, sub.expiration.Load().(time.Time))
_, err := s.SubscribeGossip(sub.gossip_topic, sub.expiration.Load().(time.Time))
domiwei marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Warn("[Gossip] Failed to resubscribe to topic", "err", err)
}
newSub.Listen()
return true
})
prevDigest = digest
Expand Down Expand Up @@ -495,6 +494,24 @@ func (g *GossipManager) Close() {
})
}

func (g *GossipManager) Start(ctx context.Context) {
Giulio2002 marked this conversation as resolved.
Show resolved Hide resolved
go func() {
checkingInterval := time.NewTicker(time.Second)
for {
select {
case <-ctx.Done():
return
case <-checkingInterval.C:
g.subscriptions.Range(func(key, value any) bool {
sub := value.(*GossipSubscription)
sub.checkIfTopicNeedsToEnabledOrDisabled()
return true
})
}
}
}()
}

// GossipSubscription abstracts a gossip subscription to write decoded structs.
type GossipSubscription struct {
gossip_topic GossipTopic
Expand All @@ -516,42 +533,32 @@ type GossipSubscription struct {
closeOnce sync.Once
}

func (sub *GossipSubscription) Listen() {
go func() {
var err error
checkingInterval := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-sub.ctx.Done():
return
case <-checkingInterval.C:
expirationTime := sub.expiration.Load().(time.Time)
if sub.subscribed.Load() && time.Now().After(expirationTime) {
sub.stopCh <- struct{}{}
sub.topic.Close()
sub.subscribed.Store(false)
log.Info("[Gossip] Unsubscribed from topic", "topic", sub.sub.Topic())
sub.s.updateENROnSubscription(sub.sub.Topic(), false)
continue
}
if !sub.subscribed.Load() && time.Now().Before(expirationTime) {
sub.stopCh = make(chan struct{}, 3)
sub.sub, err = sub.topic.Subscribe()
if err != nil {
log.Warn("[Gossip] failed to begin topic subscription", "err", err)
time.Sleep(30 * time.Second)
continue
}
var sctx context.Context
sctx, sub.cf = context.WithCancel(sub.ctx)
go sub.run(sctx, sub.sub, sub.sub.Topic())
sub.subscribed.Store(true)
sub.s.updateENROnSubscription(sub.sub.Topic(), true)
log.Info("[Gossip] Subscribed to topic", "topic", sub.sub.Topic())
}
}
func (sub *GossipSubscription) checkIfTopicNeedsToEnabledOrDisabled() {
var err error
expirationTime := sub.expiration.Load().(time.Time)
if sub.subscribed.Load() && time.Now().After(expirationTime) {
sub.stopCh <- struct{}{}
sub.topic.Close()
sub.subscribed.Store(false)
log.Info("[Gossip] Unsubscribed from topic", "topic", sub.sub.Topic())
sub.s.updateENROnSubscription(sub.sub.Topic(), false)
return
}
if !sub.subscribed.Load() && time.Now().Before(expirationTime) {
sub.stopCh = make(chan struct{}, 3)
sub.sub, err = sub.topic.Subscribe()
if err != nil {
log.Warn("[Gossip] failed to begin topic subscription", "err", err)
return
}
}()
var sctx context.Context
sctx, sub.cf = context.WithCancel(sub.ctx)
go sub.run(sctx, sub.sub, sub.sub.Topic())
sub.subscribed.Store(true)
sub.s.updateENROnSubscription(sub.sub.Topic(), true)
log.Info("[Gossip] Subscribed to topic", "topic", sub.sub.Topic())
}

}

func (sub *GossipSubscription) OverwriteSubscriptionExpiry(expiry time.Time) {
Expand Down
1 change: 1 addition & 0 deletions cl/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (s *Sentinel) Start() error {
},
})
s.subManager = NewGossipManager(s.ctx)
s.subManager.Start(s.ctx)

go s.listenForPeers()
go s.forkWatcher()
Expand Down
3 changes: 0 additions & 3 deletions cl/sentinel/sentinel_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,9 @@ func TestSentinelGossipOnHardFork(t *testing.T) {
require.NoError(t, err)
defer sub1.Close()

sub1.Listen()

sub2, err := sentinel2.SubscribeGossip(BeaconBlockSsz, time.Unix(0, math.MaxInt64))
require.NoError(t, err)
defer sub2.Close()
sub2.Listen()
time.Sleep(200 * time.Millisecond)

err = h.Connect(ctx, peer.AddrInfo{
Expand Down
4 changes: 1 addition & 3 deletions cl/sentinel/service/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,10 @@ func createSentinel(
}

// now lets separately connect to the gossip topics. this joins the room
subscriber, err := sent.SubscribeGossip(v, getExpirationForTopic(v.Name)) // Listen forever.
_, err := sent.SubscribeGossip(v, getExpirationForTopic(v.Name)) // Listen forever.
if err != nil {
logger.Error("[Sentinel] failed to start sentinel", "err", err)
}
// actually start the subscription, aka listening and sending packets to the sentinel recv channel
subscriber.Listen()
}
return sent, nil
}
Expand Down
3 changes: 0 additions & 3 deletions cl/transition/impl/eth2/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,11 +816,8 @@ func verifyAttestations(
attestingIndicies [][]uint64,
) (bool, error) {
indexedAttestations := make([]*cltypes.IndexedAttestation, 0, attestations.Len())
commonBuffer := make([]byte, 8*2048)
attestations.Range(func(idx int, a *solid.Attestation, _ int) bool {
idxAttestations := state.GetIndexedAttestation(a, attestingIndicies[idx])
idxAttestations.AttestingIndices.SetReusableHashBuffer(commonBuffer)
idxAttestations.HashSSZ()
indexedAttestations = append(indexedAttestations, idxAttestations)
return true
})
Expand Down
Loading