Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TMP: test scylla backup api #4111

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
Draft
Prev Previous commit
Next Next commit
feat(restore): use Scylla API for restore
  • Loading branch information
Michal-Leszczynski committed Dec 11, 2024

Verified

This commit was signed with the committer’s verified signature.
wezm Wesley Moore
commit e1511e5be9dd64116f5f4e3b102ff1fca5bc8fee
16 changes: 16 additions & 0 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
@@ -5,10 +5,12 @@ package restore
import (
"context"
"slices"
"strings"
"sync"

"github.com/pkg/errors"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/sstable"
)

// batchDispatcher is a tool for batching SSTables from
@@ -193,6 +195,20 @@ func (b batch) IDs() []string {
return ids
}

// TODO: should we handle situation where TOC does not exist?
// TOC returns a list of batch's sstable.ComponentTOC.
func (b batch) TOC() []string {
out := make([]string, 0, len(b.SSTables))
for _, sst := range b.SSTables {
for _, f := range sst.Files {
if strings.HasSuffix(f, string(sstable.ComponentTOC)) {
out = append(out, f)
}
}
}
return out
}

// ValidateAllDispatched returns error if not all SSTables were dispatched.
func (bd *batchDispatcher) ValidateAllDispatched() error {
bd.mu.Lock()
15 changes: 15 additions & 0 deletions pkg/service/restore/tables_worker.go
Original file line number Diff line number Diff line change
@@ -176,6 +176,9 @@ func (w *tablesWorker) restore(ctx context.Context) error {
return nil
}

// TODO: change it to something nicer - need to check Scylla version + check for config/flag.
const useScyllaAPI = true

func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
w.AwaitSchemaAgreement(ctx, w.clusterSession)
w.logger.Info(ctx, "Started restoring tables")
@@ -235,6 +238,18 @@ func (w *tablesWorker) stageRestoreData(ctx context.Context) error {
}
w.onBatchDispatch(ctx, b, host)

if len(b.VersionedSSTables()) == 0 && useScyllaAPI {
if err := w.scyllaRestore(ctx, host, b); err != nil {
err = multierr.Append(errors.Wrap(err, "restore batch with Scylla API"), bd.ReportFailure(host, b))
w.logger.Error(ctx, "Failed to restore batch with Scylla API",
"host", host,
"error", err)
} else {
bd.ReportSuccess(b)
}
continue
}

pr, err := w.newRunProgress(ctx, hi, b)
if err != nil {
err = multierr.Append(errors.Wrap(err, "create new run progress"), bd.ReportFailure(hi.Host, b))
90 changes: 90 additions & 0 deletions pkg/service/restore/tablesdir_worker.go
Original file line number Diff line number Diff line change
@@ -5,13 +5,15 @@ package restore
import (
"context"
"path"
"strings"
"time"

"github.com/pkg/errors"
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/swagger/gen/scylla/v1/models"
)

func (w *tablesWorker) restoreBatch(ctx context.Context, b batch, pr *RunProgress) (err error) {
@@ -274,3 +276,91 @@ func (w *tablesWorker) onLasEnd(ctx context.Context, b batch, pr *RunProgress) {
w.logger.Info(ctx, "Restored batch", "host", pr.Host)
w.insertRunProgress(ctx, pr)
}

func (w *tablesWorker) scyllaRestore(ctx context.Context, host string, b batch) error {
if err := w.checkAvailableDiskSpace(ctx, host); err != nil {
return errors.Wrap(err, "validate free disk space")
}
// TODO: we probably should attach to job, but we don't do it for a regular restore anyway

// TODO: resolve endpoint by either:
// - making agent return endpoint information to SM <- preferred
// - allowing for specifying endpoint instead of backed in --location flag
prefix, ok := strings.CutPrefix(b.RemoteSSTableDir, b.Location.Path)
if !ok {
return errors.Errorf("")
}
id, err := w.client.ScyllaRestore(ctx, host, "192.168.200.99", b.Location.Path, prefix, b.Keyspace, b.Table, b.TOC())
if err != nil {
return errors.Wrap(err, "restore")
}

pr := &RunProgress{
ClusterID: w.run.ClusterID,
TaskID: w.run.TaskID,
RunID: w.run.ID,
RemoteSSTableDir: b.RemoteSSTableDir,
Keyspace: b.Keyspace,
Table: b.Table,
Host: host,
ShardCnt: int64(w.hostShardCnt[host]),
ScyllaTaskID: id,
SSTableID: b.IDs(),
}
w.insertRunProgress(ctx, pr)

return w.scyllaWaitTask(ctx, pr, b)
}

func (w *tablesWorker) scyllaWaitTask(ctx context.Context, pr *RunProgress, b batch) (err error) {
defer func() {
// On error abort task
if err != nil {
if e := w.client.ScyllaAbortTask(context.Background(), pr.Host, pr.ScyllaTaskID); e != nil {
w.logger.Error(ctx, "Failed to abort task",
"host", pr.Host,
"id", pr.ScyllaTaskID,
"error", e,
)
}
}
}()

for {
if ctx.Err() != nil {
return ctx.Err()
}

task, err := w.client.ScyllaWaitTask(ctx, pr.Host, pr.ScyllaTaskID, int64(w.config.LongPollingTimeoutSeconds))
if err != nil {
return errors.Wrap(err, "wait for scylla task")
}
w.scyllaUpdateProgress(ctx, pr, b, task)
switch scyllaclient.ScyllaTaskState(task.State) {
case scyllaclient.ScyllaTaskStateFailed:
return errors.Errorf("task error (%s): %s", pr.ScyllaTaskID, task.Error)
case scyllaclient.ScyllaTaskStateDone:
return nil
}
}
}

func (w *worker) scyllaUpdateProgress(ctx context.Context, pr *RunProgress, b batch, task *models.TaskStatus) {
// If so, we shouldn't display them in on the sctool side.
pr.DownloadStartedAt = nil
pr.RestoreStartedAt = nil
if t := time.Time(task.StartTime); !t.IsZero() {
pr.DownloadStartedAt = &t
pr.RestoreStartedAt = &t
}
pr.DownloadCompletedAt = nil
pr.RestoreCompletedAt = nil
if t := time.Time(task.StartTime); !t.IsZero() {
pr.DownloadCompletedAt = &t
pr.RestoreCompletedAt = &t
}
pr.Error = task.Error
// TODO: another arg for another struct
pr.Downloaded = b.Size * int64(task.ProgressCompleted/task.ProgressTotal)
w.insertRunProgress(ctx, pr)
}