Skip to content

Commit

Permalink
State sync fixes: Remove Bad Witnesses and Don't Wait on All Witnesse…
Browse files Browse the repository at this point in the history
…s for ConsensusParams (#156)

* Remove unresponsive witnesses + consensusParams return on first resp

* Add Witness blacklist

* Default TTL

* add comment

* Fix indentation

* Add jitter to retry

* Update tests

* blacklistTTL configurable

* Update tests with blacklist TTL

* Update test

* Update blacklist tests

* gofmt

* Update mapstructure

* Remove unused struct

* Update light/client.go

Co-authored-by: Steven Landers <steven.landers@gmail.com>

* Add a cancel when lb is received

* Child Context to cancel inner witness goroutines

* Update comments

* defer childCancel

* childCancel

* remove def

* Remove witness when removing provider for p2p-down (#159)

* remove witness when removing provider for p2p-down

* add tests

* Add comments + split up blacklist function

* fmt

* Only blacklist in detect divergence

---------

Co-authored-by: Kartik Bhat <kartikbhat@kartiks-mbp-2.mynetworksettings.com>
Co-authored-by: Steven Landers <steven.landers@gmail.com>
  • Loading branch information
3 people authored Oct 3, 2023
1 parent b3eec97 commit 51802ea
Show file tree
Hide file tree
Showing 15 changed files with 338 additions and 95 deletions.
2 changes: 2 additions & 0 deletions cmd/tendermint/commands/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func MakeLightCommand(conf *config.Config, logger log.Logger) *cobra.Command {
trustedHeight int64
trustedHash []byte
trustLevelStr string
blacklistTTL time.Duration

logLevel string
logFormat string
Expand Down Expand Up @@ -158,6 +159,7 @@ for applications built w/ Cosmos SDK).
primaryAddr,
witnessesAddrs,
dbs.New(db),
blacklistTTL,
options...,
)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,9 @@ type StateSyncConfig struct {

// Timeout before considering light block verification failed
VerifyLightBlockTimeout time.Duration `mapstructure:"verify-light-block-timeout"`

// Time before which a blacklisted witness can not be added back as a provider
BlacklistTTL time.Duration `mapstructure:"blacklist-ttl"`
}

func (cfg *StateSyncConfig) TrustHashBytes() []byte {
Expand All @@ -933,6 +936,7 @@ func DefaultStateSyncConfig() *StateSyncConfig {
BackfillBlocks: 0,
BackfillDuration: 0 * time.Second,
VerifyLightBlockTimeout: 60 * time.Second,
BlacklistTTL: 5 * time.Minute,
}
}

Expand Down Expand Up @@ -1292,6 +1296,7 @@ type DBSyncConfig struct {
TrustHash string `mapstructure:"trust-hash"`
TrustPeriod time.Duration `mapstructure:"trust-period"`
VerifyLightBlockTimeout time.Duration `mapstructure:"verify-light-block-timeout"`
BlacklistTTL time.Duration `mapstructure:"blacklist-ttl"`
}

func DefaultDBSyncConfig() *DBSyncConfig {
Expand All @@ -1308,6 +1313,7 @@ func DefaultDBSyncConfig() *DBSyncConfig {
TrustHash: "",
TrustPeriod: 86400 * time.Second,
VerifyLightBlockTimeout: 60 * time.Second,
BlacklistTTL: 5 * time.Minute,
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ fetchers = "{{ .StateSync.Fetchers }}"
verify-light-block-timeout = "{{ .StateSync.VerifyLightBlockTimeout }}"
blacklist-ttl = "{{ .StateSync.BlacklistTTL }}"
#######################################################
### Consensus Configuration Options ###
#######################################################
Expand Down Expand Up @@ -608,6 +610,7 @@ trust-height = "{{ .DBSync.TrustHeight }}"
trust-hash = "{{ .DBSync.TrustHash }}"
trust-period = "{{ .DBSync.TrustPeriod }}"
verify-light-block-timeout = "{{ .DBSync.VerifyLightBlockTimeout }}"
blacklist-ttl = "{{ .DBSync.BlacklistTTL }}"
`

/****** these are for test settings ***********/
Expand Down
2 changes: 1 addition & 1 deletion internal/dbsync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
providers[idx] = light.NewBlockProvider(p, r.chainID, r.dispatcher)
}

stateProvider, err := light.NewP2PStateProvider(ctx, r.chainID, r.initialHeight, r.config.VerifyLightBlockTimeout, providers, to, r.paramsChannel, r.logger.With("module", "stateprovider"), func(height uint64) proto.Message {
stateProvider, err := light.NewP2PStateProvider(ctx, r.chainID, r.initialHeight, r.config.VerifyLightBlockTimeout, providers, to, r.paramsChannel, r.logger.With("module", "stateprovider"), r.config.BlacklistTTL, func(height uint64) proto.Message {
return &dstypes.ParamsRequest{
Height: height,
}
Expand Down
9 changes: 7 additions & 2 deletions internal/statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
providers[idx] = light.NewBlockProvider(p, chainID, r.dispatcher)
}

stateProvider, err := light.NewP2PStateProvider(ctx, chainID, initialHeight, r.cfg.VerifyLightBlockTimeout, providers, to, r.paramsChannel, r.logger.With("module", "stateprovider"), func(height uint64) proto.Message {
stateProvider, err := light.NewP2PStateProvider(ctx, chainID, initialHeight, r.cfg.VerifyLightBlockTimeout, providers, to, r.paramsChannel, r.logger.With("module", "stateprovider"), r.cfg.BlacklistTTL, func(height uint64) proto.Message {
return &ssproto.ParamsRequest{
Height: height,
}
Expand All @@ -319,7 +319,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
return nil
}

stateProvider, err := light.NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.VerifyLightBlockTimeout, r.cfg.RPCServers, to, spLogger)
stateProvider, err := light.NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.VerifyLightBlockTimeout, r.cfg.RPCServers, to, spLogger, r.cfg.BlacklistTTL)
if err != nil {
return fmt.Errorf("failed to initialize RPC state provider: %w", err)
}
Expand Down Expand Up @@ -1032,6 +1032,11 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
case p2p.PeerStatusDown:
delete(r.providers, peerUpdate.NodeID)
r.syncer.RemovePeer(peerUpdate.NodeID)
if sp, ok := r.stateProvider.(*light.StateProviderP2P); ok {
if err := sp.RemoveProviderByID(peerUpdate.NodeID); err != nil {
r.logger.Error("failed to remove peer witness", "peer", peerUpdate.NodeID, "error", err)
}
}
}
r.logger.Debug("processed peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
}
Expand Down
32 changes: 30 additions & 2 deletions internal/statesync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,11 +540,13 @@ func TestReactor_StateProviderP2P(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rts := setup(ctx, t, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 3)
// make syncer non nil else test won't think we are state syncing
rts.reactor.syncer = rts.syncer
peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
peerB := types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength))
peerC := types.NodeID(strings.Repeat("c", 2*types.NodeIDByteLength))

rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: peerA,
Status: p2p.PeerStatusUp,
Expand All @@ -553,6 +555,10 @@ func TestReactor_StateProviderP2P(t *testing.T) {
NodeID: peerB,
Status: p2p.PeerStatusUp,
}
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: peerC,
Status: p2p.PeerStatusUp,
}

closeCh := make(chan struct{})
defer close(closeCh)
Expand All @@ -565,7 +571,7 @@ func TestReactor_StateProviderP2P(t *testing.T) {
rts.reactor.cfg.TrustHeight = 1
rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())

for _, p := range []types.NodeID{peerA, peerB} {
for _, p := range []types.NodeID{peerA, peerB, peerC} {
if !rts.reactor.peers.Contains(p) {
rts.reactor.peers.Append(p)
}
Expand Down Expand Up @@ -602,6 +608,28 @@ func TestReactor_StateProviderP2P(t *testing.T) {
})
require.NoError(t, err)
require.True(t, added)

// verify that the state provider is a p2p provider
sp, ok := rts.reactor.stateProvider.(*light.StateProviderP2P)
require.True(t, ok)

// verify that a status-down peer starts in the list
require.Len(t, sp.Providers(), 2)

// notify that peer C is down
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: peerC,
Status: p2p.PeerStatusDown,
}

// removal is async, so we need to wait for the reactor to update
require.Eventually(t, func() bool {
return len(sp.Providers()) == 1
}, 5*time.Second, 100*time.Millisecond)

// should now have 1 witness (peer B)
require.Len(t, sp.Providers(), 1)
require.Equal(t, string(peerB), sp.Providers()[0].ID())
}

func TestReactor_Backfill(t *testing.T) {
Expand Down
86 changes: 85 additions & 1 deletion light/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type Client struct {
trustLevel tmmath.Fraction
maxClockDrift time.Duration
maxBlockLag time.Duration
blacklistTTL time.Duration

// Mutex for locking during changes of the light clients providers
providerMutex sync.Mutex
Expand All @@ -131,6 +132,11 @@ type Client struct {
// Providers used to "witness" new headers.
witnesses []provider.Provider

// Map of witnesses, who have been removed
// and not allowed to be added back as a provider,
// to the timadd they were added to the blacklist
blacklist map[string]time.Time

// Where trusted light blocks are stored.
trustedStore store.Store
// Highest trusted light block from the store (height=H).
Expand Down Expand Up @@ -160,6 +166,7 @@ func NewClient(
primary provider.Provider,
witnesses []provider.Provider,
trustedStore store.Store,
blacklistTTL time.Duration,
options ...Option,
) (*Client, error) {

Expand All @@ -171,7 +178,7 @@ func NewClient(
}
if lastHeight > 0 {
return NewClientFromTrustedStore(
chainID, trustOptions.Period, primary, witnesses, trustedStore, options...,
chainID, trustOptions.Period, primary, witnesses, trustedStore, blacklistTTL, options...,
)
}

Expand All @@ -191,10 +198,12 @@ func NewClient(
verificationMode: skipping,
primary: primary,
witnesses: witnesses,
blacklist: make(map[string]time.Time),
trustedStore: trustedStore,
trustLevel: DefaultTrustLevel,
maxClockDrift: defaultMaxClockDrift,
maxBlockLag: defaultMaxBlockLag,
blacklistTTL: blacklistTTL,
pruningSize: defaultPruningSize,
logger: log.NewNopLogger(),
}
Expand Down Expand Up @@ -225,6 +234,7 @@ func NewClientFromTrustedStore(
primary provider.Provider,
witnesses []provider.Provider,
trustedStore store.Store,
blacklistTTL time.Duration,
options ...Option) (*Client, error) {

c := &Client{
Expand All @@ -234,6 +244,7 @@ func NewClientFromTrustedStore(
trustLevel: DefaultTrustLevel,
maxClockDrift: defaultMaxClockDrift,
maxBlockLag: defaultMaxBlockLag,
blacklistTTL: blacklistTTL,
primary: primary,
witnesses: witnesses,
trustedStore: trustedStore,
Expand All @@ -258,6 +269,24 @@ func NewClientFromTrustedStore(
return c, nil
}

// isBlacklisted checks whether provider is black listed
// NOTE: requires a providerMutex lock
func (c *Client) isBlacklisted(p provider.Provider) bool {
timestamp, exists := c.blacklist[p.ID()]
if !exists {
return false
}

// If the provider is found, check the TTL
if time.Since(timestamp) > c.blacklistTTL {
// Remove from blacklist if TTL expired
delete(c.blacklist, p.ID())
return false
}

return true
}

// restoreTrustedLightBlock loads the latest trusted light block from the store
func (c *Client) restoreTrustedLightBlock() error {
lastHeight, err := c.trustedStore.LastLightBlockHeight()
Expand Down Expand Up @@ -821,12 +850,35 @@ func (c *Client) Witnesses() []provider.Provider {
return c.witnesses
}

// BlacklistedWitnessIDS returns the blacklisted witness IDs.
//
// NOTE: providers may be not safe for concurrent access.
func (c *Client) BlacklistedWitnessIDs() []string {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()

witnessIds := make([]string, 0, len(c.blacklist))
for w := range c.blacklist {
witnessIds = append(witnessIds, w)
}

sort.Strings(witnessIds)

return witnessIds
}

// AddProvider adds a providers to the light clients set
//
// NOTE: The light client does not check for uniqueness
func (c *Client) AddProvider(p provider.Provider) {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()

// If the provider is blacklisted, don't add it
if c.isBlacklisted(p) {
return
}

c.witnesses = append(c.witnesses, p)
}

Expand Down Expand Up @@ -956,6 +1008,38 @@ func (c *Client) getLightBlock(ctx context.Context, p provider.Provider, height
return l, err
}

// addWitnessToBlacklist adds a witness to the blacklist
// NOTE: requires a providerMutex lock
func (c *Client) addWitnessesToBlacklist(providers []provider.Provider) {
if len(providers) == 0 {
return
}

for _, provider := range providers {
c.blacklist[provider.ID()] = time.Now()
}
}

func (c *Client) findIndexForWitness(ID types.NodeID) (int, bool) {
for i, w := range c.witnesses {
if w.ID() == string(ID) {
return i, true
}
}
return 0, false
}

// RemoveProviderByID removes a witness from the light client.
func (c *Client) RemoveProviderByID(ID types.NodeID) error {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()

if idx, ok := c.findIndexForWitness(ID); ok {
return c.removeWitnesses([]int{idx})
}
return nil
}

// NOTE: requires a providerMutex lock
func (c *Client) removeWitnesses(indexes []int) error {
if len(c.witnesses) <= len(indexes) {
Expand Down
3 changes: 3 additions & 0 deletions light/client_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func BenchmarkSequence(b *testing.B) {
benchmarkFullNode,
[]provider.Provider{benchmarkFullNode},
dbs.New(dbm.NewMemDB()),
5*time.Minute,
light.Logger(logger),
light.SequentialVerification(),
)
Expand Down Expand Up @@ -123,6 +124,7 @@ func BenchmarkBisection(b *testing.B) {
benchmarkFullNode,
[]provider.Provider{benchmarkFullNode},
dbs.New(dbm.NewMemDB()),
5*time.Minute,
light.Logger(logger),
)
if err != nil {
Expand Down Expand Up @@ -159,6 +161,7 @@ func BenchmarkBackwards(b *testing.B) {
benchmarkFullNode,
[]provider.Provider{benchmarkFullNode},
dbs.New(dbm.NewMemDB()),
5*time.Minute,
light.Logger(logger),
)
if err != nil {
Expand Down
Loading

0 comments on commit 51802ea

Please sign in to comment.