-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathdatatransfer_handler.go
87 lines (75 loc) · 3.29 KB
/
datatransfer_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package retrievalprovider
import (
"context"
datatransfer "github.com/filecoin-project/go-data-transfer"
rm "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/venus-market/v2/models/repo"
)
// EventReceiver is any thing that can receive FSM events
type IDatatransferHandler interface {
//have many receiver function
HandleCompleteFor(context.Context, rm.ProviderDealIdentifier) error
HandleAcceptFor(context.Context, rm.ProviderDealIdentifier, datatransfer.ChannelID) error
HandleDisconnectFor(context.Context, rm.ProviderDealIdentifier, error) error
HandleCancelForDeal(context.Context, rm.ProviderDealIdentifier) error
HandleErrorForDeal(context.Context, rm.ProviderDealIdentifier, error) error
}
var _ IDatatransferHandler = (*DataTransferHandler)(nil)
type DataTransferHandler struct {
retrievalDealHandler IRetrievalHandler
retrievalDealStore repo.IRetrievalDealRepo
}
func NewDataTransferHandler(retrievalDealHandler IRetrievalHandler, retrievalDealStore repo.IRetrievalDealRepo) *DataTransferHandler {
return &DataTransferHandler{retrievalDealHandler: retrievalDealHandler, retrievalDealStore: retrievalDealStore}
}
func (d *DataTransferHandler) HandleCompleteFor(ctx context.Context, identifier rm.ProviderDealIdentifier) error {
deal, err := d.retrievalDealStore.GetDeal(ctx, identifier.Receiver, identifier.DealID)
if err != nil {
deal.Status = rm.DealStatusErrored
return d.retrievalDealStore.SaveDeal(ctx, deal)
}
return d.retrievalDealHandler.CleanupDeal(context.TODO(), deal)
}
func (d *DataTransferHandler) HandleAcceptFor(ctx context.Context, identifier rm.ProviderDealIdentifier, channelId datatransfer.ChannelID) error {
deal, err := d.retrievalDealStore.GetDeal(ctx, identifier.Receiver, identifier.DealID)
if err != nil {
deal.Status = rm.DealStatusErrored
return d.retrievalDealStore.SaveDeal(ctx, deal)
}
deal.ChannelID = &channelId
return d.retrievalDealHandler.UnsealData(ctx, deal)
}
func (d *DataTransferHandler) HandleDisconnectFor(ctx context.Context, identifier rm.ProviderDealIdentifier, errIn error) error {
deal, err := d.retrievalDealStore.GetDeal(ctx, identifier.Receiver, identifier.DealID)
if err != nil {
deal.Status = rm.DealStatusErrored
deal.Message = err.Error()
return d.retrievalDealStore.SaveDeal(ctx, deal)
}
return d.retrievalDealHandler.Error(ctx, deal, errIn)
}
func (d *DataTransferHandler) HandleCancelForDeal(ctx context.Context, identifier rm.ProviderDealIdentifier) error {
deal, err := d.retrievalDealStore.GetDeal(ctx, identifier.Receiver, identifier.DealID)
if err != nil {
deal.Status = rm.DealStatusErrored
return d.retrievalDealStore.SaveDeal(ctx, deal)
}
switch deal.Status {
case rm.DealStatusFailing:
case rm.DealStatusCancelling:
default:
if deal.Status != rm.DealStatusFailing {
deal.Message = "Client cancelled retrieval"
}
}
return d.retrievalDealStore.SaveDeal(ctx, deal)
}
func (d *DataTransferHandler) HandleErrorForDeal(ctx context.Context, identifier rm.ProviderDealIdentifier, errIn error) error {
deal, err := d.retrievalDealStore.GetDeal(ctx, identifier.Receiver, identifier.DealID)
if err != nil {
deal.Status = rm.DealStatusErrored
deal.Message = err.Error()
return d.retrievalDealStore.SaveDeal(ctx, deal)
}
return d.retrievalDealHandler.Error(ctx, deal, errIn)
}