Skip to content

Commit

Permalink
fix(dtutils): minor refactor/cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward committed Sep 28, 2020
1 parent 19176f4 commit 878d645
Showing 1 changed file with 40 additions and 29 deletions.
69 changes: 40 additions & 29 deletions retrievalmarket/impl/dtutils/dtutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,10 @@ func providerEvent(event datatransfer.Event, channelState datatransfer.ChannelSt
// event or moving to error if a data transfer error occurs
func ProviderDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
dealProposal, ok := channelState.Voucher().(*rm.DealProposal)
dealProposal, ok := dealProposalFromVoucher(channelState.Voucher())
// if this event is for a transfer not related to storage, ignore
if !ok {
legacyProposal, ok := channelState.Voucher().(*migrations.DealProposal0)
if !ok {
return
}
newProposal := migrations.MigrateDealProposal0To1(*legacyProposal)
dealProposal = &newProposal
return
}

if channelState.Status() == datatransfer.Completed {
Expand Down Expand Up @@ -110,16 +105,12 @@ func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelStat
case datatransfer.Cancel:
return rm.ClientEventProviderCancelled, nil
case datatransfer.NewVoucherResult:
response, ok := channelState.LastVoucherResult().(*rm.DealResponse)
response, ok := dealResponseFromVoucherResult(channelState.LastVoucherResult())
if !ok {
legacyResponse, ok := channelState.LastVoucherResult().(*migrations.DealResponse0)
if !ok {
log.Errorf("unexpected voucher result received: %s", channelState.LastVoucher().Type())
return noEvent, nil
}
newResponse := migrations.MigrateDealResponse0To1(*legacyResponse)
response = &newResponse
log.Errorf("unexpected voucher result received: %s", channelState.LastVoucher().Type())
return noEvent, nil
}

return clientEventForResponse(response)
case datatransfer.Error:
if channelState.Message() == datatransfer.ErrRejected.Error() {
Expand All @@ -138,16 +129,11 @@ func clientEvent(event datatransfer.Event, channelState datatransfer.ChannelStat
// an event to the appropriate state machine
func ClientDataTransferSubscriber(deals EventReceiver) datatransfer.Subscriber {
return func(event datatransfer.Event, channelState datatransfer.ChannelState) {
dealProposal, ok := channelState.Voucher().(*rm.DealProposal)
dealProposal, ok := dealProposalFromVoucher(channelState.Voucher())

// if this event is for a transfer not related to retrieval, ignore
if !ok {
legacyProposal, ok := channelState.Voucher().(*migrations.DealProposal0)
if !ok {
return
}
newProposal := migrations.MigrateDealProposal0To1(*legacyProposal)
dealProposal = &newProposal
return
}

retrievalEvent, params := clientEvent(event, channelState)
Expand Down Expand Up @@ -177,14 +163,9 @@ type StoreConfigurableTransport interface {
// TransportConfigurer configurers the graphsync transport to use a custom blockstore per deal
func TransportConfigurer(thisPeer peer.ID, storeGetter StoreGetter) datatransfer.TransportConfigurer {
return func(channelID datatransfer.ChannelID, voucher datatransfer.Voucher, transport datatransfer.Transport) {
dealProposal, ok := voucher.(*rm.DealProposal)
dealProposal, ok := dealProposalFromVoucher(voucher)
if !ok {
legacyProposal, ok := voucher.(*migrations.DealProposal0)
if !ok {
return
}
newProposal := migrations.MigrateDealProposal0To1(*legacyProposal)
dealProposal = &newProposal
return
}
gsTransport, ok := transport.(StoreConfigurableTransport)
if !ok {
Expand All @@ -205,3 +186,33 @@ func TransportConfigurer(thisPeer peer.ID, storeGetter StoreGetter) datatransfer
}
}
}

func dealProposalFromVoucher(voucher datatransfer.Voucher) (*rm.DealProposal, bool) {
dealProposal, ok := voucher.(*rm.DealProposal)
// if this event is for a transfer not related to storage, ignore
if ok {
return dealProposal, true
}

legacyProposal, ok := voucher.(*migrations.DealProposal0)
if !ok {
return nil, false
}
newProposal := migrations.MigrateDealProposal0To1(*legacyProposal)
return &newProposal, true
}

func dealResponseFromVoucherResult(vres datatransfer.VoucherResult) (*rm.DealResponse, bool) {
dealResponse, ok := vres.(*rm.DealResponse)
// if this event is for a transfer not related to storage, ignore
if ok {
return dealResponse, true
}

legacyResponse, ok := vres.(*migrations.DealResponse0)
if !ok {
return nil, false
}
newResponse := migrations.MigrateDealResponse0To1(*legacyResponse)
return &newResponse, true
}

0 comments on commit 878d645

Please sign in to comment.