From 178624419191c2cd4a431b5137b6eb776e41d678 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 9 Jul 2020 13:11:05 -0700 Subject: [PATCH 01/13] fix(types): cbor-gen for channel ID --- types.go | 2 + types_cbor_gen.go | 108 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+) create mode 100644 types_cbor_gen.go diff --git a/types.go b/types.go index bd290b12..242722da 100644 --- a/types.go +++ b/types.go @@ -18,6 +18,8 @@ func (es errorString) Error() string { return string(es) } +//go:generate cbor-gen-for ChannelID + // ErrChannelNotFound indicates the given channel does not exist const ErrChannelNotFound = errorString("channel not found") diff --git a/types_cbor_gen.go b/types_cbor_gen.go new file mode 100644 index 00000000..c30619c2 --- /dev/null +++ b/types_cbor_gen.go @@ -0,0 +1,108 @@ +// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. + +package datatransfer + +import ( + "fmt" + "io" + + "github.com/libp2p/go-libp2p-core/peer" + cbg "github.com/whyrusleeping/cbor-gen" + xerrors "golang.org/x/xerrors" +) + +var _ = xerrors.Errorf + +func (t *ChannelID) MarshalCBOR(w io.Writer) error { + if t == nil { + _, err := w.Write(cbg.CborNull) + return err + } + if _, err := w.Write([]byte{131}); err != nil { + return err + } + + // t.Initiator (peer.ID) (string) + if len(t.Initiator) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Initiator was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Initiator)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Initiator)); err != nil { + return err + } + + // t.Responder (peer.ID) (string) + if len(t.Responder) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.Responder was too long") + } + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajTextString, uint64(len(t.Responder)))); err != nil { + return err + } + if _, err := w.Write([]byte(t.Responder)); err != nil { + return err + } + + // t.ID (datatransfer.TransferID) (uint64) + + if _, err := w.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, uint64(t.ID))); err != nil { + return err + } + + return nil +} + +func (t *ChannelID) UnmarshalCBOR(r io.Reader) error { + br := cbg.GetPeeker(r) + + maj, extra, err := cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajArray { + return fmt.Errorf("cbor input should be of type array") + } + + if extra != 3 { + return fmt.Errorf("cbor input had wrong number of fields") + } + + // t.Initiator (peer.ID) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Initiator = peer.ID(sval) + } + // t.Responder (peer.ID) (string) + + { + sval, err := cbg.ReadString(br) + if err != nil { + return err + } + + t.Responder = peer.ID(sval) + } + // t.ID (datatransfer.TransferID) (uint64) + + { + + maj, extra, err = cbg.CborReadHeader(br) + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.ID = TransferID(extra) + + } + return nil +} From 22c1856d3fc339cd4985bacecf19066e8e6b9e9f Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 9 Jul 2020 13:11:54 -0700 Subject: [PATCH 02/13] fix(impl): better voucher decoder use both type registries to decode vouchers in channels --- impl/impl.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/impl/impl.go b/impl/impl.go index 7563968a..d8d8dba4 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -15,6 +15,7 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/channels" + "github.com/filecoin-project/go-data-transfer/encoding" "github.com/filecoin-project/go-data-transfer/message" "github.com/filecoin-project/go-data-transfer/network" "github.com/filecoin-project/go-data-transfer/registry" @@ -67,7 +68,7 @@ func NewDataTransfer(ds datastore.Datastore, dataTransferNetwork network.DataTra transport: transport, storedCounter: storedCounter, } - channels, err := channels.New(ds, m.notifier, m.validatedTypes.Decoder, m.resultTypes.Decoder, dataTransferNetwork) + channels, err := channels.New(ds, m.notifier, m.voucherDecoder, m.resultTypes.Decoder, dataTransferNetwork) if err != nil { return nil, err } @@ -75,6 +76,14 @@ func NewDataTransfer(ds datastore.Datastore, dataTransferNetwork network.DataTra return m, nil } +func (m *manager) voucherDecoder(voucherType datatransfer.TypeIdentifier) (encoding.Decoder, bool) { + decoder, has := m.validatedTypes.Decoder(voucherType) + if !has { + return m.revalidators.Decoder(voucherType) + } + return decoder, true +} + func (m *manager) notifier(evt datatransfer.Event, chst datatransfer.ChannelState) { err := m.pubSub.Publish(internalEvent{evt, chst}) if err != nil { From 7fec7f990862c94cc35bf1b7b1978d68111d5ed4 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 9 Jul 2020 19:10:00 -0700 Subject: [PATCH 03/13] refactor(channels): use go-statemachine GetSync rather than clunky noopSynchronize event, use built in GetSync method in go-statemachine --- channels/channels.go | 12 +----------- channels/channels_fsm.go | 1 - go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 4 insertions(+), 15 deletions(-) diff --git a/channels/channels.go b/channels/channels.go index c35d7494..e9e87a8f 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -3,7 +3,6 @@ package channels import ( "context" "errors" - "math" "time" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -17,8 +16,6 @@ import ( cbg "github.com/whyrusleeping/cbor-gen" ) -const noopSynchronize = datatransfer.EventCode(math.MaxInt32) - type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool) type Notifier func(datatransfer.Event, datatransfer.ChannelState) @@ -76,9 +73,6 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) { if !ok { log.Errorf("dropped bad event %v", eventName) } - if evtCode == noopSynchronize { - return - } realChannel, ok := channel.(internalChannelState) if !ok { log.Errorf("not a ClientDeal %v", channel) @@ -153,12 +147,8 @@ func (c *Channels) InProgress(ctx context.Context) (map[datatransfer.ChannelID]d // Returns datatransfer.EmptyChannelState if there is no channel with that id func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (datatransfer.ChannelState, error) { var internalChannel internalChannelState - err := c.sendSync(ctx, chid, noopSynchronize) + err := c.statemachines.GetSync(ctx, chid, &internalChannel) if err != nil && err != statemachine.ErrTerminated { - return nil, err - } - err = c.statemachines.Get(chid).Get(&internalChannel) - if err != nil { return nil, ErrNotFound } return internalChannel.ToChannelState(c.voucherDecoder, c.voucherResultDecoder), nil diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 4dc8f29f..09a45100 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -76,7 +76,6 @@ var ChannelEvents = fsm.Events{ From(datatransfer.Cancelling).To(datatransfer.Cancelled). From(datatransfer.Failing).To(datatransfer.Failed). From(datatransfer.Completing).To(datatransfer.Completed), - fsm.Event(noopSynchronize).FromAny().ToNoChange(), } // ChannelStateEntryFuncs are handlers called as we enter different states diff --git a/go.mod b/go.mod index 690a6b6a..91cc2937 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/filecoin-project/go-data-transfer go 1.13 require ( - github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9 + github.com/filecoin-project/go-statemachine v0.0.0-20200710020535-0bb117d1f638 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e diff --git a/go.sum b/go.sum index fc4bb480..75a7d1a5 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUn github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= -github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9 h1:NagIOq5osclBprc95ILEnGCOpubuhalqwWvayYJmXLQ= -github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= +github.com/filecoin-project/go-statemachine v0.0.0-20200710020535-0bb117d1f638 h1:7h+FS3wP/gi5Z9N4r3ZT9yfZVzXJ3l/gH9B63W3m3Lg= +github.com/filecoin-project/go-statemachine v0.0.0-20200710020535-0bb117d1f638/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= From 17e42e1f1ebef11801d8f45bdee439949a0137bc Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 9 Jul 2020 20:48:18 -0700 Subject: [PATCH 04/13] feat(channels): add channel id to channel state --- channels/channel_state.go | 14 ++++++++++++-- channels/internalchannel.go | 1 + go.sum | 1 + types.go | 7 +++++-- 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/channels/channel_state.go b/channels/channel_state.go index 3be6bc8d..7033c840 100644 --- a/channels/channel_state.go +++ b/channels/channel_state.go @@ -28,6 +28,8 @@ type channelState struct { totalSize uint64 // current status of this deal status datatransfer.Status + // isPull indicates if this is a push or pull request + isPull bool // total bytes sent from this node (0 if receiver) sent uint64 // total bytes received by this node (0 if sender) @@ -89,8 +91,16 @@ func (c channelState) Recipient() peer.ID { return c.recipient } func (c channelState) TotalSize() uint64 { return c.totalSize } // IsPull returns whether this is a pull request based on who initiated it -func (c channelState) IsPull(initiator peer.ID) bool { - return initiator == c.recipient +func (c channelState) IsPull() bool { + return c.isPull +} + +func (c channelState) ChannelID() datatransfer.ChannelID { + if c.isPull { + return datatransfer.ChannelID{ID: c.transferID, Initiator: c.recipient, Responder: c.sender} + } else { + return datatransfer.ChannelID{ID: c.transferID, Initiator: c.sender, Responder: c.recipient} + } } func (c channelState) Message() string { diff --git a/channels/internalchannel.go b/channels/internalchannel.go index e19f81d2..55992ab0 100644 --- a/channels/internalchannel.go +++ b/channels/internalchannel.go @@ -54,6 +54,7 @@ type internalChannelState struct { func (c internalChannelState) ToChannelState(voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState { return channelState{ + isPull: c.Initiator == c.Recipient, transferID: c.TransferID, baseCid: c.BaseCid, selector: c.Selector, diff --git a/go.sum b/go.sum index 75a7d1a5..8546288b 100644 --- a/go.sum +++ b/go.sum @@ -65,6 +65,7 @@ github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIi github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8= +github.com/filecoin-project/lotus v0.4.1 h1:rg9X3TY7ymT+m6ATIQ7xt8FW2CpCeznwOFfbONPMz84= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= diff --git a/types.go b/types.go index 242722da..5f34d3b8 100644 --- a/types.go +++ b/types.go @@ -182,8 +182,11 @@ type Channel interface { // TotalSize returns the total size for the data being transferred TotalSize() uint64 - // IsPull returns whether this is a pull request based on who initiated it - IsPull(initiator peer.ID) bool + // IsPull returns whether this is a pull request + IsPull() bool + + // ChannelID returns the ChannelID for this request + ChannelID() ChannelID // OtherParty returns the opposite party in the channel to the passed in party OtherParty(thisParty peer.ID) peer.ID From c8bccbbb685f8eac2b403cb870cf3ef46a9b12a5 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 9 Jul 2020 21:20:29 -0700 Subject: [PATCH 05/13] fix(impl): fix integration test Not sure how this was managing to work given how it's written! --- impl/integration_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/impl/integration_test.go b/impl/integration_test.go index 0b01bfc0..08748541 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -235,9 +235,11 @@ func TestSimulatedRetrievalFlow(t *testing.T) { } dt2.SubscribeToEvents(clientSubscriber) providerFinished := make(chan struct{}, 1) + providerAccepted := false var providerSubscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) { if event.Code == datatransfer.PauseResponder { - if !config.payForUnseal { + if !config.payForUnseal && !providerAccepted { + providerAccepted = true timer := time.NewTimer(config.unpauseResponderDelay) go func() { <-timer.C From d92d071053756556618856928db14a15e44fe74c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Fri, 10 Jul 2020 12:05:24 -0700 Subject: [PATCH 06/13] fix(deps): update go-statemachine --- go.mod | 2 +- go.sum | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 91cc2937..ed219483 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/filecoin-project/go-data-transfer go 1.13 require ( - github.com/filecoin-project/go-statemachine v0.0.0-20200710020535-0bb117d1f638 + github.com/filecoin-project/go-statemachine v0.0.0-20200710195503-2909a4eefeb7 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e diff --git a/go.sum b/go.sum index 8546288b..8a0d8236 100644 --- a/go.sum +++ b/go.sum @@ -59,13 +59,12 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUn github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= -github.com/filecoin-project/go-statemachine v0.0.0-20200710020535-0bb117d1f638 h1:7h+FS3wP/gi5Z9N4r3ZT9yfZVzXJ3l/gH9B63W3m3Lg= -github.com/filecoin-project/go-statemachine v0.0.0-20200710020535-0bb117d1f638/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= +github.com/filecoin-project/go-statemachine v0.0.0-20200710195503-2909a4eefeb7 h1:FGOB/y35floo7pZWzfqfMxwHUOO597F0y2Qyu4bDGpQ= +github.com/filecoin-project/go-statemachine v0.0.0-20200710195503-2909a4eefeb7/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/go.mod h1:Q0GQOBtKf1oE10eSXSlhN45kDBdGvEcVOqMiffqX+N8= -github.com/filecoin-project/lotus v0.4.1 h1:rg9X3TY7ymT+m6ATIQ7xt8FW2CpCeznwOFfbONPMz84= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-check/check v0.0.0-20180628173108-788fd7840127 h1:0gkP6mzaMqkmpcJYCFOLkIBwI7xFExG03bbkOkCvUPI= From db7276ece6f214c6bb2e6841fbee3499e59a03a1 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 13 Jul 2020 23:41:13 -0700 Subject: [PATCH 07/13] fix(impl): fix test stability was failng in situations with where request completed, due to timing issues. --- impl/integration_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/impl/integration_test.go b/impl/integration_test.go index 08748541..3b3e34d1 100644 --- a/impl/integration_test.go +++ b/impl/integration_test.go @@ -383,7 +383,7 @@ func TestPauseAndResume(t *testing.T) { sentIncrements := make([]uint64, 0, 21) receivedIncrements := make([]uint64, 0, 21) for opens < 2 || completes < 2 || len(sentIncrements) < 21 || len(receivedIncrements) < 21 || - pauseInitiators < 2 || pauseResponders < 1 || resumeInitiators < 2 || resumeResponders < 1 { + pauseInitiators < 1 || pauseResponders < 1 || resumeInitiators < 1 || resumeResponders < 1 { select { case <-ctx.Done(): t.Fatal("Did not complete succcessful data transfer") From 3e522cccb2bb30d229f913901eac4ed9748cc43a Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 14 Jul 2020 13:16:47 -0700 Subject: [PATCH 08/13] fix(deps): update go-statemachine --- channels/channels.go | 3 +-- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/channels/channels.go b/channels/channels.go index e9e87a8f..7301c502 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -7,7 +7,6 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/encoding" - "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/go-statemachine/fsm" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -148,7 +147,7 @@ func (c *Channels) InProgress(ctx context.Context) (map[datatransfer.ChannelID]d func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (datatransfer.ChannelState, error) { var internalChannel internalChannelState err := c.statemachines.GetSync(ctx, chid, &internalChannel) - if err != nil && err != statemachine.ErrTerminated { + if err != nil { return nil, ErrNotFound } return internalChannel.ToChannelState(c.voucherDecoder, c.voucherResultDecoder), nil diff --git a/go.mod b/go.mod index ed219483..ffdccb19 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/filecoin-project/go-data-transfer go 1.13 require ( - github.com/filecoin-project/go-statemachine v0.0.0-20200710195503-2909a4eefeb7 + github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989 github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e diff --git a/go.sum b/go.sum index 8a0d8236..3c8062b7 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUn github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= -github.com/filecoin-project/go-statemachine v0.0.0-20200710195503-2909a4eefeb7 h1:FGOB/y35floo7pZWzfqfMxwHUOO597F0y2Qyu4bDGpQ= -github.com/filecoin-project/go-statemachine v0.0.0-20200710195503-2909a4eefeb7/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= +github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989 h1:1GjCS3xy/CRIw7Tq0HfzX6Al8mklrszQZ3iIFnjPzHk= +github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ= github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI= github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg= From cadf9e4057308899c8f66261f216b9c37d7583e0 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 15 Jul 2020 05:57:52 -0700 Subject: [PATCH 09/13] fix(impl): revalidation sends complete responses make sure the revalidation sends a different message for a response that is finalizing versus regular --- channels/channels_fsm.go | 6 +++--- impl/events.go | 39 +++++++++++++++++++++++---------------- impl/utils.go | 10 ++++++++++ 3 files changed, 36 insertions(+), 19 deletions(-) diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 09a45100..55fb5875 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -56,8 +56,6 @@ var ChannelEvents = fsm.Events{ From(datatransfer.ResponderPaused).To(datatransfer.Ongoing). From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused). From(datatransfer.Finalizing).To(datatransfer.Completing). - From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted). - From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing). FromAny().ToNoChange(), fsm.Event(datatransfer.FinishTransfer). FromAny().To(datatransfer.TransferFinished). @@ -69,7 +67,9 @@ var ChannelEvents = fsm.Events{ fsm.Event(datatransfer.ResponderCompletes). FromAny().To(datatransfer.ResponderCompleted). From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing). - From(datatransfer.TransferFinished).To(datatransfer.Completing), + From(datatransfer.TransferFinished).To(datatransfer.Completing). + From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted). + From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing), fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing), fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing), fsm.Event(datatransfer.CleanupComplete). diff --git a/impl/events.go b/impl/events.go index 2577f1fa..e2d6c4e4 100644 --- a/impl/events.go +++ b/impl/events.go @@ -104,15 +104,6 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response messa if response.IsCancel() { return m.channels.Cancel(chid) } - if response.IsComplete() && response.Accepted() { - if !response.IsPaused() { - return m.channels.ResponderCompletes(chid) - } - err := m.channels.ResponderBeginsFinalization(chid) - if err != nil { - return nil - } - } if response.IsVoucherResult() { if !response.EmptyVoucherResult() { vresult, err := m.decodeVoucherResult(response) @@ -134,6 +125,15 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response messa } } } + if response.IsComplete() && response.Accepted() { + if !response.IsPaused() { + return m.channels.ResponderCompletes(chid) + } + err := m.channels.ResponderBeginsFinalization(chid) + if err != nil { + return nil + } + } if response.IsPaused() { return m.pauseOther(chid) } @@ -286,12 +286,23 @@ func (m *manager) processUpdateVoucher(chid datatransfer.ChannelID, request mess return m.processRevalidationResult(chid, result, voucherErr) } -func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) { - vresMessage, err := m.response(false, resultErr, chid.ID, result) +func (m *manager) revalidationResponse(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) { + chst, err := m.channels.GetByID(context.TODO(), chid) if err != nil { return nil, err } + if chst.Status() == datatransfer.Finalizing { + return m.completeResponse(resultErr, chid.ID, result) + } + return m.response(false, resultErr, chid.ID, result) +} + +func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) { + vresMessage, err := m.revalidationResponse(chid, result, resultErr) + if err != nil { + return nil, err + } if result != nil { err := m.channels.NewVoucherResult(chid, result) if err != nil { @@ -324,16 +335,12 @@ func (m *manager) completeMessage(chid datatransfer.ChannelID) (message.DataTran result, resultErr = revalidator.OnComplete(chid) return resultErr }) - vtype := datatransfer.EmptyTypeIdentifier if result != nil { - vtype = result.Type() err := m.channels.NewVoucherResult(chid, result) if err != nil { return nil, err } } - return message.CompleteResponse(chid.ID, - resultErr == nil || resultErr == datatransfer.ErrPause, - resultErr == datatransfer.ErrPause, vtype, result) + return m.completeResponse(resultErr, chid.ID, result) } diff --git a/impl/utils.go b/impl/utils.go index 6e687833..7a07ca2b 100644 --- a/impl/utils.go +++ b/impl/utils.go @@ -52,6 +52,16 @@ func (m *manager) response(isNew bool, err error, tid datatransfer.TransferID, v return message.VoucherResultResponse(tid, isAccepted, isPaused, resultType, voucherResult) } +func (m *manager) completeResponse(err error, tid datatransfer.TransferID, voucherResult datatransfer.VoucherResult) (message.DataTransferResponse, error) { + isAccepted := err == nil || err == datatransfer.ErrPause + isPaused := err == datatransfer.ErrPause + resultType := datatransfer.EmptyTypeIdentifier + if voucherResult != nil { + resultType = voucherResult.Type() + } + return message.CompleteResponse(tid, isAccepted, isPaused, resultType, voucherResult) +} + func (m *manager) resume(chid datatransfer.ChannelID) error { if chid.Initiator == m.peerID { return m.channels.ResumeInitiator(chid) From 62dc97e588a31fa19519ee3434afc6bd5c35423b Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 15 Jul 2020 06:51:50 -0700 Subject: [PATCH 10/13] fix(fsm): allow progress at start --- channels/channels_fsm.go | 1 + 1 file changed, 1 insertion(+) diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 55fb5875..2b4d0d22 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -15,6 +15,7 @@ var ChannelEvents = fsm.Events{ fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing), fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling), fsm.Event(datatransfer.Progress).FromMany( + datatransfer.Requested, datatransfer.Ongoing, datatransfer.InitiatorPaused, datatransfer.ResponderPaused, From 60f199ac750931a296146e9072c62d52bf6b79b2 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 15 Jul 2020 07:29:05 -0700 Subject: [PATCH 11/13] fix(deps): update graphsync --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ffdccb19..c941c4f2 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ipfs/go-blockservice v0.1.3 github.com/ipfs/go-cid v0.0.5 github.com/ipfs/go-datastore v0.4.4 - github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c + github.com/ipfs/go-graphsync v0.0.6-0.20200715142715-e2f27c4754e6 github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index 3c8062b7..b44430ec 100644 --- a/go.sum +++ b/go.sum @@ -156,8 +156,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c h1:fCW8JzwvBMfODvdliK+s3ziYZPD/5FAzluahZYXVg3k= -github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= +github.com/ipfs/go-graphsync v0.0.6-0.20200715142715-e2f27c4754e6 h1:+dQnaRkLV4za46Gfw6b1KNVOCcGDrdnEGZrjz3kF80k= +github.com/ipfs/go-graphsync v0.0.6-0.20200715142715-e2f27c4754e6/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ= From b3311844e1a55e5f32d0dbab0e329cc4073c5749 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 15 Jul 2020 07:47:13 -0700 Subject: [PATCH 12/13] feat(transport): cleanup channels after complete do not cleanup channels until transfer is definitely complete --- channels/channels.go | 1 + channels/channels_fsm.go | 1 + channels/channels_test.go | 22 ++++++++++++++++++++-- impl/environment.go | 26 ++++++++++++++++++++++++++ impl/impl.go | 2 +- transport/graphsync/graphsync.go | 13 ------------- 6 files changed, 49 insertions(+), 16 deletions(-) create mode 100644 impl/environment.go diff --git a/channels/channels.go b/channels/channels.go index 7301c502..639ae2de 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -38,6 +38,7 @@ type ChannelEnvironment interface { Protect(id peer.ID, tag string) Unprotect(id peer.ID, tag string) bool ID() peer.ID + CleanupChannel(chid datatransfer.ChannelID) } // New returns a new thread safe list of channels diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 2b4d0d22..866a5603 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -92,6 +92,7 @@ func cleanupConnection(ctx fsm.Context, env ChannelEnvironment, channel internal if otherParty == env.ID() { otherParty = channel.Responder } + env.CleanupChannel(datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder}) env.Unprotect(otherParty, datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder}.String()) return ctx.Trigger(datatransfer.CleanupComplete) } diff --git a/channels/channels_test.go b/channels/channels_test.go index d261698e..04083d0f 100644 --- a/channels/channels_test.go +++ b/channels/channels_test.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/go-datastore" basicnode "github.com/ipld/go-ipld-prime/node/basic" "github.com/ipld/go-ipld-prime/traversal/selector/builder" + peer "github.com/libp2p/go-libp2p-core/peer" "github.com/stretchr/testify/require" ) @@ -35,8 +36,8 @@ func TestChannels(t *testing.T) { notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) { received <- event{evt, chst} } - net := testutil.NewFakeNetwork(testutil.GeneratePeers(1)[0]) - channelList, err := channels.New(ds, notifier, decoderByType, decoderByType, net) + + channelList, err := channels.New(ds, notifier, decoderByType, decoderByType, &fakeEnv{}) require.NoError(t, err) tid1 := datatransfer.TransferID(0) @@ -256,3 +257,20 @@ func checkEvent(ctx context.Context, t *testing.T, received chan event, code dat require.Equal(t, code, evt.event.Code) return evt.state } + +type fakeEnv struct { +} + +func (fe *fakeEnv) Protect(id peer.ID, tag string) { +} + +func (fe *fakeEnv) Unprotect(id peer.ID, tag string) bool { + return false +} + +func (fe *fakeEnv) ID() peer.ID { + return peer.ID("") +} + +func (fe *fakeEnv) CleanupChannel(chid datatransfer.ChannelID) { +} diff --git a/impl/environment.go b/impl/environment.go new file mode 100644 index 00000000..86b6e0bf --- /dev/null +++ b/impl/environment.go @@ -0,0 +1,26 @@ +package impl + +import ( + datatransfer "github.com/filecoin-project/go-data-transfer" + "github.com/libp2p/go-libp2p-core/peer" +) + +type channelEnvironment struct { + m *manager +} + +func (ce *channelEnvironment) Protect(id peer.ID, tag string) { + ce.m.dataTransferNetwork.Protect(id, tag) +} + +func (ce *channelEnvironment) Unprotect(id peer.ID, tag string) bool { + return ce.m.dataTransferNetwork.Unprotect(id, tag) +} + +func (ce *channelEnvironment) ID() peer.ID { + return ce.m.dataTransferNetwork.ID() +} + +func (ce *channelEnvironment) CleanupChannel(chid datatransfer.ChannelID) { + ce.m.transport.CleanupChannel(chid) +} diff --git a/impl/impl.go b/impl/impl.go index d8d8dba4..50354051 100644 --- a/impl/impl.go +++ b/impl/impl.go @@ -68,7 +68,7 @@ func NewDataTransfer(ds datastore.Datastore, dataTransferNetwork network.DataTra transport: transport, storedCounter: storedCounter, } - channels, err := channels.New(ds, m.notifier, m.voucherDecoder, m.resultTypes.Decoder, dataTransferNetwork) + channels, err := channels.New(ds, m.notifier, m.voucherDecoder, m.resultTypes.Decoder, &channelEnvironment{m}) if err != nil { return nil, err } diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index a084ae7e..4b4d0249 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -110,15 +110,6 @@ func (t *Transport) executeGsRequest(ctx context.Context, channelID datatransfer log.Error(err) } } - t.dataLock.Lock() - gsKey, ok := t.channelIDMap[channelID] - delete(t.channelIDMap, channelID) - delete(t.contextCancelMap, channelID) - delete(t.pending, channelID) - if ok { - delete(t.graphsyncRequestMap, gsKey) - } - t.dataLock.Unlock() } func (t *Transport) gsKeyFromChannelID(ctx context.Context, chid datatransfer.ChannelID) (graphsyncKey, error) { @@ -221,7 +212,6 @@ func (t *Transport) CloseChannel(ctx context.Context, chid datatransfer.ChannelI t.dataLock.Lock() defer t.dataLock.Unlock() if _, ok := t.requestorCancelledMap[chid]; ok { - t.cleanupChannel(chid, gsKey) return nil } return t.gs.CancelResponse(gsKey.p, gsKey.requestID) @@ -432,9 +422,6 @@ func (t *Transport) gsCompletedResponseListener(p peer.ID, request graphsync.Req log.Error(err) } } - t.dataLock.Lock() - t.cleanupChannel(chid, graphsyncKey{request.ID(), p}) - t.dataLock.Unlock() } func (t *Transport) cleanupChannel(chid datatransfer.ChannelID, gsKey graphsyncKey) { From d4a6207f641e797bcc7b32ccda923223844ce738 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 15 Jul 2020 13:49:41 -0700 Subject: [PATCH 13/13] fix(deps): update go-graphsync to master --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c941c4f2..232f5918 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/ipfs/go-blockservice v0.1.3 github.com/ipfs/go-cid v0.0.5 github.com/ipfs/go-datastore v0.4.4 - github.com/ipfs/go-graphsync v0.0.6-0.20200715142715-e2f27c4754e6 + github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83 github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index b44430ec..3dbde6ec 100644 --- a/go.sum +++ b/go.sum @@ -156,8 +156,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.0.6-0.20200715142715-e2f27c4754e6 h1:+dQnaRkLV4za46Gfw6b1KNVOCcGDrdnEGZrjz3kF80k= -github.com/ipfs/go-graphsync v0.0.6-0.20200715142715-e2f27c4754e6/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= +github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83 h1:tkGDAwcZfzDFeBNyBWYOM02Qw0rGpA2UuCvq49T3K5o= +github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=