Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes for retrieval on data transfer #56

Merged
merged 13 commits into from
Jul 15, 2020
Merged
14 changes: 12 additions & 2 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type channelState struct {
totalSize uint64
// current status of this deal
status datatransfer.Status
// isPull indicates if this is a push or pull request
isPull bool
// total bytes sent from this node (0 if receiver)
sent uint64
// total bytes received by this node (0 if sender)
Expand Down Expand Up @@ -89,8 +91,16 @@ func (c channelState) Recipient() peer.ID { return c.recipient }
func (c channelState) TotalSize() uint64 { return c.totalSize }

// IsPull returns whether this is a pull request based on who initiated it
func (c channelState) IsPull(initiator peer.ID) bool {
return initiator == c.recipient
func (c channelState) IsPull() bool {
return c.isPull
}

func (c channelState) ChannelID() datatransfer.ChannelID {
if c.isPull {
return datatransfer.ChannelID{ID: c.transferID, Initiator: c.recipient, Responder: c.sender}
} else {
return datatransfer.ChannelID{ID: c.transferID, Initiator: c.sender, Responder: c.recipient}
}
}

func (c channelState) Message() string {
Expand Down
14 changes: 2 additions & 12 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package channels
import (
"context"
"errors"
"math"
"time"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/encoding"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand All @@ -17,8 +15,6 @@ import (
cbg "github.com/whyrusleeping/cbor-gen"
)

const noopSynchronize = datatransfer.EventCode(math.MaxInt32)

type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)

type Notifier func(datatransfer.Event, datatransfer.ChannelState)
Expand All @@ -42,6 +38,7 @@ type ChannelEnvironment interface {
Protect(id peer.ID, tag string)
Unprotect(id peer.ID, tag string) bool
ID() peer.ID
CleanupChannel(chid datatransfer.ChannelID)
}

// New returns a new thread safe list of channels
Expand Down Expand Up @@ -76,9 +73,6 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
if !ok {
log.Errorf("dropped bad event %v", eventName)
}
if evtCode == noopSynchronize {
return
}
realChannel, ok := channel.(internalChannelState)
if !ok {
log.Errorf("not a ClientDeal %v", channel)
Expand Down Expand Up @@ -153,11 +147,7 @@ func (c *Channels) InProgress(ctx context.Context) (map[datatransfer.ChannelID]d
// Returns datatransfer.EmptyChannelState if there is no channel with that id
func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (datatransfer.ChannelState, error) {
var internalChannel internalChannelState
err := c.sendSync(ctx, chid, noopSynchronize)
if err != nil && err != statemachine.ErrTerminated {
return nil, err
}
err = c.statemachines.Get(chid).Get(&internalChannel)
err := c.statemachines.GetSync(ctx, chid, &internalChannel)
if err != nil {
return nil, ErrNotFound
}
Expand Down
9 changes: 5 additions & 4 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.Accept).From(datatransfer.Requested).To(datatransfer.Ongoing),
fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),
fsm.Event(datatransfer.Progress).FromMany(
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
Expand Down Expand Up @@ -56,8 +57,6 @@ var ChannelEvents = fsm.Events{
From(datatransfer.ResponderPaused).To(datatransfer.Ongoing).
From(datatransfer.BothPaused).To(datatransfer.InitiatorPaused).
From(datatransfer.Finalizing).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing).
FromAny().ToNoChange(),
fsm.Event(datatransfer.FinishTransfer).
FromAny().To(datatransfer.TransferFinished).
Expand All @@ -69,14 +68,15 @@ var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.ResponderCompletes).
FromAny().To(datatransfer.ResponderCompleted).
From(datatransfer.ResponderPaused).To(datatransfer.ResponderFinalizing).
From(datatransfer.TransferFinished).To(datatransfer.Completing),
From(datatransfer.TransferFinished).To(datatransfer.Completing).
From(datatransfer.ResponderFinalizing).To(datatransfer.ResponderCompleted).
From(datatransfer.ResponderFinalizingTransferFinished).To(datatransfer.Completing),
fsm.Event(datatransfer.BeginFinalizing).FromAny().To(datatransfer.Finalizing),
fsm.Event(datatransfer.Complete).FromAny().To(datatransfer.Completing),
fsm.Event(datatransfer.CleanupComplete).
From(datatransfer.Cancelling).To(datatransfer.Cancelled).
From(datatransfer.Failing).To(datatransfer.Failed).
From(datatransfer.Completing).To(datatransfer.Completed),
fsm.Event(noopSynchronize).FromAny().ToNoChange(),
}

// ChannelStateEntryFuncs are handlers called as we enter different states
Expand All @@ -92,6 +92,7 @@ func cleanupConnection(ctx fsm.Context, env ChannelEnvironment, channel internal
if otherParty == env.ID() {
otherParty = channel.Responder
}
env.CleanupChannel(datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder})
env.Unprotect(otherParty, datatransfer.ChannelID{ID: channel.TransferID, Initiator: channel.Initiator, Responder: channel.Responder}.String())
return ctx.Trigger(datatransfer.CleanupComplete)
}
Expand Down
22 changes: 20 additions & 2 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ipfs/go-datastore"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
)

Expand All @@ -35,8 +36,8 @@ func TestChannels(t *testing.T) {
notifier := func(evt datatransfer.Event, chst datatransfer.ChannelState) {
received <- event{evt, chst}
}
net := testutil.NewFakeNetwork(testutil.GeneratePeers(1)[0])
channelList, err := channels.New(ds, notifier, decoderByType, decoderByType, net)

channelList, err := channels.New(ds, notifier, decoderByType, decoderByType, &fakeEnv{})
require.NoError(t, err)

tid1 := datatransfer.TransferID(0)
Expand Down Expand Up @@ -256,3 +257,20 @@ func checkEvent(ctx context.Context, t *testing.T, received chan event, code dat
require.Equal(t, code, evt.event.Code)
return evt.state
}

type fakeEnv struct {
}

func (fe *fakeEnv) Protect(id peer.ID, tag string) {
}

func (fe *fakeEnv) Unprotect(id peer.ID, tag string) bool {
return false
}

func (fe *fakeEnv) ID() peer.ID {
return peer.ID("")
}

func (fe *fakeEnv) CleanupChannel(chid datatransfer.ChannelID) {
}
1 change: 1 addition & 0 deletions channels/internalchannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type internalChannelState struct {

func (c internalChannelState) ToChannelState(voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
return channelState{
isPull: c.Initiator == c.Recipient,
transferID: c.TransferID,
baseCid: c.BaseCid,
selector: c.Selector,
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ module github.com/filecoin-project/go-data-transfer
go 1.13

require (
github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9
github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c
github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUn
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9 h1:NagIOq5osclBprc95ILEnGCOpubuhalqwWvayYJmXLQ=
github.com/filecoin-project/go-statemachine v0.0.0-20200703171610-a74a697973b9/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989 h1:1GjCS3xy/CRIw7Tq0HfzX6Al8mklrszQZ3iIFnjPzHk=
github.com/filecoin-project/go-statemachine v0.0.0-20200714194326-a77c3ae20989/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
github.com/filecoin-project/go-statestore v0.1.0/go.mod h1:LFc9hD+fRxPqiHiaqUEZOinUJB4WARkRfNl10O7kTnI=
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b h1:fkRZSPrYpk42PV3/lIXiL0LHetxde7vyYYvSsttQtfg=
Expand Down Expand Up @@ -156,8 +156,8 @@ github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaH
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c h1:fCW8JzwvBMfODvdliK+s3ziYZPD/5FAzluahZYXVg3k=
github.com/ipfs/go-graphsync v0.0.6-0.20200708073926-caa872f68b2c/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83 h1:tkGDAwcZfzDFeBNyBWYOM02Qw0rGpA2UuCvq49T3K5o=
github.com/ipfs/go-graphsync v0.0.6-0.20200715204712-ef06b3d32e83/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
Expand Down
26 changes: 26 additions & 0 deletions impl/environment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package impl

import (
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/libp2p/go-libp2p-core/peer"
)

type channelEnvironment struct {
m *manager
}

func (ce *channelEnvironment) Protect(id peer.ID, tag string) {
ce.m.dataTransferNetwork.Protect(id, tag)
}

func (ce *channelEnvironment) Unprotect(id peer.ID, tag string) bool {
return ce.m.dataTransferNetwork.Unprotect(id, tag)
}

func (ce *channelEnvironment) ID() peer.ID {
return ce.m.dataTransferNetwork.ID()
}

func (ce *channelEnvironment) CleanupChannel(chid datatransfer.ChannelID) {
ce.m.transport.CleanupChannel(chid)
}
39 changes: 23 additions & 16 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,6 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response messa
if response.IsCancel() {
return m.channels.Cancel(chid)
}
if response.IsComplete() && response.Accepted() {
if !response.IsPaused() {
return m.channels.ResponderCompletes(chid)
}
err := m.channels.ResponderBeginsFinalization(chid)
if err != nil {
return nil
}
}
if response.IsVoucherResult() {
if !response.EmptyVoucherResult() {
vresult, err := m.decodeVoucherResult(response)
Expand All @@ -134,6 +125,15 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response messa
}
}
}
if response.IsComplete() && response.Accepted() {
if !response.IsPaused() {
return m.channels.ResponderCompletes(chid)
}
err := m.channels.ResponderBeginsFinalization(chid)
if err != nil {
return nil
}
}
if response.IsPaused() {
return m.pauseOther(chid)
}
Expand Down Expand Up @@ -286,12 +286,23 @@ func (m *manager) processUpdateVoucher(chid datatransfer.ChannelID, request mess
return m.processRevalidationResult(chid, result, voucherErr)
}

func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) {
vresMessage, err := m.response(false, resultErr, chid.ID, result)
func (m *manager) revalidationResponse(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) {
chst, err := m.channels.GetByID(context.TODO(), chid)
if err != nil {
return nil, err
}
if chst.Status() == datatransfer.Finalizing {
return m.completeResponse(resultErr, chid.ID, result)
}
return m.response(false, resultErr, chid.ID, result)
}

func (m *manager) processRevalidationResult(chid datatransfer.ChannelID, result datatransfer.VoucherResult, resultErr error) (message.DataTransferResponse, error) {
vresMessage, err := m.revalidationResponse(chid, result, resultErr)

if err != nil {
return nil, err
}
if result != nil {
err := m.channels.NewVoucherResult(chid, result)
if err != nil {
Expand Down Expand Up @@ -324,16 +335,12 @@ func (m *manager) completeMessage(chid datatransfer.ChannelID) (message.DataTran
result, resultErr = revalidator.OnComplete(chid)
return resultErr
})
vtype := datatransfer.EmptyTypeIdentifier
if result != nil {
vtype = result.Type()
err := m.channels.NewVoucherResult(chid, result)
if err != nil {
return nil, err
}
}

return message.CompleteResponse(chid.ID,
resultErr == nil || resultErr == datatransfer.ErrPause,
resultErr == datatransfer.ErrPause, vtype, result)
return m.completeResponse(resultErr, chid.ID, result)
}
11 changes: 10 additions & 1 deletion impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels"
"github.com/filecoin-project/go-data-transfer/encoding"
"github.com/filecoin-project/go-data-transfer/message"
"github.com/filecoin-project/go-data-transfer/network"
"github.com/filecoin-project/go-data-transfer/registry"
Expand Down Expand Up @@ -67,14 +68,22 @@ func NewDataTransfer(ds datastore.Datastore, dataTransferNetwork network.DataTra
transport: transport,
storedCounter: storedCounter,
}
channels, err := channels.New(ds, m.notifier, m.validatedTypes.Decoder, m.resultTypes.Decoder, dataTransferNetwork)
channels, err := channels.New(ds, m.notifier, m.voucherDecoder, m.resultTypes.Decoder, &channelEnvironment{m})
if err != nil {
return nil, err
}
m.channels = channels
return m, nil
}

func (m *manager) voucherDecoder(voucherType datatransfer.TypeIdentifier) (encoding.Decoder, bool) {
decoder, has := m.validatedTypes.Decoder(voucherType)
if !has {
return m.revalidators.Decoder(voucherType)
}
return decoder, true
}

func (m *manager) notifier(evt datatransfer.Event, chst datatransfer.ChannelState) {
err := m.pubSub.Publish(internalEvent{evt, chst})
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,11 @@ func TestSimulatedRetrievalFlow(t *testing.T) {
}
dt2.SubscribeToEvents(clientSubscriber)
providerFinished := make(chan struct{}, 1)
providerAccepted := false
var providerSubscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
if event.Code == datatransfer.PauseResponder {
if !config.payForUnseal {
if !config.payForUnseal && !providerAccepted {
providerAccepted = true
timer := time.NewTimer(config.unpauseResponderDelay)
go func() {
<-timer.C
Expand Down Expand Up @@ -381,7 +383,7 @@ func TestPauseAndResume(t *testing.T) {
sentIncrements := make([]uint64, 0, 21)
receivedIncrements := make([]uint64, 0, 21)
for opens < 2 || completes < 2 || len(sentIncrements) < 21 || len(receivedIncrements) < 21 ||
pauseInitiators < 2 || pauseResponders < 1 || resumeInitiators < 2 || resumeResponders < 1 {
pauseInitiators < 1 || pauseResponders < 1 || resumeInitiators < 1 || resumeResponders < 1 {
select {
case <-ctx.Done():
t.Fatal("Did not complete succcessful data transfer")
Expand Down
10 changes: 10 additions & 0 deletions impl/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ func (m *manager) response(isNew bool, err error, tid datatransfer.TransferID, v
return message.VoucherResultResponse(tid, isAccepted, isPaused, resultType, voucherResult)
}

func (m *manager) completeResponse(err error, tid datatransfer.TransferID, voucherResult datatransfer.VoucherResult) (message.DataTransferResponse, error) {
isAccepted := err == nil || err == datatransfer.ErrPause
isPaused := err == datatransfer.ErrPause
resultType := datatransfer.EmptyTypeIdentifier
if voucherResult != nil {
resultType = voucherResult.Type()
}
return message.CompleteResponse(tid, isAccepted, isPaused, resultType, voucherResult)
}

func (m *manager) resume(chid datatransfer.ChannelID) error {
if chid.Initiator == m.peerID {
return m.channels.ResumeInitiator(chid)
Expand Down
Loading