Skip to content

Commit

Permalink
fix(f3): poll the lease by repeatedly participating instead of checki…
Browse files Browse the repository at this point in the history
…ng progress (#12640)

Previously, the flow was:

1. Get ticket.
2. Participate.
3. Repeatedly poll progress.

The new flow is:

1. Get ticket.
2. Repeatedly participate, using the returned lease as an indicator of progress.

That way, if the lotus node reboots we'll eventually re-tell them about
the lease.

fixes filecoin-project/go-f3#719
  • Loading branch information
Stebalien authored and rjan90 committed Oct 28, 2024
1 parent 748f6e7 commit 248a624
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 78 deletions.
134 changes: 67 additions & 67 deletions chain/lf3/participation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"

"github.com/filecoin-project/lotus/api"
Expand All @@ -37,7 +36,6 @@ const (
type F3ParticipationAPI interface {
F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) //perm:sign
F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error)
F3GetProgress(ctx context.Context) (gpbft.Instant, error)
F3GetManifest(ctx context.Context) (*manifest.Manifest, error)
}

Expand All @@ -46,7 +44,6 @@ type Participant struct {
participant address.Address
backoff *backoff.Backoff
maxCheckProgressAttempts int
previousTicket api.F3ParticipationTicket
leaseTerm uint64

runningCtx context.Context
Expand Down Expand Up @@ -92,21 +89,18 @@ func (p *Participant) run(ctx context.Context) (_err error) {
}
}()

var ticket api.F3ParticipationTicket
for ctx.Err() == nil {
var err error
start := time.Now()
ticket, err := p.tryGetF3ParticipationTicket(ctx)
ticket, err = p.tryGetF3ParticipationTicket(ctx, ticket)
if err != nil {
return err
}
lease, participating, err := p.tryF3Participate(ctx, ticket)
err = p.tryParticipate(ctx, ticket)
if err != nil {
return err
}
if participating {
if err := p.awaitLeaseExpiry(ctx, lease); err != nil {
return err
}
}
const minPeriod = 500 * time.Millisecond
if sinceLastLoop := time.Since(start); sinceLastLoop < minPeriod {
select {
Expand All @@ -120,10 +114,10 @@ func (p *Participant) run(ctx context.Context) (_err error) {
return ctx.Err()
}

func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3ParticipationTicket, error) {
func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context, previousTicket api.F3ParticipationTicket) (api.F3ParticipationTicket, error) {
p.backoff.Reset()
for ctx.Err() == nil {
switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, p.previousTicket, p.leaseTerm); {
switch ticket, err := p.node.F3GetOrRenewParticipationTicket(ctx, p.participant, previousTicket, p.leaseTerm); {
case ctx.Err() != nil:
return api.F3ParticipationTicket{}, ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
Expand All @@ -142,25 +136,51 @@ func (p *Participant) tryGetF3ParticipationTicket(ctx context.Context) (api.F3Pa
return api.F3ParticipationTicket{}, ctx.Err()
}

func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, bool, error) {
func (p *Participant) getManifest(ctx context.Context) (*manifest.Manifest, error) {
p.backoff.Reset()
for ctx.Err() == nil {
switch manifest, err := p.node.F3GetManifest(ctx); {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return nil, xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
log.Errorw("Error when fetching F3 manifest. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
case manifest == nil:
// Can happen if we reboot and have no manifest.
log.Warnw("Received no F3 manifest from lotus. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration())
default:
return manifest, nil
}
p.backOff(ctx)
}
return nil, ctx.Err()
}

func (p *Participant) tryParticipate(ctx context.Context, ticket api.F3ParticipationTicket) error {
p.backoff.Reset()
renewLeaseWithin := p.leaseTerm / 2
var (
manifest *manifest.Manifest
haveLease bool
)
for ctx.Err() == nil {
switch lease, err := p.node.F3Participate(ctx, ticket); {
lease, err := p.node.F3Participate(ctx, ticket)
switch {
case ctx.Err() != nil:
return api.F3ParticipationLease{}, false, ctx.Err()
return ctx.Err()
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot participate in F3 as it is disabled.", "err", err)
return api.F3ParticipationLease{}, false, xerrors.Errorf("attempting F3 participation with ticket: %w", err)
return xerrors.Errorf("attempting F3 participation with ticket: %w", err)
case errors.Is(err, api.ErrF3ParticipationTicketExpired):
log.Warnw("F3 participation ticket expired while attempting to participate. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
return api.F3ParticipationLease{}, false, nil
return nil
case errors.Is(err, api.ErrF3ParticipationTicketStartBeforeExisting):
log.Warnw("F3 participation ticket starts before the existing lease. Acquiring a new ticket.", "attempts", p.backoff.Attempt(), "err", err)
return api.F3ParticipationLease{}, false, nil
return nil
case errors.Is(err, api.ErrF3ParticipationTicketInvalid):
log.Errorw("F3 participation ticket is not valid. Acquiring a new ticket after backoff.", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
p.backOff(ctx)
return api.F3ParticipationLease{}, false, nil
return nil
case errors.Is(err, api.ErrF3ParticipationIssuerMismatch):
log.Warnw("Node is not the issuer of F3 participation ticket. Miner maybe load-balancing or node has changed. Retrying F3 participation after backoff.", "backoff", p.backoff.Duration(), "err", err)
p.backOff(ctx)
Expand All @@ -171,69 +191,49 @@ func (p *Participant) tryF3Participate(ctx context.Context, ticket api.F3Partici
p.backOff(ctx)
continue
case err != nil:
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
return nil
}
log.Errorw("Unexpected error while attempting F3 participation. Retrying after backoff", "backoff", p.backoff.Duration(), "attempts", p.backoff.Attempt(), "err", err)
p.backOff(ctx)
continue
case lease.ValidityTerm <= renewLeaseWithin:
return nil
default:
// we succeeded so reset the backoff.
p.backoff.Reset()
}

// Log the first time we give out the lease.
if !haveLease {
log.Infow("Successfully acquired F3 participation lease.",
"issuer", lease.Issuer,
"not-before", lease.FromInstance,
"not-after", lease.ToInstance(),
)
p.previousTicket = ticket
return lease, true, nil
haveLease = true
}
}
return api.F3ParticipationLease{}, false, ctx.Err()
}

func (p *Participant) awaitLeaseExpiry(ctx context.Context, lease api.F3ParticipationLease) error {
p.backoff.Reset()
renewLeaseWithin := p.leaseTerm / 2
for ctx.Err() == nil {
manifest, err := p.node.F3GetManifest(ctx)
switch {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
return nil
// Fetch the manifest if necessary.
if manifest == nil || lease.Network != manifest.NetworkName {
manifest, err = p.getManifest(ctx)
if err != nil {
return err
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
p.backOff(ctx)
continue
case manifest == nil || manifest.NetworkName != lease.Network:
// If we got an unexpected manifest, or no manifest, go back to the
// beginning and try to get another ticket. Switching from having a manifest
// to having no manifest can theoretically happen if the lotus node reboots
// and has no static manifest.
return nil
}
switch progress, err := p.node.F3GetProgress(ctx); {
case errors.Is(err, api.ErrF3Disabled):
log.Errorw("Cannot await F3 participation lease expiry as F3 is disabled.", "err", err)
return xerrors.Errorf("awaiting F3 participation lease expiry: %w", err)
case err != nil:
if p.backoff.Attempt() > float64(p.maxCheckProgressAttempts) {
log.Errorw("Too many failures while attempting to check F3 progress. Restarting participation.", "attempts", p.backoff.Attempt(), "err", err)
if manifest.NetworkName != lease.Network {
log.Warnf("Got a manifest for network %q while waiting for a lease on network %q. Getting another ticket.", manifest.NetworkName, lease.Network)
return nil
}
log.Errorw("Failed to check F3 progress while awaiting lease expiry. Retrying after backoff.", "attempts", p.backoff.Attempt(), "backoff", p.backoff.Duration(), "err", err)
p.backOff(ctx)
case progress.ID+renewLeaseWithin >= lease.ToInstance():
log.Infof("F3 progressed (%d) to within %d instances of lease expiry (%d). Renewing participation.", progress.ID, renewLeaseWithin, lease.ToInstance())
return nil
default:
remainingInstanceLease := lease.ToInstance() - progress.ID
waitTime := time.Duration(remainingInstanceLease-renewLeaseWithin) * manifest.CatchUpAlignment
if waitTime == 0 {
waitTime = 100 * time.Millisecond
}
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", remainingInstanceLease, waitTime)
p.backOffFor(ctx, waitTime)
}

// Wait until we think we may need to renew the lease.
waitTime := time.Duration(lease.ValidityTerm-renewLeaseWithin) * manifest.CatchUpAlignment
if waitTime == 0 {
waitTime = 100 * time.Millisecond
}
log.Debugf("F3 participation lease is valid for further %d instances. Re-checking after %s.", lease.ValidityTerm, waitTime)
p.backOffFor(ctx, waitTime)
}
return ctx.Err()
}
Expand Down
17 changes: 12 additions & 5 deletions chain/lf3/participation_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,25 @@ func (l *leaser) participate(ticket api.F3ParticipationTicket) (api.F3Participat
l.mutex.Lock()
defer l.mutex.Unlock()
currentLease, found := l.leases[newLease.MinerID]
if found && currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance {
// For safety, strictly require lease start instance to never decrease.
return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting
}
if !found {
if found {
// short-circuite for reparticipation.
if currentLease == newLease {
return newLease, nil
}
if currentLease.Network == newLease.Network && currentLease.FromInstance > newLease.FromInstance {
// For safety, strictly require lease start instance to never decrease.
return api.F3ParticipationLease{}, api.ErrF3ParticipationTicketStartBeforeExisting
}
} else {
log.Infof("started participating in F3 for miner %d", newLease.MinerID)
}
l.leases[newLease.MinerID] = newLease
select {
case l.notifyParticipation <- struct{}{}:
default:
}
newLease.ValidityTerm = newLease.ToInstance() - instant.ID
newLease.FromInstance = instant.ID
return newLease, nil
}

Expand Down
3 changes: 2 additions & 1 deletion chain/lf3/participation_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestLeaser(t *testing.T) {
require.NoError(t, err)
require.Equal(t, uint64(123), lease.MinerID)
require.Equal(t, issuer.String(), lease.Issuer)
require.Equal(t, uint64(5), lease.ValidityTerm) // Current instance (10) + offset (5)
require.Equal(t, uint64(10), lease.FromInstance) // Current instance (10) + offset (5)
require.Equal(t, uint64(5), lease.ValidityTerm) // Current instance (10) + offset (5)
})
t.Run("get participants", func(t *testing.T) {
progress.currentInstance = 11
Expand Down
79 changes: 74 additions & 5 deletions chain/lf3/participation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"time"

"github.com/jpillora/backoff"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/manifest"

"github.com/filecoin-project/lotus/api"
Expand Down Expand Up @@ -41,10 +41,6 @@ func (m *manifestFailAPI) F3GetOrRenewParticipationTicket(ctx context.Context, m
}
}

func (m *manifestFailAPI) F3GetProgress(ctx context.Context) (gpbft.Instant, error) {
return gpbft.Instant{}, nil
}

func (m *manifestFailAPI) F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) {
return api.F3ParticipationLease{
Network: "test",
Expand Down Expand Up @@ -73,3 +69,76 @@ func TestParticipantManifestFailure(t *testing.T) {
<-api.manifestRequested
require.NoError(t, p.Stop(context.Background()))
}

type repeatedParticipateAPI struct {
secondTicket chan struct{}
instance uint64
t *testing.T
}

func (m *repeatedParticipateAPI) F3GetManifest(ctx context.Context) (*manifest.Manifest, error) {
return &manifest.Manifest{
NetworkName: "test",
CatchUpAlignment: time.Millisecond,
}, nil
}

func (m *repeatedParticipateAPI) F3GetOrRenewParticipationTicket(ctx context.Context, minerID address.Address, previous api.F3ParticipationTicket, instances uint64) (api.F3ParticipationTicket, error) {
switch string(previous) {
case "first ticket":
return api.F3ParticipationTicket("second ticket"), nil
case "":
return api.F3ParticipationTicket("first ticket"), nil
default:
panic("unexpected ticket")
}
}

func (m *repeatedParticipateAPI) F3Participate(ctx context.Context, ticket api.F3ParticipationTicket) (api.F3ParticipationLease, error) {
switch string(ticket) {
case "first ticket":
case "second ticket":
// This is 6, not 5, because we expect one final call to participate before getting
// a new ticket.
assert.Equal(m.t, uint64(6), m.instance)
close(m.secondTicket)
return api.F3ParticipationLease{}, api.ErrF3ParticipationIssuerMismatch
default:
m.t.Errorf("unexpected f3 ticket: %s", string(ticket))
return api.F3ParticipationLease{}, api.ErrF3Disabled
}

if m.instance >= 10 {
m.t.Error("did not expect the participant to continue past the half-way point")
return api.F3ParticipationLease{}, api.ErrF3Disabled
}

lease := api.F3ParticipationLease{
Network: "test",
Issuer: "foobar",
MinerID: 0,
FromInstance: m.instance,
ValidityTerm: 10 - m.instance,
}
m.instance++

return lease, nil
}

// Make sure we keep calling participate until our validity term drops to half (5) of the initial
// term (10). At that point, the participant should request a new ticket.
func TestParticipantRepeat(t *testing.T) {
api := &repeatedParticipateAPI{secondTicket: make(chan struct{}), t: t}
addr, err := address.NewIDAddress(1000)
require.NoError(t, err)

p := lf3.NewParticipant(context.Background(), api, dtypes.MinerAddress(addr),
&backoff.Backoff{
Min: 1 * time.Second,
Max: 1 * time.Minute,
Factor: 1.5,
}, 13, 10)
require.NoError(t, p.Start(context.Background()))
<-api.secondTicket
require.NoError(t, p.Stop(context.Background()))
}

0 comments on commit 248a624

Please sign in to comment.