Skip to content

Commit

Permalink
Simplify graphsync cancel (#229)
Browse files Browse the repository at this point in the history
* refactor: simplify graphsync cancel

* docs: add lock required comments to graphsync transport

* feat: restore wait for complete in graphsync transport

* feat: upgrade to graphsync v0.6.8
  • Loading branch information
dirkmc authored Aug 6, 2021
1 parent 1d44c41 commit f3280ab
Show file tree
Hide file tree
Showing 11 changed files with 139 additions and 110 deletions.
8 changes: 4 additions & 4 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,10 @@ func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.Disconnected, err)
}

// RequestTimedOut indicates that the transport layer had a timeout trying to
// make a request
func (c *Channels) RequestTimedOut(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.RequestTimedOut, err)
// RequestCancelled indicates that a transport layer request was cancelled by the
// request opener
func (c *Channels) RequestCancelled(chid datatransfer.ChannelID, err error) error {
return c.send(chid, datatransfer.RequestCancelled, err)
}

// SendDataError indicates that the transport layer had an error trying
Expand Down
4 changes: 2 additions & 2 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ var ChannelEvents = fsm.Events{
chst.AddLog("data transfer receive error: %s", chst.Message)
return nil
}),
fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
fsm.Event(datatransfer.RequestCancelled).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error {
chst.Message = err.Error()
chst.AddLog("data transfer request timed out: %s", chst.Message)
chst.AddLog("data transfer request cancelled: %s", chst.Message)
return nil
}),
fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error {
Expand Down
7 changes: 5 additions & 2 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ const (
// data has been received.
DataReceivedProgress

// RequestTimedOut indicates that the transport layer had a timeout trying to
// make a request
// Deprecated in favour of RequestCancelled
RequestTimedOut

// SendDataError indicates that the transport layer had an error trying
Expand All @@ -105,6 +104,9 @@ const (

// TransferRequestQueued indicates that a new data transfer request has been queued in the transport layer
TransferRequestQueued

// RequestCancelled indicates that a transport layer request was cancelled by the request opener
RequestCancelled
)

// Events are human readable names for data transfer events
Expand Down Expand Up @@ -138,6 +140,7 @@ var Events = map[EventCode]string{
SendDataError: "SendDataError",
ReceiveDataError: "ReceiveDataError",
TransferRequestQueued: "TransferRequestQueued",
RequestCancelled: "RequestCancelled",
}

// Event is a struct containing information about a data transfer event
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.3
github.com/ipfs/go-graphsync v0.6.4
github.com/ipfs/go-graphsync v0.6.8
github.com/ipfs/go-ipfs-blockstore v1.0.1
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
Expand All @@ -33,5 +33,6 @@ require (
github.com/stretchr/testify v1.6.1
github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163
go.uber.org/atomic v1.6.0
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.6.4 h1:g6wFRK2BkLPnx8nfoSdnokp5gtpuGyWZjbqI6q3NGb8=
github.com/ipfs/go-graphsync v0.6.4/go.mod h1:5WyaeigpNdpiYQuW2vwpuecOoEfB4h747ZGEOKmAGTg=
github.com/ipfs/go-graphsync v0.6.8 h1:mgyPdBDPcgL8ujO132grQjP3rfQv+KN/riQzbmTVgg4=
github.com/ipfs/go-graphsync v0.6.8/go.mod h1:GdHT8JeuIZ0R4lSjFR16Oe4zPi5dXwKi9zR9ADVlcdk=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
Expand Down
6 changes: 3 additions & 3 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
return m.resumeOther(chid)
}

func (m *manager) OnRequestTimedOut(chid datatransfer.ChannelID, err error) error {
log.Warnf("channel %+v has timed out: %s", chid, err)
return m.channels.RequestTimedOut(chid, err)
func (m *manager) OnRequestCancelled(chid datatransfer.ChannelID, err error) error {
log.Warnf("channel %+v was cancelled: %s", chid, err)
return m.channels.RequestCancelled(chid, err)
}

func (m *manager) OnRequestDisconnected(chid datatransfer.ChannelID, err error) error {
Expand Down
2 changes: 1 addition & 1 deletion impl/restart_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestRestartPush(t *testing.T) {
require.EqualError(t, err, "context timed-out without completing data transfer")
require.True(t, len(receivedI) < totalIncrements)
require.NotEmpty(t, sentI)
t.Logf("not request was completed after disconnect")
t.Logf("request was not completed after disconnect")

// Connect the peers and restart
require.NoError(t, rh.gsData.Mn.LinkAll())
Expand Down
18 changes: 18 additions & 0 deletions testutil/fakegraphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ type FakeGraphSync struct {
pauseResponses chan PauseResponse
resumeResponses chan ResumeResponse
cancelResponses chan CancelResponse
cancelRequests chan graphsync.RequestID
persistenceOptionsLk sync.RWMutex
persistenceOptions map[string]PersistenceOption
leaveRequestsOpen bool
Expand All @@ -126,6 +127,7 @@ func NewFakeGraphSync() *FakeGraphSync {
pauseResponses: make(chan PauseResponse, 1),
resumeResponses: make(chan ResumeResponse, 1),
cancelResponses: make(chan CancelResponse, 1),
cancelRequests: make(chan graphsync.RequestID, 1),
persistenceOptions: make(map[string]PersistenceOption),
}
}
Expand Down Expand Up @@ -230,6 +232,17 @@ func (fgs *FakeGraphSync) AssertCancelResponseReceived(ctx context.Context, t *t
return cancelResponseReceived
}

// AssertCancelRequestReceived asserts a request was cancelled
func (fgs *FakeGraphSync) AssertCancelRequestReceived(ctx context.Context, t *testing.T) graphsync.RequestID {
select {
case <-ctx.Done():
t.Fatal("did not receive message sent")
return 0
case requestID := <-fgs.cancelRequests:
return requestID
}
}

// AssertHasPersistenceOption verifies that a persistence option was registered
func (fgs *FakeGraphSync) AssertHasPersistenceOption(t *testing.T, name string) PersistenceOption {
fgs.persistenceOptionsLk.RLock()
Expand Down Expand Up @@ -367,6 +380,11 @@ func (fgs *FakeGraphSync) PauseRequest(requestID graphsync.RequestID) error {
return nil
}

func (fgs *FakeGraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
fgs.cancelRequests <- requestID
return nil
}

// CancelResponse cancels a response in progress
func (fgs *FakeGraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error {
fgs.cancelResponses <- CancelResponse{p, requestID}
Expand Down
5 changes: 3 additions & 2 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ type EventsHandler interface {
// Error returns are logged but otherwise have no effect
OnChannelCompleted(chid ChannelID, err error) error

// OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out.
// OnRequestCancelled is called when a request we opened (with the given channel Id) to
// receive data is cancelled by us.
// Error returns are logged but otherwise have no effect
OnRequestTimedOut(chid ChannelID, err error) error
OnRequestCancelled(chid ChannelID, err error) error

// OnRequestDisconnected is called when a network error occurs trying to send a request
OnRequestDisconnected(chid ChannelID, err error) error
Expand Down
Loading

0 comments on commit f3280ab

Please sign in to comment.