Skip to content

Commit

Permalink
fix: add Transform implementation for PreCommitInfoExtractorV8 (#1242)
Browse files Browse the repository at this point in the history
* fix: add Transform implementation for PreCommitInfoExtractorV8

* fix: skip diff if same root

* fix: check earlier

* chore: skip when cids are also same in miner

* check root of maps
  • Loading branch information
kasteph authored Jul 17, 2023
1 parent 03c0d1f commit 244215b
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 34 deletions.
1 change: 1 addition & 0 deletions chain/actors/adt/diff/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ArrayDiffer interface {
func CompareArray(preArr, curArr adt.Array, out ArrayDiffer) error {
notNew := make(map[int64]struct{}, curArr.Length())
prevVal := new(typegen.Deferred)

if err := preArr.ForEach(prevVal, func(i int64) error {
curVal := new(typegen.Deferred)
found, err := curArr.Get(uint64(i), curVal)
Expand Down
17 changes: 16 additions & 1 deletion chain/actors/builtin/init/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ func DiffAddressMap(ctx context.Context, store adt.Store, pre, cur State) (*Addr
}

mapDiffer := NewAddressMapDiffer(pre, cur)

premR, err := prem.Root()
if err != nil {
return nil, err
}

curmR, err := curm.Root()
if err != nil {
return nil, err
}

if premR.Equals(curmR) {
return mapDiffer.Results, nil
}

if requiresLegacyDiffing(pre, cur,
&adt.MapOpts{
Bitwidth: pre.AddressMapBitWidth(),
Expand All @@ -45,7 +60,7 @@ func DiffAddressMap(ctx context.Context, store adt.Store, pre, cur State) (*Addr
Bitwidth: cur.AddressMapBitWidth(),
HashFunc: cur.AddressMapHashFunction(),
}) {
log.Warnw("actor HAMT opts differ, running slower generic map diff", "preCID", pre.Code(), "curCID", cur.Code())
log.Warnw("actor HAMT opts differ, running slower generic map diff ", "preCID ", pre.Code(), "curCID ", cur.Code())
if err := diff.CompareMap(prem, curm, mapDiffer); err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions chain/actors/builtin/market/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func DiffDealProposals(ctx context.Context, store adt.Store, pre, cur State) (*D
preOpts := pre.DealProposalsAmtBitwidth()
curOpts := cur.DealProposalsAmtBitwidth()
preP, err := pre.Proposals()

if err != nil {
return nil, err
}
Expand All @@ -33,7 +34,7 @@ func DiffDealProposals(ctx context.Context, store adt.Store, pre, cur State) (*D

diffContainer := NewMarketProposalsDiffContainer(preP, curP)
if requiresLegacyDiffing(pre, cur, preOpts, curOpts) {
log.Warn("actor AMT opts differ, running slower generic array diff", "preCID", pre.Code(), "curCID", cur.Code())
log.Warn("actor AMT opts differ, running slower generic array diff ", "preCID ", pre.Code(), "curCID ", cur.Code())
if err := diff.CompareArray(preP.array(), curP.array(), diffContainer); err != nil {
return nil, fmt.Errorf("diffing deal states: %w", err)
}
Expand Down Expand Up @@ -115,7 +116,7 @@ func DiffDealStates(ctx context.Context, store adt.Store, pre, cur State) (*Deal

diffContainer := NewMarketStatesDiffContainer(preS, curS)
if requiresLegacyDiffing(pre, cur, preOpts, curOpts) {
log.Warn("actor AMT opts differ, running slower generic array diff", "preCID", pre.Code(), "curCID", cur.Code())
log.Warn("actor AMT opts differ, running slower generic array diff ", "preCID ", pre.Code(), "curCID ", cur.Code())
if err := diff.CompareArray(preS.array(), curS.array(), diffContainer); err != nil {
return nil, fmt.Errorf("diffing deal states: %w", err)
}
Expand Down
14 changes: 14 additions & 0 deletions chain/actors/builtin/miner/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,20 @@ func DiffSectors(ctx context.Context, store adt.Store, pre, cur State) (*SectorC
preBw := pre.SectorsAmtBitwidth()
curBw := cur.SectorsAmtBitwidth()
diffContainer := NewSectorDiffContainer(pre, cur)

presR, err := pres.Root()
if err != nil {
return nil, err
}
cursR, err := curs.Root()
if err != nil {
return nil, err
}

if presR.Equals(cursR) {
return diffContainer.Results, nil
}

if ArrayRequiresLegacyDiffing(pre, cur, preBw, curBw) {
if span.IsRecording() {
span.SetAttributes(attribute.String("diff", "slow"))
Expand Down
14 changes: 14 additions & 0 deletions chain/actors/builtin/miner/diff_v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ func DiffPreCommitsV8(ctx context.Context, store adt.Store, pre, cur State) (*Pr
return nil, err
}

prepR, err := prep.Root()
if err != nil {
return nil, err
}

curpR, err := curp.Root()
if err != nil {
return nil, err
}

diffContainer := NewPreCommitDiffContainerV8(pre, cur)
if prepR.Equals(curpR) {
return diffContainer.Results, nil
}

if MapRequiresLegacyDiffing(pre, cur,
&adt.MapOpts{
Bitwidth: pre.SectorsAmtBitwidth(),
Expand Down
25 changes: 17 additions & 8 deletions chain/indexer/integrated/processor/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,19 +449,28 @@ func MakeProcessors(api tasks.DataSource, indexerTasks []string) (*IndexerProces
),
minertask.LockedFundsExtractor{},
)
case tasktype.MinerPreCommitInfoV1_8:
out.ActorProcessors[t] = actorstate.NewTaskWithTransformer(
api,
actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
mineractors.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}},
},
),
minertask.PreCommitInfoExtractorV8{},
)
case tasktype.MinerPreCommitInfo:
out.ActorProcessors[t] = actorstate.NewTaskWithTransformer(
api,
actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
mineractors.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}},
mineractors.VersionCodes()[actorstypes.Version9]: {minertask.PreCommitInfoExtractorV9{}},
mineractors.VersionCodes()[actorstypes.Version10]: {minertask.PreCommitInfoExtractorV9{}},
mineractors.VersionCodes()[actorstypes.Version11]: {minertask.PreCommitInfoExtractorV9{}},
Expand Down
10 changes: 1 addition & 9 deletions chain/indexer/integrated/processor/state_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestNewProcessor(t *testing.T) {
proc, err := New(nil, t.Name(), tasktype.AllTableTasks)
require.NoError(t, err)
require.Equal(t, t.Name(), proc.name)
require.Len(t, proc.actorProcessors, 24)
require.Len(t, proc.actorProcessors, 25)
require.Len(t, proc.tipsetProcessors, 10)
require.Len(t, proc.tipsetsProcessors, 14)
require.Len(t, proc.builtinProcessors, 1)
Expand Down Expand Up @@ -122,14 +122,6 @@ func TestNewProcessor(t *testing.T) {
nil,
actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
miner.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version9]: {minertask.PreCommitInfoExtractorV9{}},
miner.VersionCodes()[actorstypes.Version10]: {minertask.PreCommitInfoExtractorV9{}},
miner.VersionCodes()[actorstypes.Version11]: {minertask.PreCommitInfoExtractorV9{}},
Expand Down
26 changes: 17 additions & 9 deletions chain/indexer/integrated/processor/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,26 @@ func TestMakeProcessorsActors(t *testing.T) {
),
transformer: minertask.V7SectorInfoExtractor{},
},
{
taskName: tasktype.MinerPreCommitInfoV1_8,
extractor: actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
miner.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}},
},
),
transformer: minertask.PreCommitInfoExtractorV8{},
},
{
taskName: tasktype.MinerPreCommitInfo,
extractor: actorstate.NewCustomTypedActorExtractorMap(
map[cid.Cid][]actorstate.ActorStateExtractor{
miner.VersionCodes()[actorstypes.Version0]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version2]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version3]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version4]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version5]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version6]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version7]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version8]: {minertask.PreCommitInfoExtractorV8{}},
miner.VersionCodes()[actorstypes.Version9]: {minertask.PreCommitInfoExtractorV9{}},
miner.VersionCodes()[actorstypes.Version10]: {minertask.PreCommitInfoExtractorV9{}},
miner.VersionCodes()[actorstypes.Version11]: {minertask.PreCommitInfoExtractorV9{}},
Expand Down Expand Up @@ -425,7 +433,7 @@ func TestMakeProcessorsAllTasks(t *testing.T) {
// If this test fails it indicates a new processor and/or task name was added and test should be created for it in one of the above test cases.
proc, err := processor.MakeProcessors(nil, append(tasktype.AllTableTasks, processor.BuiltinTaskName))
require.NoError(t, err)
require.Len(t, proc.ActorProcessors, 24)
require.Len(t, proc.ActorProcessors, 25)
require.Len(t, proc.TipsetProcessors, 10)
require.Len(t, proc.TipsetsProcessors, 14)
require.Len(t, proc.ReportProcessors, 1)
Expand Down
5 changes: 5 additions & 0 deletions chain/indexer/tasktype/table_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
MinerSectorInfoV1_6 = "miner_sector_infos"
MinerSectorPost = "miner_sector_post"
MinerPreCommitInfo = "miner_pre_commit_info"
MinerPreCommitInfoV1_8 = "miner_pre_commit_info_v8"
MinerSectorEvent = "miner_sector_event"
MinerCurrentDeadlineInfo = "miner_current_deadline_info"
MinerFeeDebt = "miner_fee_debt"
Expand Down Expand Up @@ -63,6 +64,7 @@ var AllTableTasks = []string{
MinerSectorInfoV1_6,
MinerSectorPost,
MinerPreCommitInfo,
MinerPreCommitInfoV1_8,
MinerSectorEvent,
MinerCurrentDeadlineInfo,
MinerFeeDebt,
Expand Down Expand Up @@ -114,6 +116,7 @@ var TableLookup = map[string]struct{}{
MinerSectorInfoV1_6: {},
MinerSectorPost: {},
MinerPreCommitInfo: {},
MinerPreCommitInfoV1_8: {},
MinerSectorEvent: {},
MinerCurrentDeadlineInfo: {},
MinerFeeDebt: {},
Expand Down Expand Up @@ -165,6 +168,7 @@ var TableComment = map[string]string{
MinerSectorInfoV1_6: `MinerSectorInfoV1_6 is exported from the miner actor iff the actor code is less than v7. The table keeps its original name since that's a requirement to support lily backfills`,
MinerSectorPost: ``,
MinerPreCommitInfo: ``,
MinerPreCommitInfoV1_8: `MinerPreCommitInfo using actors v1 to v8.`,
MinerSectorEvent: ``,
MinerCurrentDeadlineInfo: ``,
MinerFeeDebt: ``,
Expand Down Expand Up @@ -221,6 +225,7 @@ var TableFieldComments = map[string]map[string]string{
MinerSectorInfoV1_6: {},
MinerSectorPost: {},
MinerPreCommitInfo: {},
MinerPreCommitInfoV1_8: {},
MinerSectorEvent: {},
MinerCurrentDeadlineInfo: {},
MinerFeeDebt: {},
Expand Down
1 change: 1 addition & 0 deletions chain/indexer/tasktype/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var TaskLookup = map[string][]string{
MinerSectorInfoV1_6,
MinerSectorPost,
MinerPreCommitInfo,
MinerPreCommitInfoV1_8,
MinerSectorEvent,
MinerCurrentDeadlineInfo,
MinerFeeDebt,
Expand Down
4 changes: 2 additions & 2 deletions chain/indexer/tasktype/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestMakeTaskNamesAlias(t *testing.T) {
{
taskAlias: tasktype.ActorStatesMinerTask,
tasks: []string{tasktype.MinerSectorDeal, tasktype.MinerSectorInfoV7, tasktype.MinerSectorInfoV1_6,
tasktype.MinerSectorPost, tasktype.MinerPreCommitInfo, tasktype.MinerSectorEvent,
tasktype.MinerSectorPost, tasktype.MinerPreCommitInfo, tasktype.MinerPreCommitInfoV1_8, tasktype.MinerSectorEvent,
tasktype.MinerCurrentDeadlineInfo, tasktype.MinerFeeDebt, tasktype.MinerLockedFund, tasktype.MinerInfo,
tasktype.MinerBeneficiary},
},
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestMakeAllTaskAliasNames(t *testing.T) {
}

func TestMakeAllTaskNames(t *testing.T) {
const TotalTableTasks = 48
const TotalTableTasks = 49
actual, err := tasktype.MakeTaskNames(tasktype.AllTableTasks)
require.NoError(t, err)
// if this test fails it means a new task name was added, update the above test
Expand Down
10 changes: 7 additions & 3 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ var Models = []interface{}{
(*miner.MinerSectorInfoV1_6)(nil),
(*miner.MinerSectorPost)(nil),
(*miner.MinerPreCommitInfo)(nil),
(*miner.MinerPreCommitInfoV9)(nil),
(*miner.MinerSectorEvent)(nil),
(*miner.MinerCurrentDeadlineInfo)(nil),
(*miner.MinerFeeDebt)(nil),
Expand Down Expand Up @@ -352,10 +353,13 @@ func verifyModel(ctx context.Context, db *pg.DB, schemaName string, m *orm.Table
}

// Some common aliases
if datatype == "timestamp with time zone" {
switch datatype {
case "timestamp with time zone":
fallthrough
case "timestamp without time zone":
datatype = "timestamptz"
} else if datatype == "timestamp without time zone" {
datatype = "timestamp"
case "ARRAY":
datatype = "bigint[]"
}

if datatype != fld.SQLType {
Expand Down
14 changes: 14 additions & 0 deletions tasks/actorstate/miner/precommit.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,17 @@ func (PreCommitInfoExtractorV8) Extract(ctx context.Context, a actorstate.ActorI

return preCommitModel, nil
}

func (PreCommitInfoExtractorV8) Transform(ctx context.Context, data model.PersistableList) (model.PersistableList, error) {
persistableList := make(minermodel.MinerPreCommitInfoList, 0, len(data))
for _, d := range data {
ml, ok := d.(minermodel.MinerPreCommitInfoList)
if !ok {
return nil, fmt.Errorf("expected MinerPreCommitInfoList type but got: %T", d)
}
for _, m := range ml {
persistableList = append(persistableList, m)
}
}
return model.PersistableList{persistableList}, nil
}

0 comments on commit 244215b

Please sign in to comment.