Skip to content

Commit

Permalink
Restore improvement: transfers (#4054)
Browse files Browse the repository at this point in the history
* chore(go.mod): bump SM sub packages versions

* feat(rcserver): implement rate limit/transfers change on upload/download

This commit allows for greater control over transfers
and rate limit as a part of the upload/download
rclone API used by backup/restore tasks.
It also makes the changes more persistent in case
of agent restarts.

It also adds the core/transfers endpoint that
can be used outside the upload/download rclone API
in order to set the amount of transfers for the following rclone API calls.

* feat(scyllaclient): add RcloneSet/GetTransfers to client

This commit exposes the /rclone/core/transfers agent endpoint via agent client.

* feat(scyllaclient): set transfers in RcloneCopyPaths

This commit requires changes in the usage of RcloneCopyPaths in restore pkg.
It also adds Transfers field to restore Target with the default value -1
meaning that transfers will be set to the amount specified in agent
rclone server config.

* feat(restore): support max transfers (0) and make it default

Our experiments showed that the fastest file download
was achieved for transfers=2*2*node_shard_cnt. In order to make
it easier for the user to use, a new, special value
of transfers=0 will take care of that.
It is set as the default, because we aim to make
not configured restore as fast as possible.

* feat(command/restore): add --transfers flag

This commit allows user to control the amount of rclone transfers used during restore.
Moreover, during development days of restore task, the default value of --parallel flag
was changed from 1 to 0, but we forgot to change it in the flag documentation.

* feat(backup): add Transfers to Target

This is the first step to control transfers in the context of backup.

* feat(scyllaclient): set transfers in rcloneMoveOrCopyDir

This commit requires changes in the usage of RcloneMoveDir in backup pkg.

* feat(command/backup): add --transfers flag

This commit allows user to control the amount of rclone transfers used during backup.

* feat(restore_test): extend TestRestoreTablesPreparationIntegration with transfers

This way this test also checks transfers before and after backup.
It also checks transfers before, in the middle, when paused,
when resumed, and after restore.
This required some changes to the test like swapping
src and dst cluster (increase batch count) and hanging on LAS instead of
copy paths (transfer change is applied as a part of copy paths).
  • Loading branch information
Michal-Leszczynski authored Oct 15, 2024
1 parent ee47f3d commit 2099e2e
Show file tree
Hide file tree
Showing 48 changed files with 840 additions and 131 deletions.
5 changes: 5 additions & 0 deletions docs/source/sctool/partials/sctool_backup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ options:
usage: |
Timezone of --cron and --window flag values.
The default value is taken from this system, namely 'TZ' envvar or '/etc/localtime' file.
- name: transfers
default_value: "-1"
usage: |
Sets the amount of file transfers to run in parallel when uploading files from a Scylla node to its backup location.
Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file.
- name: upload-parallel
default_value: '[]'
usage: |
Expand Down
5 changes: 5 additions & 0 deletions docs/source/sctool/partials/sctool_backup_update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ options:
usage: |
Timezone of --cron and --window flag values.
The default value is taken from this system, namely 'TZ' envvar or '/etc/localtime' file.
- name: transfers
default_value: "-1"
usage: |
Sets the amount of file transfers to run in parallel when uploading files from a Scylla node to its backup location.
Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file.
- name: upload-parallel
default_value: '[]'
usage: |
Expand Down
8 changes: 7 additions & 1 deletion docs/source/sctool/partials/sctool_restore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ options:
usage: |
Number of times a task reruns following a failure.
- name: parallel
default_value: "1"
default_value: "0"
usage: |
The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables).
Each node can take part in at most one restore at any given moment.
Expand Down Expand Up @@ -156,6 +156,12 @@ options:
usage: |
Timezone of --cron and --window flag values.
The default value is taken from this system, namely 'TZ' envvar or '/etc/localtime' file.
- name: transfers
default_value: "0"
usage: |
Sets the amount of file transfers to run in parallel when downloading files from backup location to Scylla node.
Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count).
Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file.
- name: window
default_value: '[]'
usage: |
Expand Down
8 changes: 7 additions & 1 deletion docs/source/sctool/partials/sctool_restore_update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ options:
usage: |
Number of times a task reruns following a failure.
- name: parallel
default_value: "1"
default_value: "0"
usage: |
The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables).
Each node can take part in at most one restore at any given moment.
Expand Down Expand Up @@ -154,6 +154,12 @@ options:
usage: |
Timezone of --cron and --window flag values.
The default value is taken from this system, namely 'TZ' envvar or '/etc/localtime' file.
- name: transfers
default_value: "0"
usage: |
Sets the amount of file transfers to run in parallel when downloading files from backup location to Scylla node.
Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count).
Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file.
- name: window
default_value: '[]'
usage: |
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ require (
github.com/scylladb/go-reflectx v1.0.1
github.com/scylladb/go-set v1.0.2
github.com/scylladb/gocqlx/v2 v2.8.0
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20240926142436-6d27036d615d
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20240926142436-6d27036d615d
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20240926142436-6d27036d615d
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stoewer/go-strcase v1.3.0
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1053,12 +1053,12 @@ github.com/scylladb/google-api-go-client v0.34.1-patched h1:DW+T0HA+74o6FDr3TFzV
github.com/scylladb/google-api-go-client v0.34.1-patched/go.mod h1:RriRmS2wJXH+2yd9PRTEcR380U9AXmurWwznqVhzsSc=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e h1:lJRphCtu+nKd+mfo8whOTeFkgjMWvk8iCSlqgibKSa8=
github.com/scylladb/rclone v1.54.1-0.20240312172628-afe1fd2aa65e/go.mod h1:JGZp4EvCUK+6AM1Fe1dye5xvihTc/Bk0WnHHSCJOePM=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20240926142436-6d27036d615d h1:c2mlLf5LlsR3KvQnZODjjmebV0/Fe8/i1NQlr4TVOoU=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20240926142436-6d27036d615d/go.mod h1:AQyWEkxdYc+zAEKofGOKOTPyvW2HhoL1+iMQrESFqdY=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20240926142436-6d27036d615d h1:9hQ+509JUYSlNyR/1C2Iy1gc36Y4Z4V+On3pn2pzcqg=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20240926142436-6d27036d615d/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20240926142436-6d27036d615d h1:eR9FrNQLeaaBfHYlQxDJCQ3LS6Zo92ut0G+aYbzy7UQ=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20240926142436-6d27036d615d/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478 h1:nI+3aLInCdLRYsR+iEuZkzFblgvAdGhWJT/Wqt7cKnU=
github.com/scylladb/scylla-manager/v3/pkg/managerclient v0.0.0-20241015081800-ee47f3d10478/go.mod h1:6cKRPGgPPFrPSh+M90k+bp0oIFsB5SZsdI6hWeXnXOY=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478 h1:QEhE1TL21CZglbImBdDkjo6ZzPnc3OkOMcDYmo6QwIk=
github.com/scylladb/scylla-manager/v3/pkg/util v0.0.0-20241015081800-ee47f3d10478/go.mod h1:+sPCx2oaOXmMpy/ODNNEDGJ7vCghBeKP4S7xEfMI+eA=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478 h1:cL8e+sW/7MZcq9guSBleR0bU+lEGxw1ZUjAf6vajHdE=
github.com/scylladb/scylla-manager/v3/swagger v0.0.0-20241015081800-ee47f3d10478/go.mod h1:Oxfuz1XcXi9iV4ggSGfQdn+p6gPz6djPOegRMMe/6/s=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4 h1:8qmTC5ByIXO3GP/IzBkxcZ/99VITvnIETDhdFz/om7A=
github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg=
github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM=
Expand Down
6 changes: 6 additions & 0 deletions pkg/command/backup/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type command struct {
retention int
retentionDays int
rateLimit []string
transfers int
snapshotParallel []string
uploadParallel []string
dryRun bool
Expand Down Expand Up @@ -84,6 +85,7 @@ func (cmd *command) init() {
w.Unwrap().IntVar(&cmd.retention, "retention", 7, "")
w.Unwrap().IntVar(&cmd.retentionDays, "retention-days", 0, "")
w.Unwrap().StringSliceVar(&cmd.rateLimit, "rate-limit", nil, "")
w.Unwrap().IntVar(&cmd.transfers, "transfers", -1, "")
w.Unwrap().StringSliceVar(&cmd.snapshotParallel, "snapshot-parallel", nil, "")
w.Unwrap().StringSliceVar(&cmd.uploadParallel, "upload-parallel", nil, "")
w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "")
Expand Down Expand Up @@ -144,6 +146,10 @@ func (cmd *command) run(args []string) error {
props["rate_limit"] = cmd.rateLimit
ok = true
}
if cmd.Flag("transfers").Changed {
props["transfers"] = cmd.transfers
ok = true
}
if cmd.Flag("snapshot-parallel").Changed {
props["snapshot_parallel"] = cmd.snapshotParallel
ok = true
Expand Down
4 changes: 4 additions & 0 deletions pkg/command/backup/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ rate-limit: |
The <dc>: part is optional and is only needed when different datacenters require different upload limits.
Set to 0 for no limit (default 100).
transfers: |
Sets the amount of file transfers to run in parallel when uploading files from a Scylla node to its backup location.
Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file.
snapshot-parallel: |
A comma-separated list of snapshot parallelism limits in the format `[<dc>:]<limit>`.
The ``dc`` part is optional and allows for specifying different limits in selected datacenters.
Expand Down
8 changes: 7 additions & 1 deletion pkg/command/restore/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type command struct {
snapshotTag string
batchSize int
parallel int
transfers int
allowCompaction bool
restoreSchema bool
restoreTables bool
Expand Down Expand Up @@ -78,7 +79,8 @@ func (cmd *command) init() {
w.Keyspace(&cmd.keyspace)
w.Unwrap().StringVarP(&cmd.snapshotTag, "snapshot-tag", "T", "", "")
w.Unwrap().IntVar(&cmd.batchSize, "batch-size", 2, "")
w.Unwrap().IntVar(&cmd.parallel, "parallel", 1, "")
w.Unwrap().IntVar(&cmd.parallel, "parallel", 0, "")
w.Unwrap().IntVar(&cmd.transfers, "transfers", 0, "")
w.Unwrap().BoolVar(&cmd.allowCompaction, "allow-compaction", false, "")
w.Unwrap().BoolVar(&cmd.restoreSchema, "restore-schema", false, "")
w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "")
Expand Down Expand Up @@ -146,6 +148,10 @@ func (cmd *command) run(args []string) error {
props["parallel"] = cmd.parallel
ok = true
}
if cmd.Flag("transfers").Changed {
props["transfers"] = cmd.transfers
ok = true
}
if cmd.Flag("allow-compaction").Changed {
props["allow_compaction"] = cmd.allowCompaction
ok = true
Expand Down
5 changes: 5 additions & 0 deletions pkg/command/restore/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ parallel: |
The maximum number of Scylla restore jobs that can be run at the same time (on different SSTables).
Each node can take part in at most one restore at any given moment.
transfers: |
Sets the amount of file transfers to run in parallel when downloading files from backup location to Scylla node.
Set to 0 for the fastest download (results in setting transfers to 2*node_shard_count).
Set to -1 for using the transfers value defined in node's 'scylla-manager-agent.yaml' config file.
allow-compaction: |
Defines if auto compactions should be running on Scylla nodes during restore.
Disabling auto compactions decreases restore time duration, but increases compaction workload after the restore is done.
Expand Down
1 change: 1 addition & 0 deletions pkg/rclone/rcserver/internal/rclone_supported_calls.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions pkg/rclone/rcserver/rc.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ func rcMoveOrCopyDir(doMove bool) func(ctx context.Context, in rc.Params) (rc.Pa
if err != nil && !rc.IsErrParamNotFound(err) {
return nil, err
}
if err := setGuardedConfig(in); err != nil {
return nil, err
}

return nil, sync.CopyDir2(ctx, dstFs, dstRemote, srcFs, srcRemote, doMove)
}
Expand All @@ -557,10 +560,40 @@ func rcCopyPaths() func(ctx context.Context, in rc.Params) (rc.Params, error) {
if len(paths) == 0 {
return nil, nil
}
if err := setGuardedConfig(in); err != nil {
return nil, err
}
return nil, sync.CopyPaths(ctx, dstFs, dstRemote, srcFs, srcRemote, paths, false)
}
}

// setGuardedConfig sets transfers and bandwidth limit if present.
func setGuardedConfig(in rc.Params) error {
// Set transfers
if in["transfers"] != nil {
transfers, err := in.GetInt64("transfers")
if err != nil {
return err
}
if err := SetTransfers(int(transfers)); err != nil {
return err
}
}

// Set bandwidth rate
if in["bandwidth_rate"] != nil {
limit, err := in.GetString("bandwidth_rate")
if err != nil {
return err
}
if err := SetBandwidthLimit(limit); err != nil {
return err
}
}

return nil
}

// rcDeletePaths returns rc function that deletes paths from remote.
func rcDeletePaths(ctx context.Context, in rc.Params) (out rc.Params, err error) {
f, remote, err := rc.GetFsAndRemote(ctx, in)
Expand Down Expand Up @@ -600,6 +633,38 @@ func rcDeletePaths(ctx context.Context, in rc.Params) (out rc.Params, err error)
return out, multierr.Combine(err, statsDeleteErr)
}

// rcTransfers sets the default amount of transfers.
// This change is not persisted after server restart.
// Transfers correspond to the number of file transfers to run in parallel.
// If the transfers parameter is not supplied then the transfers are queried.
func rcTransfers(_ context.Context, in rc.Params) (out rc.Params, err error) {
if in["transfers"] != nil {
transfers, err := in.GetInt64("transfers")
if err != nil {
return out, err
}
if err := SetTransfers(int(transfers)); err != nil {
return nil, err
}
}
out = rc.Params{
"transfers": fs.GetConfig(context.Background()).Transfers,
}
return out, nil
}

func init() {
rc.Add(rc.Call{
Path: "core/transfers",
AuthRequired: true,
Fn: rcTransfers,
Title: "Set the default amount of transfers",
Help: `This takes the following parameters:
- transfers - the number of file transfers to run in parallel`,
})
}

// getFsAndRemoteNamed gets fs and remote path from the params, but it doesn't
// fail if remote path is not provided.
// In that case it is assumed that path is empty and root of the fs is used.
Expand Down
92 changes: 92 additions & 0 deletions pkg/rclone/rcserver/rcconfigguard.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (C) 2024 ScyllaDB

package rcserver

import (
"context"
"sync"

"github.com/pkg/errors"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/cache"
"github.com/rclone/rclone/fs/rc"
"go.uber.org/atomic"
)

// globalConfigGuard is global configGuard that should be used
// by rc calls when performing global config changes.
var globalConfigGuard = &configGuard{}

// configGuard is a tool for performing global config changes.
// It supports setting transfers and bandwidth limit.
// It does not re-set config values if they are already
// set to the desired value.
type configGuard struct {
mu sync.Mutex
initialized atomic.Bool

defaultTransfers int
transfers int
bandwidthLimit string
}

func (cg *configGuard) init() {
if cg.initialized.CompareAndSwap(false, true) {
defaultTransfers := fs.GetConfig(context.Background()).Transfers
cg.defaultTransfers = defaultTransfers
cg.transfers = defaultTransfers
cg.bandwidthLimit = ""
}
}

// SetTransfers sets global transfers value in rclone config.
// It also clears fs.Fs cache, so that they can be re-initialized
// with the new transfers value.
func SetTransfers(transfers int) error {
globalConfigGuard.mu.Lock()
defer globalConfigGuard.mu.Unlock()
globalConfigGuard.init()

if transfers == -1 {
transfers = globalConfigGuard.defaultTransfers
}
if transfers <= 0 {
return errors.Errorf("transfers count must be greater than 0, got %d", transfers)
}
// Returns global config
ci := fs.GetConfig(context.Background())
if transfers == globalConfigGuard.transfers {
// Safety check in case configguard is not in sync with global config
if transfers == ci.Transfers {
// Transfers are already set to specified value
return nil
}
}

globalConfigGuard.transfers = transfers
ci.Transfers = transfers
// The amount of transfers impacts fs.Fs initialization (e.g. pool.Pool and fs.Pacer),
// so fs.Fs cache should be cleared on transfers count change.
cache.Clear()
return nil
}

// SetBandwidthLimit sets global bandwidth limit in token bucket.
func SetBandwidthLimit(limit string) error {
globalConfigGuard.mu.Lock()
defer globalConfigGuard.mu.Unlock()
globalConfigGuard.init()

if limit == globalConfigGuard.bandwidthLimit {
return nil
}

in := rc.Params{
"rate": limit,
}
_, err := rcCalls.Get("core/bwlimit").Fn(context.Background(), in)
if err != nil {
return errors.Wrapf(err, "set bandwidth to %s", limit)
}
return nil
}
Loading

0 comments on commit 2099e2e

Please sign in to comment.