From 2099e2ead4e960c9a820405997c4ad762f16d3ba Mon Sep 17 00:00:00 2001 From: Michal-Leszczynski <74614433+Michal-Leszczynski@users.noreply.github.com> Date: Tue, 15 Oct 2024 11:14:35 +0200 Subject: [PATCH] Restore improvement: transfers (#4054) * chore(go.mod): bump SM sub packages versions * feat(rcserver): implement rate limit/transfers change on upload/download This commit allows for greater control over transfers and rate limit as a part of the upload/download rclone API used by backup/restore tasks. It also makes the changes more persistent in case of agent restarts. It also adds the core/transfers endpoint that can be used outside the upload/download rclone API in order to set the amount of transfers for the following rclone API calls. * feat(scyllaclient): add RcloneSet/GetTransfers to client This commit exposes the /rclone/core/transfers agent endpoint via agent client. * feat(scyllaclient): set transfers in RcloneCopyPaths This commit requires changes in the usage of RcloneCopyPaths in restore pkg. It also adds Transfers field to restore Target with the default value -1 meaning that transfers will be set to the amount specified in agent rclone server config. * feat(restore): support max transfers (0) and make it default Our experiments showed that the fastest file download was achieved for transfers=2*2*node_shard_cnt. In order to make it easier for the user to use, a new, special value of transfers=0 will take care of that. It is set as the default, because we aim to make not configured restore as fast as possible. * feat(command/restore): add --transfers flag This commit allows user to control the amount of rclone transfers used during restore. Moreover, during development days of restore task, the default value of --parallel flag was changed from 1 to 0, but we forgot to change it in the flag documentation. * feat(backup): add Transfers to Target This is the first step to control transfers in the context of backup. * feat(scyllaclient): set transfers in rcloneMoveOrCopyDir This commit requires changes in the usage of RcloneMoveDir in backup pkg. * feat(command/backup): add --transfers flag This commit allows user to control the amount of rclone transfers used during backup. * feat(restore_test): extend TestRestoreTablesPreparationIntegration with transfers This way this test also checks transfers before and after backup. It also checks transfers before, in the middle, when paused, when resumed, and after restore. This required some changes to the test like swapping src and dst cluster (increase batch count) and hanging on LAS instead of copy paths (transfer change is applied as a part of copy paths). --- .../source/sctool/partials/sctool_backup.yaml | 5 + .../sctool/partials/sctool_backup_update.yaml | 5 + .../sctool/partials/sctool_restore.yaml | 8 +- .../partials/sctool_restore_update.yaml | 8 +- go.mod | 6 +- go.sum | 12 +- pkg/command/backup/cmd.go | 6 + pkg/command/backup/res.yaml | 4 + pkg/command/restore/cmd.go | 8 +- pkg/command/restore/res.yaml | 5 + .../internal/rclone_supported_calls.go | 1 + pkg/rclone/rcserver/rc.go | 65 ++++++++ pkg/rclone/rcserver/rcconfigguard.go | 92 ++++++++++++ pkg/scyllaclient/client_rclone.go | 43 +++++- .../client_rclone_agent_integration_test.go | 20 +-- .../client_rclone_integration_test.go | 4 +- pkg/service/backup/backup.go | 3 +- pkg/service/backup/export_test.go | 4 + pkg/service/backup/list.go | 8 +- pkg/service/backup/model.go | 10 +- pkg/service/backup/parallel.go | 8 +- pkg/service/backup/service.go | 6 +- .../testdata/get_target/continue.golden.json | 3 +- .../get_target/dc_locations.golden.json | 3 +- .../get_target/dc_no_rate_limit.golden.json | 3 +- .../get_target/dc_rate_limit.golden.json | 3 +- .../dc_snapshot_parallel.golden.json | 3 +- .../get_target/dc_upload_parallel.golden.json | 3 +- .../get_target/everything.golden.json | 3 +- .../testdata/get_target/filter_dc.golden.json | 3 +- .../get_target/filter_keyspaces.golden.json | 3 +- pkg/service/backup/validation.go | 8 +- pkg/service/backup/worker.go | 1 + pkg/service/backup/worker_schema.go | 12 +- pkg/service/backup/worker_upload.go | 6 +- pkg/service/restore/model.go | 18 ++- .../restore/restore_integration_test.go | 125 ++++++++++------ .../service_restore_integration_test.go | 4 +- pkg/service/restore/tables_worker.go | 20 ++- pkg/service/restore/tablesdir_worker.go | 22 +-- .../v3/pkg/util/inexlist/dcfilter/dcfilter.go | 23 +++ .../operations/core_transfers_parameters.go | 139 ++++++++++++++++++ .../operations/core_transfers_responses.go | 135 +++++++++++++++++ .../client/operations/operations_client.go | 39 ++++- .../gen/agent/models/copy_paths_options.go | 6 + .../agent/models/move_or_copy_file_options.go | 6 + .../v3/swagger/gen/agent/models/transfers.go | 43 ++++++ vendor/modules.txt | 6 +- 48 files changed, 840 insertions(+), 131 deletions(-) create mode 100644 pkg/rclone/rcserver/rcconfigguard.go create mode 100644 vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/core_transfers_parameters.go create mode 100644 vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/core_transfers_responses.go create mode 100644 vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/transfers.go diff --git a/docs/source/sctool/partials/sctool_backup.yaml b/docs/source/sctool/partials/sctool_backup.yaml index 1da58e4c49..95eb30762b 100644 --- a/docs/source/sctool/partials/sctool_backup.yaml +++ b/docs/source/sctool/partials/sctool_backup.yaml @@ -159,6 +159,11 @@ options: usage: | Timezone of --cron and --window flag values. The default value is taken from this system, namely 'TZ' envvar or '/etc/localtime' file. + - name: transfers + default_value: "-1" + usage: | + Sets the amount of file transfers to run in parallel when uploading files from a Scylla node to its backup location. + Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file. - name: upload-parallel default_value: '[]' usage: | diff --git a/docs/source/sctool/partials/sctool_backup_update.yaml b/docs/source/sctool/partials/sctool_backup_update.yaml index 611b60b4aa..d9ad5174f6 100644 --- a/docs/source/sctool/partials/sctool_backup_update.yaml +++ b/docs/source/sctool/partials/sctool_backup_update.yaml @@ -160,6 +160,11 @@ options: usage: | Timezone of --cron and --window flag values. The default value is taken from this system, namely 'TZ' envvar or '/etc/localtime' file. + - name: transfers + default_value: "-1" + usage: | + Sets the amount of file transfers to run in parallel when uploading files from a Scylla node to its backup location. + Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file. - name: upload-parallel default_value: '[]' usage: | diff --git a/docs/source/sctool/partials/sctool_restore.yaml b/docs/source/sctool/partials/sctool_restore.yaml index facaa61cbf..6c709b1bb9 100644 --- a/docs/source/sctool/partials/sctool_restore.yaml +++ b/docs/source/sctool/partials/sctool_restore.yaml @@ -103,7 +103,7 @@ options: usage: | Number of times a task reruns following a failure. - name: parallel - default_value: "1" + default_value: "0" usage: | The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables). Each node can take part in at most one restore at any given moment. @@ -156,6 +156,12 @@ options: usage: | Timezone of --cron and --window flag values. The default value is taken from this system, namely 'TZ' envvar or '/etc/localtime' file. + - name: transfers + default_value: "0" + usage: | + Sets the amount of file transfers to run in parallel when downloading files from backup location to Scylla node. + Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count). + Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file. - name: window default_value: '[]' usage: | diff --git a/docs/source/sctool/partials/sctool_restore_update.yaml b/docs/source/sctool/partials/sctool_restore_update.yaml index 165e882854..3e9e3b8fd7 100644 --- a/docs/source/sctool/partials/sctool_restore_update.yaml +++ b/docs/source/sctool/partials/sctool_restore_update.yaml @@ -101,7 +101,7 @@ options: usage: | Number of times a task reruns following a failure. - name: parallel - default_value: "1" + default_value: "0" usage: | The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables). Each node can take part in at most one restore at any given moment. @@ -154,6 +154,12 @@ options: usage: | Timezone of --cron and --window flag values. The default value is taken from this system, namely 'TZ' envvar or '/etc/localtime' file. + - name: transfers + default_value: "0" + usage: | + Sets the amount of file transfers to run in parallel when downloading files from backup location to Scylla node. + Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count). + Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file. - name: window default_value: '[]' usage: | diff --git a/go.mod b/go.mod index c4b1517eda..acd9d2ab87 100644 --- a/go.mod +++ b/go.mod @@ -29,9 +29,9 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/scylladb/go-set v1.0.2 github.com/scylladb/gocqlx/v2 v2.8.0 - github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20240926142436-6d27036d615d - github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20240926142436-6d27036d615d - github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20240926142436-6d27036d615d + github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478 + github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478 + github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 github.com/stoewer/go-strcase v1.3.0 diff --git a/go.sum b/go.sum index 029df61dc0..8335697adc 100644 --- a/go.sum +++ b/go.sum @@ -1053,12 +1053,12 @@ github.com/scylladb/google-api-go-client v0.34.1-patched h1:DW+T0HA+74o6FDr3TFzV github.com/scylladb/google-api-go-client v0.34.1-patched/go.mod h1:RriRmS2wJXH+2yd9PRTEcR380U9AXmurWwznqVhzsSc= github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e h1:lJRphCtu+nKd+mfo8whOTeFkgjMWvk8iCSlqgibKSa8= github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM= -github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20240926142436-6d27036d615d h1:c2mlLf5LlsR3KvQnZODjjmebV0/Fe8/i1NQlr4TVOoU= -github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20240926142436-6d27036d615d/go.mod h1:AQyWEkxdYc+zAEKofGOKOTPyvW2HhoL1+iMQrESFqdY= -github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20240926142436-6d27036d615d h1:9hQ+509JUYSlNyR/1C2Iy1gc36Y4Z4V+On3pn2pzcqg= -github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20240926142436-6d27036d615d/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA= -github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20240926142436-6d27036d615d h1:eR9FrNQLeaaBfHYlQxDJCQ3LS6Zo92ut0G+aYbzy7UQ= -github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20240926142436-6d27036d615d/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s= +github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478 h1:nI+3aLInCdLRYsR+iEuZkzFblgvAdGhWJT/Wqt7cKnU= +github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478/go.mod h1:6cKRPGgPPFrPSh+M90k+bp0oIFsB5SZsdI6hWeXnXOY= +github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478 h1:QEhE1TL21CZglbImBdDkjo6ZzPnc3OkOMcDYmo6QwIk= +github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA= +github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478 h1:cL8e+sW/7MZcq9guSBleR0bU+lEGxw1ZUjAf6vajHdE= +github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s= github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4 h1:8qmTC5ByIXO3GP/IzBkxcZ/99VITvnIETDhdFz/om7A= github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM= diff --git a/pkg/command/backup/cmd.go b/pkg/command/backup/cmd.go index f091e005e8..18362fb870 100644 --- a/pkg/command/backup/cmd.go +++ b/pkg/command/backup/cmd.go @@ -32,6 +32,7 @@ type command struct { retention int retentionDays int rateLimit []string + transfers int snapshotParallel []string uploadParallel []string dryRun bool @@ -84,6 +85,7 @@ func (cmd *command) init() { w.Unwrap().IntVar(&cmd.retention, "retention", 7, "") w.Unwrap().IntVar(&cmd.retentionDays, "retention-days", 0, "") w.Unwrap().StringSliceVar(&cmd.rateLimit, "rate-limit", nil, "") + w.Unwrap().IntVar(&cmd.transfers, "transfers", -1, "") w.Unwrap().StringSliceVar(&cmd.snapshotParallel, "snapshot-parallel", nil, "") w.Unwrap().StringSliceVar(&cmd.uploadParallel, "upload-parallel", nil, "") w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "") @@ -144,6 +146,10 @@ func (cmd *command) run(args []string) error { props["rate_limit"] = cmd.rateLimit ok = true } + if cmd.Flag("transfers").Changed { + props["transfers"] = cmd.transfers + ok = true + } if cmd.Flag("snapshot-parallel").Changed { props["snapshot_parallel"] = cmd.snapshotParallel ok = true diff --git a/pkg/command/backup/res.yaml b/pkg/command/backup/res.yaml index 92021520b8..f774163b0c 100644 --- a/pkg/command/backup/res.yaml +++ b/pkg/command/backup/res.yaml @@ -19,6 +19,10 @@ rate-limit: | The : part is optional and is only needed when different datacenters require different upload limits. Set to 0 for no limit (default 100). +transfers: | + Sets the amount of file transfers to run in parallel when uploading files from a Scylla node to its backup location. + Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file. + snapshot-parallel: | A comma-separated list of snapshot parallelism limits in the format `[:]`. The ``dc`` part is optional and allows for specifying different limits in selected datacenters. diff --git a/pkg/command/restore/cmd.go b/pkg/command/restore/cmd.go index 02ec01fa7c..c830dce5f0 100644 --- a/pkg/command/restore/cmd.go +++ b/pkg/command/restore/cmd.go @@ -29,6 +29,7 @@ type command struct { snapshotTag string batchSize int parallel int + transfers int allowCompaction bool restoreSchema bool restoreTables bool @@ -78,7 +79,8 @@ func (cmd *command) init() { w.Keyspace(&cmd.keyspace) w.Unwrap().StringVarP(&cmd.snapshotTag, "snapshot-tag", "T", "", "") w.Unwrap().IntVar(&cmd.batchSize, "batch-size", 2, "") - w.Unwrap().IntVar(&cmd.parallel, "parallel", 1, "") + w.Unwrap().IntVar(&cmd.parallel, "parallel", 0, "") + w.Unwrap().IntVar(&cmd.transfers, "transfers", 0, "") w.Unwrap().BoolVar(&cmd.allowCompaction, "allow-compaction", false, "") w.Unwrap().BoolVar(&cmd.restoreSchema, "restore-schema", false, "") w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "") @@ -146,6 +148,10 @@ func (cmd *command) run(args []string) error { props["parallel"] = cmd.parallel ok = true } + if cmd.Flag("transfers").Changed { + props["transfers"] = cmd.transfers + ok = true + } if cmd.Flag("allow-compaction").Changed { props["allow_compaction"] = cmd.allowCompaction ok = true diff --git a/pkg/command/restore/res.yaml b/pkg/command/restore/res.yaml index fcd19fc907..3b4a67cd05 100644 --- a/pkg/command/restore/res.yaml +++ b/pkg/command/restore/res.yaml @@ -33,6 +33,11 @@ parallel: | The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables). Each node can take part in at most one restore at any given moment. +transfers: | + Sets the amount of file transfers to run in parallel when downloading files from backup location to Scylla node. + Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count). + Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file. + allow-compaction: | Defines if auto compactions should be running on Scylla nodes during restore. Disabling auto compactions decreases restore time duration, but increases compaction workload after the restore is done. diff --git a/pkg/rclone/rcserver/internal/rclone_supported_calls.go b/pkg/rclone/rcserver/internal/rclone_supported_calls.go index bc84c724fe..ef46d45724 100644 --- a/pkg/rclone/rcserver/internal/rclone_supported_calls.go +++ b/pkg/rclone/rcserver/internal/rclone_supported_calls.go @@ -11,6 +11,7 @@ var RcloneSupportedCalls = strset.New( "core/bwlimit", "core/stats-delete", "core/stats-reset", + "core/transfers", "job/info", "job/progress", "job/stop", diff --git a/pkg/rclone/rcserver/rc.go b/pkg/rclone/rcserver/rc.go index 0e0537d07d..e4ec3bc2fb 100644 --- a/pkg/rclone/rcserver/rc.go +++ b/pkg/rclone/rcserver/rc.go @@ -533,6 +533,9 @@ func rcMoveOrCopyDir(doMove bool) func(ctx context.Context, in rc.Params) (rc.Pa if err != nil && !rc.IsErrParamNotFound(err) { return nil, err } + if err := setGuardedConfig(in); err != nil { + return nil, err + } return nil, sync.CopyDir2(ctx, dstFs, dstRemote, srcFs, srcRemote, doMove) } @@ -557,10 +560,40 @@ func rcCopyPaths() func(ctx context.Context, in rc.Params) (rc.Params, error) { if len(paths) == 0 { return nil, nil } + if err := setGuardedConfig(in); err != nil { + return nil, err + } return nil, sync.CopyPaths(ctx, dstFs, dstRemote, srcFs, srcRemote, paths, false) } } +// setGuardedConfig sets transfers and bandwidth limit if present. +func setGuardedConfig(in rc.Params) error { + // Set transfers + if in["transfers"] != nil { + transfers, err := in.GetInt64("transfers") + if err != nil { + return err + } + if err := SetTransfers(int(transfers)); err != nil { + return err + } + } + + // Set bandwidth rate + if in["bandwidth_rate"] != nil { + limit, err := in.GetString("bandwidth_rate") + if err != nil { + return err + } + if err := SetBandwidthLimit(limit); err != nil { + return err + } + } + + return nil +} + // rcDeletePaths returns rc function that deletes paths from remote. func rcDeletePaths(ctx context.Context, in rc.Params) (out rc.Params, err error) { f, remote, err := rc.GetFsAndRemote(ctx, in) @@ -600,6 +633,38 @@ func rcDeletePaths(ctx context.Context, in rc.Params) (out rc.Params, err error) return out, multierr.Combine(err, statsDeleteErr) } +// rcTransfers sets the default amount of transfers. +// This change is not persisted after server restart. +// Transfers correspond to the number of file transfers to run in parallel. +// If the transfers parameter is not supplied then the transfers are queried. +func rcTransfers(_ context.Context, in rc.Params) (out rc.Params, err error) { + if in["transfers"] != nil { + transfers, err := in.GetInt64("transfers") + if err != nil { + return out, err + } + if err := SetTransfers(int(transfers)); err != nil { + return nil, err + } + } + out = rc.Params{ + "transfers": fs.GetConfig(context.Background()).Transfers, + } + return out, nil +} + +func init() { + rc.Add(rc.Call{ + Path: "core/transfers", + AuthRequired: true, + Fn: rcTransfers, + Title: "Set the default amount of transfers", + Help: `This takes the following parameters: + +- transfers - the number of file transfers to run in parallel`, + }) +} + // getFsAndRemoteNamed gets fs and remote path from the params, but it doesn't // fail if remote path is not provided. // In that case it is assumed that path is empty and root of the fs is used. diff --git a/pkg/rclone/rcserver/rcconfigguard.go b/pkg/rclone/rcserver/rcconfigguard.go new file mode 100644 index 0000000000..7cb7c7b124 --- /dev/null +++ b/pkg/rclone/rcserver/rcconfigguard.go @@ -0,0 +1,92 @@ +// Copyright (C) 2024 ScyllaDB + +package rcserver + +import ( + "context" + "sync" + + "github.com/pkg/errors" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/cache" + "github.com/rclone/rclone/fs/rc" + "go.uber.org/atomic" +) + +// globalConfigGuard is global configGuard that should be used +// by rc calls when performing global config changes. +var globalConfigGuard = &configGuard{} + +// configGuard is a tool for performing global config changes. +// It supports setting transfers and bandwidth limit. +// It does not re-set config values if they are already +// set to the desired value. +type configGuard struct { + mu sync.Mutex + initialized atomic.Bool + + defaultTransfers int + transfers int + bandwidthLimit string +} + +func (cg *configGuard) init() { + if cg.initialized.CompareAndSwap(false, true) { + defaultTransfers := fs.GetConfig(context.Background()).Transfers + cg.defaultTransfers = defaultTransfers + cg.transfers = defaultTransfers + cg.bandwidthLimit = "" + } +} + +// SetTransfers sets global transfers value in rclone config. +// It also clears fs.Fs cache, so that they can be re-initialized +// with the new transfers value. +func SetTransfers(transfers int) error { + globalConfigGuard.mu.Lock() + defer globalConfigGuard.mu.Unlock() + globalConfigGuard.init() + + if transfers == -1 { + transfers = globalConfigGuard.defaultTransfers + } + if transfers <= 0 { + return errors.Errorf("transfers count must be greater than 0, got %d", transfers) + } + // Returns global config + ci := fs.GetConfig(context.Background()) + if transfers == globalConfigGuard.transfers { + // Safety check in case configguard is not in sync with global config + if transfers == ci.Transfers { + // Transfers are already set to specified value + return nil + } + } + + globalConfigGuard.transfers = transfers + ci.Transfers = transfers + // The amount of transfers impacts fs.Fs initialization (e.g. pool.Pool and fs.Pacer), + // so fs.Fs cache should be cleared on transfers count change. + cache.Clear() + return nil +} + +// SetBandwidthLimit sets global bandwidth limit in token bucket. +func SetBandwidthLimit(limit string) error { + globalConfigGuard.mu.Lock() + defer globalConfigGuard.mu.Unlock() + globalConfigGuard.init() + + if limit == globalConfigGuard.bandwidthLimit { + return nil + } + + in := rc.Params{ + "rate": limit, + } + _, err := rcCalls.Get("core/bwlimit").Fn(context.Background(), in) + if err != nil { + return errors.Wrapf(err, "set bandwidth to %s", limit) + } + return nil +} diff --git a/pkg/scyllaclient/client_rclone.go b/pkg/scyllaclient/client_rclone.go index b4f415353e..ae22e03fd7 100644 --- a/pkg/scyllaclient/client_rclone.go +++ b/pkg/scyllaclient/client_rclone.go @@ -33,6 +33,31 @@ func (c *Client) RcloneSetBandwidthLimit(ctx context.Context, host string, limit return err } +// RcloneSetTransfers sets the default amount of transfers on rclone server. +// This change is not persisted after server restart. +// Transfers correspond to the number of file transfers to run in parallel. +func (c *Client) RcloneSetTransfers(ctx context.Context, host string, transfers int) error { + p := operations.CoreTransfersParams{ + Context: forceHost(ctx, host), + Transfers: &models.Transfers{Transfers: int64(transfers)}, + } + _, err := c.agentOps.CoreTransfers(&p) // nolint: errcheck + return err +} + +// RcloneGetTransfers gets the default amount of transfers on rclone server. +// Transfers correspond to the number of file transfers to run in parallel. +func (c *Client) RcloneGetTransfers(ctx context.Context, host string) (int, error) { + p := operations.CoreTransfersParams{ + Context: forceHost(ctx, host), + } + resp, err := c.agentOps.CoreTransfers(&p) + if err != nil { + return 0, err + } + return int(resp.Payload.Transfers), nil +} + // RcloneJobStop stops running job. func (c *Client) RcloneJobStop(ctx context.Context, host string, jobID int64) error { p := operations.JobStopParams{ @@ -208,8 +233,8 @@ func (c *Client) rcloneMoveOrCopyFile(ctx context.Context, host, dstRemotePath, // Returns ID of the asynchronous job. // Remote path format is "name:bucket/path". // If specified, a suffix will be added to otherwise overwritten or deleted files. -func (c *Client) RcloneMoveDir(ctx context.Context, host, dstRemotePath, srcRemotePath, suffix string) (int64, error) { - return c.rcloneMoveOrCopyDir(ctx, host, dstRemotePath, srcRemotePath, true, suffix) +func (c *Client) RcloneMoveDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath, suffix string) (int64, error) { + return c.rcloneMoveOrCopyDir(ctx, host, transfers, dstRemotePath, srcRemotePath, true, suffix) } // RcloneCopyDir copies contents of the directory pointed by srcRemotePath to @@ -218,11 +243,11 @@ func (c *Client) RcloneMoveDir(ctx context.Context, host, dstRemotePath, srcRemo // Returns ID of the asynchronous job. // Remote path format is "name:bucket/path". // If specified, a suffix will be added to otherwise overwritten or deleted files. -func (c *Client) RcloneCopyDir(ctx context.Context, host, dstRemotePath, srcRemotePath, suffix string) (int64, error) { - return c.rcloneMoveOrCopyDir(ctx, host, dstRemotePath, srcRemotePath, false, suffix) +func (c *Client) RcloneCopyDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath, suffix string) (int64, error) { + return c.rcloneMoveOrCopyDir(ctx, host, transfers, dstRemotePath, srcRemotePath, false, suffix) } -func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host, dstRemotePath, srcRemotePath string, doMove bool, suffix string) (int64, error) { +func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host string, transfers int, dstRemotePath, srcRemotePath string, doMove bool, suffix string) (int64, error) { dstFs, dstRemote, err := rcloneSplitRemotePath(dstRemotePath) if err != nil { return 0, err @@ -237,6 +262,7 @@ func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host, dstRemotePath, s SrcFs: srcFs, SrcRemote: srcRemote, Suffix: suffix, + Transfers: int64(transfers), } var jobID int64 @@ -267,11 +293,15 @@ func (c *Client) rcloneMoveOrCopyDir(ctx context.Context, host, dstRemotePath, s return jobID, nil } +// TransfersFromConfig describes transfers value which results in setting transfers +// to the value from host's scylla-manager-agent.yaml config. +const TransfersFromConfig = -1 + // RcloneCopyPaths copies paths from srcRemoteDir/path to dstRemoteDir/path. // Remotes need to be registered with the server first. // Remote path format is "name:bucket/path". // Both dstRemoteDir and srRemoteDir must point to a directory. -func (c *Client) RcloneCopyPaths(ctx context.Context, host, dstRemoteDir, srcRemoteDir string, paths []string) (int64, error) { +func (c *Client) RcloneCopyPaths(ctx context.Context, host string, transfers int, dstRemoteDir, srcRemoteDir string, paths []string) (int64, error) { dstFs, dstRemote, err := rcloneSplitRemotePath(dstRemoteDir) if err != nil { return 0, err @@ -292,6 +322,7 @@ func (c *Client) RcloneCopyPaths(ctx context.Context, host, dstRemoteDir, srcRem SrcFs: srcFs, SrcRemote: srcRemote, Paths: paths, + Transfers: int64(transfers), }, Async: true, } diff --git a/pkg/scyllaclient/client_rclone_agent_integration_test.go b/pkg/scyllaclient/client_rclone_agent_integration_test.go index f47e7592bc..87dc8d174d 100644 --- a/pkg/scyllaclient/client_rclone_agent_integration_test.go +++ b/pkg/scyllaclient/client_rclone_agent_integration_test.go @@ -123,7 +123,7 @@ func TestRcloneDeletePathsInBatchesAgentIntegration(t *testing.T) { t.Fatalf("Create files on Scylla node, err = {%s}, stdOut={%s}, stdErr={%s}", err, stdOut, stdErr) } } - id, err := client.RcloneCopyDir(ctx, testHost, remotePath(dirName), "data:"+dirName, "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(dirName), "data:"+dirName, "") if err != nil { t.Fatal(errors.Wrap(err, "copy created files to backup location")) } @@ -200,7 +200,7 @@ func TestRcloneSkippingFilesAgentIntegration(t *testing.T) { if err != nil { t.Fatal(err) } - id, err := client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp/copy", "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp/copy", "") if err != nil { t.Fatal(err) } @@ -223,7 +223,7 @@ func TestRcloneSkippingFilesAgentIntegration(t *testing.T) { } } - id, err = client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp/copy", "") + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp/copy", "") if err != nil { t.Fatal(err) } @@ -283,7 +283,7 @@ func TestRcloneStoppingTransferIntegration(t *testing.T) { } }() - id, err := client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp", "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "") if err != nil { t.Fatal(err) } @@ -351,7 +351,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) { }() Print("When: first batch upload") - id, err := client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp", "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "") if err != nil { t.Fatal(err) } @@ -383,7 +383,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) { } Print("When: second batch upload") - id, err = client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp", "") + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "") if err != nil { t.Fatal(err) } @@ -408,7 +408,7 @@ func TestRcloneJobProgressIntegration(t *testing.T) { } Print("When: third batch upload") - id, err = client.RcloneCopyDir(ctx, testHost, remotePath(""), "data:tmp", "") + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, remotePath(""), "data:tmp", "") if err != nil { t.Fatal(err) } @@ -487,7 +487,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) { Print("Copy src into dst") - id, err := client.RcloneCopyDir(ctx, testHost, dstPath, srcPath, "") + id, err := client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, "") if err != nil { t.Fatal(err) } @@ -534,7 +534,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) { Print("Copy src into dst after file modification") - id, err = client.RcloneCopyDir(ctx, testHost, dstPath, srcPath, firstSuffix) + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, firstSuffix) if err != nil { t.Fatal(err) } @@ -583,7 +583,7 @@ func TestRcloneSuffixOptionIntegration(t *testing.T) { Print("Copy src into dst after another file modification") - id, err = client.RcloneCopyDir(ctx, testHost, dstPath, srcPath, secondSuffix) + id, err = client.RcloneCopyDir(ctx, testHost, scyllaclient.TransfersFromConfig, dstPath, srcPath, secondSuffix) if err != nil { t.Fatal(err) } diff --git a/pkg/scyllaclient/client_rclone_integration_test.go b/pkg/scyllaclient/client_rclone_integration_test.go index 61b4453322..5485a72759 100644 --- a/pkg/scyllaclient/client_rclone_integration_test.go +++ b/pkg/scyllaclient/client_rclone_integration_test.go @@ -36,7 +36,7 @@ func TestRcloneLocalToS3CopyDirIntegration(t *testing.T) { ctx := context.Background() copyDir := func(dir string) (*scyllaclient.RcloneJobInfo, error) { - id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, remotePath("/copy"), "rclonetest:"+dir, "") + id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, remotePath("/copy"), "rclonetest:"+dir, "") if err != nil { t.Fatal(err) } @@ -106,7 +106,7 @@ func TestRcloneS3ToLocalCopyDirIntegration(t *testing.T) { defer closeServer() ctx := context.Background() - id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, "rclonetest:foo", remotePath("/copy"), "") + id, err := client.RcloneCopyDir(ctx, scyllaclienttest.TestHost, scyllaclient.TransfersFromConfig, "rclonetest:foo", remotePath("/copy"), "") if err != nil { t.Fatal(err) } diff --git a/pkg/service/backup/backup.go b/pkg/service/backup/backup.go index e4bc540979..074b045756 100644 --- a/pkg/service/backup/backup.go +++ b/pkg/service/backup/backup.go @@ -56,7 +56,7 @@ func checkAllDCsCovered(locations []Location, dcs []string) error { return nil } -func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rateLimits []DCLimit) ([]hostInfo, error) { +func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rateLimits []DCLimit, transfers int) ([]hostInfo, error) { // DC location index dcl := map[string]Location{} for _, l := range locations { @@ -90,6 +90,7 @@ func makeHostInfo(nodes []scyllaclient.NodeStatusInfo, locations []Location, rat if !ok { hi[i].RateLimit = dcr[""] // no rate limit is ok, fallback to 0 - no limit } + hi[i].Transfers = transfers } return hi, errs diff --git a/pkg/service/backup/export_test.go b/pkg/service/backup/export_test.go index 6992940902..fddc2b3cc1 100644 --- a/pkg/service/backup/export_test.go +++ b/pkg/service/backup/export_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/pkg/errors" + "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) @@ -35,6 +36,9 @@ func (s *Service) InitTarget(ctx context.Context, clusterID uuid.UUID, target *T // Get live nodes target.liveNodes, err = s.getLiveNodes(ctx, client, target.DC) + if target.Transfers == 0 { + target.Transfers = scyllaclient.TransfersFromConfig + } return err } diff --git a/pkg/service/backup/list.go b/pkg/service/backup/list.go index b06885fbba..a059dd1684 100644 --- a/pkg/service/backup/list.go +++ b/pkg/service/backup/list.go @@ -22,13 +22,13 @@ func listManifestsInAllLocations(ctx context.Context, client *scyllaclient.Clien manifests []*ManifestInfo ) - for _, hi := range hosts { - if _, ok := locations[hi.Location]; ok { + for i := range hosts { + if _, ok := locations[hosts[i].Location]; ok { continue } - locations[hi.Location] = struct{}{} + locations[hosts[i].Location] = struct{}{} - lm, err := listManifests(ctx, client, hi.IP, hi.Location, clusterID) + lm, err := listManifests(ctx, client, hosts[i].IP, hosts[i].Location, clusterID) if err != nil { return nil, err } diff --git a/pkg/service/backup/model.go b/pkg/service/backup/model.go index a8702123c0..eed2f6d6d1 100644 --- a/pkg/service/backup/model.go +++ b/pkg/service/backup/model.go @@ -51,6 +51,7 @@ type Target struct { RetentionDays int `json:"retention_days"` RetentionMap RetentionMap `json:"-"` // policy for all tasks, injected in runtime RateLimit []DCLimit `json:"rate_limit,omitempty"` + Transfers int `json:"transfers"` SnapshotParallel []DCLimit `json:"snapshot_parallel,omitempty"` UploadParallel []DCLimit `json:"upload_parallel,omitempty"` Continue bool `json:"continue,omitempty"` @@ -250,6 +251,7 @@ type taskProperties struct { RetentionDays *int `json:"retention_days"` RetentionMap RetentionMap `json:"retention_map"` RateLimit []DCLimit `json:"rate_limit"` + Transfers int `json:"transfers"` SnapshotParallel []DCLimit `json:"snapshot_parallel"` UploadParallel []DCLimit `json:"upload_parallel"` Continue bool `json:"continue"` @@ -264,6 +266,10 @@ func (p taskProperties) validate(dcs []string, dcMap map[string][]string) error if policy := p.extractRetention(); policy.Retention < 0 || policy.RetentionDays < 0 { return errors.New("negative retention") } + if p.Transfers != scyllaclient.TransfersFromConfig && p.Transfers < 1 { + return errors.New("transfers param has to be equal to -1 (set transfers to the value from scylla-manager-agent.yaml config) " + + "or greater than zero") + } // Validate location DCs if err := checkDCs(func(i int) (string, string) { return p.Location[i].DC, p.Location[i].String() }, len(p.Location), dcMap); err != nil { @@ -310,6 +316,7 @@ func (p taskProperties) toTarget(ctx context.Context, client *scyllaclient.Clien RetentionDays: policy.RetentionDays, RetentionMap: p.RetentionMap, RateLimit: rateLimit, + Transfers: p.Transfers, SnapshotParallel: filterDCLimits(p.SnapshotParallel, dcs), UploadParallel: filterDCLimits(p.UploadParallel, dcs), Continue: p.Continue, @@ -417,7 +424,8 @@ func (p taskProperties) extractRetention() RetentionPolicy { func defaultTaskProperties() taskProperties { return taskProperties{ - Continue: true, + Transfers: scyllaclient.TransfersFromConfig, + Continue: true, } } diff --git a/pkg/service/backup/parallel.go b/pkg/service/backup/parallel.go index 39b1114446..c0296c3dca 100644 --- a/pkg/service/backup/parallel.go +++ b/pkg/service/backup/parallel.go @@ -32,8 +32,8 @@ func makeHostsLimit(hosts []hostInfo, limits []DCLimit) map[string]hostsLimit { } m := make(map[string]hostsLimit, len(dcLimit)+1) - for _, h := range hosts { - dc := h.DC + for i := range hosts { + dc := hosts[i].DC // If DC has no limit put host under an empty DC if _, ok := dcLimit[dc]; !ok { dc = "" @@ -42,14 +42,14 @@ func makeHostsLimit(hosts []hostInfo, limits []DCLimit) map[string]hostsLimit { v, ok := m[dc] if !ok { v = hostsLimit{} - v.hosts = []hostInfo{h} + v.hosts = []hostInfo{hosts[i]} if dc == "" { v.limit = globalLimit } else { v.limit = dcLimit[dc] } } else { - v.hosts = append(v.hosts, h) + v.hosts = append(v.hosts, hosts[i]) } m[dc] = v } diff --git a/pkg/service/backup/service.go b/pkg/service/backup/service.go index ac3116d424..cbb4d222c7 100644 --- a/pkg/service/backup/service.go +++ b/pkg/service/backup/service.go @@ -484,8 +484,8 @@ func (s *Service) forEachManifest(ctx context.Context, clusterID uuid.UUID, loca return errors.Wrap(err, "resolve hosts") } locationHost := map[Location]string{} - for _, h := range hosts { - locationHost[h.Location] = h.IP + for i := range hosts { + locationHost[hosts[i].Location] = hosts[i].IP } manifests, err := listManifestsInAllLocations(ctx, client, hosts, filter.ClusterID) @@ -657,7 +657,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID } // Create hostInfo for run hosts - hi, err := makeHostInfo(liveNodes, target.Location, target.RateLimit) + hi, err := makeHostInfo(liveNodes, target.Location, target.RateLimit, target.Transfers) if err != nil { return err } diff --git a/pkg/service/backup/testdata/get_target/continue.golden.json b/pkg/service/backup/testdata/get_target/continue.golden.json index cdf7504ace..7b62696d98 100644 --- a/pkg/service/backup/testdata/get_target/continue.golden.json +++ b/pkg/service/backup/testdata/get_target/continue.golden.json @@ -68,5 +68,6 @@ "retention": 3, "rate_limit": [ "100" - ] + ], + "transfers": -1 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_locations.golden.json b/pkg/service/backup/testdata/get_target/dc_locations.golden.json index 7f0e8c52dd..b3a69700c7 100644 --- a/pkg/service/backup/testdata/get_target/dc_locations.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_locations.golden.json @@ -70,5 +70,6 @@ "rate_limit": [ "100" ], - "continue": true + "continue": true, + "transfers": -1 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_no_rate_limit.golden.json b/pkg/service/backup/testdata/get_target/dc_no_rate_limit.golden.json index ac7605cf78..ab5a2b7095 100644 --- a/pkg/service/backup/testdata/get_target/dc_no_rate_limit.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_no_rate_limit.golden.json @@ -69,5 +69,6 @@ "rate_limit": [ "0" ], - "continue": true + "continue": true, + "transfers": -1 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_rate_limit.golden.json b/pkg/service/backup/testdata/get_target/dc_rate_limit.golden.json index bc526693f5..0ba949c595 100644 --- a/pkg/service/backup/testdata/get_target/dc_rate_limit.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_rate_limit.golden.json @@ -70,5 +70,6 @@ "1000", "dc1:100" ], - "continue": true + "continue": true, + "transfers": -1 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_snapshot_parallel.golden.json b/pkg/service/backup/testdata/get_target/dc_snapshot_parallel.golden.json index ad8780b919..8cec0c3bc9 100644 --- a/pkg/service/backup/testdata/get_target/dc_snapshot_parallel.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_snapshot_parallel.golden.json @@ -73,5 +73,6 @@ "10", "dc1:20" ], - "continue": true + "continue": true, + "transfers": -1 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/dc_upload_parallel.golden.json b/pkg/service/backup/testdata/get_target/dc_upload_parallel.golden.json index d0ed89e620..98b03c4af2 100644 --- a/pkg/service/backup/testdata/get_target/dc_upload_parallel.golden.json +++ b/pkg/service/backup/testdata/get_target/dc_upload_parallel.golden.json @@ -73,5 +73,6 @@ "10", "dc1:20" ], - "continue": true + "continue": true, + "transfers": -1 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/everything.golden.json b/pkg/service/backup/testdata/get_target/everything.golden.json index 80479521a3..3801ce371c 100644 --- a/pkg/service/backup/testdata/get_target/everything.golden.json +++ b/pkg/service/backup/testdata/get_target/everything.golden.json @@ -69,5 +69,6 @@ "rate_limit": [ "100" ], - "continue": true + "continue": true, + "transfers": -1 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/filter_dc.golden.json b/pkg/service/backup/testdata/get_target/filter_dc.golden.json index f9cd403714..7ed63fdb31 100644 --- a/pkg/service/backup/testdata/get_target/filter_dc.golden.json +++ b/pkg/service/backup/testdata/get_target/filter_dc.golden.json @@ -47,5 +47,6 @@ "rate_limit": [ "100" ], - "continue": true + "continue": true, + "transfers": -1 } \ No newline at end of file diff --git a/pkg/service/backup/testdata/get_target/filter_keyspaces.golden.json b/pkg/service/backup/testdata/get_target/filter_keyspaces.golden.json index d193e9f41f..36f14a7edc 100644 --- a/pkg/service/backup/testdata/get_target/filter_keyspaces.golden.json +++ b/pkg/service/backup/testdata/get_target/filter_keyspaces.golden.json @@ -38,5 +38,6 @@ "rate_limit": [ "100" ], - "continue": true + "continue": true, + "transfers": -1 } \ No newline at end of file diff --git a/pkg/service/backup/validation.go b/pkg/service/backup/validation.go index a547653065..7274ec49c0 100644 --- a/pkg/service/backup/validation.go +++ b/pkg/service/backup/validation.go @@ -147,7 +147,7 @@ func (s *Service) Validate(ctx context.Context, clusterID, taskID, runID uuid.UU } } - hosts, err := makeHostInfo(target.liveNodes, target.Location, nil) + hosts, err := makeHostInfo(target.liveNodes, target.Location, nil, 0) if err != nil { return err } @@ -176,9 +176,9 @@ func (s *Service) Validate(ctx context.Context, clusterID, taskID, runID uuid.UU p := newPurger(client, h.IP, log.NopLogger) hostForNodeID := func() string { - for _, h := range hosts { - if h.ID == nodeID { - return h.IP + for i := range hosts { + if hosts[i].ID == nodeID { + return hosts[i].IP } } if host := p.Host(nodeID); host != "" { diff --git a/pkg/service/backup/worker.go b/pkg/service/backup/worker.go index e597864c25..3555f80412 100644 --- a/pkg/service/backup/worker.go +++ b/pkg/service/backup/worker.go @@ -23,6 +23,7 @@ type hostInfo struct { ID string Location Location RateLimit DCLimit + Transfers int } func (h hostInfo) String() string { diff --git a/pkg/service/backup/worker_schema.go b/pkg/service/backup/worker_schema.go index 418da112c4..c683bc140f 100644 --- a/pkg/service/backup/worker_schema.go +++ b/pkg/service/backup/worker_schema.go @@ -26,8 +26,8 @@ import ( func (w *worker) DumpSchema(ctx context.Context, hi []hostInfo, sessionFunc cluster.SessionFunc) error { var hosts []string - for _, h := range hi { - hosts = append(hosts, h.IP) + for i := range hi { + hosts = append(hosts, hi[i].IP) } descSchemaHosts, err := backupAndRestoreFromDescSchemaHosts(ctx, w.Client, hosts) @@ -105,12 +105,12 @@ func (w *worker) UploadSchema(ctx context.Context, hosts []hostInfo) (stepError // Select single host per location locations := map[string]hostInfo{} - for _, hi := range hosts { - locations[hi.Location.String()] = hi + for i := range hosts { + locations[hosts[i].Location.String()] = hosts[i] } hostPerLocation := make([]hostInfo, 0, len(locations)) - for _, hi := range locations { - hostPerLocation = append(hostPerLocation, hi) + for l := range locations { + hostPerLocation = append(hostPerLocation, locations[l]) } f := func(h hostInfo) error { diff --git a/pkg/service/backup/worker_upload.go b/pkg/service/backup/worker_upload.go index a382fa1863..36c745fba9 100644 --- a/pkg/service/backup/worker_upload.go +++ b/pkg/service/backup/worker_upload.go @@ -172,7 +172,7 @@ func (w *worker) uploadSnapshotDir(ctx context.Context, h hostInfo, d snapshotDi retries = 10 ) for i := 0; i < retries; i++ { - if err := w.uploadDataDir(ctx, dataDst, dataSrc, d); err != nil { + if err := w.uploadDataDir(ctx, h, dataDst, dataSrc, d); err != nil { if errors.Is(err, errJobNotFound) { continue } @@ -184,9 +184,9 @@ func (w *worker) uploadSnapshotDir(ctx context.Context, h hostInfo, d snapshotDi return nil } -func (w *worker) uploadDataDir(ctx context.Context, dst, src string, d snapshotDir) error { +func (w *worker) uploadDataDir(ctx context.Context, hi hostInfo, dst, src string, d snapshotDir) error { // Ensure file versioning during upload - id, err := w.Client.RcloneMoveDir(ctx, d.Host, dst, src, VersionedFileExt(w.SnapshotTag)) + id, err := w.Client.RcloneMoveDir(ctx, d.Host, hi.Transfers, dst, src, VersionedFileExt(w.SnapshotTag)) if err != nil { return err } diff --git a/pkg/service/restore/model.go b/pkg/service/restore/model.go index 8a483210a2..431975fd89 100644 --- a/pkg/service/restore/model.go +++ b/pkg/service/restore/model.go @@ -28,6 +28,7 @@ type Target struct { SnapshotTag string `json:"snapshot_tag"` BatchSize int `json:"batch_size,omitempty"` Parallel int `json:"parallel,omitempty"` + Transfers int `json:"transfers"` AllowCompaction bool `json:"allow_compaction,omitempty"` RestoreSchema bool `json:"restore_schema,omitempty"` RestoreTables bool `json:"restore_tables,omitempty"` @@ -37,12 +38,17 @@ type Target struct { locationHosts map[Location][]string `json:"-"` } -const maxBatchSize = 0 +const ( + maxBatchSize = 0 + // maxTransfers are experimentally defined to 2*node_shard_cnt. + maxTransfers = 0 +) func defaultTarget() Target { return Target{ BatchSize: 2, Parallel: 0, + Transfers: maxTransfers, Continue: true, } } @@ -62,6 +68,10 @@ func (t Target) validateProperties() error { if t.Parallel < 0 { return errors.New("parallel param has to be greater or equal to zero") } + if t.Transfers != scyllaclient.TransfersFromConfig && t.Transfers != maxTransfers && t.Transfers < 1 { + return errors.New("transfers param has to be equal to -1 (set transfers to the value from scylla-manager-agent.yaml config) " + + "or 0 (set transfers for fastest download) or greater than zero") + } if t.RestoreSchema == t.RestoreTables { return errors.New("choose EXACTLY ONE restore type ('--restore-schema' or '--restore-tables' flag)") } @@ -280,3 +290,9 @@ type TableName struct { Keyspace string Table string } + +// HostInfo represents host with rclone download config. +type HostInfo struct { + Host string + Transfers int +} diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index d233b668f6..cf81acae75 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -481,7 +481,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { // Validate setup // Validate restore success - h := newTestHelper(t, ManagedSecondClusterHosts(), ManagedClusterHosts()) + h := newTestHelper(t, ManagedClusterHosts(), ManagedSecondClusterHosts()) Print("Keyspace setup") ksStmt := "CREATE KEYSPACE %q WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d}" @@ -498,15 +498,69 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Fill setup") fillTable(t, h.srcCluster.rootSession, 100, ks, tab) + validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) { + // Validate tombstone_gc mode + if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got { + t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) + } + // Validate compaction + for _, host := range ch.Client.Config().Hosts { + enabled, err := ch.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab) + if err != nil { + t.Fatal(errors.Wrapf(err, "check compaction on host %s", host)) + } + if compaction != enabled { + t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host) + } + } + // Validate transfers + for _, host := range ch.Client.Config().Hosts { + got, err := ch.Client.RcloneGetTransfers(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + } + if transfers != got { + t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host) + } + } + } + + shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost()) + if err != nil { + t.Fatal(err) + } + transfers0 := 2 * int(shardCnt) + + // Set initial transfers + for _, host := range ManagedClusterHosts() { + err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10) + if err != nil { + t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + } + } + for _, host := range ManagedSecondClusterHosts() { + err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10) + if err != nil { + t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + } + } + + Print("Validate state before backup") + validateState(h.srcCluster, "repair", true, 10) + Print("Run backup") loc := []Location{testLocation("preparation", "")} S3InitBucket(t, loc[0].Path) ksFilter := []string{ks} tag := h.runBackup(t, map[string]any{ - "location": loc, - "keyspace": ksFilter, + "location": loc, + "keyspace": ksFilter, + "transfers": 3, }) + Print("Validate state after backup") + validateState(h.srcCluster, "repair", true, 3) + runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) h.dstCluster.RunID = uuid.NewTime() @@ -514,6 +568,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "location": loc, "keyspace": ksFilter, "snapshot_tag": tag, + "transfers": 0, "restore_tables": true, }) if err != nil { @@ -522,44 +577,31 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps) } - validateState := func(tombstone string, compaction bool) { - // Validate tombstone_gc mode - if got := tombstoneGCMode(t, h.dstCluster.rootSession, ks, tab); tombstone != got { - t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) - } - // Validate compaction - for _, host := range ManagedClusterHosts() { - enabled, err := h.dstCluster.Client.IsAutoCompactionEnabled(context.Background(), host, ks, tab) - if err != nil { - t.Fatal(errors.Wrapf(err, "check compaction on host %s", host)) - } - if compaction != enabled { - t.Errorf("expected compaction enabled=%v, got=%v on host %s", compaction, enabled, host) - } - } - } - - makeCopyPathsHang := func(reachedDataStage *atomic.Bool, reachedDataStageChan, hangCopyPaths chan struct{}) { + makeLASHang := func(reachedDataStageChan, hangLAS chan struct{}) { + cnt := atomic.Int64{} + cnt.Add(int64(len(h.dstCluster.Client.Config().Hosts))) h.dstCluster.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { - if strings.HasPrefix(req.URL.Path, "/agent/rclone/sync/copypaths") { - if reachedDataStage.CompareAndSwap(false, true) { + if strings.HasPrefix(req.URL.Path, "/storage_service/sstables") { + if curr := cnt.Add(-1); curr == 0 { Print("Reached data stage") close(reachedDataStageChan) } - Print("Wait for copy paths to stop hanging") - <-hangCopyPaths + Print("Wait for LAS to stop hanging") + <-hangLAS } return nil, nil })) } var ( - reachedDataStage = &atomic.Bool{} reachedDataStageChan = make(chan struct{}) - hangCopyPathsChan = make(chan struct{}) + hangLAS = make(chan struct{}) ) - Print("Make copy paths hang") - makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan) + Print("Make LAS hang") + makeLASHang(reachedDataStageChan, hangLAS) + + Print("Validate state before restore") + validateState(h.dstCluster, "repair", true, 10) Print("Run restore") finishedRestore := make(chan error) @@ -570,28 +612,27 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { <-reachedDataStageChan Print("Validate state during restore data") - validateState("disabled", false) + validateState(h.dstCluster, "disabled", false, transfers0) Print("Pause restore") restoreCancel() - Print("Release copy paths") - close(hangCopyPathsChan) + Print("Release LAS") + close(hangLAS) Print("Wait for restore") - err := <-finishedRestore + err = <-finishedRestore if !errors.Is(err, context.Canceled) { t.Fatalf("Expected restore to be paused, got: %s", err) } Print("Validate state during pause") - validateState("disabled", true) + validateState(h.dstCluster, "disabled", true, transfers0) - reachedDataStage = &atomic.Bool{} reachedDataStageChan = make(chan struct{}) - hangCopyPathsChan = make(chan struct{}) - Print("Make copy paths hang after pause") - makeCopyPathsHang(reachedDataStage, reachedDataStageChan, hangCopyPathsChan) + hangLAS = make(chan struct{}) + Print("Make LAS hang after pause") + makeLASHang(reachedDataStageChan, hangLAS) Print("Run restore after pause") finishedRestore = make(chan error) @@ -601,10 +642,10 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { <-reachedDataStageChan Print("Validate state during restore data after pause") - validateState("disabled", false) + validateState(h.dstCluster, "disabled", false, transfers0) - Print("Release copy paths") - close(hangCopyPathsChan) + Print("Release LAS") + close(hangLAS) Print("Wait for restore") err = <-finishedRestore @@ -613,7 +654,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state after restore success") - validateState("repair", true) + validateState(h.dstCluster, "repair", true, transfers0) Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}}) diff --git a/pkg/service/restore/service_restore_integration_test.go b/pkg/service/restore/service_restore_integration_test.go index 0ae8648cc9..d3a79ff39f 100644 --- a/pkg/service/restore/service_restore_integration_test.go +++ b/pkg/service/restore/service_restore_integration_test.go @@ -1680,8 +1680,8 @@ func (h *restoreTestHelper) simpleBackup(location Location) string { time.Sleep(time.Second) ctx := context.Background() - props, err := json.Marshal(backup.Target{ - Location: []Location{location}, + props, err := json.Marshal(map[string]any{ + "location": []Location{location}, }) if err != nil { h.T.Fatal(err) diff --git a/pkg/service/restore/tables_worker.go b/pkg/service/restore/tables_worker.go index bf08b2d8a2..ecb5c4b648 100644 --- a/pkg/service/restore/tables_worker.go +++ b/pkg/service/restore/tables_worker.go @@ -202,24 +202,32 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error { bd := newBatchDispatcher(workload, w.target.BatchSize, hostToShard, w.target.locationHosts) f := func(n int) (err error) { - h := hosts[n] + transfers := w.target.Transfers + if transfers == maxTransfers { + transfers = 2 * int(hostToShard[hosts[n]]) + } + hi := HostInfo{ + Host: hosts[n], + Transfers: transfers, + } + w.logger.Info(ctx, "Host info", "host", hi.Host, "transfers", hi.Transfers) for { // Download and stream in parallel - b, ok := bd.DispatchBatch(h) + b, ok := bd.DispatchBatch(hi.Host) if !ok { - w.logger.Info(ctx, "No more batches to restore", "host", h) + w.logger.Info(ctx, "No more batches to restore", "host", hi.Host) return nil } - w.metrics.IncreaseBatchSize(w.run.ClusterID, h, b.Size) + w.metrics.IncreaseBatchSize(w.run.ClusterID, hi.Host, b.Size) w.logger.Info(ctx, "Got batch to restore", - "host", h, + "host", hi.Host, "keyspace", b.Keyspace, "table", b.Table, "size", b.Size, "sstable count", len(b.SSTables), ) - pr, err := w.newRunProgress(ctx, h, b) + pr, err := w.newRunProgress(ctx, hi, b) if err != nil { return errors.Wrap(err, "create new run progress") } diff --git a/pkg/service/restore/tablesdir_worker.go b/pkg/service/restore/tablesdir_worker.go index 990efc9ad2..7c9efb1987 100644 --- a/pkg/service/restore/tablesdir_worker.go +++ b/pkg/service/restore/tablesdir_worker.go @@ -114,17 +114,17 @@ func (w *tablesWorker) restoreSSTables(ctx context.Context, b batch, pr *RunProg } // newRunProgress creates RunProgress by starting download to host's upload dir. -func (w *tablesWorker) newRunProgress(ctx context.Context, host string, b batch) (*RunProgress, error) { - if err := w.checkAvailableDiskSpace(ctx, host); err != nil { +func (w *tablesWorker) newRunProgress(ctx context.Context, hi HostInfo, b batch) (*RunProgress, error) { + if err := w.checkAvailableDiskSpace(ctx, hi.Host); err != nil { return nil, errors.Wrap(err, "validate free disk space") } uploadDir := UploadTableDir(b.Keyspace, b.Table, w.tableVersion[b.TableName]) - if err := w.cleanUploadDir(ctx, host, uploadDir, nil); err != nil { - return nil, errors.Wrapf(err, "clean upload dir of host %s", host) + if err := w.cleanUploadDir(ctx, hi.Host, uploadDir, nil); err != nil { + return nil, errors.Wrapf(err, "clean upload dir of host %s", hi.Host) } - jobID, versionedPr, err := w.startDownload(ctx, host, b) + jobID, versionedPr, err := w.startDownload(ctx, hi, b) if err != nil { return nil, err } @@ -136,7 +136,7 @@ func (w *tablesWorker) newRunProgress(ctx context.Context, host string, b batch) RemoteSSTableDir: b.RemoteSSTableDir, Keyspace: b.Keyspace, Table: b.Table, - Host: host, + Host: hi.Host, AgentJobID: jobID, SSTableID: b.IDs(), VersionedProgress: versionedPr, @@ -150,14 +150,14 @@ func (w *tablesWorker) newRunProgress(ctx context.Context, host string, b batch) // Downloading of versioned files happens first in a synchronous way. // It returns jobID for asynchronous download of the newest versions of files // alongside with the size of the already downloaded versioned files. -func (w *tablesWorker) startDownload(ctx context.Context, host string, b batch) (jobID, versionedPr int64, err error) { +func (w *tablesWorker) startDownload(ctx context.Context, hi HostInfo, b batch) (jobID, versionedPr int64, err error) { uploadDir := UploadTableDir(b.Keyspace, b.Table, w.tableVersion[b.TableName]) sstables := b.NotVersionedSSTables() versioned := b.VersionedSSTables() versionedSize := b.VersionedSize() if len(versioned) > 0 { - if err := w.downloadVersioned(ctx, host, b.RemoteSSTableDir, uploadDir, versioned); err != nil { - return 0, 0, errors.Wrapf(err, "download versioned sstabled on host %s", host) + if err := w.downloadVersioned(ctx, hi.Host, b.RemoteSSTableDir, uploadDir, versioned); err != nil { + return 0, 0, errors.Wrapf(err, "download versioned sstabled on host %s", hi.Host) } } @@ -166,12 +166,12 @@ func (w *tablesWorker) startDownload(ctx context.Context, host string, b batch) for _, sst := range sstables { files = append(files, sst.Files...) } - jobID, err = w.client.RcloneCopyPaths(ctx, host, uploadDir, b.RemoteSSTableDir, files) + jobID, err = w.client.RcloneCopyPaths(ctx, hi.Host, hi.Transfers, uploadDir, b.RemoteSSTableDir, files) if err != nil { return 0, 0, errors.Wrap(err, "download batch to upload dir") } w.logger.Info(ctx, "Started downloading files", - "host", host, + "host", hi.Host, "job_id", jobID, ) return jobID, versionedSize, nil diff --git a/vendor/github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter/dcfilter.go b/vendor/github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter/dcfilter.go index 24a931b18d..22f7843ae0 100644 --- a/vendor/github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter/dcfilter.go +++ b/vendor/github.com/scylladb/scylla-manager/v3/pkg/util/inexlist/dcfilter/dcfilter.go @@ -37,6 +37,29 @@ func Apply(dcMap map[string][]string, filters []string) ([]string, error) { return filtered, nil } +// Filter that lets you filter datacenters. +type Filter struct { + filters []string + inex inexlist.InExList +} + +func NewFilter(filters []string) (*Filter, error) { + // Decorate filters and create inexlist + inex, err := inexlist.ParseInExList(decorate(filters)) + if err != nil { + return nil, errors.Wrapf(err, "parse dc filter %v", filters) + } + return &Filter{ + filters: filters, + inex: inex, + }, nil +} + +// Check returns true iff dc matches filter. +func (f *Filter) Check(dc string) bool { + return len(f.inex.Filter([]string{dc})) > 0 +} + func decorate(filters []string) []string { if len(filters) == 0 { filters = append(filters, "*") diff --git a/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/core_transfers_parameters.go b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/core_transfers_parameters.go new file mode 100644 index 0000000000..9f95b7c669 --- /dev/null +++ b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/core_transfers_parameters.go @@ -0,0 +1,139 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package operations + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "context" + "net/http" + "time" + + "github.com/go-openapi/errors" + "github.com/go-openapi/runtime" + cr "github.com/go-openapi/runtime/client" + "github.com/go-openapi/strfmt" + + "github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models" +) + +// NewCoreTransfersParams creates a new CoreTransfersParams object +// with the default values initialized. +func NewCoreTransfersParams() *CoreTransfersParams { + var () + return &CoreTransfersParams{ + + timeout: cr.DefaultTimeout, + } +} + +// NewCoreTransfersParamsWithTimeout creates a new CoreTransfersParams object +// with the default values initialized, and the ability to set a timeout on a request +func NewCoreTransfersParamsWithTimeout(timeout time.Duration) *CoreTransfersParams { + var () + return &CoreTransfersParams{ + + timeout: timeout, + } +} + +// NewCoreTransfersParamsWithContext creates a new CoreTransfersParams object +// with the default values initialized, and the ability to set a context for a request +func NewCoreTransfersParamsWithContext(ctx context.Context) *CoreTransfersParams { + var () + return &CoreTransfersParams{ + + Context: ctx, + } +} + +// NewCoreTransfersParamsWithHTTPClient creates a new CoreTransfersParams object +// with the default values initialized, and the ability to set a custom HTTPClient for a request +func NewCoreTransfersParamsWithHTTPClient(client *http.Client) *CoreTransfersParams { + var () + return &CoreTransfersParams{ + HTTPClient: client, + } +} + +/* +CoreTransfersParams contains all the parameters to send to the API endpoint +for the core transfers operation typically these are written to a http.Request +*/ +type CoreTransfersParams struct { + + /*Transfers + The number of file transfers to run in parallel. Accepts a special value '-1' describing that transfers from rclone config file should be used + + */ + Transfers *models.Transfers + + timeout time.Duration + Context context.Context + HTTPClient *http.Client +} + +// WithTimeout adds the timeout to the core transfers params +func (o *CoreTransfersParams) WithTimeout(timeout time.Duration) *CoreTransfersParams { + o.SetTimeout(timeout) + return o +} + +// SetTimeout adds the timeout to the core transfers params +func (o *CoreTransfersParams) SetTimeout(timeout time.Duration) { + o.timeout = timeout +} + +// WithContext adds the context to the core transfers params +func (o *CoreTransfersParams) WithContext(ctx context.Context) *CoreTransfersParams { + o.SetContext(ctx) + return o +} + +// SetContext adds the context to the core transfers params +func (o *CoreTransfersParams) SetContext(ctx context.Context) { + o.Context = ctx +} + +// WithHTTPClient adds the HTTPClient to the core transfers params +func (o *CoreTransfersParams) WithHTTPClient(client *http.Client) *CoreTransfersParams { + o.SetHTTPClient(client) + return o +} + +// SetHTTPClient adds the HTTPClient to the core transfers params +func (o *CoreTransfersParams) SetHTTPClient(client *http.Client) { + o.HTTPClient = client +} + +// WithTransfers adds the transfers to the core transfers params +func (o *CoreTransfersParams) WithTransfers(transfers *models.Transfers) *CoreTransfersParams { + o.SetTransfers(transfers) + return o +} + +// SetTransfers adds the transfers to the core transfers params +func (o *CoreTransfersParams) SetTransfers(transfers *models.Transfers) { + o.Transfers = transfers +} + +// WriteToRequest writes these params to a swagger request +func (o *CoreTransfersParams) WriteToRequest(r runtime.ClientRequest, reg strfmt.Registry) error { + + if err := r.SetTimeout(o.timeout); err != nil { + return err + } + var res []error + + if o.Transfers != nil { + if err := r.SetBodyParam(o.Transfers); err != nil { + return err + } + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} diff --git a/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/core_transfers_responses.go b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/core_transfers_responses.go new file mode 100644 index 0000000000..af096646f1 --- /dev/null +++ b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/core_transfers_responses.go @@ -0,0 +1,135 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package operations + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "fmt" + "io" + "strconv" + "strings" + + "github.com/go-openapi/runtime" + "github.com/go-openapi/strfmt" + + "github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models" +) + +// CoreTransfersReader is a Reader for the CoreTransfers structure. +type CoreTransfersReader struct { + formats strfmt.Registry +} + +// ReadResponse reads a server response into the received o. +func (o *CoreTransfersReader) ReadResponse(response runtime.ClientResponse, consumer runtime.Consumer) (interface{}, error) { + switch response.Code() { + case 200: + result := NewCoreTransfersOK() + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + return result, nil + default: + result := NewCoreTransfersDefault(response.Code()) + if err := result.readResponse(response, consumer, o.formats); err != nil { + return nil, err + } + if response.Code()/100 == 2 { + return result, nil + } + return nil, result + } +} + +// NewCoreTransfersOK creates a CoreTransfersOK with default headers values +func NewCoreTransfersOK() *CoreTransfersOK { + return &CoreTransfersOK{} +} + +/* +CoreTransfersOK handles this case with default header values. + +transfers +*/ +type CoreTransfersOK struct { + Payload *models.Transfers + JobID int64 +} + +func (o *CoreTransfersOK) GetPayload() *models.Transfers { + return o.Payload +} + +func (o *CoreTransfersOK) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + o.Payload = new(models.Transfers) + + // response payload + if err := consumer.Consume(response.Body(), o.Payload); err != nil && err != io.EOF { + return err + } + + if jobIDHeader := response.GetHeader("x-rclone-jobid"); jobIDHeader != "" { + jobID, err := strconv.ParseInt(jobIDHeader, 10, 64) + if err != nil { + return err + } + + o.JobID = jobID + } + return nil +} + +// NewCoreTransfersDefault creates a CoreTransfersDefault with default headers values +func NewCoreTransfersDefault(code int) *CoreTransfersDefault { + return &CoreTransfersDefault{ + _statusCode: code, + } +} + +/* +CoreTransfersDefault handles this case with default header values. + +Server error +*/ +type CoreTransfersDefault struct { + _statusCode int + + Payload *models.ErrorResponse + JobID int64 +} + +// Code gets the status code for the core transfers default response +func (o *CoreTransfersDefault) Code() int { + return o._statusCode +} + +func (o *CoreTransfersDefault) GetPayload() *models.ErrorResponse { + return o.Payload +} + +func (o *CoreTransfersDefault) readResponse(response runtime.ClientResponse, consumer runtime.Consumer, formats strfmt.Registry) error { + + o.Payload = new(models.ErrorResponse) + + // response payload + if err := consumer.Consume(response.Body(), o.Payload); err != nil && err != io.EOF { + return err + } + + if jobIDHeader := response.GetHeader("x-rclone-jobid"); jobIDHeader != "" { + jobID, err := strconv.ParseInt(jobIDHeader, 10, 64) + if err != nil { + return err + } + + o.JobID = jobID + } + return nil +} + +func (o *CoreTransfersDefault) Error() string { + return fmt.Sprintf("agent [HTTP %d] %s", o._statusCode, strings.TrimRight(o.Payload.Message, ".")) +} diff --git a/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/operations_client.go b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/operations_client.go index 18bd52efca..4c3f1546bb 100644 --- a/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/operations_client.go +++ b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations/operations_client.go @@ -31,6 +31,8 @@ type ClientService interface { CoreStatsReset(params *CoreStatsResetParams) (*CoreStatsResetOK, error) + CoreTransfers(params *CoreTransfersParams) (*CoreTransfersOK, error) + FreeOSMemory(params *FreeOSMemoryParams) (*FreeOSMemoryOK, error) JobInfo(params *JobInfoParams) (*JobInfoOK, error) @@ -73,7 +75,7 @@ type ClientService interface { /* CoreBwlimit sets the bandwidth limit -This sets the bandwidth limit to that passed in +This sets the bandwidth limit to that passed in. If the rate parameter is not supplied then the bandwidth is queried */ func (a *Client) CoreBwlimit(params *CoreBwlimitParams) (*CoreBwlimitOK, error) { // TODO: Validate the params before sending @@ -175,6 +177,41 @@ func (a *Client) CoreStatsReset(params *CoreStatsResetParams) (*CoreStatsResetOK return nil, runtime.NewAPIError("unexpected success response: content available as default response in error", unexpectedSuccess, unexpectedSuccess.Code()) } +/* +CoreTransfers sets transfers + +This sets the default amount of transfers to that passed in. If the transfers parameter is not supplied then the transfers are queried +*/ +func (a *Client) CoreTransfers(params *CoreTransfersParams) (*CoreTransfersOK, error) { + // TODO: Validate the params before sending + if params == nil { + params = NewCoreTransfersParams() + } + + result, err := a.transport.Submit(&runtime.ClientOperation{ + ID: "CoreTransfers", + Method: "POST", + PathPattern: "/rclone/core/transfers", + ProducesMediaTypes: []string{"application/json"}, + ConsumesMediaTypes: []string{"application/json"}, + Schemes: []string{"http"}, + Params: params, + Reader: &CoreTransfersReader{formats: a.formats}, + Context: params.Context, + Client: params.HTTPClient, + }) + if err != nil { + return nil, err + } + success, ok := result.(*CoreTransfersOK) + if ok { + return success, nil + } + // unexpected success response + unexpectedSuccess := result.(*CoreTransfersDefault) + return nil, runtime.NewAPIError("unexpected success response: content available as default response in error", unexpectedSuccess, unexpectedSuccess.Code()) +} + /* FreeOSMemory returns memory to o s diff --git a/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/copy_paths_options.go b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/copy_paths_options.go index 514f6cb269..ce80477888 100644 --- a/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/copy_paths_options.go +++ b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/copy_paths_options.go @@ -15,6 +15,9 @@ import ( // swagger:model CopyPathsOptions type CopyPathsOptions struct { + // String representation of the bandwidth rate limit (eg. 100k, 1M, ...) + BandwidthRate string `json:"bandwidth_rate,omitempty"` + // Destination file system e.g. s3: or gcs: DstFs string `json:"dstFs,omitempty"` @@ -29,6 +32,9 @@ type CopyPathsOptions struct { // A directory within that remote eg. files/ for the source SrcRemote string `json:"srcRemote,omitempty"` + + // The number of file transfers to run in parallel. Accepts a special value '-1' describing that transfers from rclone config file should be used + Transfers int64 `json:"transfers,omitempty"` } // Validate validates this copy paths options diff --git a/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/move_or_copy_file_options.go b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/move_or_copy_file_options.go index 4a5870b7e9..66b121bfe3 100644 --- a/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/move_or_copy_file_options.go +++ b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/move_or_copy_file_options.go @@ -15,6 +15,9 @@ import ( // swagger:model MoveOrCopyFileOptions type MoveOrCopyFileOptions struct { + // String representation of the bandwidth rate limit (eg. 100k, 1M, ...) + BandwidthRate string `json:"bandwidth_rate,omitempty"` + // Destination file system e.g. s3: or gcs: DstFs string `json:"dstFs,omitempty"` @@ -29,6 +32,9 @@ type MoveOrCopyFileOptions struct { // A suffix which will be added to otherwise overwritten or deleted files Suffix string `json:"suffix,omitempty"` + + // The number of file transfers to run in parallel. Accepts a special value '-1' describing that transfers from rclone config file should be used + Transfers int64 `json:"transfers,omitempty"` } // Validate validates this move or copy file options diff --git a/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/transfers.go b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/transfers.go new file mode 100644 index 0000000000..45422b1b06 --- /dev/null +++ b/vendor/github.com/scylladb/scylla-manager/v3/swagger/gen/agent/models/transfers.go @@ -0,0 +1,43 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package models + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "github.com/go-openapi/strfmt" + "github.com/go-openapi/swag" +) + +// Transfers transfers +// +// swagger:model Transfers +type Transfers struct { + + // The number of file transfers to run in parallel + Transfers int64 `json:"transfers,omitempty"` +} + +// Validate validates this transfers +func (m *Transfers) Validate(formats strfmt.Registry) error { + return nil +} + +// MarshalBinary interface implementation +func (m *Transfers) MarshalBinary() ([]byte, error) { + if m == nil { + return nil, nil + } + return swag.WriteJSON(m) +} + +// UnmarshalBinary interface implementation +func (m *Transfers) UnmarshalBinary(b []byte) error { + var res Transfers + if err := swag.ReadJSON(b, &res); err != nil { + return err + } + *m = res + return nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index dd6fdccc5e..b01ba617d1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -395,11 +395,11 @@ github.com/scylladb/gocqlx/v2/dbutil github.com/scylladb/gocqlx/v2/migrate github.com/scylladb/gocqlx/v2/qb github.com/scylladb/gocqlx/v2/table -# github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20240926142436-6d27036d615d +# github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478 ## explicit; go 1.21.1 github.com/scylladb/scylla-manager/v3/pkg/managerclient github.com/scylladb/scylla-manager/v3/pkg/managerclient/table -# github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20240926142436-6d27036d615d +# github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478 ## explicit; go 1.21.1 github.com/scylladb/scylla-manager/v3/pkg/util github.com/scylladb/scylla-manager/v3/pkg/util/certutil @@ -431,7 +431,7 @@ github.com/scylladb/scylla-manager/v3/pkg/util/timeutc github.com/scylladb/scylla-manager/v3/pkg/util/uuid github.com/scylladb/scylla-manager/v3/pkg/util/version github.com/scylladb/scylla-manager/v3/pkg/util/workerpool -# github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20240926142436-6d27036d615d +# github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478 ## explicit; go 1.21.1 github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client github.com/scylladb/scylla-manager/v3/swagger/gen/agent/client/operations