Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove job on cancel transfer #3882

Merged
merged 2 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/unreleased/remove-job-on-cancel-transfer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Bugfix: Remove transfer on cancel should also remove transfer job

https://github.com/cs3org/reva/pull/3882
https://github.com/cs3org/reva/issues/3881
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,12 @@ transfer-retry -txId fe671ae3-0fbf-4b06-b7df-32418c2ebfcb
```

## 6 Cleanup transfers
Transfers will be removed from the db using the `transfer-cancel` command when the configuration property `remove_on_cancel` of the datatx service has been set to `true` as follows:
Transfers will be removed from the db using the `transfer-cancel` command when the configuration property `remove_transfer_on_cancel` and `remove_transfer_job_on_cancel` of the datatx service and rclone driver respectively have been set to `true` as follows:
```
[grpc.services.datatx]
remove_on_cancel = true
remove_transfer_on_cancel = true

[grpc.services.datatx.txdrivers.rclone]
remove_transfer_job_on_cancel = true
```
Currently this setting is recommended.
6 changes: 6 additions & 0 deletions examples/datatx/datatx.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ txdriver = "rclone"
tx_shares_file = ""
# base folder of the data transfers (eg. /home/DataTransfers)
data_transfers_folder = ""
# if set to 'true' the transfer will always be removed from the db upon cancel request
# recommended value is true
remove_transfer_on_cancel = true

# rclone driver
[grpc.services.datatx.txdrivers.rclone]
Expand All @@ -33,6 +36,9 @@ file = ""
job_status_check_interval = 2000
# the job timeout in milliseconds (must be long enough for big transfers!)
job_timeout = 120000
# if set to 'true' the transfer job will always be removed from the db upon transfer cancel request
# recommended value is true
remove_transfer_job_on_cancel = true

[http.services.ocdav]
# reva supports http third party copy
Expand Down
10 changes: 5 additions & 5 deletions internal/grpc/services/datatx/datatx.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ type config struct {
TxDriver string `mapstructure:"txdriver"`
TxDrivers map[string]map[string]interface{} `mapstructure:"txdrivers"`
// storage driver to persist share/transfer relation
StorageDriver string `mapstructure:"storage_driver"`
StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"`
TxSharesFile string `mapstructure:"tx_shares_file"`
RemoveOnCancel bool `mapstructure:"remove_on_cancel"`
StorageDriver string `mapstructure:"storage_driver"`
StorageDrivers map[string]map[string]interface{} `mapstructure:"storage_drivers"`
TxSharesFile string `mapstructure:"tx_shares_file"`
RemoveTransferOnCancel bool `mapstructure:"remove_transfer_on_cancel"`
}

type service struct {
Expand Down Expand Up @@ -209,7 +209,7 @@ func (s *service) CancelTransfer(ctx context.Context, req *datatx.CancelTransfer
}

transferRemovedMessage := ""
if s.conf.RemoveOnCancel {
if s.conf.RemoveTransferOnCancel {
delete(s.txShareDriver.model.TxShares, req.TxId.GetOpaqueId())
if err := s.txShareDriver.model.saveTxShare(); err != nil {
err = errors.Wrap(err, "datatx service: error deleting transfer: "+datatx.Status_STATUS_INVALID.String())
Expand Down
13 changes: 11 additions & 2 deletions internal/grpc/services/gateway/ocmshareprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
}, nil
}

// retrieve the persisted received share
// retrieve the current received share
getShareReq := &ocm.GetReceivedOCMShareRequest{
Ref: &ocm.ShareReference{
Spec: &ocm.ShareReference_Id{
Expand Down Expand Up @@ -234,7 +234,16 @@ func (s *svc) UpdateReceivedOCMShare(ctx context.Context, req *ocm.UpdateReceive
}
}
// handle transfer in case it has not already been accepted
if s.isTransferShare(share) && req.GetShare().State == ocm.ShareState_SHARE_STATE_ACCEPTED && share.State != ocm.ShareState_SHARE_STATE_ACCEPTED {
if s.isTransferShare(share) && req.GetShare().State == ocm.ShareState_SHARE_STATE_ACCEPTED {
if share.State == ocm.ShareState_SHARE_STATE_ACCEPTED {
log.Err(err).Msg("gateway: error calling UpdateReceivedShare, share already accepted.")
return &ocm.UpdateReceivedOCMShareResponse{
Status: &rpc.Status{
Code: rpc.Code_CODE_FAILED_PRECONDITION,
Message: "Share already accepted.",
},
}, err
}
// get provided destination path
transferDestinationPath, err := s.getTransferDestinationPath(ctx, req)
if err != nil {
Expand Down
47 changes: 31 additions & 16 deletions pkg/datatx/manager/rclone/rclone.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ func (c *config) init(m map[string]interface{}) {
}

type config struct {
Endpoint string `mapstructure:"endpoint"`
AuthUser string `mapstructure:"auth_user"` // rclone basicauth user
AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass
AuthHeader string `mapstructure:"auth_header"`
File string `mapstructure:"file"`
JobStatusCheckInterval int `mapstructure:"job_status_check_interval"`
JobTimeout int `mapstructure:"job_timeout"`
Insecure bool `mapstructure:"insecure"`
Endpoint string `mapstructure:"endpoint"`
AuthUser string `mapstructure:"auth_user"` // rclone basicauth user
AuthPass string `mapstructure:"auth_pass"` // rclone basicauth pass
AuthHeader string `mapstructure:"auth_header"`
File string `mapstructure:"file"`
JobStatusCheckInterval int `mapstructure:"job_status_check_interval"`
JobTimeout int `mapstructure:"job_timeout"`
Insecure bool `mapstructure:"insecure"`
RemoveTransferJobOnCancel bool `mapstructure:"remove_transfer_job_on_cancel"`
}

type rclone struct {
Expand Down Expand Up @@ -644,10 +645,24 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d
Ctime: nil,
}, err
}

cTime, _ := strconv.ParseInt(transfer.Ctime, 10, 64)
transferRemovedMessage := ""
if driver.config.RemoveTransferJobOnCancel {
delete(driver.pDriver.model.Transfers, transfer.TransferID)
if err := driver.pDriver.model.saveTransfer(nil); err != nil {
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Ctime: &typespb.Timestamp{Seconds: uint64(cTime)},
}, err
}
transferRemovedMessage = "(transfer job successfully removed)"
}

_, endStatusFound := txEndStatuses[transfer.TransferStatus.String()]
if endStatusFound {
err := errors.New("rclone driver: transfer already in end state")
err := errors.Wrapf(errors.New("rclone driver: transfer already in end state"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -665,7 +680,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d

data, err := json.Marshal(rcloneCancelTransferReq)
if err != nil {
err = errors.Wrap(err, "rclone driver: error marshalling rclone req data")
err := errors.Wrapf(errors.New("rclone driver: error marshalling rclone req data"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -677,7 +692,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d

u, err := url.Parse(driver.config.Endpoint)
if err != nil {
err = errors.Wrap(err, "rclone driver: error parsing driver endpoint")
err := errors.Wrapf(errors.New("rclone driver: error parsing driver endpoint"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -689,7 +704,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d

req, err := http.NewRequest(http.MethodPost, requestURL, bytes.NewReader(data))
if err != nil {
err = errors.Wrap(err, "rclone driver: error framing post request")
err := errors.Wrapf(errors.New("rclone driver: error framing post request"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -702,7 +717,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d

res, err := driver.client.Do(req)
if err != nil {
err = errors.Wrap(err, "rclone driver: error sending post request")
err := errors.Wrapf(errors.New("rclone driver: error sending post request"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -715,14 +730,14 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d
if res.StatusCode != http.StatusOK {
var errorResData rcloneHTTPErrorRes
if err = json.NewDecoder(res.Body).Decode(&errorResData); err != nil {
err = errors.Wrap(err, "rclone driver: error decoding response data")
err := errors.Wrapf(errors.New("rclone driver: error decoding response data"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Ctime: &typespb.Timestamp{Seconds: uint64(cTime)},
}, err
}
err = errors.Wrap(errors.Errorf("status: %v, error: %v", errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error")
err = errors.Wrap(errors.Errorf("%v, status: %v, error: %v", transferRemovedMessage, errorResData.Status, errorResData.Error), "rclone driver: rclone request responded with error")
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand All @@ -744,7 +759,7 @@ func (driver *rclone) CancelTransfer(ctx context.Context, transferID string) (*d
}
var resData rcloneCancelTransferResJSON
if err = json.NewDecoder(res.Body).Decode(&resData); err != nil {
err = errors.Wrap(err, "rclone driver: error decoding response data")
err := errors.Wrapf(errors.New("rclone driver: error decoding response data"), transferRemovedMessage)
return &datatx.TxInfo{
Id: &datatx.TxId{OpaqueId: transferID},
Status: datatx.Status_STATUS_INVALID,
Expand Down