Skip to content

Commit

Permalink
feat(restore_test): extend TestRestoreTablesPreparationIntegration wi…
Browse files Browse the repository at this point in the history
…th rate limit

This way this test also checks rate limit before and after backup.
It also checks transfers before, in the middle, when paused,
when resumed, and after restore.

This commit also extends the test to change transfers
and rate limit values when restore is paused, so that
it validates that they are correctly re-set during
restore data stage.
  • Loading branch information
Michal-Leszczynski committed Oct 15, 2024
1 parent 7076cfa commit b60a232
Showing 1 changed file with 57 additions and 25 deletions.
82 changes: 57 additions & 25 deletions pkg/service/restore/restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
Print("Fill setup")
fillTable(t, h.srcCluster.rootSession, 100, ks, tab)

validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int) {
validateState := func(ch clusterHelper, tombstone string, compaction bool, transfers int, rateLimit int) {
// Validate tombstone_gc mode
if got := tombstoneGCMode(t, ch.rootSession, ks, tab); tombstone != got {
t.Errorf("expected tombstone_gc=%s, got %s", tombstone, got)
Expand All @@ -523,6 +523,20 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
t.Errorf("expected transfers=%d, got=%d on host %s", transfers, got, host)
}
}
// Validate rate limit
for _, host := range ch.Client.Config().Hosts {
got, err := ch.Client.RcloneGetBandwidthLimit(context.Background(), host)
if err != nil {
t.Fatal(errors.Wrapf(err, "check transfers on host %s", host))
}
rawLimit := fmt.Sprintf("%dM", rateLimit)
if rateLimit == 0 {
rawLimit = "off"
}
if rawLimit != got {
t.Errorf("expected rate_limit=%s, got=%s on host %s", rawLimit, got, host)
}
}
}

shardCnt, err := h.dstCluster.Client.ShardCount(context.Background(), ManagedClusterHost())
Expand All @@ -531,35 +545,39 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}
transfers0 := 2 * int(shardCnt)

// Set initial transfers
for _, host := range ManagedClusterHosts() {
err := h.dstCluster.Client.RcloneSetTransfers(context.Background(), host, 10)
if err != nil {
t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host))
}
}
for _, host := range ManagedSecondClusterHosts() {
err := h.srcCluster.Client.RcloneSetTransfers(context.Background(), host, 10)
if err != nil {
t.Fatal(errors.Wrapf(err, "set initial transfers on host %s", host))
setTransfersAndRateLimit := func(ch clusterHelper, transfers int, rateLimit int) {
for _, host := range ch.Client.Config().Hosts {
err := ch.Client.RcloneSetTransfers(context.Background(), host, transfers)
if err != nil {
t.Fatal(errors.Wrapf(err, "set transfers on host %s", host))
}
err = ch.Client.RcloneSetBandwidthLimit(context.Background(), host, rateLimit)
if err != nil {
t.Fatal(errors.Wrapf(err, "set rate limit on host %s", host))
}
}
}

Print("Set initial transfers and rate limit")
setTransfersAndRateLimit(h.srcCluster, 10, 99)
setTransfersAndRateLimit(h.dstCluster, 10, 99)

Print("Validate state before backup")
validateState(h.srcCluster, "repair", true, 10)
validateState(h.srcCluster, "repair", true, 10, 99)

Print("Run backup")
loc := []Location{testLocation("preparation", "")}
S3InitBucket(t, loc[0].Path)
ksFilter := []string{ks}
tag := h.runBackup(t, map[string]any{
"location": loc,
"keyspace": ksFilter,
"transfers": 3,
"location": loc,
"keyspace": ksFilter,
"transfers": 3,
"rate_limit": []string{"88"},
})

Print("Validate state after backup")
validateState(h.srcCluster, "repair", true, 3)
validateState(h.srcCluster, "repair", true, 3, 88)

runRestore := func(ctx context.Context, finishedRestore chan error) {
grantRestoreTablesPermissions(t, h.dstCluster.rootSession, ksFilter, h.dstUser)
Expand All @@ -569,10 +587,12 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
"keyspace": ksFilter,
"snapshot_tag": tag,
"transfers": 0,
"rate_limit": []string{"0"},
"restore_tables": true,
})
if err != nil {
t.Error(err)
finishedRestore <- err
return
}
finishedRestore <- h.dstRestoreSvc.Restore(ctx, h.dstCluster.ClusterID, h.dstCluster.TaskID, h.dstCluster.RunID, rawProps)
}
Expand Down Expand Up @@ -601,18 +621,22 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
makeLASHang(reachedDataStageChan, hangLAS)

Print("Validate state before restore")
validateState(h.dstCluster, "repair", true, 10)
validateState(h.dstCluster, "repair", true, 10, 99)

Print("Run restore")
finishedRestore := make(chan error)
restoreCtx, restoreCancel := context.WithCancel(context.Background())
go runRestore(restoreCtx, finishedRestore)

Print("Wait for data stage")
<-reachedDataStageChan
select {
case <-reachedDataStageChan:
case err := <-finishedRestore:
t.Fatalf("Restore finished before reaching data stage with: %s", err)
}

Print("Validate state during restore data")
validateState(h.dstCluster, "disabled", false, transfers0)
validateState(h.dstCluster, "disabled", false, transfers0, 0)

Print("Pause restore")
restoreCancel()
Expand All @@ -627,7 +651,11 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}

Print("Validate state during pause")
validateState(h.dstCluster, "disabled", true, transfers0)
validateState(h.dstCluster, "disabled", true, transfers0, 0)

Print("Change transfers and rate limit during pause")
setTransfersAndRateLimit(h.srcCluster, 9, 55)
setTransfersAndRateLimit(h.dstCluster, 9, 55)

reachedDataStageChan = make(chan struct{})
hangLAS = make(chan struct{})
Expand All @@ -639,10 +667,14 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
go runRestore(context.Background(), finishedRestore)

Print("Wait for data stage")
<-reachedDataStageChan
select {
case <-reachedDataStageChan:
case err := <-finishedRestore:
t.Fatalf("Restore finished before reaching data stage with: %s", err)
}

Print("Validate state during restore data after pause")
validateState(h.dstCluster, "disabled", false, transfers0)
validateState(h.dstCluster, "disabled", false, transfers0, 0)

Print("Release LAS")
close(hangLAS)
Expand All @@ -654,7 +686,7 @@ func TestRestoreTablesPreparationIntegration(t *testing.T) {
}

Print("Validate state after restore success")
validateState(h.dstCluster, "repair", true, transfers0)
validateState(h.dstCluster, "repair", true, transfers0, 0)

Print("Validate table contents")
h.validateIdenticalTables(t, []table{{ks: ks, tab: tab}})
Expand Down

0 comments on commit b60a232

Please sign in to comment.