diff --git a/pkg/service/restore/restore_integration_test.go b/pkg/service/restore/restore_integration_test.go index cf81acae7..318377cd5 100644 --- a/pkg/service/restore/restore_integration_test.go +++ b/pkg/service/restore/restore_integration_test.go @@ -498,7 +498,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { Print("Fill setup") fillTable(t, h.srcCluster.rootSession, 100, ks, tab) - validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) { + validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int) { // Validate tombstone_gc mode if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got { t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got) @@ -523,6 +523,20 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host) } } + // Validate rate limit + for _, host := range ch.Client.Config().Hosts { + got, err := ch.Client.RcloneGetBandwidthLimit(context.Background(), host) + if err != nil { + t.Fatal(errors.Wrapf(err, "check transfers on host %s", host)) + } + rawLimit := fmt.Sprintf("%dM", rateLimit) + if rateLimit == 0 { + rawLimit = "off" + } + if rawLimit != got { + t.Errorf("expected rate_limit=%s, got=%s on host %s", rawLimit, got, host) + } + } } shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost()) @@ -531,35 +545,39 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } transfers0 := 2 * int(shardCnt) - // Set initial transfers - for _, host := range ManagedClusterHosts() { - err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10) - if err != nil { - t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) - } - } - for _, host := range ManagedSecondClusterHosts() { - err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10) - if err != nil { - t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host)) + setTransfersAndRateLimit := func(ch clusterHelper, transfers int, rateLimit int) { + for _, host := range ch.Client.Config().Hosts { + err := ch.Client.RcloneSetTransfers(context.Background(), host, transfers) + if err != nil { + t.Fatal(errors.Wrapf(err, "set transfers on host %s", host)) + } + err = ch.Client.RcloneSetBandwidthLimit(context.Background(), host, rateLimit) + if err != nil { + t.Fatal(errors.Wrapf(err, "set rate limit on host %s", host)) + } } } + Print("Set initial transfers and rate limit") + setTransfersAndRateLimit(h.srcCluster, 10, 99) + setTransfersAndRateLimit(h.dstCluster, 10, 99) + Print("Validate state before backup") - validateState(h.srcCluster, "repair", true, 10) + validateState(h.srcCluster, "repair", true, 10, 99) Print("Run backup") loc := []Location{testLocation("preparation", "")} S3InitBucket(t, loc[0].Path) ksFilter := []string{ks} tag := h.runBackup(t, map[string]any{ - "location": loc, - "keyspace": ksFilter, - "transfers": 3, + "location": loc, + "keyspace": ksFilter, + "transfers": 3, + "rate_limit": []string{"88"}, }) Print("Validate state after backup") - validateState(h.srcCluster, "repair", true, 3) + validateState(h.srcCluster, "repair", true, 3, 88) runRestore := func(ctx context.Context, finishedRestore chan error) { grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser) @@ -569,10 +587,12 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { "keyspace": ksFilter, "snapshot_tag": tag, "transfers": 0, + "rate_limit": []string{"0"}, "restore_tables": true, }) if err != nil { - t.Error(err) + finishedRestore <- err + return } finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps) } @@ -601,7 +621,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { makeLASHang(reachedDataStageChan, hangLAS) Print("Validate state before restore") - validateState(h.dstCluster, "repair", true, 10) + validateState(h.dstCluster, "repair", true, 10, 99) Print("Run restore") finishedRestore := make(chan error) @@ -609,10 +629,14 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { go runRestore(restoreCtx, finishedRestore) Print("Wait for data stage") - <-reachedDataStageChan + select { + case <-reachedDataStageChan: + case err := <-finishedRestore: + t.Fatalf("Restore finished before reaching data stage with: %s", err) + } Print("Validate state during restore data") - validateState(h.dstCluster, "disabled", false, transfers0) + validateState(h.dstCluster, "disabled", false, transfers0, 0) Print("Pause restore") restoreCancel() @@ -627,7 +651,11 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state during pause") - validateState(h.dstCluster, "disabled", true, transfers0) + validateState(h.dstCluster, "disabled", true, transfers0, 0) + + Print("Change transfers and rate limit during pause") + setTransfersAndRateLimit(h.srcCluster, 9, 55) + setTransfersAndRateLimit(h.dstCluster, 9, 55) reachedDataStageChan = make(chan struct{}) hangLAS = make(chan struct{}) @@ -639,10 +667,14 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { go runRestore(context.Background(), finishedRestore) Print("Wait for data stage") - <-reachedDataStageChan + select { + case <-reachedDataStageChan: + case err := <-finishedRestore: + t.Fatalf("Restore finished before reaching data stage with: %s", err) + } Print("Validate state during restore data after pause") - validateState(h.dstCluster, "disabled", false, transfers0) + validateState(h.dstCluster, "disabled", false, transfers0, 0) Print("Release LAS") close(hangLAS) @@ -654,7 +686,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) { } Print("Validate state after restore success") - validateState(h.dstCluster, "repair", true, transfers0) + validateState(h.dstCluster, "repair", true, transfers0, 0) Print("Validate table contents") h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})