Skip to content

Commit

Permalink
feat(backup): delete files in batches (#3928)
Browse files Browse the repository at this point in the history
Fixes #3928
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed Aug 9, 2024
1 parent 8af10ed commit 8bdc4f8
Showing 1 changed file with 17 additions and 32 deletions.
49 changes: 17 additions & 32 deletions pkg/service/backup/worker_deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,21 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error {
}
}

deduplicatedByUUID, err := w.deduplicateUUIDSStables(ctx, d.Path, h.IP, remoteSSTableBundles, localSSTableBundles)
deduplicatedUUIDSSTables := w.deduplicateUUIDSStables(remoteSSTableBundles, localSSTableBundles)
deduplicatedIntSSTables, err := w.deduplicateIntSSTables(ctx, h.IP, dataDst, d.Path, remoteSSTableBundles, localSSTableBundles)
if err != nil {
return errors.Wrap(err, "deduplicate based on UUID as generation id content")
return errors.Wrap(err, "deduplication based on .crc32 content")
}
d.Progress.Skipped += deduplicatedByUUID
deduplicated := make([]string, 0, len(deduplicatedUUIDSSTables)+len(deduplicatedIntSSTables))

deduplicatedByCrc32, err := w.deduplicateIntSSTables(ctx, h.IP, dataDst, d.Path, remoteSSTableBundles, localSSTableBundles)
for _, fi := range deduplicatedUUIDSSTables {
d.Progress.Skipped += fi.Size
deduplicated = append(deduplicated, fi.Name)
}
_, err = w.Client.RcloneDeletePathsInBatches(ctx, h.IP, d.Path, deduplicated, 1000)
if err != nil {
return errors.Wrap(err, "deduplication based on .crc32 content")
return errors.Wrap(err, "delete deduplicated files")
}
d.Progress.Skipped += deduplicatedByCrc32

return nil
}
Expand All @@ -112,11 +116,10 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error {
return parallel.Run(len(dirs), 1, f, notify)
}

func (w *worker) deduplicateUUIDSStables(ctx context.Context, host string, snapshotDir string,
remoteSSTables, localSSTables *sstableBundlesByID,
) (deduplicated int64, err error) {
func (w *worker) deduplicateUUIDSStables(remoteSSTables, localSSTables *sstableBundlesByID) []fileInfo {
// SSTable bundle with UUID generation ID can be manually deduplicated
// when SSTable bundle with the same UUID is already present on the remote.
deduplicated := make([]fileInfo, 0)
for id, localBundle := range localSSTables.uuidID {
remoteBundle, ok := remoteSSTables.uuidID[id]
if !ok {
Expand All @@ -125,24 +128,14 @@ func (w *worker) deduplicateUUIDSStables(ctx context.Context, host string, snaps
if !isSSTableBundleSizeEqual(localBundle, remoteBundle) {
continue
}
// Remove duplicated SSTable from local snapshot
for _, fi := range localBundle {
localPath := path.Join(snapshotDir, fi.Name)
w.Logger.Debug(ctx, "Removing local snapshot file (deduplication based on generation UUID)",
"host", host, "file", localPath)
if err := w.Client.RcloneDeleteFile(ctx, host, localPath); err != nil {
return deduplicated, errors.Wrapf(err, "delete local snapshot's SSTable file %s", localPath)
}
deduplicated += fi.Size
}
deduplicated = append(deduplicated, localBundle...)
}

return deduplicated, nil
return deduplicated
}

func (w *worker) deduplicateIntSSTables(ctx context.Context, host string, remoteDir, localDir string,
remoteSSTables, localSSTables *sstableBundlesByID,
) (deduplicated int64, err error) {
) (deduplicated []fileInfo, err error) {
// Reference to SSTables 3.0 Data File Format
// https://opensource.docs.scylladb.com/stable/architecture/sstable/sstable3/sstables-3-data-file-format.html

Expand Down Expand Up @@ -177,16 +170,8 @@ func (w *worker) deduplicateIntSSTables(ctx context.Context, host string, remote
return deduplicated, errors.Wrapf(err, "get content of local CRC32 %s", localCRC32Path)
}

if !bytes.Equal(localCRC32, remoteCRC32) {
continue
}
for _, fi := range localBundle {
localPath := path.Join(localDir, fi.Name)
w.Logger.Debug(ctx, "Removing local snapshot file (deduplication based on .crc32)", "host", host, "file", localPath)
if err := w.Client.RcloneDeleteFile(ctx, host, localPath); err != nil {
return deduplicated, errors.Wrapf(err, "delete local snapshot's SSTable file %s", localPath)
}
deduplicated += fi.Size
if bytes.Equal(localCRC32, remoteCRC32) {
deduplicated = append(deduplicated, localBundle...)
}
}
return deduplicated, nil
Expand Down

0 comments on commit 8bdc4f8

Please sign in to comment.