Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make journal pluggable; record deals, sealing, wdpost, mempool events #2455

Merged
merged 44 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
695f6cf
wip in-memory journal.
raulk Jul 17, 2020
bef7d64
refactor EventType construction.
raulk Jul 17, 2020
4bcf019
use new journal from chain/store pkg.
raulk Jul 17, 2020
becbff0
add NilJournal; fix call sites to ChainStore constructor.
raulk Jul 17, 2020
4d2d8b2
Merge branch 'next' into inmem-journal
raulk Jul 20, 2020
226786c
wip
raulk Jul 20, 2020
7459ec6
add mpool journal events; fix dependency injection.
raulk Jul 20, 2020
d6e6eed
pipe sealing events to the journal.
raulk Jul 21, 2020
3bd9d55
message pool: write message(s) in journal entries.
raulk Jul 21, 2020
cb8e209
journal: disable noisy message pool events.
raulk Jul 21, 2020
d547c25
record deals events in journal.
raulk Jul 21, 2020
4e82cf3
rename journal entries to journal events.
raulk Jul 21, 2020
443473f
fix tests.
raulk Jul 21, 2020
b847511
Merge branch 'next' into inmem-journal
raulk Jul 21, 2020
5074ce5
move in-mem journal to Project Oni.
raulk Aug 10, 2020
9259823
Merge branch 'next' into inmem-journal
raulk Aug 11, 2020
bca516b
Merge remote-tracking branch 'origin/next' into inmem-journal
raulk Aug 11, 2020
5e2c28d
add comment.
raulk Aug 11, 2020
d53c6f2
storage-fsm: emit sealing state transition notifications.
raulk Aug 11, 2020
2ea5abd
wire journal into miner.
raulk Aug 11, 2020
b534ab9
panic recovery in MaybeRecordEvent; handle nil current tipsets in wdp…
raulk Aug 11, 2020
8f19fff
fix tests.
raulk Aug 11, 2020
200b0f7
mpool/repub: only record in journal if actually repubbing.
raulk Aug 11, 2020
cc859d3
journal: wdpost: record proofs_processed even if no eligible partitions.
raulk Aug 11, 2020
4e1ef09
make journal a global var.
raulk Aug 26, 2020
efdfd3e
Merge branch 'master' into inmem-journal
raulk Aug 26, 2020
cb8e105
reduce diff noise.
raulk Aug 26, 2020
0b6a182
move InitJournalKey to the top; add godcs on invoke order.
raulk Sep 2, 2020
ac152ab
shuffle code to journal wdpost events.
raulk Sep 2, 2020
f046af3
split wdpost event into finer-grained ones.
raulk Sep 2, 2020
fd04a19
go mod tidy.
raulk Sep 2, 2020
3206f92
Merge branch 'master' into inmem-journal
raulk Sep 2, 2020
4554c4b
fix lotus-bench stale import.
raulk Sep 2, 2020
97ff90f
fix imports.
raulk Sep 3, 2020
d46f684
fix test following merge.
raulk Sep 3, 2020
905168e
fix lint errors.
raulk Sep 4, 2020
1ec534d
Merge branch 'master' into inmem-journal
raulk Sep 4, 2020
0a4c473
fix dangling import.
raulk Sep 4, 2020
35a8f7e
Merge branch 'master' into inmem-journal
raulk Sep 4, 2020
6d29d75
Merge branch 'master' into inmem-journal
raulk Sep 14, 2020
05aa5f2
allow customizing disabled journal events + tests.
raulk Sep 14, 2020
09e9d6d
deal journal events: wire into markets subscriptions.
raulk Sep 14, 2020
7c13fa9
Merge branch 'master' into inmem-journal
raulk Sep 14, 2020
954ff32
fix transitive api dependency on ffi.
raulk Sep 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions chain/events/events_height.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type heightEvents struct {
}

func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {

ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
defer span.End()
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
Expand Down Expand Up @@ -145,12 +144,11 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
}

// ChainAt invokes the specified `HeightHandler` when the chain reaches the
// specified height+confidence threshold. If the chain is rolled-back under the
// specified height, `RevertHandler` will be called.
// specified height+confidence threshold. If the chain is rolled-back under the
// specified height, `RevertHandler` will be called.
//
// ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error {

e.lk.Lock() // Tricky locking, check your locks if you modify this function!

best, err := e.tsc.best()
Expand Down
44 changes: 44 additions & 0 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/node/modules/dtypes"

Expand Down Expand Up @@ -83,6 +84,26 @@ const (
localUpdates = "update"
)

// Journal event types.
const (
evtTypeMpoolAdd = iota
evtTypeMpoolRemove
evtTypeMpoolRepub
)

// MessagePoolEvt is the journal entry for message pool events.
type MessagePoolEvt struct {
Action string
Messages []MessagePoolEvtMessage
Error error `json:",omitempty"`
}

type MessagePoolEvtMessage struct {
types.Message

CID cid.Cid
}

// this is *temporary* mutilation until we have implemented uncapped miner penalties -- it will go
// away in the next fork.
var strictBaseFeeValidation = false
Expand Down Expand Up @@ -140,6 +161,8 @@ type MessagePool struct {
netName dtypes.NetworkName

sigValCache *lru.TwoQueueCache

evtTypes [3]journal.EventType
}

type msgSet struct {
Expand Down Expand Up @@ -316,6 +339,11 @@ func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*Messa
api: api,
netName: netName,
cfg: cfg,
evtTypes: [...]journal.EventType{
evtTypeMpoolAdd: journal.J.RegisterEventType("mpool", "add"),
evtTypeMpoolRemove: journal.J.RegisterEventType("mpool", "remove"),
evtTypeMpoolRepub: journal.J.RegisterEventType("mpool", "repub"),
},
}

// enable initial prunes
Expand Down Expand Up @@ -367,10 +395,12 @@ func (mp *MessagePool) runLoop() {
if err := mp.republishPendingMessages(); err != nil {
log.Errorf("error while republishing messages: %s", err)
}

case <-mp.pruneTrigger:
if err := mp.pruneExcessMessages(); err != nil {
log.Errorf("failed to prune excess messages from mempool: %s", err)
}

case <-mp.closer:
mp.repubTk.Stop()
return
Expand Down Expand Up @@ -700,6 +730,14 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage, strict bool) error {
Type: api.MpoolAdd,
Message: m,
}, localUpdates)

journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolAdd], func() interface{} {
return MessagePoolEvt{
Action: "add",
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}},
}
})

return nil
}

Expand Down Expand Up @@ -862,6 +900,12 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64, applied bool)
Message: m,
}, localUpdates)

journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRemove], func() interface{} {
return MessagePoolEvt{
Action: "remove",
Messages: []MessagePoolEvtMessage{{Message: m.Message, CID: m.Cid()}}}
})

mp.currentSize--
}

Expand Down
14 changes: 14 additions & 0 deletions chain/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/messagepool/gasguess"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/ipfs/go-cid"
)

Expand Down Expand Up @@ -146,6 +147,19 @@ loop:
}
}

if len(msgs) > 0 {
journal.J.RecordEvent(mp.evtTypes[evtTypeMpoolRepub], func() interface{} {
msgs := make([]MessagePoolEvtMessage, 0, len(msgs))
for _, m := range msgs {
msgs = append(msgs, MessagePoolEvtMessage{Message: m.Message, CID: m.Cid()})
}
return MessagePoolEvt{
Action: "repub",
Messages: msgs,
}
})
}

// track most recently republished messages
republished := make(map[cid.Cid]struct{})
for _, m := range msgs[:count] {
Expand Down
35 changes: 29 additions & 6 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,20 @@ func init() {
// ReorgNotifee represents a callback that gets called upon reorgs.
type ReorgNotifee func(rev, app []*types.TipSet) error

// Journal event types.
const (
evtTypeHeadChange = iota
)

type HeadChangeEvt struct {
From types.TipSetKey
FromHeight abi.ChainEpoch
To types.TipSetKey
ToHeight abi.ChainEpoch
RevertCount int
ApplyCount int
}

// ChainStore is the main point of access to chain data.
//
// Raw chain data is stored in the Blockstore, with relevant markers (genesis,
Expand Down Expand Up @@ -103,6 +117,8 @@ type ChainStore struct {
tsCache *lru.ARCCache

vmcalls vm.SyscallBuilder

evtTypes [1]journal.EventType
}

func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallBuilder) *ChainStore {
Expand All @@ -118,6 +134,10 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls vm.SyscallB
vmcalls: vmcalls,
}

cs.evtTypes = [1]journal.EventType{
evtTypeHeadChange: journal.J.RegisterEventType("sync", "head_change"),
}

ci := NewChainIndex(cs.LoadTipSet)

cs.cindex = ci
Expand Down Expand Up @@ -344,12 +364,15 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
continue
}

journal.Add("sync", map[string]interface{}{
"op": "headChange",
"from": r.old.Key(),
"to": r.new.Key(),
"rev": len(revert),
"apply": len(apply),
journal.J.RecordEvent(cs.evtTypes[evtTypeHeadChange], func() interface{} {
return HeadChangeEvt{
From: r.old.Key(),
FromHeight: r.old.Height(),
To: r.new.Key(),
ToHeight: r.new.Height(),
RevertCount: len(revert),
ApplyCount: len(apply),
}
})

// reverse the apply array
Expand Down
7 changes: 7 additions & 0 deletions cmd/lotus-storage-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
lcli "github.com/filecoin-project/lotus/cli"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/genesis"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/modules/dtypes"
Expand Down Expand Up @@ -459,6 +460,12 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
return err
}

if jrnl, err := journal.OpenFSJournal(lr, journal.DefaultDisabledEvents); err == nil {
journal.J = jrnl
} else {
return fmt.Errorf("failed to open filesystem journal: %w", err)
}

m := miner.NewMiner(api, epp, a, slashfilter.New(mds))
{
if err := m.Start(ctx); err != nil {
Expand Down
6 changes: 6 additions & 0 deletions extern/storage-sealing/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
state.Log = append(state.Log, l)
}

if m.notifee != nil {
defer func(before SectorInfo) {
m.notifee(before, *state)
}(*state) // take safe-ish copy of the before state (except for nested pointers)
}

p := fsmPlanners[state.State]
if p == nil {
return nil, 0, xerrors.Errorf("planner for state %s not found", state.State)
Expand Down
14 changes: 14 additions & 0 deletions extern/storage-sealing/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ type test struct {
}

func TestHappyPath(t *testing.T) {
var notif []struct{ before, after SectorInfo }
ma, _ := address.NewIDAddress(55151)
m := test{
s: &Sealing{
maddr: ma,
stats: SectorStats{
bySector: map[abi.SectorID]statSectorState{},
},
notifee: func(before, after SectorInfo) {
notif = append(notif, struct{ before, after SectorInfo }{before, after})
},
},
t: t,
state: &SectorInfo{State: Packing},
Expand Down Expand Up @@ -68,6 +72,16 @@ func TestHappyPath(t *testing.T) {

m.planSingle(SectorFinalized{})
require.Equal(m.t, m.state.State, Proving)

expected := []SectorState{Packing, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector, Proving}
for i, n := range notif {
if n.before.State != expected[i] {
t.Fatalf("expected before state: %s, got: %s", expected[i], n.before.State)
}
if n.after.State != expected[i+1] {
t.Fatalf("expected after state: %s, got: %s", expected[i+1], n.after.State)
}
}
}

func TestSeedRevert(t *testing.T) {
Expand Down
9 changes: 8 additions & 1 deletion extern/storage-sealing/sealing.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type SealingAPI interface {
ChainReadObj(context.Context, cid.Cid) ([]byte, error)
}

type SectorStateNotifee func(before, after SectorInfo)

type Sealing struct {
api SealingAPI
feeCfg FeeConfig
Expand All @@ -79,6 +81,8 @@ type Sealing struct {
upgradeLk sync.Mutex
toUpgrade map[abi.SectorNumber]struct{}

notifee SectorStateNotifee

stats SectorStats

getConfig GetSealingConfigFunc
Expand All @@ -101,7 +105,7 @@ type UnsealedSectorInfo struct {
pieceSizes []abi.UnpaddedPieceSize
}

func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc) *Sealing {
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc, notifee SectorStateNotifee) *Sealing {
s := &Sealing{
api: api,
feeCfg: fc,
Expand All @@ -118,6 +122,9 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
},

toUpgrade: map[abi.SectorNumber]struct{}{},

notifee: notifee,

getConfig: gc,

stats: SectorStats{
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/briandowns/spinner v1.11.1 h1:OixPqDEcX3juo5AjQZAnFPbeUA0jvkp2qzB5gOZJ/L0=
github.com/briandowns/spinner v1.11.1/go.mod h1:QOuQk7x+EaDASo80FEXwlwiA+j/PPIcX3FScO+3/ZPQ=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI=
Expand Down Expand Up @@ -241,7 +240,6 @@ github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 h
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc=
github.com/filecoin-project/go-state-types v0.0.0-20200903145444-247639ffa6ad/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200904021452-1883f36ca2f4/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df h1:m2esXSuGBkuXlRyCsl1a/7/FkFam63o1OzIgzaHtOfI=
github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
github.com/filecoin-project/go-state-types v0.0.0-20200909080127-001afaca718c h1:HHRMFpU8OrODDUja5NmGWNBAVGoSy4MRjxgZa+a0qIw=
github.com/filecoin-project/go-state-types v0.0.0-20200909080127-001afaca718c/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I=
Expand All @@ -253,7 +251,6 @@ github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZO
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8=
github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4=
github.com/filecoin-project/specs-actors v0.9.7 h1:7PAZ8kdqwBdmgf/23FCkQZLCXcVu02XJrkpkhBikiA8=
github.com/filecoin-project/specs-actors v0.9.7/go.mod h1:wM2z+kwqYgXn5Z7scV1YHLyd1Q1cy0R8HfTIWQ0BFGU=
github.com/filecoin-project/specs-actors v0.9.8 h1:45fnx/BsseFL3CtvSoR6CszFY26TFtsh9AHwCW2vkg8=
github.com/filecoin-project/specs-actors v0.9.8/go.mod h1:xFObDoWPySBNTNBrGXVVrutmgSZH/mMo46Q1bec/0hw=
Expand Down Expand Up @@ -506,7 +503,6 @@ github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28
github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.1.2 h1:25Ll9kIXCE+DY0dicvfS3KMw+U5sd01b/FJbA7KAbhg=
github.com/ipfs/go-graphsync v0.1.2/go.mod h1:sLXVXm1OxtE2XYPw62MuXCdAuNwkAdsbnfrmos5odbA=
github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
Expand Down
Loading