Skip to content

Commit

Permalink
tests: cover pause/resume on backup's deduplication stage
Browse files Browse the repository at this point in the history
  • Loading branch information
karol-kokoszka committed Aug 9, 2024
1 parent 9cfeeb9 commit e0fa1f6
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 1 deletion.
3 changes: 3 additions & 0 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type Service struct {
scyllaClient scyllaclient.ProviderFunc
clusterSession cluster.SessionFunc
logger log.Logger

dth deduplicateTestHooks
}

func NewService(session gocqlx.Session, config Config, metrics metrics.BackupMetrics,
Expand Down Expand Up @@ -687,6 +689,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
return &bytes.Buffer{}
},
},
dth: s.dth,
}

// Map stages to worker functions
Expand Down
163 changes: 163 additions & 0 deletions pkg/service/backup/service_deduplicate_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright (C) 2024 ScyllaDB

//go:build all || integration
// +build all integration

package backup_test

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/scylladb/scylla-manager/v3/pkg/service/backup"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
. "github.com/scylladb/scylla-manager/v3/pkg/testutils"
"github.com/scylladb/scylla-manager/v3/pkg/testutils/db"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)

func TestBackupPauseResumeOnDeduplicationStage(t *testing.T) {
const (
testBucket = "backuptest-deduplication"
testKeyspace = "backuptest_deduplication"
)

location := s3Location(testBucket)
config := defaultConfig()

var (
session = db.CreateScyllaManagerDBSession(t)
h = newBackupTestHelper(t, session, config, location, nil)
clusterSession = db.CreateSessionAndDropAllKeyspaces(t, h.Client)
ctx, cancel = context.WithCancel(context.Background())
)

db.WriteData(t, clusterSession, testKeyspace, 3)

target := backup.Target{
Units: []backup.Unit{
{
Keyspace: testKeyspace,
},
},
DC: []string{"dc1"},
Location: []backupspec.Location{location},
Retention: 2,
RateLimit: []backup.DCLimit{
{"dc1", 1},
},
Continue: true,
}
if err := h.service.InitTarget(ctx, h.ClusterID, &target); err != nil {
t.Fatal(err)
}

Print("Given: backup is created out of the current cluster state")
var originalTotalSize int64
func() {
var deduplicatedOnFreshBackup int64
after := func(skipped, uploaded, size int64) {
deduplicatedOnFreshBackup += skipped
}
defer h.service.RemoveDeduplicateTestHooks()
h.service.SetDeduplicateTestHooks(func() {}, after)

if err := h.service.Backup(ctx, h.ClusterID, h.TaskID, h.RunID, target); err != nil {
t.Fatal(err)
}
Print("Then: nothing is deduplicated")
if deduplicatedOnFreshBackup != 0 {
t.Fatalf("Expected deduplicated 0 bytes on fresh backup, but was %v", deduplicatedOnFreshBackup)
}
}()

Print("Then: another backup to deduplicate everything (no delta)")
time.Sleep(time.Second)
func() {
var totalDeduplicated, totalUploaded, totalSize int64
after := func(skipped, uploaded, size int64) {
totalDeduplicated += skipped
totalUploaded += uploaded
totalSize += size
}
defer h.service.RemoveDeduplicateTestHooks()
h.service.SetDeduplicateTestHooks(func() {}, after)

if err := h.service.Backup(ctx, h.ClusterID, h.TaskID, h.RunID, target); err != nil {
t.Fatal(err)
}

Print("Then: everything is deduplicated")
if totalDeduplicated != totalSize {
t.Fatalf("Expected deduplicated %v bytes on delta 0 backup, but was %v", totalSize, totalDeduplicated)
}
Print("Then: nothing is uploaded")
if totalUploaded != 0 {
t.Fatalf("Expected uploaded 0 bytes on delta 0 backup, but was %v", totalUploaded)
}
Printf("Then: total size is equal to first backup")
if totalSize != originalTotalSize {
t.Fatalf("Expected total size to be %v, but was %v", originalTotalSize, totalSize)
}
}()

Print("Given: yet another backup is started on empty delta (no data changes) and paused od DEDUPLICATE stage")
time.Sleep(time.Second)
func() {
onlyOneHostToProcessMutex := sync.Mutex{}
var totalDeduplicated, totalUploaded, totalSize int64
before := func() {
onlyOneHostToProcessMutex.Lock()
}
after := func(skipped, uploaded, size int64) {
totalDeduplicated += skipped
totalUploaded += uploaded
totalSize += size
cancel()
onlyOneHostToProcessMutex.Unlock()
}
defer h.service.RemoveDeduplicateTestHooks()
h.service.SetDeduplicateTestHooks(before, after)

if err := h.service.Backup(ctx, h.ClusterID, h.TaskID, h.RunID, target); err != nil && !errors.Is(err, context.Canceled) {
t.Fatal(err)
}

Print("Then: not everything is either deduplicated and 0 is uploaded")
if totalDeduplicated == totalSize || totalUploaded > 0 {
t.Fatalf("Expected backup to be paused in the middle")
}
}()

Print("When: backup is resumed with the new RunID")
func() {
var totalDeduplicated, totalUploaded, totalSize int64
after := func(skipped, uploaded, size int64) {
totalDeduplicated += skipped
totalUploaded += uploaded
totalSize += size
}
defer h.service.RemoveDeduplicateTestHooks()
h.service.SetDeduplicateTestHooks(func() {}, after)

if err := h.service.Backup(context.Background(), h.ClusterID, h.TaskID, uuid.NewTime(), target); err != nil {
t.Fatal(err)
}
Printf("Then: total size is equal to first backup")
if totalSize != originalTotalSize {
t.Fatalf("Expected total size to be %v, but was %v", originalTotalSize, totalSize)
}
Print("Then: everything is deduplicated")
if totalDeduplicated != totalSize {
t.Fatalf("Expected deduplicated %v bytes on delta 0 backup, but was %v", totalSize, totalDeduplicated)
}
Print("Then: nothing is uploaded")
if totalUploaded != 0 {
t.Fatalf("Expected uploaded 0 bytes on delta 0 backup, but was %v", totalUploaded)
}
}()

}
30 changes: 30 additions & 0 deletions pkg/service/backup/service_testutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (C) 2024 ScyllaDB

//go:build all || integration
// +build all integration

package backup

type dthHelper struct {
before func()
after func(skipped, uploaded, size int64)
}

func (d dthHelper) beforeDeduplicateHost() {
d.before()
}

func (d dthHelper) afterDeduplicateHost(skipped, uploaded, size int64) {
d.after(skipped, uploaded, size)
}

func (s *Service) SetDeduplicateTestHooks(before func(), after func(skipped, uploaded, size int64)) {
s.dth = dthHelper{
before: before,
after: after,
}
}

func (s *Service) RemoveDeduplicateTestHooks() {
s.dth = nil
}
2 changes: 2 additions & 0 deletions pkg/service/backup/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type worker struct {
mu sync.Mutex

memoryPool *sync.Pool

dth deduplicateTestHooks
}

func (w *worker) WithLogger(logger log.Logger) *worker {
Expand Down
20 changes: 19 additions & 1 deletion pkg/service/backup/worker_deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
)

type deduplicateTestHooks interface {
beforeDeduplicateHost()
afterDeduplicateHost(skipped, uploaded, size int64)
}

// Deduplicate handles the deduplicate stage of the backup process.
// The implementation is expected to follow RFC document
// https://docs.google.com/document/d/1EtGlF6UGNy34D_7QsnCheaukp3UwVObZU56PBdd0CQ8/edit#heading=h.jl2qbpcarwp9
Expand All @@ -38,6 +43,19 @@ func (w *worker) Deduplicate(ctx context.Context, hosts []hostInfo, limits []DCL
}

func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error {
if w.dth != nil {
w.dth.beforeDeduplicateHost()
defer func(sd []snapshotDir) {
var skipped, uploaded, size int64
for _, v := range sd {
skipped += v.Progress.Skipped
uploaded += v.Progress.Uploaded
size += v.Progress.Size
}
w.dth.afterDeduplicateHost(skipped, uploaded, size)
}(w.hostSnapshotDirs(h))
}

if err := w.setRateLimit(ctx, h); err != nil {
return errors.Wrap(err, "set rate limit")
}
Expand Down Expand Up @@ -108,7 +126,7 @@ func (w *worker) basedOnUUIDGenerationAvailability(ctx context.Context, d snapsh
remoteSSTables map[string]struct{}, ssTablesGroupByID map[string][]string,
) (deduplicated int64, err error) {
// Per every SSTable files group, check if the name uses UUID to identify the SSTable generation.
// If the above is true, then check if files exists in the remote storage and remove it locally if exists in rmeote.
// If the above is true, then c4heck if files exists in the remote storage and remove it locally if exists in rmeote.
for id, ssTableContent := range ssTablesGroupByID {
// Check if id is an integer (no UUID)
if _, err := strconv.Atoi(id); err == nil {
Expand Down

0 comments on commit e0fa1f6

Please sign in to comment.