Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop data transfer correctly and some minor cleanp #69

Merged
merged 2 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (c *Channels) CreateNew(tid datatransfer.TransferID, baseCid cid.Cid, selec
}

// InProgress returns a list of in progress channels
func (c *Channels) InProgress(ctx context.Context) (map[datatransfer.ChannelID]datatransfer.ChannelState, error) {
func (c *Channels) InProgress() (map[datatransfer.ChannelID]datatransfer.ChannelState, error) {
var internalChannels []internalChannelState
err := c.statemachines.List(&internalChannels)
if err != nil {
Expand Down Expand Up @@ -261,14 +261,3 @@ func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode
}
return c.statemachines.Send(chid, code, args...)
}

func (c *Channels) sendSync(ctx context.Context, chid datatransfer.ChannelID, code datatransfer.EventCode, args ...interface{}) error {
has, err := c.statemachines.Has(chid)
if err != nil {
return err
}
if !has {
return ErrNotFound
}
return c.statemachines.SendSync(ctx, chid, code, args...)
}
2 changes: 1 addition & 1 deletion channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestChannels(t *testing.T) {
})

t.Run("in progress channels", func(t *testing.T) {
inProgress, err := channelList.InProgress(ctx)
inProgress, err := channelList.InProgress()
require.NoError(t, err)
require.Len(t, inProgress, 2)
require.Contains(t, inProgress, datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1})
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/hashicorp/go-multierror v1.1.0
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.7
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099 h1:vQqOW
github.com/hannahhoward/cbor-gen-for v0.0.0-20191218204337-9ab7b1bcc099/go.mod h1:WVPCl0HO/0RAL5+vBH2GMxBomlxBF70MAS78+Lu1//k=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e h1:3YKHER4nmd7b5qy5t0GWDTwSn4OyRgfAXSmo6VnryBY=
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e/go.mod h1:I8h3MITA53gN9OnWGCgaMa0JWVRdXthWw4M3CPM54OY=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
Expand Down
19 changes: 16 additions & 3 deletions impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/hannahhoward/go-pubsub"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -101,8 +102,20 @@ func (m *manager) Start(ctx context.Context) error {
}

// Stop terminates all data transfers and ends processing
func (m *manager) Stop() error {
return nil
func (m *manager) Stop(ctx context.Context) error {
openChannels, err := m.channels.InProgress()
if err != nil {
return xerrors.Errorf("error getting channels in progress: %w", err)
}

var result error
for chid := range openChannels {
if err := m.CloseDataTransferChannel(ctx, chid); err != nil {
result = multierror.Append(result, xerrors.Errorf("error closing channel with ID %v, err: %w", chid, err))
}
}

return result
}

// RegisterVoucherType registers a validator for the given voucher type
Expand Down Expand Up @@ -266,7 +279,7 @@ func (m *manager) SubscribeToEvents(subscriber datatransfer.Subscriber) datatran

// get all in progress transfers
func (m *manager) InProgressChannels(ctx context.Context) (map[datatransfer.ChannelID]datatransfer.ChannelState, error) {
return m.channels.InProgress(ctx)
return m.channels.InProgress()
}

// RegisterRevalidator registers a revalidator for the given voucher type
Expand Down
2 changes: 1 addition & 1 deletion manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type Manager interface {
Start(ctx context.Context) error

// Stop terminates all data transfers and ends processing
Stop() error
Stop(ctx context.Context) error

// RegisterVoucherType registers a validator for the given voucher type
// will error if voucher type does not implement voucher
Expand Down
74 changes: 29 additions & 45 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,51 +37,6 @@ type libp2pDataTransferNetwork struct {
receiver Receiver
}

type streamMessageSender struct {
s network.Stream
}

func (s *streamMessageSender) Close() error {
return helpers.FullClose(s.s)
}

func (s *streamMessageSender) Reset() error {
return s.s.Reset()
}

func (s *streamMessageSender) SendMsg(ctx context.Context, msg datatransfer.Message) error {
return msgToStream(ctx, s.s, msg)
}

func msgToStream(ctx context.Context, s network.Stream, msg datatransfer.Message) error {
if msg.IsRequest() {
log.Debugf("Outgoing request message for transfer ID: %d", msg.TransferID())
}

deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
if err := s.SetWriteDeadline(deadline); err != nil {
log.Warnf("error setting deadline: %s", err)
}

switch s.Protocol() {
case ProtocolDataTransfer:
if err := msg.ToNet(s); err != nil {
log.Debugf("error: %s", err)
return err
}
default:
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}

if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warnf("error resetting deadline: %s", err)
}
return nil
}

func (dtnet *libp2pDataTransferNetwork) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) {
return dtnet.host.NewStream(ctx, p, ProtocolDataTransfer)
}
Expand Down Expand Up @@ -167,3 +122,32 @@ func (dtnet *libp2pDataTransferNetwork) Protect(id peer.ID, tag string) {
func (dtnet *libp2pDataTransferNetwork) Unprotect(id peer.ID, tag string) bool {
return dtnet.host.ConnManager().Unprotect(id, tag)
}

func msgToStream(ctx context.Context, s network.Stream, msg datatransfer.Message) error {
if msg.IsRequest() {
log.Debugf("Outgoing request message for transfer ID: %d", msg.TransferID())
}

deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
if err := s.SetWriteDeadline(deadline); err != nil {
log.Warnf("error setting deadline: %s", err)
}

switch s.Protocol() {
case ProtocolDataTransfer:
if err := msg.ToNet(s); err != nil {
log.Debugf("error: %s", err)
return err
}
default:
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}

if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warnf("error resetting deadline: %s", err)
}
return nil
}
6 changes: 0 additions & 6 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ import (
"github.com/filecoin-project/go-data-transfer/encoding"
)

type errorString string

func (es errorString) Error() string {
return string(es)
}

//go:generate cbor-gen-for ChannelID

// TypeIdentifier is a unique string identifier for a type of encodable object in a
Expand Down