From 7578d623f89f3746d11ed8103e51084f03ccc24c Mon Sep 17 00:00:00 2001 From: Will Date: Mon, 14 Jun 2021 12:10:19 -0700 Subject: [PATCH] include payloadCID in finalized task (#214) --- controller/graphql/schema.go | 22 ++++++ tasks/gen.go | 1 + tasks/ipldsch_satisfaction.go | 138 ++++++++++++++++++++++++++++++++-- tasks/ipldsch_types.go | 1 + tasks/model.go | 8 ++ tasks/storage_deal.go | 3 +- 6 files changed, 166 insertions(+), 7 deletions(-) diff --git a/controller/graphql/schema.go b/controller/graphql/schema.go index 5b2e71ae..51df9702 100644 --- a/controller/graphql/schema.go +++ b/controller/graphql/schema.go @@ -563,6 +563,22 @@ func FinishedTask__Size__resolve(p graphql.ResolveParams) (interface{}, error) { return nil, nil } +} +func FinishedTask__PayloadCID__resolve(p graphql.ResolveParams) (interface{}, error) { + ts, ok := p.Source.(tasks.FinishedTask) + if !ok { + return nil, fmt.Errorf(errUnexpectedType, p.Source, "tasks.FinishedTask") + } + + f := ts.FieldPayloadCID() + if f.Exists() { + + return f.Must().AsString() + + } else { + return nil, nil + } + } var FinishedTask__type = graphql.NewObject(graphql.ObjectConfig{ Name: "FinishedTask", @@ -657,6 +673,12 @@ var FinishedTask__type = graphql.NewObject(graphql.ObjectConfig{ Resolve: FinishedTask__Size__resolve, }, + "PayloadCID": &graphql.Field{ + + Type: graphql.String, + + Resolve: FinishedTask__PayloadCID__resolve, + }, }, }) var FinishedTasks__type = graphql.NewObject(graphql.ObjectConfig{ diff --git a/tasks/gen.go b/tasks/gen.go index ff13e8f0..7dda7790 100644 --- a/tasks/gen.go +++ b/tasks/gen.go @@ -139,6 +139,7 @@ func main() { schema.SpawnStructField("MinerVersion", "String", true, true), schema.SpawnStructField("ClientVersion", "String", true, true), schema.SpawnStructField("Size", "Int", true, true), + schema.SpawnStructField("PayloadCID", "String", true, true), }, schema.SpawnStructRepresentationMap(map[string]string{}))) ts.Accumulate(schema.SpawnList("FinishedTasks", "FinishedTask", false)) ts.Accumulate(schema.SpawnLinkReference("Link_FinishedTask", "FinishedTask")) diff --git a/tasks/ipldsch_satisfaction.go b/tasks/ipldsch_satisfaction.go index f22f6324..edd0d536 100644 --- a/tasks/ipldsch_satisfaction.go +++ b/tasks/ipldsch_satisfaction.go @@ -2546,6 +2546,9 @@ func (n _FinishedTask) FieldClientVersion() MaybeString { func (n _FinishedTask) FieldSize() MaybeInt { return &n.Size } +func (n _FinishedTask) FieldPayloadCID() MaybeString { + return &n.PayloadCID +} type _FinishedTask__Maybe struct { m schema.Maybe v FinishedTask @@ -2595,6 +2598,7 @@ var ( fieldName__FinishedTask_MinerVersion = _String{"MinerVersion"} fieldName__FinishedTask_ClientVersion = _String{"ClientVersion"} fieldName__FinishedTask_Size = _String{"Size"} + fieldName__FinishedTask_PayloadCID = _String{"PayloadCID"} ) var _ ipld.Node = (FinishedTask)(&_FinishedTask{}) var _ schema.TypedNode = (FinishedTask)(&_FinishedTask{}) @@ -2684,6 +2688,14 @@ func (n FinishedTask) LookupByString(key string) (ipld.Node, error) { return ipld.Null, nil } return n.Size.v, nil + case "PayloadCID": + if n.PayloadCID.m == schema.Maybe_Absent { + return ipld.Absent, nil + } + if n.PayloadCID.m == schema.Maybe_Null { + return ipld.Null, nil + } + return n.PayloadCID.v, nil default: return nil, schema.ErrNoSuchField{Type: nil /*TODO*/, Field: ipld.PathSegmentOfString(key)} } @@ -2711,7 +2723,7 @@ type _FinishedTask__MapItr struct { } func (itr *_FinishedTask__MapItr) Next() (k ipld.Node, v ipld.Node, _ error) { - if itr.idx >= 15 { + if itr.idx >= 16 { return nil, nil, ipld.ErrIteratorOverread{} } switch itr.idx { @@ -2828,6 +2840,17 @@ func (itr *_FinishedTask__MapItr) Next() (k ipld.Node, v ipld.Node, _ error) { break } v = itr.n.Size.v + case 15: + k = &fieldName__FinishedTask_PayloadCID + if itr.n.PayloadCID.m == schema.Maybe_Absent { + v = ipld.Absent + break + } + if itr.n.PayloadCID.m == schema.Maybe_Null { + v = ipld.Null + break + } + v = itr.n.PayloadCID.v default: panic("unreachable") } @@ -2835,14 +2858,14 @@ func (itr *_FinishedTask__MapItr) Next() (k ipld.Node, v ipld.Node, _ error) { return } func (itr *_FinishedTask__MapItr) Done() bool { - return itr.idx >= 15 + return itr.idx >= 16 } func (FinishedTask) ListIterator() ipld.ListIterator { return nil } func (FinishedTask) Length() int64 { - return 15 + return 16 } func (FinishedTask) IsAbsent() bool { return false @@ -2915,6 +2938,7 @@ type _FinishedTask__Assembler struct { ca_MinerVersion _String__Assembler ca_ClientVersion _String__Assembler ca_Size _Int__Assembler + ca_PayloadCID _String__Assembler } func (na *_FinishedTask__Assembler) reset() { @@ -2935,6 +2959,7 @@ func (na *_FinishedTask__Assembler) reset() { na.ca_MinerVersion.reset() na.ca_ClientVersion.reset() na.ca_Size.reset() + na.ca_PayloadCID.reset() } var ( @@ -2953,6 +2978,7 @@ var ( fieldBit__FinishedTask_MinerVersion = 1 << 12 fieldBit__FinishedTask_ClientVersion = 1 << 13 fieldBit__FinishedTask_Size = 1 << 14 + fieldBit__FinishedTask_PayloadCID = 1 << 15 fieldBits__FinishedTask_sufficient = 0 + 1 << 0 + 1 << 1 + 1 << 5 + 1 << 6 + 1 << 7 + 1 << 11 ) func (na *_FinishedTask__Assembler) BeginMap(int64) (ipld.MapAssembler, error) { @@ -3211,6 +3237,18 @@ func (ma *_FinishedTask__Assembler) valueFinishTidy() bool { default: return false } + case 15: + switch ma.w.PayloadCID.m { + case schema.Maybe_Null: + ma.state = maState_initial + return true + case schema.Maybe_Value: + ma.w.PayloadCID.v = ma.ca_PayloadCID.w + ma.state = maState_initial + return true + default: + return false + } default: panic("unreachable") } @@ -3389,6 +3427,17 @@ func (ma *_FinishedTask__Assembler) AssembleEntry(k string) (ipld.NodeAssembler, ma.ca_Size.m = &ma.w.Size.m ma.w.Size.m = allowNull return &ma.ca_Size, nil + case "PayloadCID": + if ma.s & fieldBit__FinishedTask_PayloadCID != 0 { + return nil, ipld.ErrRepeatedMapKey{&fieldName__FinishedTask_PayloadCID} + } + ma.s += fieldBit__FinishedTask_PayloadCID + ma.state = maState_midValue + ma.f = 15 + ma.ca_PayloadCID.w = ma.w.PayloadCID.v + ma.ca_PayloadCID.m = &ma.w.PayloadCID.m + ma.w.PayloadCID.m = allowNull + return &ma.ca_PayloadCID, nil default: return nil, ipld.ErrInvalidKey{TypeName:"tasks.FinishedTask", Key:&_String{k}} } @@ -3494,6 +3543,11 @@ func (ma *_FinishedTask__Assembler) AssembleValue() ipld.NodeAssembler { ma.ca_Size.m = &ma.w.Size.m ma.w.Size.m = allowNull return &ma.ca_Size + case 15: + ma.ca_PayloadCID.w = ma.w.PayloadCID.v + ma.ca_PayloadCID.m = &ma.w.PayloadCID.m + ma.w.PayloadCID.m = allowNull + return &ma.ca_PayloadCID default: panic("unreachable") } @@ -3674,6 +3728,13 @@ func (ka *_FinishedTask__KeyAssembler) AssignString(k string) error { ka.s += fieldBit__FinishedTask_Size ka.state = maState_expectValue ka.f = 14 + case "PayloadCID": + if ka.s & fieldBit__FinishedTask_PayloadCID != 0 { + return ipld.ErrRepeatedMapKey{&fieldName__FinishedTask_PayloadCID} + } + ka.s += fieldBit__FinishedTask_PayloadCID + ka.state = maState_expectValue + ka.f = 15 default: return ipld.ErrInvalidKey{TypeName:"tasks.FinishedTask", Key:&_String{k}} } @@ -3718,6 +3779,7 @@ var ( fieldName__FinishedTask_MinerVersion_serial = _String{"MinerVersion"} fieldName__FinishedTask_ClientVersion_serial = _String{"ClientVersion"} fieldName__FinishedTask_Size_serial = _String{"Size"} + fieldName__FinishedTask_PayloadCID_serial = _String{"PayloadCID"} ) var _ ipld.Node = &_FinishedTask__Repr{} func (_FinishedTask__Repr) Kind() ipld.Kind { @@ -3806,6 +3868,14 @@ func (n *_FinishedTask__Repr) LookupByString(key string) (ipld.Node, error) { return ipld.Null, nil } return n.Size.v.Representation(), nil + case "PayloadCID": + if n.PayloadCID.m == schema.Maybe_Absent { + return ipld.Absent, ipld.ErrNotExists{ipld.PathSegmentOfString(key)} + } + if n.PayloadCID.m == schema.Maybe_Null { + return ipld.Null, nil + } + return n.PayloadCID.v.Representation(), nil default: return nil, schema.ErrNoSuchField{Type: nil /*TODO*/, Field: ipld.PathSegmentOfString(key)} } @@ -3824,7 +3894,12 @@ func (n _FinishedTask__Repr) LookupBySegment(seg ipld.PathSegment) (ipld.Node, e return n.LookupByString(seg.String()) } func (n *_FinishedTask__Repr) MapIterator() ipld.MapIterator { - end := 15 + end := 16 + if n.PayloadCID.m == schema.Maybe_Absent { + end = 15 + } else { + goto done + } if n.Size.m == schema.Maybe_Absent { end = 14 } else { @@ -3851,7 +3926,7 @@ type _FinishedTask__ReprMapItr struct { } func (itr *_FinishedTask__ReprMapItr) Next() (k ipld.Node, v ipld.Node, _ error) { -advance:if itr.idx >= 15 { +advance:if itr.idx >= 16 { return nil, nil, ipld.ErrIteratorOverread{} } switch itr.idx { @@ -3968,6 +4043,17 @@ advance:if itr.idx >= 15 { break } v = itr.n.Size.v.Representation() + case 15: + k = &fieldName__FinishedTask_PayloadCID_serial + if itr.n.PayloadCID.m == schema.Maybe_Absent { + itr.idx++ + goto advance + } + if itr.n.PayloadCID.m == schema.Maybe_Null { + v = ipld.Null + break + } + v = itr.n.PayloadCID.v.Representation() default: panic("unreachable") } @@ -3981,7 +4067,7 @@ func (_FinishedTask__Repr) ListIterator() ipld.ListIterator { return nil } func (rn *_FinishedTask__Repr) Length() int64 { - l := 15 + l := 16 if rn.ErrorMessage.m == schema.Maybe_Absent { l-- } @@ -4009,6 +4095,9 @@ func (rn *_FinishedTask__Repr) Length() int64 { if rn.Size.m == schema.Maybe_Absent { l-- } + if rn.PayloadCID.m == schema.Maybe_Absent { + l-- + } return int64(l) } func (_FinishedTask__Repr) IsAbsent() bool { @@ -4082,6 +4171,7 @@ type _FinishedTask__ReprAssembler struct { ca_MinerVersion _String__ReprAssembler ca_ClientVersion _String__ReprAssembler ca_Size _Int__ReprAssembler + ca_PayloadCID _String__ReprAssembler } func (na *_FinishedTask__ReprAssembler) reset() { @@ -4102,6 +4192,7 @@ func (na *_FinishedTask__ReprAssembler) reset() { na.ca_MinerVersion.reset() na.ca_ClientVersion.reset() na.ca_Size.reset() + na.ca_PayloadCID.reset() } func (na *_FinishedTask__ReprAssembler) BeginMap(int64) (ipld.MapAssembler, error) { switch *na.m { @@ -4347,6 +4438,18 @@ func (ma *_FinishedTask__ReprAssembler) valueFinishTidy() bool { default: return false } + case 15: + switch ma.w.PayloadCID.m { + case schema.Maybe_Null: + ma.state = maState_initial + return true + case schema.Maybe_Value: + ma.w.PayloadCID.v = ma.ca_PayloadCID.w + ma.state = maState_initial + return true + default: + return false + } default: panic("unreachable") } @@ -4526,6 +4629,17 @@ func (ma *_FinishedTask__ReprAssembler) AssembleEntry(k string) (ipld.NodeAssemb ma.ca_Size.m = &ma.w.Size.m ma.w.Size.m = allowNull return &ma.ca_Size, nil + case "PayloadCID": + if ma.s & fieldBit__FinishedTask_PayloadCID != 0 { + return nil, ipld.ErrRepeatedMapKey{&fieldName__FinishedTask_PayloadCID_serial} + } + ma.s += fieldBit__FinishedTask_PayloadCID + ma.state = maState_midValue + ma.f = 15 + ma.ca_PayloadCID.w = ma.w.PayloadCID.v + ma.ca_PayloadCID.m = &ma.w.PayloadCID.m + ma.w.PayloadCID.m = allowNull + return &ma.ca_PayloadCID, nil default: return nil, ipld.ErrInvalidKey{TypeName:"tasks.FinishedTask.Repr", Key:&_String{k}} } @@ -4632,6 +4746,11 @@ func (ma *_FinishedTask__ReprAssembler) AssembleValue() ipld.NodeAssembler { ma.ca_Size.m = &ma.w.Size.m ma.w.Size.m = allowNull return &ma.ca_Size + case 15: + ma.ca_PayloadCID.w = ma.w.PayloadCID.v + ma.ca_PayloadCID.m = &ma.w.PayloadCID.m + ma.w.PayloadCID.m = allowNull + return &ma.ca_PayloadCID default: panic("unreachable") } @@ -4812,6 +4931,13 @@ func (ka *_FinishedTask__ReprKeyAssembler) AssignString(k string) error { ka.s += fieldBit__FinishedTask_Size ka.state = maState_expectValue ka.f = 14 + case "PayloadCID": + if ka.s & fieldBit__FinishedTask_PayloadCID != 0 { + return ipld.ErrRepeatedMapKey{&fieldName__FinishedTask_PayloadCID_serial} + } + ka.s += fieldBit__FinishedTask_PayloadCID + ka.state = maState_expectValue + ka.f = 15 default: return ipld.ErrInvalidKey{TypeName:"tasks.FinishedTask.Repr", Key:&_String{k}} } diff --git a/tasks/ipldsch_types.go b/tasks/ipldsch_types.go index e6fc0847..a9462244 100644 --- a/tasks/ipldsch_types.go +++ b/tasks/ipldsch_types.go @@ -131,6 +131,7 @@ type _FinishedTask struct { MinerVersion _String__Maybe ClientVersion _String__Maybe Size _Int__Maybe + PayloadCID _String__Maybe } // FinishedTasks matches the IPLD Schema type "FinishedTasks". It has list kind. diff --git a/tasks/model.go b/tasks/model.go index d9a06d17..917052de 100644 --- a/tasks/model.go +++ b/tasks/model.go @@ -276,6 +276,7 @@ func (t *_Task) Finalize(ctx context.Context, s ipld.Storer) (FinishedTask, erro MinerVersion: logs.minerVersion, ClientVersion: logs.clientVersion, Size: logs.size, + PayloadCID: logs.payloadCID, } // events to dag item logList := &_List_StageDetails{} @@ -301,6 +302,7 @@ type logExtraction struct { timeFirstByte _Int__Maybe timeLastByte _Int__Maybe size _Int__Maybe + payloadCID _String__Maybe } func parseFinalLogs(t Task) *logExtraction { @@ -313,6 +315,8 @@ func parseFinalLogs(t Task) *logExtraction { if t.StorageTask.Exists() { le.size = _Int__Maybe{m: schema.Maybe_Value, v: &t.StorageTask.Must().Size} + } else if t.RetrievalTask.Exists() { + le.payloadCID = _String__Maybe{m: schema.Maybe_Value, v: &t.RetrievalTask.Must().PayloadCID} } // If the task failed early, we might not have some of the info. @@ -387,6 +391,10 @@ func parseFinalLogs(t Task) *logExtraction { le.size.v = &_Int{b} } } + if le.payloadCID.IsAbsent() && strings.Contains(entry, "PayloadCID:") { + le.payloadCID.m = schema.Maybe_Value + le.payloadCID.v = &_String{strings.TrimPrefix(entry, "PayloadCID: ")} + } } } diff --git a/tasks/storage_deal.go b/tasks/storage_deal.go index ae27e53a..fe98c6bd 100644 --- a/tasks/storage_deal.go +++ b/tasks/storage_deal.go @@ -235,7 +235,7 @@ func (de *storageDealExecutor) generateFile(_ logFunc) error { return nil } -func (de *storageDealExecutor) importFile(_ logFunc) (err error) { +func (de *storageDealExecutor) importFile(l logFunc) (err error) { // import the file into the lotus node ref := api.FileRef{ Path: filepath.Join(de.config.NodeDataDir, de.fileName), @@ -246,6 +246,7 @@ func (de *storageDealExecutor) importFile(_ logFunc) (err error) { if err != nil { return fmt.Errorf("error importing file: %w", err) } + l(fmt.Sprintf("PayloadCID: %s", de.importRes.Root)) return nil }