Skip to content

Commit

Permalink
Resume Data Transfer (#100)
Browse files Browse the repository at this point in the history
* Emit events with received cids (#71)

* persist received cids on channel state.

* Send, Receive and Validate Restart requests (#75)

* Send, Receive and Validate Requests

* Initiating and Responding Tests and bug fixes (#76)

* Testing for resuming data transfer work

* Cleanup Push Restarts PR  (#79)

* cleanup of restart PR

* link the peers

* Tests for pull restarts (#84)

* tests for pull restarts

* Merge Tests cleanup work (#92)

* cleanup of restart PR

* cleanup timedout channels (#93)

* backward compatibility of restart (#96)

* backward compatibility of restart

* changes and tests

* more tests

* better error handling for restarts

* feat(message): switch to cbor map encoding (#97)

switch to cbor map encoding for the 1_1 message protocol

* feat(channels): setup datastore migrations (#99)

setup datatransfer channels so they migrate over successfully

Co-authored-by: Hannah Howard <hannah@hannahhoward.net>
  • Loading branch information
aarshkshah1992 and hannahhoward authored Oct 12, 2020
1 parent 7958d09 commit ad43f2d
Show file tree
Hide file tree
Showing 58 changed files with 7,986 additions and 609 deletions.
15 changes: 15 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ test:

type-gen: build
go generate ./...

imports:
scripts/fiximports

cbor-gen:
go generate ./...

tidy:
go mod tidy

lint:
git fetch
golangci-lint run -v --concurrency 2 --new-from-rev origin/master

prepare-pr: cbor-gen tidy imports lint
48 changes: 42 additions & 6 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@ import (
cbg "github.com/whyrusleeping/cbor-gen"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels/internal"
)

// channelState is immutable channel data plus mutable state
type channelState struct {
// peerId of the manager peer
selfPeer peer.ID
// an identifier for this channel shared by request and responder, set by requester through protocol
transferID datatransfer.TransferID
// base CID for the piece being transferred
Expand All @@ -38,11 +41,13 @@ type channelState struct {
// more informative status on a channel
message string
// additional vouchers
vouchers []encodedVoucher
vouchers []internal.EncodedVoucher
// additional voucherResults
voucherResults []encodedVoucherResult
voucherResults []internal.EncodedVoucherResult
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc

receivedCids []cid.Cid
}

// EmptyChannelState is the zero value for channel state, meaning not present
Expand Down Expand Up @@ -82,6 +87,11 @@ func (c channelState) Voucher() datatransfer.Voucher {
return encodable.(datatransfer.Voucher)
}

// ReceivedCids returns the cids received so far on this channel
func (c channelState) ReceivedCids() []cid.Cid {
return c.receivedCids
}

// Sender returns the peer id for the node that is sending data
func (c channelState) Sender() peer.ID { return c.sender }

Expand All @@ -99,9 +109,8 @@ func (c channelState) IsPull() bool {
func (c channelState) ChannelID() datatransfer.ChannelID {
if c.isPull {
return datatransfer.ChannelID{ID: c.transferID, Initiator: c.recipient, Responder: c.sender}
} else {
return datatransfer.ChannelID{ID: c.transferID, Initiator: c.sender, Responder: c.recipient}
}
return datatransfer.ChannelID{ID: c.transferID, Initiator: c.sender, Responder: c.recipient}
}

func (c channelState) Message() string {
Expand Down Expand Up @@ -140,11 +149,38 @@ func (c channelState) VoucherResults() []datatransfer.VoucherResult {
return voucherResults
}

func (c channelState) OtherParty(thisParty peer.ID) peer.ID {
if thisParty == c.sender {
func (c channelState) SelfPeer() peer.ID {
return c.selfPeer
}

func (c channelState) OtherPeer() peer.ID {
if c.sender == c.selfPeer {
return c.recipient
}
return c.sender
}

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
return channelState{
selfPeer: c.SelfPeer,
isPull: c.Initiator == c.Recipient,
transferID: c.TransferID,
baseCid: c.BaseCid,
selector: c.Selector,
sender: c.Sender,
recipient: c.Recipient,
totalSize: c.TotalSize,
status: c.Status,
sent: c.Sent,
received: c.Received,
message: c.Message,
vouchers: c.Vouchers,
voucherResults: c.VoucherResults,
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,

receivedCids: c.ReceivedCids,
}
}

var _ datatransfer.ChannelState = channelState{}
87 changes: 56 additions & 31 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"

versioning "github.com/filecoin-project/go-ds-versioning/pkg"
versionedfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm"
"github.com/filecoin-project/go-statemachine/fsm"

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-data-transfer/channels/internal"
"github.com/filecoin-project/go-data-transfer/channels/internal/migrations"
"github.com/filecoin-project/go-data-transfer/encoding"
)

Expand All @@ -32,7 +36,8 @@ type Channels struct {
notifier Notifier
voucherDecoder DecoderByTypeFunc
voucherResultDecoder DecoderByTypeFunc
statemachines fsm.Group
stateMachines fsm.Group
migrateStateMachines func(context.Context) error
}

// ChannelEnvironment -- just a proxy for DTNetwork for now
Expand All @@ -44,38 +49,47 @@ type ChannelEnvironment interface {
}

// New returns a new thread safe list of channels
func New(ds datastore.Datastore,
func New(ds datastore.Batching,
notifier Notifier,
voucherDecoder DecoderByTypeFunc,
voucherResultDecoder DecoderByTypeFunc,
env ChannelEnvironment) (*Channels, error) {
env ChannelEnvironment,
selfPeer peer.ID) (*Channels, error) {
c := &Channels{
notifier: notifier,
voucherDecoder: voucherDecoder,
voucherResultDecoder: voucherResultDecoder,
}
statemachines, err := fsm.New(ds, fsm.Parameters{
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer)
if err != nil {
return nil, err
}
c.stateMachines, c.migrateStateMachines, err = versionedfsm.NewVersionedFSM(ds, fsm.Parameters{
Environment: env,
StateType: internalChannelState{},
StateType: internal.ChannelState{},
StateKeyField: "Status",
Events: ChannelEvents,
StateEntryFuncs: ChannelStateEntryFuncs,
Notifier: c.dispatch,
FinalityStates: ChannelFinalityStates,
})
}, channelMigrations, versioning.VersionKey("1"))
if err != nil {
return nil, err
}
c.statemachines = statemachines
return c, nil
}

// Start migrates the channel data store as needed
func (c *Channels) Start(ctx context.Context) error {
return c.migrateStateMachines(ctx)
}

func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
evtCode, ok := eventName.(datatransfer.EventCode)
if !ok {
log.Errorf("dropped bad event %v", eventName)
}
realChannel, ok := channel.(internalChannelState)
realChannel, ok := channel.(internal.ChannelState)
if !ok {
log.Errorf("not a ClientDeal %v", channel)
}
Expand All @@ -85,12 +99,12 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
Timestamp: time.Now(),
}

c.notifier(evt, realChannel.ToChannelState(c.voucherDecoder, c.voucherResultDecoder))
c.notifier(evt, fromInternalChannelState(realChannel, c.voucherDecoder, c.voucherResultDecoder))
}

// CreateNew creates a new channel id and channel state and saves to channels.
// returns error if the channel exists already.
func (c *Channels) CreateNew(tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
var responder peer.ID
if dataSender == initiator {
responder = dataReceiver
Expand All @@ -106,71 +120,78 @@ func (c *Channels) CreateNew(tid datatransfer.TransferID, baseCid cid.Cid, selec
if err != nil {
return datatransfer.ChannelID{}, err
}
err = c.statemachines.Begin(chid, &internalChannelState{
err = c.stateMachines.Begin(chid, &internal.ChannelState{
SelfPeer: selfPeer,
TransferID: tid,
Initiator: initiator,
Responder: responder,
BaseCid: baseCid,
Selector: &cbg.Deferred{Raw: selBytes},
Sender: dataSender,
Recipient: dataReceiver,
Vouchers: []encodedVoucher{
Vouchers: []internal.EncodedVoucher{
{
Type: voucher.Type(),
Voucher: &cbg.Deferred{
Raw: voucherBytes,
},
},
},
Status: datatransfer.Requested,
Status: datatransfer.Requested,
ReceivedCids: nil,
})
if err != nil {
return datatransfer.ChannelID{}, err
}
return chid, c.statemachines.Send(chid, datatransfer.Open)
return chid, c.stateMachines.Send(chid, datatransfer.Open)
}

// InProgress returns a list of in progress channels
func (c *Channels) InProgress() (map[datatransfer.ChannelID]datatransfer.ChannelState, error) {
var internalChannels []internalChannelState
err := c.statemachines.List(&internalChannels)
var internalChannels []internal.ChannelState
err := c.stateMachines.List(&internalChannels)
if err != nil {
return nil, err
}
channels := make(map[datatransfer.ChannelID]datatransfer.ChannelState, len(internalChannels))
for _, internalChannel := range internalChannels {
channels[datatransfer.ChannelID{ID: internalChannel.TransferID, Responder: internalChannel.Responder, Initiator: internalChannel.Initiator}] =
internalChannel.ToChannelState(c.voucherDecoder, c.voucherResultDecoder)
fromInternalChannelState(internalChannel, c.voucherDecoder, c.voucherResultDecoder)
}
return channels, nil
}

// GetByID searches for a channel in the slice of channels with id `chid`.
// Returns datatransfer.EmptyChannelState if there is no channel with that id
func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (datatransfer.ChannelState, error) {
var internalChannel internalChannelState
err := c.statemachines.GetSync(ctx, chid, &internalChannel)
var internalChannel internal.ChannelState
err := c.stateMachines.GetSync(ctx, chid, &internalChannel)
if err != nil {
return nil, ErrNotFound
}
return internalChannel.ToChannelState(c.voucherDecoder, c.voucherResultDecoder), nil
return fromInternalChannelState(internalChannel, c.voucherDecoder, c.voucherResultDecoder), nil
}

// Accept marks a data transfer as accepted
func (c *Channels) Accept(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Accept)
}

// IncrementSent increments the total sent on the given channel by the given amount (returning
// the new total)
func (c *Channels) IncrementSent(chid datatransfer.ChannelID, delta uint64) error {
return c.send(chid, datatransfer.Progress, delta, uint64(0))
// Restart marks a data transfer as restarted
func (c *Channels) Restart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Restart)
}

// IncrementReceived increments the total received on the given channel by the given amount (returning
// the new total)
func (c *Channels) IncrementReceived(chid datatransfer.ChannelID, delta uint64) error {
return c.send(chid, datatransfer.Progress, uint64(0), delta)
func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.CompleteCleanupOnRestart)
}

func (c *Channels) DataSent(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
return c.send(chid, datatransfer.DataSent, delta, cid)
}

func (c *Channels) DataReceived(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
return c.send(chid, datatransfer.DataReceived, delta, cid)
}

// PauseInitiator pauses the initator of this channel
Expand Down Expand Up @@ -246,18 +267,22 @@ func (c *Channels) Error(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.Error, err)
}

func (c *Channels) Disconnected(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.Disconnected)
}

// HasChannel returns true if the given channel id is being tracked
func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
return c.statemachines.Has(chid)
return c.stateMachines.Has(chid)
}

func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode, args ...interface{}) error {
has, err := c.statemachines.Has(chid)
has, err := c.stateMachines.Has(chid)
if err != nil {
return err
}
if !has {
return ErrNotFound
}
return c.statemachines.Send(chid, code, args...)
return c.stateMachines.Send(chid, code, args...)
}
Loading

0 comments on commit ad43f2d

Please sign in to comment.