Skip to content

Commit

Permalink
log request / response events (#137)
Browse files Browse the repository at this point in the history
* fix: better channel not found error

* refactor: idiomatic NotFoundErr

* feat: log request / response events
  • Loading branch information
dirkmc authored Jan 19, 2021
1 parent 270fb6a commit e429103
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 30 deletions.
18 changes: 15 additions & 3 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

versioning "github.com/filecoin-project/go-ds-versioning/pkg"
versionedfsm "github.com/filecoin-project/go-ds-versioning/pkg/fsm"
Expand All @@ -29,7 +30,17 @@ type ChannelCIDsReader func(chid datatransfer.ChannelID) ([]cid.Cid, error)
type Notifier func(datatransfer.Event, datatransfer.ChannelState)

// ErrNotFound is returned when a channel cannot be found with a given channel ID
var ErrNotFound = errors.New("No channel for this channel ID")
type ErrNotFound struct {
ChannelID datatransfer.ChannelID
}

func (e *ErrNotFound) Error() string {
return "No channel for channel ID " + e.ChannelID.String()
}

func NewErrNotFound(chid datatransfer.ChannelID) error {
return &ErrNotFound{ChannelID: chid}
}

// ErrWrongType is returned when a caller attempts to change the type of implementation data after setting it
var ErrWrongType = errors.New("Cannot change type of implementation specific data after setting it")
Expand Down Expand Up @@ -176,7 +187,7 @@ func (c *Channels) GetByID(ctx context.Context, chid datatransfer.ChannelID) (da
var internalChannel internal.ChannelState
err := c.stateMachines.GetSync(ctx, chid, &internalChannel)
if err != nil {
return nil, ErrNotFound
return nil, NewErrNotFound(chid)
}
return fromInternalChannelState(internalChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList), nil
}
Expand Down Expand Up @@ -299,7 +310,8 @@ func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode
return err
}
if !has {
return ErrNotFound
return xerrors.Errorf("cannot send FSM event %s to data-transfer channel %s: %w",
datatransfer.Events[code], chid, NewErrNotFound(chid))
}
return c.stateMachines.Send(chid, code, args...)
}
9 changes: 5 additions & 4 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

versioning "github.com/filecoin-project/go-ds-versioning/pkg"
versionedds "github.com/filecoin-project/go-ds-versioning/pkg/datastore"
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestChannels(t *testing.T) {
// empty if channel does not exist
state, err = channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[1], Responder: peers[1], ID: tid1})
require.Equal(t, nil, state)
require.EqualError(t, err, channels.ErrNotFound.Error())
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))

// works for other channel as well
state, err = channelList.GetByID(ctx, datatransfer.ChannelID{Initiator: peers[3], Responder: peers[2], ID: tid2})
Expand All @@ -122,7 +123,7 @@ func TestChannels(t *testing.T) {
require.Equal(t, state.Status(), datatransfer.Ongoing)

err = channelList.Accept(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1})
require.EqualError(t, err, channels.ErrNotFound.Error())
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
})

t.Run("updating send/receive values", func(t *testing.T) {
Expand Down Expand Up @@ -159,9 +160,9 @@ func TestChannels(t *testing.T) {

// errors if channel does not exist
err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200)
require.EqualError(t, err, channels.ErrNotFound.Error())
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[1], Responder: peers[0], ID: tid1}, cids[1], 200)
require.EqualError(t, err, channels.ErrNotFound.Error())
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())

err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50)
Expand Down
22 changes: 18 additions & 4 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
)

func (m *manager) OnChannelOpened(chid datatransfer.ChannelID) error {
log.Infof("channel %s: opened", chid)

has, err := m.channels.HasChannel(chid)
if err != nil {
return err
Expand Down Expand Up @@ -121,6 +123,8 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra
return m.receiveNewRequest(chid.Initiator, request)
}
if request.IsCancel() {
log.Infof("channel %s: received cancel request, cleaning up channel", chid)

m.transport.CleanupChannel(chid)
return nil, m.channels.Cancel(chid)
}
Expand All @@ -147,6 +151,7 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra

func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error {
if response.IsCancel() {
log.Infof("channel %s: received cancel response, cancelling channel", chid)
return m.channels.Cancel(chid)
}
if response.IsVoucherResult() {
Expand All @@ -161,16 +166,19 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
}
}
if !response.Accepted() {
log.Infof("channel %s: received rejected response, erroring out channel", chid)
return m.channels.Error(chid, datatransfer.ErrRejected)
}
if response.IsNew() {
log.Infof("channel %s: received new response, accepting channel", chid)
err := m.channels.Accept(chid)
if err != nil {
return err
}
}

if response.IsRestart() {
log.Infof("channel %s: received restart response, restarting channel", chid)
err := m.channels.Restart(chid)
if err != nil {
return err
Expand All @@ -179,6 +187,7 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
}
if response.IsComplete() && response.Accepted() {
if !response.IsPaused() {
log.Infof("channel %s: received complete response, completing channel", chid)
return m.channels.ResponderCompletes(chid)
}
err := m.channels.ResponderBeginsFinalization(chid)
Expand Down Expand Up @@ -285,8 +294,9 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
return nil
}
if msg != nil {
if err := m.dataTransferNetwork.SendMessage(context.TODO(), chid.Initiator, msg); err != nil {
log.Warnf("failed to send completion message, err : %v", err)
log.Infof("channel %s: sending completion message", chid)
if err := m.dataTransferNetwork.SendMessage(context.Background(), chid.Initiator, msg); err != nil {
log.Warnf("channel %s: failed to send completion message: %s", chid, err)
return m.OnRequestDisconnected(context.TODO(), chid)
}
}
Expand All @@ -312,6 +322,8 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
}

func (m *manager) receiveRestartRequest(chid datatransfer.ChannelID, incoming datatransfer.Request) (datatransfer.Response, error) {
log.Infof("channel %s: received restart request", chid)

result, err := m.restartRequest(chid, incoming)
msg, msgErr := m.response(true, false, err, incoming.TransferID(), result)
if msgErr != nil {
Expand All @@ -323,6 +335,8 @@ func (m *manager) receiveRestartRequest(chid datatransfer.ChannelID, incoming da
func (m *manager) receiveNewRequest(
initiator peer.ID,
incoming datatransfer.Request) (datatransfer.Response, error) {
log.Infof("received new channel request from %s", initiator)

result, err := m.acceptRequest(initiator, incoming)
msg, msgErr := m.response(false, true, err, incoming.TransferID(), result)
if msgErr != nil {
Expand All @@ -340,7 +354,7 @@ func (m *manager) restartRequest(chid datatransfer.ChannelID,
}

if err := m.validateRestartRequest(context.Background(), initiator, chid, incoming); err != nil {
return nil, err
return nil, xerrors.Errorf("restart request for channel %s failed validation: %w", chid, err)
}

stor, err := incoming.Selector()
Expand All @@ -361,7 +375,7 @@ func (m *manager) restartRequest(chid datatransfer.ChannelID,
}
}
if err := m.channels.Restart(chid); err != nil {
return result, err
return result, xerrors.Errorf("failed to restart channel %s: %w", chid, err)
}
processor, has := m.transportConfigurers.Processor(voucher.Type())
if has {
Expand Down
18 changes: 17 additions & 1 deletion impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ func (m *manager) notifier(evt datatransfer.Event, chst datatransfer.ChannelStat

// Start initializes data transfer processing
func (m *manager) Start(ctx context.Context) error {
log.Info("start data-transfer module")

go func() {
err := m.channels.Start(ctx)
if err != nil {
Expand All @@ -189,6 +191,7 @@ func (m *manager) Start(ctx context.Context) error {
log.Warnf("Publish data transfer ready event: %s", err.Error())
}
}()

dtReceiver := &receiver{m}
m.dataTransferNetwork.SetDelegate(dtReceiver)
return m.transport.SetEventHandler(m)
Expand All @@ -201,6 +204,7 @@ func (m *manager) OnReady(ready datatransfer.ReadyFunc) {

// Stop terminates all data transfers and ends processing
func (m *manager) Stop(ctx context.Context) error {
log.Info("stop data-transfer module")
m.pushChannelMonitor.Shutdown()
return m.transport.Shutdown(ctx)
}
Expand All @@ -221,6 +225,8 @@ func (m *manager) RegisterVoucherType(voucherType datatransfer.Voucher, validato
// OpenPushDataChannel opens a data transfer that will send data to the recipient peer and
// transfer parts of the piece that match the selector
func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.ChannelID, error) {
log.Infof("open push channel to %s with base cid %s", requestTo, baseCid)

req, err := m.newRequest(ctx, selector, false, voucher, baseCid, requestTo)
if err != nil {
return datatransfer.ChannelID{}, err
Expand Down Expand Up @@ -257,6 +263,8 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo
// OpenPullDataChannel opens a data transfer that will request data from the sending peer and
// transfer parts of the piece that match the selector
func (m *manager) OpenPullDataChannel(ctx context.Context, requestTo peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.ChannelID, error) {
log.Infof("open pull channel to %s with base cid %s", requestTo, baseCid)

req, err := m.newRequest(ctx, selector, true, voucher, baseCid, requestTo)
if err != nil {
return datatransfer.ChannelID{}, err
Expand Down Expand Up @@ -304,15 +312,18 @@ func (m *manager) SendVoucher(ctx context.Context, channelID datatransfer.Channe

// close an open channel (effectively a cancel)
func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("close channel %s", chid)

chst, err := m.channels.GetByID(ctx, chid)
if err != nil {
return err
}
err = m.transport.CloseChannel(ctx, chid)
if err != nil {
log.Warnf("unable to close channel: %w", err)
log.Warnf("unable to close channel %s: %s", chid, err)
}

log.Infof("%s: sending close channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
if err != nil {
err = fmt.Errorf("Unable to send cancel message: %w", err)
Expand All @@ -333,6 +344,7 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe

// pause a running data transfer channel
func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("pause channel %s", chid)

pausable, ok := m.transport.(datatransfer.PauseableTransport)
if !ok {
Expand All @@ -355,6 +367,8 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe

// resume a running data transfer channel
func (m *manager) ResumeDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("resume channel %s", chid)

pausable, ok := m.transport.(datatransfer.PauseableTransport)
if !ok {
return datatransfer.ErrUnsupported
Expand Down Expand Up @@ -427,6 +441,8 @@ func (m *manager) RegisterTransportConfigurer(voucherType datatransfer.Voucher,

// RestartDataTransferChannel restarts data transfer on the channel with the given channelId
func (m *manager) RestartDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("restart channel %s", chid)

channel, err := m.channels.GetByID(ctx, chid)
if err != nil {
return xerrors.Errorf("failed to fetch channel: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion impl/initiating_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-storedcounter"

Expand Down Expand Up @@ -156,7 +157,7 @@ func TestDataTransferInitiating(t *testing.T) {
"SendVoucher with no channel open": {
verify: func(t *testing.T, h *harness) {
err := h.dt.SendVoucher(h.ctx, datatransfer.ChannelID{Initiator: h.peers[1], Responder: h.peers[0], ID: 999999}, h.voucher)
require.EqualError(t, err, channels.ErrNotFound.Error())
require.True(t, xerrors.As(err, new(*channels.ErrNotFound)))
},
},
"SendVoucher with channel open, push succeeds": {
Expand Down
19 changes: 12 additions & 7 deletions impl/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (r *receiver) ReceiveRequest(
incoming datatransfer.Request) {
err := r.receiveRequest(ctx, initiator, incoming)
if err != nil {
log.Warn(err)
log.Warnf("error processing request from %s: %s", initiator, err)
}
}

Expand Down Expand Up @@ -97,6 +97,9 @@ func (r *receiver) receiveResponse(
return r.manager.transport.(datatransfer.PauseableTransport).PauseChannel(ctx, chid)
}
if err != nil {
log.Warnf("closing channel %s after getting error processing response from %s: %s",
chid, sender, err)

_ = r.manager.transport.CloseChannel(ctx, chid)
return err
}
Expand All @@ -113,10 +116,12 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context,

ch, err := incoming.RestartChannelId()
if err != nil {
log.Errorf("failed to fetch restart channel Id: %w", err)
log.Errorf("cannot restart channel: failed to fetch channel Id: %w", err)
return
}

log.Infof("channel %s: received restart existing channel request from %s", ch, sender)

// validate channel exists -> in non-terminal state and that the sender matches
channel, err := r.manager.channels.GetByID(ctx, ch)
if err != nil || channel == nil {
Expand All @@ -126,30 +131,30 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context,

// initiator should be me
if channel.ChannelID().Initiator != r.manager.peerID {
log.Error("channel initiator is not the manager peer")
log.Error("cannot restart channel %s: channel initiator is not the manager peer", ch)
return
}

// other peer should be the counter party on the channel
if channel.OtherPeer() != sender {
log.Error("channel counterparty is not the sender peer")
log.Error("cannot restart channel %s: channel counterparty is not the sender peer", ch)
return
}

// channel should NOT be terminated
if channels.IsChannelTerminated(channel.Status()) {
log.Error("channel is already terminated")
log.Error("cannot restart channel %s: channel already terminated", ch)
return
}

switch r.manager.channelDataTransferType(channel) {
case ManagerPeerCreatePush:
if err := r.manager.openPushRestartChannel(ctx, channel); err != nil {
log.Errorf("failed to open push restart channel: %w", err)
log.Errorf("failed to open push restart channel %s: %s", ch, err)
}
case ManagerPeerCreatePull:
if err := r.manager.openPullRestartChannel(ctx, channel); err != nil {
log.Errorf("failed to open pull restart channel: %w", err)
log.Errorf("failed to open pull restart channel %s: %s", ch, err)
}
default:
log.Error("peer is not the creator of the channel")
Expand Down
Loading

0 comments on commit e429103

Please sign in to comment.