diff --git a/channels/channels.go b/channels/channels.go index 9aac1ef1..9aa3834c 100644 --- a/channels/channels.go +++ b/channels/channels.go @@ -332,10 +332,10 @@ func (c *Channels) Disconnected(chid datatransfer.ChannelID, err error) error { return c.send(chid, datatransfer.Disconnected, err) } -// RequestTimedOut indicates that the transport layer had a timeout trying to -// make a request -func (c *Channels) RequestTimedOut(chid datatransfer.ChannelID, err error) error { - return c.send(chid, datatransfer.RequestTimedOut, err) +// RequestCancelled indicates that a transport layer request was cancelled by the +// request opener +func (c *Channels) RequestCancelled(chid datatransfer.ChannelID, err error) error { + return c.send(chid, datatransfer.RequestCancelled, err) } // SendDataError indicates that the transport layer had an error trying diff --git a/channels/channels_fsm.go b/channels/channels_fsm.go index 02768eee..cf20f870 100644 --- a/channels/channels_fsm.go +++ b/channels/channels_fsm.go @@ -95,9 +95,9 @@ var ChannelEvents = fsm.Events{ chst.AddLog("data transfer receive error: %s", chst.Message) return nil }), - fsm.Event(datatransfer.RequestTimedOut).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error { + fsm.Event(datatransfer.RequestCancelled).FromAny().ToNoChange().Action(func(chst *internal.ChannelState, err error) error { chst.Message = err.Error() - chst.AddLog("data transfer request timed out: %s", chst.Message) + chst.AddLog("data transfer request cancelled: %s", chst.Message) return nil }), fsm.Event(datatransfer.Error).FromAny().To(datatransfer.Failing).Action(func(chst *internal.ChannelState, err error) error { diff --git a/events.go b/events.go index 12158565..2de9039e 100644 --- a/events.go +++ b/events.go @@ -91,8 +91,7 @@ const ( // data has been received. DataReceivedProgress - // RequestTimedOut indicates that the transport layer had a timeout trying to - // make a request + // Deprecated in favour of RequestCancelled RequestTimedOut // SendDataError indicates that the transport layer had an error trying @@ -105,6 +104,9 @@ const ( // TransferRequestQueued indicates that a new data transfer request has been queued in the transport layer TransferRequestQueued + + // RequestCancelled indicates that a transport layer request was cancelled by the request opener + RequestCancelled ) // Events are human readable names for data transfer events @@ -138,6 +140,7 @@ var Events = map[EventCode]string{ SendDataError: "SendDataError", ReceiveDataError: "ReceiveDataError", TransferRequestQueued: "TransferRequestQueued", + RequestCancelled: "RequestCancelled", } // Event is a struct containing information about a data transfer event diff --git a/go.mod b/go.mod index cf61f158..046cdcb5 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-datastore v0.4.5 github.com/ipfs/go-ds-badger v0.2.3 - github.com/ipfs/go-graphsync v0.6.4 + github.com/ipfs/go-graphsync v0.6.8 github.com/ipfs/go-ipfs-blockstore v1.0.1 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 @@ -33,5 +33,6 @@ require ( github.com/stretchr/testify v1.6.1 github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163 go.uber.org/atomic v1.6.0 + golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 ) diff --git a/go.sum b/go.sum index d76c44e5..e27f8c98 100644 --- a/go.sum +++ b/go.sum @@ -215,8 +215,8 @@ github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBR github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= -github.com/ipfs/go-graphsync v0.6.4 h1:g6wFRK2BkLPnx8nfoSdnokp5gtpuGyWZjbqI6q3NGb8= -github.com/ipfs/go-graphsync v0.6.4/go.mod h1:5WyaeigpNdpiYQuW2vwpuecOoEfB4h747ZGEOKmAGTg= +github.com/ipfs/go-graphsync v0.6.8 h1:mgyPdBDPcgL8ujO132grQjP3rfQv+KN/riQzbmTVgg4= +github.com/ipfs/go-graphsync v0.6.8/go.mod h1:GdHT8JeuIZ0R4lSjFR16Oe4zPi5dXwKi9zR9ADVlcdk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ= diff --git a/impl/events.go b/impl/events.go index dcea7f8f..6a8ee5b7 100644 --- a/impl/events.go +++ b/impl/events.go @@ -216,9 +216,9 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat return m.resumeOther(chid) } -func (m *manager) OnRequestTimedOut(chid datatransfer.ChannelID, err error) error { - log.Warnf("channel %+v has timed out: %s", chid, err) - return m.channels.RequestTimedOut(chid, err) +func (m *manager) OnRequestCancelled(chid datatransfer.ChannelID, err error) error { + log.Warnf("channel %+v was cancelled: %s", chid, err) + return m.channels.RequestCancelled(chid, err) } func (m *manager) OnRequestDisconnected(chid datatransfer.ChannelID, err error) error { diff --git a/impl/restart_integration_test.go b/impl/restart_integration_test.go index ed2a7337..997bb394 100644 --- a/impl/restart_integration_test.go +++ b/impl/restart_integration_test.go @@ -201,7 +201,7 @@ func TestRestartPush(t *testing.T) { require.EqualError(t, err, "context timed-out without completing data transfer") require.True(t, len(receivedI) < totalIncrements) require.NotEmpty(t, sentI) - t.Logf("not request was completed after disconnect") + t.Logf("request was not completed after disconnect") // Connect the peers and restart require.NoError(t, rh.gsData.Mn.LinkAll()) diff --git a/testutil/fakegraphsync.go b/testutil/fakegraphsync.go index 0348b44e..76c2e422 100644 --- a/testutil/fakegraphsync.go +++ b/testutil/fakegraphsync.go @@ -100,6 +100,7 @@ type FakeGraphSync struct { pauseResponses chan PauseResponse resumeResponses chan ResumeResponse cancelResponses chan CancelResponse + cancelRequests chan graphsync.RequestID persistenceOptionsLk sync.RWMutex persistenceOptions map[string]PersistenceOption leaveRequestsOpen bool @@ -126,6 +127,7 @@ func NewFakeGraphSync() *FakeGraphSync { pauseResponses: make(chan PauseResponse, 1), resumeResponses: make(chan ResumeResponse, 1), cancelResponses: make(chan CancelResponse, 1), + cancelRequests: make(chan graphsync.RequestID, 1), persistenceOptions: make(map[string]PersistenceOption), } } @@ -230,6 +232,17 @@ func (fgs *FakeGraphSync) AssertCancelResponseReceived(ctx context.Context, t *t return cancelResponseReceived } +// AssertCancelRequestReceived asserts a request was cancelled +func (fgs *FakeGraphSync) AssertCancelRequestReceived(ctx context.Context, t *testing.T) graphsync.RequestID { + select { + case <-ctx.Done(): + t.Fatal("did not receive message sent") + return 0 + case requestID := <-fgs.cancelRequests: + return requestID + } +} + // AssertHasPersistenceOption verifies that a persistence option was registered func (fgs *FakeGraphSync) AssertHasPersistenceOption(t *testing.T, name string) PersistenceOption { fgs.persistenceOptionsLk.RLock() @@ -367,6 +380,11 @@ func (fgs *FakeGraphSync) PauseRequest(requestID graphsync.RequestID) error { return nil } +func (fgs *FakeGraphSync) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error { + fgs.cancelRequests <- requestID + return nil +} + // CancelResponse cancels a response in progress func (fgs *FakeGraphSync) CancelResponse(p peer.ID, requestID graphsync.RequestID) error { fgs.cancelResponses <- CancelResponse{p, requestID} diff --git a/transport.go b/transport.go index 7fbee093..4f1655cb 100644 --- a/transport.go +++ b/transport.go @@ -56,9 +56,10 @@ type EventsHandler interface { // Error returns are logged but otherwise have no effect OnChannelCompleted(chid ChannelID, err error) error - // OnRequestTimedOut is called when a request we opened (with the given channel Id) to receive data times out. + // OnRequestCancelled is called when a request we opened (with the given channel Id) to + // receive data is cancelled by us. // Error returns are logged but otherwise have no effect - OnRequestTimedOut(chid ChannelID, err error) error + OnRequestCancelled(chid ChannelID, err error) error // OnRequestDisconnected is called when a network error occurs trying to send a request OnRequestDisconnected(chid ChannelID, err error) error diff --git a/transport/graphsync/graphsync.go b/transport/graphsync/graphsync.go index 7fbf44a7..4845033d 100644 --- a/transport/graphsync/graphsync.go +++ b/transport/graphsync/graphsync.go @@ -12,6 +12,7 @@ import ( logging "github.com/ipfs/go-log/v2" ipld "github.com/ipld/go-ipld-prime" peer "github.com/libp2p/go-libp2p-core/peer" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" datatransfer "github.com/filecoin-project/go-data-transfer" @@ -22,9 +23,8 @@ var log = logging.Logger("dt_graphsync") // When restarting a data transfer, we cancel the existing graphsync request // before opening a new one. -// These constants define the minimum and maximum time to wait for the request -// to be cancelled. -const minGSCancelWait = 100 * time.Millisecond +// This constant defines the maximum time to wait for the request to be +// cancelled. const maxGSCancelWait = time.Second type graphsyncKey struct { @@ -183,30 +183,24 @@ func (t *Transport) executeGsRequest(req *gsReq) { lastError := t.consumeResponses(req.responseChan, req.errChan) - if _, ok := lastError.(graphsync.RequestContextCancelledErr); ok { - terr := xerrors.Errorf("graphsync request context cancelled") - log.Warnf("channel id %v: %s", req.channelID, terr) - if err := t.events.OnRequestTimedOut(req.channelID, terr); err != nil { + // Request cancelled by client + if _, ok := lastError.(graphsync.RequestClientCancelledErr); ok { + terr := xerrors.Errorf("graphsync request cancelled") + log.Warnf("channel %s: %s", req.channelID, terr) + if err := t.events.OnRequestCancelled(req.channelID, terr); err != nil { log.Error(err) } return } + // Request cancelled by responder if _, ok := lastError.(graphsync.RequestCancelledErr); ok { // TODO Should we do anything for RequestCancelledErr ? return } - // TODO: There seems to be a bug in graphsync. I believe it should return - // graphsync.RequestCancelledErr on the errChan if the request's context is - // cancelled, but it doesn't seem to be doing that - if req.gsReqCtx.Err() != nil { - log.Warnf("graphsync request cancelled for channel %s", req.channelID) - return - } - if lastError != nil { - log.Warnf("graphsync error: %s", lastError.Error()) + log.Warnf("graphsync error on channel %s: %s", req.channelID, lastError) } log.Debugf("finished executing graphsync request for channel %s", req.channelID) @@ -223,7 +217,7 @@ func (t *Transport) executeGsRequest(req *gsReq) { err := t.events.OnChannelCompleted(req.channelID, completeErr) if err != nil { - log.Errorf("processing OnChannelCompleted: %s", err) + log.Errorf("processing OnChannelCompleted %s: %s", req.channelID, err) } } @@ -256,7 +250,12 @@ func (t *Transport) CloseChannel(ctx context.Context, chid datatransfer.ChannelI if err != nil { return err } - return ch.close() + + err = ch.close(ctx) + if err != nil { + return xerrors.Errorf("closing channel: %w", err) + } + return nil } // CleanupChannel is called on the otherside of a cancel - removes any associated @@ -309,10 +308,18 @@ func (t *Transport) Shutdown(ctx context.Context) error { t.dtChannelsLk.Lock() defer t.dtChannelsLk.Unlock() + var eg errgroup.Group for _, ch := range t.dtChannels { - ch.shutdown() + ch := ch + eg.Go(func() error { + return ch.shutdown(ctx) + }) } + err := eg.Wait() + if err != nil { + return xerrors.Errorf("shutting down graphsync transport: %w", err) + } return nil } @@ -823,13 +830,6 @@ func (t *Transport) getDTChannel(chid datatransfer.ChannelID) (*dtChannel, error return ch, nil } -// Info needed to cancel a graphsync request, and wait for the cancellation -// to complete -type cancelRequest struct { - cancel context.CancelFunc - completed chan struct{} -} - // Info needed to keep track of a data transfer channel type dtChannel struct { peerID peer.ID @@ -840,8 +840,8 @@ type dtChannel struct { lk sync.RWMutex isOpen bool - gsKey graphsyncKey - cancelReq *cancelRequest + gsKey *graphsyncKey + completed chan struct{} requesterCancelled bool xferStarted bool pendingExtensions []graphsync.ExtensionData @@ -855,7 +855,6 @@ type dtChannel struct { // Info needed to monitor an ongoing graphsync request type gsReq struct { channelID datatransfer.ChannelID - gsReqCtx context.Context responseChan <-chan graphsync.ResponseProgress errChan <-chan error onComplete func() @@ -867,51 +866,46 @@ func (c *dtChannel) open(ctx context.Context, chid datatransfer.ChannelID, dataS defer c.lk.Unlock() // If there is an existing graphsync request for this channelID - if c.isOpen { + if c.gsKey != nil { // Cancel the existing graphsync request - log.Warnf("Restarting %s - canceling existing graphsync request for channel", chid) - completed := c.cancelReq.completed - c.cancelReq.cancel() + completed := c.completed + err := c.cancelRequest(ctx) + if err != nil { + return nil, xerrors.Errorf("restarting graphsync request: %w", err) + } // Wait for the cancel to go through - err := waitForCancelComplete(ctx, completed) + err = waitForCancelComplete(ctx, completed) if err != nil { - return nil, err + return nil, xerrors.Errorf("waiting for cancelled graphsync request to complete: %w", err) } } - // Create a cancellable context for the channel so that the graphsync - // request can be cancelled - gsReqCtx, gsReqCancel := context.WithCancel(ctx) - c.cancelReq = &cancelRequest{ - cancel: gsReqCancel, - completed: make(chan struct{}), + // Set up a completed channel that will be closed when the request + // completes (or is cancelled) + completed := make(chan struct{}) + onComplete := func() { + close(completed) } + c.completed = completed - // Open graphsync request + // Open a new graphsync request log.Infof("Opening graphsync request to %s for root %s with %d CIDs already received", dataSender, root, len(doNotSendCids)) - responseChan, errChan := c.gs.Request(gsReqCtx, dataSender, root, stor, exts...) + responseChan, errChan := c.gs.Request(ctx, dataSender, root, stor, exts...) // Wait for graphsync "request opened" callback select { case <-ctx.Done(): return nil, ctx.Err() case gsKey := <-c.opened: + // Mark the channel as open and save the Graphsync request key c.isOpen = true - - // Save the Graphsync request key - c.gsKey = gsKey - } - - // When the transfer completes, close the completed channel - onComplete := func() { - close(c.cancelReq.completed) + c.gsKey = &gsKey } return &gsReq{ channelID: chid, - gsReqCtx: gsReqCtx, responseChan: responseChan, errChan: errChan, onComplete: onComplete, @@ -923,18 +917,7 @@ func waitForCancelComplete(ctx context.Context, completed chan struct{}) error { // the graphsync request to complete select { case <-completed: - // Graphsync request has completed. - // Now wait for a minimum backoff before initiating another - // graphsync request. - // We need to do this to make sure that graphsync has finished - // emitting all events for the current request before - // initiating a new one. - select { - case <-time.After(minGSCancelWait): - return nil - case <-ctx.Done(): - return ctx.Err() - } + return nil case <-time.After(maxGSCancelWait): // Fail-safe: give up waiting after a certain amount of time return nil @@ -958,7 +941,8 @@ func (c *dtChannel) gsReqOpened(gsKey graphsyncKey, hookActions graphsync.Outgoi } // gsDataRequestRcvd is called when the transport receives an incoming request -// for data +// for data. +// Note: Must be called under the lock. func (c *dtChannel) gsDataRequestRcvd(gsKey graphsyncKey, hookActions graphsync.IncomingRequestHookActions) { log.Debugf("%s: received request for data", c.channelID) @@ -981,7 +965,7 @@ func (c *dtChannel) gsDataRequestRcvd(gsKey graphsyncKey, hookActions graphsync. // Save a mapping from the graphsync key to the channel ID so that // subsequent graphsync callbacks are associated with this channel - c.gsKey = gsKey + c.gsKey = &gsKey log.Infow("incoming graphsync request", "peer", gsKey.p, "graphsync request id", gsKey.requestID, "data transfer channel id", c.channelID) c.gsKeyToChannelID.set(gsKey, c.channelID) @@ -1051,16 +1035,14 @@ func (c *dtChannel) resume(msg datatransfer.Message) error { return c.gs.UnpauseResponse(c.gsKey.p, c.gsKey.requestID, extensions...) } -func (c *dtChannel) close() error { +func (c *dtChannel) close(ctx context.Context) error { c.lk.Lock() defer c.lk.Unlock() // If it's a graphsync request if c.gsKey.p == c.peerID { // Cancel the request - log.Debugf("%s: cancelling request", c.channelID) - c.cancelReq.cancel() - return nil + return c.cancelRequest(ctx) } // It's a graphsync response @@ -1091,7 +1073,7 @@ func (c *dtChannel) hasStore() bool { } // Use the given loader and storer to get / put blocks for the data-transfer. -// Note that each data-transfer channel uses a separate multi-store. +// Note that each data-transfer channel uses a separate blockstore. func (c *dtChannel) useStore(loader ipld.Loader, storer ipld.Storer) error { c.storeLk.Lock() defer c.storeLk.Unlock() @@ -1126,14 +1108,35 @@ func (c *dtChannel) cleanup() { c.gsKeyToChannelID.deleteRefs(c.channelID) } -func (c *dtChannel) shutdown() { +func (c *dtChannel) shutdown(ctx context.Context) error { c.lk.Lock() defer c.lk.Unlock() // Cancel the graphsync request - if c.cancelReq != nil { - c.cancelReq.cancel() + return c.cancelRequest(ctx) +} + +// Cancel the graphsync request. +// Note: must be called under the lock. +func (c *dtChannel) cancelRequest(ctx context.Context) error { + // Check that the request has not already been cancelled + if c.gsKey == nil { + return nil } + + log.Debugf("%s: cancelling request", c.channelID) + err := c.gs.CancelRequest(ctx, c.gsKey.requestID) + if err != nil { + // Ignore "request not found" errors + if !xerrors.Is(graphsync.RequestNotFoundErr{}, err) { + return xerrors.Errorf("cancelling graphsync request for channel %s: %w", c.channelID, err) + } + } + + // Clear the graphsync key to indicate that the request has been cancelled + c.gsKey = nil + + return nil } // Used in graphsync callbacks to map from graphsync request to the diff --git a/transport/graphsync/graphsync_test.go b/transport/graphsync/graphsync_test.go index 3916c1d7..b077e694 100644 --- a/transport/graphsync/graphsync_test.go +++ b/transport/graphsync/graphsync_test.go @@ -751,11 +751,12 @@ func TestManager(t *testing.T) { gsData.outgoing) }, check: func(t *testing.T, events *fakeEvents, gsData *harness) { - requestReceived1 := gsData.fgs.AssertRequestReceived(gsData.ctx, t) - requestReceived2 := gsData.fgs.AssertRequestReceived(gsData.ctx, t) + gsData.fgs.AssertRequestReceived(gsData.ctx, t) + gsData.fgs.AssertRequestReceived(gsData.ctx, t) - require.Error(t, requestReceived1.Ctx.Err()) - require.NoError(t, requestReceived2.Ctx.Err()) + ctxt, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + gsData.fgs.AssertCancelRequestReceived(ctxt, t) }, }, "OnChannelCompleted called when outgoing request completes successfully": { @@ -835,9 +836,9 @@ func TestManager(t *testing.T) { request := testutil.NewFakeRequest(graphsync.RequestID(rand.Int31()), extensions) gsData.fgs.OutgoingRequestHook(gsData.other, request, gsData.outgoingRequestHookActions) _ = gsData.transport.CloseChannel(gsData.ctx, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}) - require.Eventually(t, func() bool { - return requestReceived.Ctx.Err() != nil - }, 2*time.Second, 100*time.Millisecond) + ctxt, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + gsData.fgs.AssertCancelRequestReceived(ctxt, t) }, }, "request times out if we get request context cancelled error": { @@ -858,13 +859,13 @@ func TestManager(t *testing.T) { check: func(t *testing.T, events *fakeEvents, gsData *harness) { requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t) close(requestReceived.ResponseChan) - requestReceived.ResponseErrChan <- graphsync.RequestContextCancelledErr{} + requestReceived.ResponseErrChan <- graphsync.RequestClientCancelledErr{} close(requestReceived.ResponseErrChan) require.Eventually(t, func() bool { - return events.OnRequestTimedOutCalled == true + return events.OnRequestCancelledCalled == true }, 2*time.Second, 100*time.Millisecond) - require.Equal(t, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, events.OnRequestTimedOutChannelId) + require.Equal(t, datatransfer.ChannelID{ID: gsData.transferID, Responder: gsData.other, Initiator: gsData.self}, events.OnRequestCancelledChannelId) }, }, "request cancelled out if transport shuts down": { @@ -883,12 +884,14 @@ func TestManager(t *testing.T) { gsData.outgoing) }, check: func(t *testing.T, events *fakeEvents, gsData *harness) { - requestReceived := gsData.fgs.AssertRequestReceived(gsData.ctx, t) + gsData.fgs.AssertRequestReceived(gsData.ctx, t) gsData.transport.Shutdown(gsData.ctx) - require.Eventually(t, func() bool { - return requestReceived.Ctx.Err() != nil - }, 2*time.Second, 100*time.Millisecond) + + ctxt, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + gsData.fgs.AssertCancelRequestReceived(ctxt, t) + require.Nil(t, gsData.fgs.IncomingRequestHook) require.Nil(t, gsData.fgs.CompletedResponseListener) require.Nil(t, gsData.fgs.IncomingBlockHook) @@ -1043,8 +1046,8 @@ type fakeEvents struct { OnDataQueuedMessage datatransfer.Message OnDataQueuedError error - OnRequestTimedOutCalled bool - OnRequestTimedOutChannelId datatransfer.ChannelID + OnRequestCancelledCalled bool + OnRequestCancelledChannelId datatransfer.ChannelID OnSendDataErrorCalled bool OnSendDataErrorChannelID datatransfer.ChannelID OnReceiveDataErrorCalled bool @@ -1065,9 +1068,9 @@ func (fe *fakeEvents) OnDataQueued(chid datatransfer.ChannelID, link ipld.Link, return fe.OnDataQueuedMessage, fe.OnDataQueuedError } -func (fe *fakeEvents) OnRequestTimedOut(chid datatransfer.ChannelID, err error) error { - fe.OnRequestTimedOutCalled = true - fe.OnRequestTimedOutChannelId = chid +func (fe *fakeEvents) OnRequestCancelled(chid datatransfer.ChannelID, err error) error { + fe.OnRequestCancelledCalled = true + fe.OnRequestCancelledChannelId = chid return nil }