Skip to content

Commit

Permalink
Simplify CertStore subscriber
Browse files Browse the repository at this point in the history
I have yet to write any code that cares about intermediate certificates
and they can easily be queried with `GetRange`. This change:

1. Makes the certificate subscription system always return the latest
certificate, dropping intermediates.
2. Drops the dependency on go-broadcast.
3. Simplifies the code that relies on the certificate subscription logic.
  • Loading branch information
Stebalien committed Sep 17, 2024
1 parent 92a5658 commit ad7ff7a
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 51 deletions.
65 changes: 44 additions & 21 deletions certstore/certstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/gpbft"

"github.com/Kubuxu/go-broadcast"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
Expand All @@ -30,11 +29,12 @@ var (

// Store is responsible for storing and relaying information about new finality certificates
type Store struct {
writeLk sync.Mutex
mu sync.RWMutex
ds datastore.Datastore
busCerts broadcast.Channel[*certs.FinalityCertificate]
firstInstance uint64
powerTableFrequency uint64
subscribers map[chan *certs.FinalityCertificate]struct{}
latestCertificate *certs.FinalityCertificate

latestPowerTable gpbft.PowerEntries
}
Expand All @@ -44,6 +44,7 @@ func open(ctx context.Context, ds datastore.Datastore) (*Store, error) {
cs := &Store{
ds: namespace.Wrap(ds, datastore.NewKey("/certstore")),
powerTableFrequency: defaultPowerTableFrequency,
subscribers: make(map[chan *certs.FinalityCertificate]struct{}),
}
err := maybeContinueDelete(ctx, ds)
if err != nil {
Expand All @@ -57,14 +58,13 @@ func open(ctx context.Context, ds datastore.Datastore) (*Store, error) {
return nil, fmt.Errorf("determining latest cert: %w", err)
}

latestCert, err := cs.Get(ctx, latestInstance)
cs.latestCertificate, err = cs.Get(ctx, latestInstance)
if err != nil {
return nil, fmt.Errorf("loading latest cert: %w", err)
}

metrics.latestInstance.Record(ctx, int64(latestCert.GPBFTInstance))
metrics.latestFinalizedEpoch.Record(ctx, latestCert.ECChain.Head().Epoch)
cs.busCerts.Publish(latestCert)
metrics.latestInstance.Record(ctx, int64(cs.latestCertificate.GPBFTInstance))
metrics.latestFinalizedEpoch.Record(ctx, cs.latestCertificate.ECChain.Head().Epoch)

return cs, nil
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func OpenOrCreateStore(ctx context.Context, ds datastore.Datastore, firstInstanc
return nil, fmt.Errorf("failed to read initial instance number: %w", err)
}
cs.firstInstance = firstInstance
if latest := cs.Latest(); latest != nil {
if latest := cs.latestCertificate; latest != nil {
cs.latestPowerTable, err = cs.GetPowerTable(ctx, latest.GPBFTInstance+1)
if err != nil {
return nil, fmt.Errorf("failed to load latest power table: %w", err)
Expand Down Expand Up @@ -162,7 +162,7 @@ func OpenStore(ctx context.Context, ds datastore.Datastore) (*Store, error) {
return nil, fmt.Errorf("getting first instance: %w", err)
}
latestPowerTable := cs.firstInstance
if latest := cs.Latest(); latest != nil {
if latest := cs.latestCertificate; latest != nil {
latestPowerTable = latest.GPBFTInstance + 1
}
cs.latestPowerTable, err = cs.GetPowerTable(ctx, latestPowerTable)
Expand Down Expand Up @@ -195,7 +195,9 @@ func (cs *Store) writeInstanceNumber(ctx context.Context, key datastore.Key, val

// Latest returns the newest available certificate
func (cs *Store) Latest() *certs.FinalityCertificate {
return cs.busCerts.Last()
cs.mu.RLock()
defer cs.mu.RUnlock()
return cs.latestCertificate
}

// Get returns the FinalityCertificate at the specified instance, or an error derived from
Expand Down Expand Up @@ -349,11 +351,11 @@ func (cs *Store) Put(ctx context.Context, cert *certs.FinalityCertificate) error
}

// Take a lock to ensure ordering.
cs.writeLk.Lock()
defer cs.writeLk.Unlock()
cs.mu.Lock()
defer cs.mu.Unlock()

nextCert := cs.firstInstance
if latestCert := cs.Latest(); latestCert != nil {
if latestCert := cs.latestCertificate; latestCert != nil {
nextCert = latestCert.GPBFTInstance + 1
}
if cert.GPBFTInstance > nextCert {
Expand Down Expand Up @@ -412,21 +414,42 @@ func (cs *Store) Put(ctx context.Context, cert *certs.FinalityCertificate) error
}

cs.latestPowerTable = newPowerTable
cs.latestCertificate = cert
for ch := range cs.subscribers {
// Always drain first.
for range ch {
}
// Then write the latest certificate.
ch <- cs.latestCertificate
}

metrics.latestInstance.Record(ctx, int64(cert.GPBFTInstance))
metrics.tipsetsPerInstance.Record(ctx, int64(len(cert.ECChain.Suffix())))
metrics.latestFinalizedEpoch.Record(ctx, cert.ECChain.Head().Epoch)
cs.busCerts.Publish(cert)

return nil
}

// SubscribeForNewCerts is used to subscribe to the broadcast channel.
// If the passed channel is full at any point, it will be dropped from subscription and closed.
// To stop subscribing, either the closer function can be used or the channel can be abandoned.
// Passing a channel multiple times to the Subscribe function will result in a panic.
// The channel will receive new certificates sequentially.
func (cs *Store) SubscribeForNewCerts(ch chan<- *certs.FinalityCertificate) (last *certs.FinalityCertificate, closer func()) {
return cs.busCerts.Subscribe(ch)
// Subscribe subscribes to new certificate notifications. When read, it will always return the
// latest not-yet-seen certificate (including the latest certificate when Subscribe is first
// called) but it will drop intermediate certificates. If you need all the certificates, you should
// keep track of the last certificate you received and call GetRange to get the ones between.
//
// The caller must call the closer to unsubscribe and release resources.
func (cs *Store) Subscribe() (out <-chan *certs.FinalityCertificate, closer func()) {
cs.mu.Lock()
defer cs.mu.Unlock()
ch := make(chan *certs.FinalityCertificate, 1)
ch <- cs.latestCertificate
cs.subscribers[ch] = struct{}{}
return ch, func() {
cs.mu.Lock()
defer cs.mu.Unlock()
if _, ok := cs.subscribers[ch]; ok {
delete(cs.subscribers, ch)
close(ch)
}
}
}

var tombstoneKey = datastore.NewKey("/tombstone")
Expand Down
5 changes: 2 additions & 3 deletions certstore/certstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestDeleteAll(t *testing.T) {
verifyEmpty()
}

func TestSubscribeForNewCerts(t *testing.T) {
func TestSubscribe(t *testing.T) {
t.Parallel()

ctx := context.Background()
Expand All @@ -233,8 +233,7 @@ func TestSubscribeForNewCerts(t *testing.T) {
cs, err := CreateStore(ctx, ds, 1, pt)
require.NoError(t, err)

ch := make(chan *certs.FinalityCertificate, 1)
_, closer := cs.SubscribeForNewCerts(ch)
ch, closer := cs.Subscribe()
defer closer()

cert := makeCert(1, supp)
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/filecoin-project/go-f3
go 1.22

require (
github.com/Kubuxu/go-broadcast v0.0.0-20240621161059-1a8c90734cd6
github.com/drand/kyber v1.3.1
github.com/drand/kyber-bls12381 v0.3.1
github.com/filecoin-project/go-bitfield v0.2.4
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ dmitri.shuralyov.com/service/change v0.0.0-20181023043359-a85b471d5412/go.mod h1
dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D6DFvNNtx+9ybjezNCa8XF0xaYcETyp6rHWU=
git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Kubuxu/go-broadcast v0.0.0-20240621161059-1a8c90734cd6 h1:yh2/1fz3ajTaeKskSWxtSBNScdRZfQ/A5nyd9+64T6M=
github.com/Kubuxu/go-broadcast v0.0.0-20240621161059-1a8c90734cd6/go.mod h1:5LOj/fF3Oc/cvJqzDiyfx4XwtBPRWUYEz+V+b13sH5U=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
Expand Down
28 changes: 4 additions & 24 deletions host.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,8 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
return fmt.Errorf("starting a participant: %w", err)
}

// Subscribe to new certificates. We don't bother canceling the subscription as that'll
// happen automatically when the channel fills.
finalityCertificates := make(chan *certs.FinalityCertificate, 4)
_, _ = h.certStore.SubscribeForNewCerts(finalityCertificates)
finalityCertificates, unsubCerts := h.certStore.Subscribe()
defer unsubCerts()

h.errgrp.Go(func() (_err error) {
defer func() {
Expand All @@ -122,16 +120,7 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {
for h.runningCtx.Err() == nil {
// prioritise finality certificates and alarm delivery
select {
case c, ok := <-finalityCertificates:
// We only care about the latest certificate, skip passed old ones.
if len(finalityCertificates) > 0 {
continue
}

if !ok {
finalityCertificates = make(chan *certs.FinalityCertificate, 4)
c, _ = h.certStore.SubscribeForNewCerts(finalityCertificates)
}
case c := <-finalityCertificates:
if err := h.receiveCertificate(c); err != nil {
return err
}
Expand All @@ -146,16 +135,7 @@ func (h *gpbftRunner) Start(ctx context.Context) (_err error) {

// Handle messages, finality certificates, and alarms
select {
case c, ok := <-finalityCertificates:
// We only care about the latest certificate, skip passed old ones.
if len(finalityCertificates) > 0 {
continue
}

if !ok {
finalityCertificates = make(chan *certs.FinalityCertificate, 4)
c, _ = h.certStore.SubscribeForNewCerts(finalityCertificates)
}
case c := <-finalityCertificates:
if err := h.receiveCertificate(c); err != nil {
return err
}
Expand Down

0 comments on commit ad7ff7a

Please sign in to comment.