Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dependency Injection Revisited #7637

Closed
wants to merge 41 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
42a7748
allow service cleanup on node shutdown
rkapka Oct 21, 2020
2517374
revert error to fatal
rkapka Oct 21, 2020
f23568f
Merge branch 'master' into shutdown-cleanup
rkapka Oct 21, 2020
ea80748
remove unused struct field
rkapka Oct 21, 2020
7b69421
remove context param from powchain service
rkapka Oct 22, 2020
06cb5e0
add context to Start
rkapka Oct 22, 2020
1297c5b
add context to more Start functions
rkapka Oct 22, 2020
23d4ac1
add context map to registry
rkapka Oct 22, 2020
3446bc8
replace used context inside Start
rkapka Oct 22, 2020
b7cac6c
implement common context pattern in Stop
rkapka Oct 22, 2020
5ae20ff
move cancel to registry
rkapka Oct 22, 2020
bfb0765
clean up blockchain service
rkapka Oct 22, 2020
663e09f
remove todos
rkapka Oct 22, 2020
9a4ae15
clean up beacon chain gateway
rkapka Oct 22, 2020
bdbd1a2
clean up interop-cold-start
rkapka Oct 22, 2020
58595dd
clean up attestations
rkapka Oct 22, 2020
2512db8
clean up p2p
rkapka Oct 23, 2020
b846cee
clean up initial-sync and sync
rkapka Oct 23, 2020
46b14a6
more sync cleanup
rkapka Oct 23, 2020
46b7905
clean up powchain
rkapka Oct 23, 2020
630c75b
fix test build errors
rkapka Oct 23, 2020
50ba8e0
more test fixes
rkapka Oct 23, 2020
b61c660
another fuzz test fix
rkapka Oct 23, 2020
33d1b63
clean up rpc
rkapka Oct 23, 2020
6435212
clean up slasher's beaconclient
rkapka Oct 23, 2020
da9c3ee
clean up slasher's detectopn
rkapka Oct 23, 2020
1a529ff
clean up slasher's rpc
rkapka Oct 23, 2020
4c528ac
clean up validator's client
rkapka Oct 23, 2020
c5f9a55
clean up validator's rpc
rkapka Oct 23, 2020
9ed1266
clean up validator's rpc gateway
rkapka Oct 23, 2020
fd00a0c
clean up validator's slasher protection
rkapka Oct 23, 2020
efe739c
Merge branch 'master' into shutdown-cleanup
rkapka Oct 23, 2020
f6c2ca4
test fixes after merging with master
rkapka Oct 23, 2020
7a45c3b
a little more cleanup
rkapka Oct 23, 2020
712c197
Introduce ServiceContext
rkapka Oct 24, 2020
c8f32ac
remove todos
rkapka Oct 23, 2020
24903f5
Remove name of unused Start context param
rkapka Oct 24, 2020
ee4181b
removed unused dependencies
rkapka Oct 25, 2020
1c7f389
Merge branch 'master' into service-context-redesign
rkapka Oct 25, 2020
e93f2d7
bring back node cancelling
rkapka Oct 25, 2020
709255b
remove changes to test files
rkapka Nov 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions beacon-chain/blockchain/process_attestation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,36 +545,6 @@ func TestVerifyFinalizedConsistency_OK(t *testing.T) {
require.NoError(t, err)
}

func TestVerifyFinalizedConsistency_IsCanonical(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)

cfg := &Config{BeaconDB: db, ForkChoiceStore: protoarray.New(0, 0, [32]byte{})}
service, err := NewService(ctx, cfg)
require.NoError(t, err)

b32 := testutil.NewBeaconBlock()
b32.Block.Slot = 32
r32, err := b32.Block.HashTreeRoot()
require.NoError(t, err)

service.finalizedCheckpt = &ethpb.Checkpoint{Epoch: 1, Root: r32[:]}

b33 := testutil.NewBeaconBlock()
b33.Block.Slot = 33
b33.Block.ParentRoot = r32[:]
r33, err := b33.Block.HashTreeRoot()
require.NoError(t, err)

require.NoError(t, service.forkChoiceStore.ProcessBlock(ctx, b32.Block.Slot, r32, [32]byte{}, [32]byte{}, 0, 0))
require.NoError(t, service.forkChoiceStore.ProcessBlock(ctx, b33.Block.Slot, r33, r32, [32]byte{}, 0, 0))

_, err = service.forkChoiceStore.Head(ctx, 0, r32, []uint64{}, 0)
require.NoError(t, err)
err = service.VerifyFinalizedConsistency(context.Background(), r33[:])
require.NoError(t, err)
}

func TestGetAttCheckptInfo(t *testing.T) {
ctx := context.Background()
db, _ := testDB.SetupDB(t)
Expand Down
5 changes: 2 additions & 3 deletions beacon-chain/blockchain/receive_attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *Service) VerifyFinalizedConsistency(ctx context.Context, root []byte) e
}

// This processes attestations from the attestation pool to account for validator votes and fork choice.
func (s *Service) processAttestation(subscribedToStateEvents chan struct{}) {
func (s *Service) processAttestation(ctx context.Context, subscribedToStateEvents chan struct{}) {
// Wait for state to be initialized.
stateChannel := make(chan *feed.Event, 1)
stateSub := s.stateNotifier.StateFeed().Subscribe(stateChannel)
Expand All @@ -136,10 +136,9 @@ func (s *Service) processAttestation(subscribedToStateEvents chan struct{}) {
st := slotutil.GetSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-s.ctx.Done():
case <-ctx.Done():
return
case <-st.C():
ctx := s.ctx
atts := s.attPool.ForkchoiceAttestations()
for _, a := range atts {
// Based on the spec, don't process the attestation until the subsequent slot.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []*ethpb.SignedB
reportSlotMetrics(blockCopy.Block.Slot, s.HeadSlot(), s.CurrentSlot(), s.finalizedCheckpt)
}

if err := s.VerifyWeakSubjectivityRoot(s.ctx); err != nil {
if err := s.VerifyWeakSubjectivityRoot(ctx); err != nil {
// log.Fatalf will prevent defer from being called
span.End()
// Exit run time if the node failed to verify weak subjectivity checkpoint.
Expand Down
40 changes: 0 additions & 40 deletions beacon-chain/blockchain/receive_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"sync"
"testing"
"time"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
blockchainTesting "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
Expand All @@ -18,7 +17,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
logTest "github.com/sirupsen/logrus/hooks/test"
)

func TestService_ReceiveBlock(t *testing.T) {
Expand Down Expand Up @@ -375,41 +373,3 @@ func TestService_HasInitSyncBlock(t *testing.T) {
t.Error("Should have block")
}
}

func TestCheckSaveHotStateDB_Enabling(t *testing.T) {
db, stateSummaryCache := testDB.SetupDB(t)
hook := logTest.NewGlobal()
s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)})
require.NoError(t, err)
st := params.BeaconConfig().SlotsPerEpoch * uint64(epochsSinceFinalitySaveHotStateDB)
s.genesisTime = time.Now().Add(time.Duration(-1*int64(st)*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second)
s.finalizedCheckpt = &ethpb.Checkpoint{}

require.NoError(t, s.checkSaveHotStateDB(context.Background()))
assert.LogsContain(t, hook, "Entering mode to save hot states in DB")
}

func TestCheckSaveHotStateDB_Disabling(t *testing.T) {
db, stateSummaryCache := testDB.SetupDB(t)
hook := logTest.NewGlobal()
s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)})
require.NoError(t, err)
s.finalizedCheckpt = &ethpb.Checkpoint{}
require.NoError(t, s.checkSaveHotStateDB(context.Background()))
s.genesisTime = time.Now()

require.NoError(t, s.checkSaveHotStateDB(context.Background()))
assert.LogsContain(t, hook, "Exiting mode to save hot states in DB")
}

func TestCheckSaveHotStateDB_Overflow(t *testing.T) {
db, stateSummaryCache := testDB.SetupDB(t)
hook := logTest.NewGlobal()
s, err := NewService(context.Background(), &Config{StateGen: stategen.New(db, stateSummaryCache)})
require.NoError(t, err)
s.finalizedCheckpt = &ethpb.Checkpoint{Epoch: 10000000}
s.genesisTime = time.Now()

require.NoError(t, s.checkSaveHotStateDB(context.Background()))
assert.LogsDoNotContain(t, hook, "Entering mode to save hot states in DB")
}
49 changes: 19 additions & 30 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stategen"
Expand All @@ -40,16 +39,13 @@ import (
// Service represents a service that handles the internal
// logic of managing the full PoS beacon chain.
type Service struct {
ctx context.Context
cancel context.CancelFunc
beaconDB db.HeadAccessDatabase
depositCache *depositcache.DepositCache
chainStartFetcher powchain.ChainStartFetcher
attPool attestations.Pool
slashingPool *slashings.Pool
exitPool *voluntaryexits.Pool
genesisTime time.Time
p2p p2p.Broadcaster
maxRoutines int
head *head
headLock sync.RWMutex
Expand Down Expand Up @@ -87,7 +83,6 @@ type Config struct {
AttPool attestations.Pool
ExitPool *voluntaryexits.Pool
SlashingPool *slashings.Pool
P2p p2p.Broadcaster
MaxRoutines int
StateNotifier statefeed.Notifier
ForkChoiceStore f.ForkChoicer
Expand All @@ -99,18 +94,14 @@ type Config struct {

// NewService instantiates a new block service instance that will
// be registered into a running beacon node.
func NewService(ctx context.Context, cfg *Config) (*Service, error) {
ctx, cancel := context.WithCancel(ctx)
func NewService(_ context.Context, cfg *Config) (*Service, error) {
return &Service{
ctx: ctx,
cancel: cancel,
beaconDB: cfg.BeaconDB,
depositCache: cfg.DepositCache,
chainStartFetcher: cfg.ChainStartFetcher,
attPool: cfg.AttPool,
exitPool: cfg.ExitPool,
slashingPool: cfg.SlashingPool,
p2p: cfg.P2p,
maxRoutines: cfg.MaxRoutines,
stateNotifier: cfg.StateNotifier,
forkChoiceStore: cfg.ForkChoiceStore,
Expand All @@ -128,16 +119,16 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
}

// Start a blockchain service's main event loop.
func (s *Service) Start() {
beaconState, err := s.beaconDB.HeadState(s.ctx)
func (s *Service) Start(ctx context.Context) {
beaconState, err := s.beaconDB.HeadState(ctx)
if err != nil {
log.Fatalf("Could not fetch beacon state: %v", err)
}

// For running initial sync with state cache, in an event of restart, we use
// last finalized check point as start point to sync instead of head
// state. This is because we no longer save state every slot during sync.
cp, err := s.beaconDB.FinalizedCheckpoint(s.ctx)
cp, err := s.beaconDB.FinalizedCheckpoint(ctx)
if err != nil {
log.Fatalf("Could not fetch finalized cp: %v", err)
}
Expand All @@ -148,7 +139,7 @@ func (s *Service) Start() {
// the finalized root is defined as zero hashes instead of genesis root hash.
// We want to use genesis root to retrieve for state.
if r == params.BeaconConfig().ZeroHash {
genesisBlock, err := s.beaconDB.GenesisBlock(s.ctx)
genesisBlock, err := s.beaconDB.GenesisBlock(ctx)
if err != nil {
log.Fatalf("Could not fetch finalized cp: %v", err)
}
Expand All @@ -159,7 +150,7 @@ func (s *Service) Start() {
}
}
}
beaconState, err = s.stateGen.StateByRoot(s.ctx, r)
beaconState, err = s.stateGen.StateByRoot(ctx, r)
if err != nil {
log.Fatalf("Could not fetch beacon state by root: %v", err)
}
Expand All @@ -173,29 +164,29 @@ func (s *Service) Start() {
log.Info("Blockchain data already exists in DB, initializing...")
s.genesisTime = time.Unix(int64(beaconState.GenesisTime()), 0)
s.opsService.SetGenesisTime(beaconState.GenesisTime())
if err := s.initializeChainInfo(s.ctx); err != nil {
if err := s.initializeChainInfo(ctx); err != nil {
log.Fatalf("Could not set up chain info: %v", err)
}

// We start a counter to genesis, if needed.
gState, err := s.beaconDB.GenesisState(s.ctx)
gState, err := s.beaconDB.GenesisState(ctx)
if err != nil {
log.Fatalf("Could not retrieve genesis state: %v", err)
}
go slotutil.CountdownToGenesis(s.ctx, s.genesisTime, uint64(gState.NumValidators()))
go slotutil.CountdownToGenesis(ctx, s.genesisTime, uint64(gState.NumValidators()))

justifiedCheckpoint, err := s.beaconDB.JustifiedCheckpoint(s.ctx)
justifiedCheckpoint, err := s.beaconDB.JustifiedCheckpoint(ctx)
if err != nil {
log.Fatalf("Could not get justified checkpoint: %v", err)
}
finalizedCheckpoint, err := s.beaconDB.FinalizedCheckpoint(s.ctx)
finalizedCheckpoint, err := s.beaconDB.FinalizedCheckpoint(ctx)
if err != nil {
log.Fatalf("Could not get finalized checkpoint: %v", err)
}

// Resume fork choice.
s.justifiedCheckpt = stateTrie.CopyCheckpoint(justifiedCheckpoint)
if err := s.cacheJustifiedStateBalances(s.ctx, s.ensureRootNotZeros(bytesutil.ToBytes32(s.justifiedCheckpt.Root))); err != nil {
if err := s.cacheJustifiedStateBalances(ctx, s.ensureRootNotZeros(bytesutil.ToBytes32(s.justifiedCheckpt.Root))); err != nil {
log.Fatalf("Could not cache justified state balances: %v", err)
}
s.prevJustifiedCheckpt = stateTrie.CopyCheckpoint(justifiedCheckpoint)
Expand All @@ -204,7 +195,7 @@ func (s *Service) Start() {
s.prevFinalizedCheckpt = stateTrie.CopyCheckpoint(finalizedCheckpoint)
s.resumeForkChoice(justifiedCheckpoint, finalizedCheckpoint)

if err := s.VerifyWeakSubjectivityRoot(s.ctx); err != nil {
if err := s.VerifyWeakSubjectivityRoot(ctx); err != nil {
// Exit run time if the node failed to verify weak subjectivity checkpoint.
log.Fatalf("Could not verify weak subjectivity checkpoint: %v", err)
}
Expand Down Expand Up @@ -237,10 +228,10 @@ func (s *Service) Start() {
return
}
log.WithField("starttime", data.StartTime).Debug("Received chain start event")
s.processChainStartTime(s.ctx, data.StartTime)
s.processChainStartTime(ctx, data.StartTime)
return
}
case <-s.ctx.Done():
case <-ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
case err := <-stateSub.Err():
Expand All @@ -251,7 +242,7 @@ func (s *Service) Start() {
}()
}

go s.processAttestation(attestationProcessorSubscribed)
go s.processAttestation(ctx, attestationProcessorSubscribed)
}

// processChainStartTime initializes a series of deposits from the ChainStart deposits in the eth1
Expand Down Expand Up @@ -317,17 +308,15 @@ func (s *Service) initializeBeaconChain(
}

// Stop the blockchain service's main event loop and associated goroutines.
func (s *Service) Stop() error {
defer s.cancel()

func (s *Service) Stop(ctx context.Context) error {
if s.stateGen != nil && s.head != nil && s.head.state != nil {
if err := s.stateGen.ForceCheckpoint(s.ctx, s.head.state.FinalizedCheckpoint().Root); err != nil {
if err := s.stateGen.ForceCheckpoint(ctx, s.head.state.FinalizedCheckpoint().Root); err != nil {
return err
}
}

// Save initial sync cached blocks to the DB before stop.
return s.beaconDB.SaveBlocks(s.ctx, s.getInitSyncBlocks())
return s.beaconDB.SaveBlocks(ctx, s.getInitSyncBlocks())
}

// Status always returns nil unless there is an error condition that causes
Expand Down
18 changes: 4 additions & 14 deletions beacon-chain/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ var _ shared.Service = (*Gateway)(nil)
// it to the beacon-chain gRPC server.
type Gateway struct {
conn *grpc.ClientConn
ctx context.Context
cancel context.CancelFunc
gatewayAddr string
remoteAddr string
server *http.Server
Expand All @@ -37,10 +35,7 @@ type Gateway struct {

// Start the gateway service. This serves the HTTP JSON traffic on the specified
// port.
func (g *Gateway) Start() {
ctx, cancel := context.WithCancel(g.ctx)
g.cancel = cancel

func (g *Gateway) Start(ctx context.Context) {
log.WithField("address", g.gatewayAddr).Info("Starting JSON-HTTP API")

conn, err := g.dial(ctx, "tcp", g.remoteAddr)
Expand Down Expand Up @@ -103,24 +98,20 @@ func (g *Gateway) Status() error {
}

// Stop the gateway with a graceful shutdown.
func (g *Gateway) Stop() error {
func (g *Gateway) Stop(ctx context.Context) error {
if g.server != nil {
if err := g.server.Shutdown(g.ctx); err != nil {
if err := g.server.Shutdown(ctx); err != nil {
log.WithError(err).Error("Failed to shut down server")
}
}

if g.cancel != nil {
g.cancel()
}

return nil
}

// New returns a new gateway server which translates HTTP into gRPC.
// Accepts a context and optional http.ServeMux.
func New(
ctx context.Context,
_ context.Context,
remoteAddress,
gatewayAddress string,
mux *http.ServeMux,
Expand All @@ -135,7 +126,6 @@ func New(
return &Gateway{
remoteAddr: remoteAddress,
gatewayAddr: gatewayAddress,
ctx: ctx,
mux: mux,
allowedOrigins: allowedOrigins,
enableDebugRPCEndpoints: enableDebugRPCEndpoints,
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/gateway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func main() {
)
mux.HandleFunc("/swagger/", gateway.SwaggerServer())
mux.HandleFunc("/healthz", healthzServer(gw))
gw.Start()
gw.Start(context.Background())

select {}
}
Expand Down
Loading