Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

State sync fixes: Remove Bad Witnesses and Don't Block on All Witnesses for ConsensusParams #156

Merged
merged 28 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/tendermint/commands/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func MakeLightCommand(conf *config.Config, logger log.Logger) *cobra.Command {
trustedHeight int64
trustedHash []byte
trustLevelStr string
blacklistTTL time.Duration

logLevel string
logFormat string
Expand Down Expand Up @@ -158,6 +159,7 @@ for applications built w/ Cosmos SDK).
primaryAddr,
witnessesAddrs,
dbs.New(db),
blacklistTTL,
options...,
)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,9 @@ type StateSyncConfig struct {

// Timeout before considering light block verification failed
VerifyLightBlockTimeout time.Duration `mapstructure:"verify-light-block-timeout"`

// Time before which a blacklisted witness can not be added back as a provider
BlacklistTTL time.Duration `mapstructure:"blacklist-ttl"`
}

func (cfg *StateSyncConfig) TrustHashBytes() []byte {
Expand All @@ -933,6 +936,7 @@ func DefaultStateSyncConfig() *StateSyncConfig {
BackfillBlocks: 0,
BackfillDuration: 0 * time.Second,
VerifyLightBlockTimeout: 60 * time.Second,
BlacklistTTL: 5 * time.Minute,
}
}

Expand Down Expand Up @@ -1292,6 +1296,7 @@ type DBSyncConfig struct {
TrustHash string `mapstructure:"trust-hash"`
TrustPeriod time.Duration `mapstructure:"trust-period"`
VerifyLightBlockTimeout time.Duration `mapstructure:"verify-light-block-timeout"`
BlacklistTTL time.Duration `mapstructure:"blacklist-ttl"`
}

func DefaultDBSyncConfig() *DBSyncConfig {
Expand All @@ -1308,6 +1313,7 @@ func DefaultDBSyncConfig() *DBSyncConfig {
TrustHash: "",
TrustPeriod: 86400 * time.Second,
VerifyLightBlockTimeout: 60 * time.Second,
BlacklistTTL: 5 * time.Minute,
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ fetchers = "{{ .StateSync.Fetchers }}"

verify-light-block-timeout = "{{ .StateSync.VerifyLightBlockTimeout }}"

blacklist-ttl = "{{ .StateSync.BlacklistTTL }}"

#######################################################
### Consensus Configuration Options ###
#######################################################
Expand Down Expand Up @@ -608,6 +610,7 @@ trust-height = "{{ .DBSync.TrustHeight }}"
trust-hash = "{{ .DBSync.TrustHash }}"
trust-period = "{{ .DBSync.TrustPeriod }}"
verify-light-block-timeout = "{{ .DBSync.VerifyLightBlockTimeout }}"
blacklist-ttl = "{{ .DBSync.BlacklistTTL }}"
`

/****** these are for test settings ***********/
Expand Down
2 changes: 1 addition & 1 deletion internal/dbsync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
providers[idx] = light.NewBlockProvider(p, r.chainID, r.dispatcher)
}

stateProvider, err := light.NewP2PStateProvider(ctx, r.chainID, r.initialHeight, r.config.VerifyLightBlockTimeout, providers, to, r.paramsChannel, r.logger.With("module", "stateprovider"), func(height uint64) proto.Message {
stateProvider, err := light.NewP2PStateProvider(ctx, r.chainID, r.initialHeight, r.config.VerifyLightBlockTimeout, providers, to, r.paramsChannel, r.logger.With("module", "stateprovider"), r.config.BlacklistTTL, func(height uint64) proto.Message {
return &dstypes.ParamsRequest{
Height: height,
}
Expand Down
9 changes: 7 additions & 2 deletions internal/statesync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
providers[idx] = light.NewBlockProvider(p, chainID, r.dispatcher)
}

stateProvider, err := light.NewP2PStateProvider(ctx, chainID, initialHeight, r.cfg.VerifyLightBlockTimeout, providers, to, r.paramsChannel, r.logger.With("module", "stateprovider"), func(height uint64) proto.Message {
stateProvider, err := light.NewP2PStateProvider(ctx, chainID, initialHeight, r.cfg.VerifyLightBlockTimeout, providers, to, r.paramsChannel, r.logger.With("module", "stateprovider"), r.cfg.BlacklistTTL, func(height uint64) proto.Message {
return &ssproto.ParamsRequest{
Height: height,
}
Expand All @@ -319,7 +319,7 @@ func (r *Reactor) OnStart(ctx context.Context) error {
return nil
}

stateProvider, err := light.NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.VerifyLightBlockTimeout, r.cfg.RPCServers, to, spLogger)
stateProvider, err := light.NewRPCStateProvider(ctx, chainID, initialHeight, r.cfg.VerifyLightBlockTimeout, r.cfg.RPCServers, to, spLogger, r.cfg.BlacklistTTL)
if err != nil {
return fmt.Errorf("failed to initialize RPC state provider: %w", err)
}
Expand Down Expand Up @@ -1032,6 +1032,11 @@ func (r *Reactor) processPeerUpdate(ctx context.Context, peerUpdate p2p.PeerUpda
case p2p.PeerStatusDown:
delete(r.providers, peerUpdate.NodeID)
r.syncer.RemovePeer(peerUpdate.NodeID)
if sp, ok := r.stateProvider.(*light.StateProviderP2P); ok {
if err := sp.RemoveProviderByID(peerUpdate.NodeID); err != nil {
r.logger.Error("failed to remove peer witness", "peer", peerUpdate.NodeID, "error", err)
}
}
}
r.logger.Debug("processed peer update", "peer", peerUpdate.NodeID, "status", peerUpdate.Status)
}
Expand Down
32 changes: 30 additions & 2 deletions internal/statesync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,11 +540,13 @@ func TestReactor_StateProviderP2P(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rts := setup(ctx, t, nil, nil, 2)
rts := setup(ctx, t, nil, nil, 3)
// make syncer non nil else test won't think we are state syncing
rts.reactor.syncer = rts.syncer
peerA := types.NodeID(strings.Repeat("a", 2*types.NodeIDByteLength))
peerB := types.NodeID(strings.Repeat("b", 2*types.NodeIDByteLength))
peerC := types.NodeID(strings.Repeat("c", 2*types.NodeIDByteLength))

rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: peerA,
Status: p2p.PeerStatusUp,
Expand All @@ -553,6 +555,10 @@ func TestReactor_StateProviderP2P(t *testing.T) {
NodeID: peerB,
Status: p2p.PeerStatusUp,
}
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: peerC,
Status: p2p.PeerStatusUp,
}

closeCh := make(chan struct{})
defer close(closeCh)
Expand All @@ -565,7 +571,7 @@ func TestReactor_StateProviderP2P(t *testing.T) {
rts.reactor.cfg.TrustHeight = 1
rts.reactor.cfg.TrustHash = fmt.Sprintf("%X", chain[1].Hash())

for _, p := range []types.NodeID{peerA, peerB} {
for _, p := range []types.NodeID{peerA, peerB, peerC} {
if !rts.reactor.peers.Contains(p) {
rts.reactor.peers.Append(p)
}
Expand Down Expand Up @@ -602,6 +608,28 @@ func TestReactor_StateProviderP2P(t *testing.T) {
})
require.NoError(t, err)
require.True(t, added)

// verify that the state provider is a p2p provider
sp, ok := rts.reactor.stateProvider.(*light.StateProviderP2P)
require.True(t, ok)

// verify that a status-down peer starts in the list
require.Len(t, sp.Providers(), 2)

// notify that peer C is down
rts.peerUpdateCh <- p2p.PeerUpdate{
NodeID: peerC,
Status: p2p.PeerStatusDown,
}

// removal is async, so we need to wait for the reactor to update
require.Eventually(t, func() bool {
return len(sp.Providers()) == 1
}, 5*time.Second, 100*time.Millisecond)

// should now have 1 witness (peer B)
require.Len(t, sp.Providers(), 1)
require.Equal(t, string(peerB), sp.Providers()[0].ID())
}

func TestReactor_Backfill(t *testing.T) {
Expand Down
86 changes: 85 additions & 1 deletion light/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type Client struct {
trustLevel tmmath.Fraction
maxClockDrift time.Duration
maxBlockLag time.Duration
blacklistTTL time.Duration

// Mutex for locking during changes of the light clients providers
providerMutex sync.Mutex
Expand All @@ -131,6 +132,11 @@ type Client struct {
// Providers used to "witness" new headers.
witnesses []provider.Provider

// Map of witnesses, who have been removed
// and not allowed to be added back as a provider,
// to the timadd they were added to the blacklist
blacklist map[string]time.Time

// Where trusted light blocks are stored.
trustedStore store.Store
// Highest trusted light block from the store (height=H).
Expand Down Expand Up @@ -160,6 +166,7 @@ func NewClient(
primary provider.Provider,
witnesses []provider.Provider,
trustedStore store.Store,
blacklistTTL time.Duration,
options ...Option,
) (*Client, error) {

Expand All @@ -171,7 +178,7 @@ func NewClient(
}
if lastHeight > 0 {
return NewClientFromTrustedStore(
chainID, trustOptions.Period, primary, witnesses, trustedStore, options...,
chainID, trustOptions.Period, primary, witnesses, trustedStore, blacklistTTL, options...,
)
}

Expand All @@ -191,10 +198,12 @@ func NewClient(
verificationMode: skipping,
primary: primary,
witnesses: witnesses,
blacklist: make(map[string]time.Time),
trustedStore: trustedStore,
trustLevel: DefaultTrustLevel,
maxClockDrift: defaultMaxClockDrift,
maxBlockLag: defaultMaxBlockLag,
blacklistTTL: blacklistTTL,
pruningSize: defaultPruningSize,
logger: log.NewNopLogger(),
}
Expand Down Expand Up @@ -225,6 +234,7 @@ func NewClientFromTrustedStore(
primary provider.Provider,
witnesses []provider.Provider,
trustedStore store.Store,
blacklistTTL time.Duration,
options ...Option) (*Client, error) {

c := &Client{
Expand All @@ -234,6 +244,7 @@ func NewClientFromTrustedStore(
trustLevel: DefaultTrustLevel,
maxClockDrift: defaultMaxClockDrift,
maxBlockLag: defaultMaxBlockLag,
blacklistTTL: blacklistTTL,
primary: primary,
witnesses: witnesses,
trustedStore: trustedStore,
Expand All @@ -258,6 +269,24 @@ func NewClientFromTrustedStore(
return c, nil
}

// isBlacklisted checks whether provider is black listed
// NOTE: requires a providerMutex lock
func (c *Client) isBlacklisted(p provider.Provider) bool {
timestamp, exists := c.blacklist[p.ID()]
if !exists {
return false
}

// If the provider is found, check the TTL
if time.Since(timestamp) > c.blacklistTTL {
// Remove from blacklist if TTL expired
delete(c.blacklist, p.ID())
return false
}

return true
}

// restoreTrustedLightBlock loads the latest trusted light block from the store
func (c *Client) restoreTrustedLightBlock() error {
lastHeight, err := c.trustedStore.LastLightBlockHeight()
Expand Down Expand Up @@ -821,12 +850,35 @@ func (c *Client) Witnesses() []provider.Provider {
return c.witnesses
}

// BlacklistedWitnessIDS returns the blacklisted witness IDs.
//
// NOTE: providers may be not safe for concurrent access.
func (c *Client) BlacklistedWitnessIDs() []string {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()

witnessIds := make([]string, 0, len(c.blacklist))
for w := range c.blacklist {
witnessIds = append(witnessIds, w)
}

sort.Strings(witnessIds)

return witnessIds
}

// AddProvider adds a providers to the light clients set
//
// NOTE: The light client does not check for uniqueness
func (c *Client) AddProvider(p provider.Provider) {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()

// If the provider is blacklisted, don't add it
if c.isBlacklisted(p) {
return
}

c.witnesses = append(c.witnesses, p)
}

Expand Down Expand Up @@ -956,6 +1008,38 @@ func (c *Client) getLightBlock(ctx context.Context, p provider.Provider, height
return l, err
}

// addWitnessToBlacklist adds a witness to the blacklist
// NOTE: requires a providerMutex lock
func (c *Client) addWitnessesToBlacklist(providers []provider.Provider) {
if len(providers) == 0 {
return
}

for _, provider := range providers {
c.blacklist[provider.ID()] = time.Now()
}
}

func (c *Client) findIndexForWitness(ID types.NodeID) (int, bool) {
for i, w := range c.witnesses {
if w.ID() == string(ID) {
return i, true
}
}
return 0, false
}

// RemoveProviderByID removes a witness from the light client.
func (c *Client) RemoveProviderByID(ID types.NodeID) error {
c.providerMutex.Lock()
defer c.providerMutex.Unlock()

if idx, ok := c.findIndexForWitness(ID); ok {
return c.removeWitnesses([]int{idx})
}
return nil
}

// NOTE: requires a providerMutex lock
func (c *Client) removeWitnesses(indexes []int) error {
if len(c.witnesses) <= len(indexes) {
Expand Down
3 changes: 3 additions & 0 deletions light/client_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func BenchmarkSequence(b *testing.B) {
benchmarkFullNode,
[]provider.Provider{benchmarkFullNode},
dbs.New(dbm.NewMemDB()),
5*time.Minute,
light.Logger(logger),
light.SequentialVerification(),
)
Expand Down Expand Up @@ -123,6 +124,7 @@ func BenchmarkBisection(b *testing.B) {
benchmarkFullNode,
[]provider.Provider{benchmarkFullNode},
dbs.New(dbm.NewMemDB()),
5*time.Minute,
light.Logger(logger),
)
if err != nil {
Expand Down Expand Up @@ -159,6 +161,7 @@ func BenchmarkBackwards(b *testing.B) {
benchmarkFullNode,
[]provider.Provider{benchmarkFullNode},
dbs.New(dbm.NewMemDB()),
5*time.Minute,
light.Logger(logger),
)
if err != nil {
Expand Down
Loading