Skip to content

Commit

Permalink
Handle data-sent and data-queued events in the TransferFinished state (
Browse files Browse the repository at this point in the history
…#233)

* handle data-sent and data-queued events in the TransferFinished state

* go fmt
  • Loading branch information
aarshkshah1992 authored Aug 18, 2021
1 parent 0e3b2a1 commit 8931e01
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 8 deletions.
25 changes: 17 additions & 8 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,29 @@ var ChannelEvents = fsm.Events{
chst.AddLog("received data")
return nil
}),
fsm.Event(datatransfer.DataSent).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.DataSent).
FromMany(transferringStates...).ToNoChange().
From(datatransfer.TransferFinished).ToNoChange().
Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.DataSentProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Sent += delta
chst.AddLog("sending data")
return nil
}),
fsm.Event(datatransfer.DataQueued).FromMany(transferringStates...).ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),

fsm.Event(datatransfer.DataQueued).
FromMany(transferringStates...).ToNoChange().
From(datatransfer.TransferFinished).ToNoChange().
Action(func(chst *internal.ChannelState) error {
chst.AddLog("")
return nil
}),
fsm.Event(datatransfer.DataQueuedProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Queued += delta
Expand Down
32 changes: 32 additions & 0 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,38 @@ func TestChannels(t *testing.T) {
require.Equal(t, state.Status(), datatransfer.Ongoing)
})

t.Run("datasent/queued when transfer is already finished", func(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
dir := os.TempDir()
cidLists, err := cidlists.NewCIDLists(dir)
require.NoError(t, err)
channelList, err := channels.New(ds, cidLists, notifier, decoderByType, decoderByType, &fakeEnv{}, peers[0])
require.NoError(t, err)
err = channelList.Start(ctx)
require.NoError(t, err)

chid, err := channelList.CreateNew(peers[0], tid1, cids[0], selector, fv1, peers[0], peers[0], peers[1])
require.NoError(t, err)
checkEvent(ctx, t, received, datatransfer.Open)

// move the channel to `TransferFinished` state.
require.NoError(t, channelList.FinishTransfer(chid))
state := checkEvent(ctx, t, received, datatransfer.FinishTransfer)
require.Equal(t, datatransfer.TransferFinished, state.Status())

// send a data-sent event and ensure it's a no-op
_, err = channelList.DataSent(chid, cids[1], 1)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, datatransfer.TransferFinished, state.Status())

// send a data-queued event and ensure it's a no-op.
_, err = channelList.DataQueued(chid, cids[1], 1)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataQueued)
require.Equal(t, datatransfer.TransferFinished, state.Status())
})

t.Run("updating send/receive values", func(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
dir := os.TempDir()
Expand Down

0 comments on commit 8931e01

Please sign in to comment.