Skip to content

Commit

Permalink
fix deletion for object_disk_path and embedded backups, after `up…
Browse files Browse the repository at this point in the history
…load` to properly respect `backups_to_keep_remote`, refactoring `watch` command, after #804
  • Loading branch information
Slach committed Feb 6, 2024
1 parent a3b4ae6 commit 517c3c8
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 119 deletions.
6 changes: 6 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# v2.4.28
IMPROVEMENT
- refactoring `watch` command, after https://github.com/Altinity/clickhouse-backup/pull/804
BUG FIXES
- fix deletion for `object_disk_path` and `embedded` backups, after `upload` to properly respect `backups_to_keep_remote`

# v2.4.27
BUG FIXES
- fix deletion for `object_disk_path` (all backups with S3, GCS over S3, AZBLOB disks from 2.4.0-2.4.25 didn't delete properly their data from backup bucket)
Expand Down
79 changes: 47 additions & 32 deletions pkg/backup/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ func (b *Backuper) RemoveOldBackupsLocal(ctx context.Context, keepLastBackup boo
if err != nil {
return err
}
backupsToDelete := GetBackupsToDelete(backupList, keep)
backupsToDelete := GetBackupsToDeleteLocal(backupList, keep)
for _, backup := range backupsToDelete {
if err := b.RemoveBackupLocal(ctx, backup.BackupName, disks); err != nil {
return err
if deleteErr := b.RemoveBackupLocal(ctx, backup.BackupName, disks); deleteErr != nil {
return deleteErr
}
}
return nil
Expand Down Expand Up @@ -146,22 +146,10 @@ func (b *Backuper) RemoveBackupLocal(ctx context.Context, backupName string, dis

for _, backup := range backupList {
if backup.BackupName == backupName {
var skip bool
skip, err = b.skipIfTheSameRemoteBackupPresent(ctx, backup.BackupName, backup.Tags)
err = b.cleanEmbeddedAndObjectDiskLocalIfSameRemoteNotPresent(ctx, backupName, disks, backup, hasObjectDisks, log)
if err != nil {
return err
}
if !skip && strings.Contains(backup.Tags, "embedded") {
if err = b.cleanLocalEmbedded(ctx, backup, disks); err != nil {
log.Warnf("b.cleanLocalEmbedded return error: %v", err)
return err
}
}
if !skip && hasObjectDisks {
if err = b.cleanBackupObjectDisks(ctx, backupName); err != nil {
return err
}
}
for _, disk := range disks {
backupPath := path.Join(disk.Path, "backup", backupName)
if disk.IsBackup {
Expand All @@ -183,6 +171,25 @@ func (b *Backuper) RemoveBackupLocal(ctx context.Context, backupName string, dis
return fmt.Errorf("'%s' is not found on local storage", backupName)
}

func (b *Backuper) cleanEmbeddedAndObjectDiskLocalIfSameRemoteNotPresent(ctx context.Context, backupName string, disks []clickhouse.Disk, backup LocalBackup, hasObjectDisks bool, log *apexLog.Entry) error {
skip, err := b.skipIfTheSameRemoteBackupPresent(ctx, backup.BackupName, backup.Tags)
if err != nil {
return err
}
if !skip && strings.Contains(backup.Tags, "embedded") {
if err = b.cleanLocalEmbedded(ctx, backup, disks); err != nil {
log.Warnf("b.cleanLocalEmbedded return error: %v", err)
return err
}
}
if !skip && hasObjectDisks {
if err = b.cleanBackupObjectDisks(ctx, backupName); err != nil {
return err
}
}
return nil
}

func (b *Backuper) hasObjectDisksLocal(backupList []LocalBackup, backupName string, disks []clickhouse.Disk) bool {
for _, backup := range backupList {
if backup.BackupName == backupName && !strings.Contains(backup.Tags, "embedded") {
Expand Down Expand Up @@ -289,22 +296,12 @@ func (b *Backuper) RemoveBackupRemote(ctx context.Context, backupName string) er
}
for _, backup := range backupList {
if backup.BackupName == backupName {
if skip, err := b.skipIfSameLocalBackupPresent(ctx, backup.BackupName, backup.Tags); err != nil {
err = b.cleanEmbeddedAndObjectDiskRemoteIfSameLocalNotPresent(ctx, backup, log)
if err != nil {
return err
} else if !skip {
if strings.Contains(backup.Tags, "embedded") {
if err = b.cleanRemoteEmbedded(ctx, backup, bd); err != nil {
log.Warnf("b.cleanRemoteEmbedded return error: %v", err)
return err
}
} else if b.hasObjectDisksRemote(backup) {
if err = b.cleanBackupObjectDisks(ctx, backup.BackupName); err != nil {
log.Warnf("b.cleanBackupObjectDisks return error: %v", err)
}
}
}

if err = bd.RemoveBackup(ctx, backup); err != nil {
if err = bd.RemoveBackupRemote(ctx, backup); err != nil {
log.Warnf("bd.RemoveBackup return error: %v", err)
return err
}
Expand All @@ -320,6 +317,24 @@ func (b *Backuper) RemoveBackupRemote(ctx context.Context, backupName string) er
return fmt.Errorf("'%s' is not found on remote storage", backupName)
}

func (b *Backuper) cleanEmbeddedAndObjectDiskRemoteIfSameLocalNotPresent(ctx context.Context, backup storage.Backup, log *apexLog.Entry) error {
if skip, err := b.skipIfSameLocalBackupPresent(ctx, backup.BackupName, backup.Tags); err != nil {
return err
} else if !skip {
if strings.Contains(backup.Tags, "embedded") {
if err = b.cleanRemoteEmbedded(ctx, backup); err != nil {
log.Warnf("b.cleanRemoteEmbedded return error: %v", err)
return err
}
} else if b.hasObjectDisksRemote(backup) {
if err = b.cleanBackupObjectDisks(ctx, backup.BackupName); err != nil {
log.Warnf("b.cleanBackupObjectDisks return error: %v", err)
}
}
}
return nil
}

func (b *Backuper) hasObjectDisksRemote(backup storage.Backup) bool {
for _, diskType := range backup.DiskTypes {
if b.isDiskTypeObject(diskType) {
Expand All @@ -329,13 +344,13 @@ func (b *Backuper) hasObjectDisksRemote(backup storage.Backup) bool {
return false
}

func (b *Backuper) cleanRemoteEmbedded(ctx context.Context, backup storage.Backup, bd *storage.BackupDestination) error {
func (b *Backuper) cleanRemoteEmbedded(ctx context.Context, backup storage.Backup) error {
if err := object_disk.InitCredentialsAndConnections(ctx, b.ch, b.cfg, b.cfg.ClickHouse.EmbeddedBackupDisk); err != nil {
return err
}
return bd.Walk(ctx, backup.BackupName+"/", true, func(ctx context.Context, f storage.RemoteFile) error {
return b.dst.Walk(ctx, backup.BackupName+"/", true, func(ctx context.Context, f storage.RemoteFile) error {
if !strings.HasSuffix(f.Name(), ".json") {
r, err := bd.GetFileReader(ctx, path.Join(backup.BackupName, f.Name()))
r, err := b.dst.GetFileReader(ctx, path.Join(backup.BackupName, f.Name()))
if err != nil {
return err
}
Expand Down
38 changes: 37 additions & 1 deletion pkg/backup/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,48 @@ func (b *Backuper) Upload(backupName, diffFrom, diffFromRemote, tablePattern str
Info("done")

// Clean
if err = b.dst.RemoveOldBackups(ctx, b.cfg.General.BackupsToKeepRemote); err != nil {
if err = b.RemoveOldBackupsRemote(ctx); err != nil {
return fmt.Errorf("can't remove old backups on remote storage: %v", err)
}
return nil
}

func (b *Backuper) RemoveOldBackupsRemote(ctx context.Context) error {

if b.cfg.General.BackupsToKeepRemote < 1 {
return nil
}
start := time.Now()
backupList, err := b.dst.BackupList(ctx, true, "")
if err != nil {
return err
}
backupsToDelete := storage.GetBackupsToDeleteRemote(backupList, b.cfg.General.BackupsToKeepRemote)
b.dst.Log.WithFields(apexLog.Fields{
"operation": "RemoveOldBackupsRemote",
"duration": utils.HumanizeDuration(time.Since(start)),
}).Info("calculate backup list for delete remote")
for _, backupToDelete := range backupsToDelete {
startDelete := time.Now()
err = b.cleanEmbeddedAndObjectDiskRemoteIfSameLocalNotPresent(ctx, backupToDelete, b.dst.Log)
if err != nil {
return err
}

if err := b.dst.RemoveBackupRemote(ctx, backupToDelete); err != nil {
b.dst.Log.Warnf("can't deleteKey %s return error : %v", backupToDelete.BackupName, err)
}
b.dst.Log.WithFields(apexLog.Fields{
"operation": "RemoveOldBackupsRemote",
"location": "remote",
"backup": backupToDelete.BackupName,
"duration": utils.HumanizeDuration(time.Since(startDelete)),
}).Info("done")
}
b.dst.Log.WithFields(apexLog.Fields{"operation": "RemoveOldBackupsRemote", "duration": utils.HumanizeDuration(time.Since(start))}).Info("done")
return nil
}

func (b *Backuper) uploadSingleBackupFile(ctx context.Context, localFile, remoteFile string) error {
if b.resume && b.resumableState.IsAlreadyProcessedBool(remoteFile) {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/backup/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"sort"
)

func GetBackupsToDelete(backups []LocalBackup, keep int) []LocalBackup {
func GetBackupsToDeleteLocal(backups []LocalBackup, keep int) []LocalBackup {
if len(backups) > keep {
sort.SliceStable(backups, func(i, j int) bool {
return backups[i].CreationDate.After(backups[j].CreationDate)
Expand Down
97 changes: 53 additions & 44 deletions pkg/backup/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,53 +83,10 @@ func (b *Backuper) Watch(watchInterval, fullInterval, watchBackupNameTemplate, t
lastBackup := time.Now()
lastFullBackup := time.Now()

remoteBackups, err := b.GetRemoteBackups(ctx, true)
if err != nil {
return err
}
backupTemplateName, err := b.ch.ApplyMacros(ctx, b.cfg.General.WatchBackupNameTemplate)
prevBackupName, prevBackupType, lastBackup, lastFullBackup, backupType, err = b.calculatePrevBackupNameAndType(ctx, prevBackupName, prevBackupType, lastBackup, lastFullBackup, backupType)
if err != nil {
return err
}
backupTemplateNamePrepR := regexp.MustCompile(`{type}|{time:([^}]+)}`)
backupTemplateNameR := regexp.MustCompile(backupTemplateNamePrepR.ReplaceAllString(backupTemplateName, `\S+`))

for _, remoteBackup := range remoteBackups {
if remoteBackup.Broken == "" && backupTemplateNameR.MatchString(remoteBackup.BackupName) {
prevBackupName = remoteBackup.BackupName
if strings.Contains(remoteBackup.BackupName, "increment") {
prevBackupType = "increment"
lastBackup = remoteBackup.CreationDate
} else {
prevBackupType = "full"
lastBackup = remoteBackup.CreationDate
lastFullBackup = remoteBackup.CreationDate
}
}
}
if prevBackupName != "" {
now := time.Now()
timeBeforeDoBackup := int(b.cfg.General.WatchDuration.Seconds() - now.Sub(lastBackup).Seconds())
timeBeforeDoFullBackup := int(b.cfg.General.FullDuration.Seconds() - now.Sub(lastFullBackup).Seconds())
b.log.Infof("Time before do backup %v", timeBeforeDoBackup)
b.log.Infof("Time before do full backup %v", timeBeforeDoFullBackup)
if timeBeforeDoBackup > 0 && timeBeforeDoFullBackup > 0 {
b.log.Infof("Waiting %d seconds until continue doing backups due watch interval", timeBeforeDoBackup)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(b.cfg.General.WatchDuration - now.Sub(lastBackup)):
}
}
now = time.Now()
lastBackup = now
if b.cfg.General.FullDuration.Seconds()-time.Now().Sub(lastFullBackup).Seconds() <= 0 {
backupType = "full"
lastFullBackup = now
} else {
backupType = "increment"
}
}

createRemoteErrCount := 0
deleteLocalErrCount := 0
Expand Down Expand Up @@ -226,3 +183,55 @@ func (b *Backuper) Watch(watchInterval, fullInterval, watchBackupNameTemplate, t
}
}
}

// calculatePrevBackupNameAndType - https://github.com/Altinity/clickhouse-backup/pull/804
func (b *Backuper) calculatePrevBackupNameAndType(ctx context.Context, prevBackupName string, prevBackupType string, lastBackup time.Time, lastFullBackup time.Time, backupType string) (string, string, time.Time, time.Time, string, error) {
remoteBackups, err := b.GetRemoteBackups(ctx, true)
if err != nil {
return "", "", time.Time{}, time.Time{}, "", err
}
backupTemplateName, err := b.ch.ApplyMacros(ctx, b.cfg.General.WatchBackupNameTemplate)
if err != nil {
return "", "", time.Time{}, time.Time{}, "", err
}
backupTemplateNamePrepareRE := regexp.MustCompile(`{type}|{time:([^}]+)}`)
backupTemplateNameRE := regexp.MustCompile(backupTemplateNamePrepareRE.ReplaceAllString(backupTemplateName, `\S+`))

for _, remoteBackup := range remoteBackups {
if remoteBackup.Broken == "" && backupTemplateNameRE.MatchString(remoteBackup.BackupName) {
prevBackupName = remoteBackup.BackupName
if strings.Contains(remoteBackup.BackupName, "increment") {
prevBackupType = "increment"
lastBackup = remoteBackup.CreationDate
} else {
prevBackupType = "full"
lastBackup = remoteBackup.CreationDate
lastFullBackup = remoteBackup.CreationDate
}
}
}
if prevBackupName != "" {
now := time.Now()
timeBeforeDoBackup := int(b.cfg.General.WatchDuration.Seconds() - now.Sub(lastBackup).Seconds())
timeBeforeDoFullBackup := int(b.cfg.General.FullDuration.Seconds() - now.Sub(lastFullBackup).Seconds())
b.log.Infof("Time before do backup %v", timeBeforeDoBackup)
b.log.Infof("Time before do full backup %v", timeBeforeDoFullBackup)
if timeBeforeDoBackup > 0 && timeBeforeDoFullBackup > 0 {
b.log.Infof("Waiting %d seconds until continue doing backups due watch interval", timeBeforeDoBackup)
select {
case <-ctx.Done():
return "", "", time.Time{}, time.Time{}, "", ctx.Err()
case <-time.After(b.cfg.General.WatchDuration - now.Sub(lastBackup)):
}
}
now = time.Now()
lastBackup = now
if b.cfg.General.FullDuration.Seconds()-time.Now().Sub(lastFullBackup).Seconds() <= 0 {
backupType = "full"
lastFullBackup = now
} else {
backupType = "increment"
}
}
return prevBackupName, prevBackupType, lastBackup, lastFullBackup, backupType, nil
}
33 changes: 1 addition & 32 deletions pkg/storage/general.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/Altinity/clickhouse-backup/v2/pkg/clickhouse"
"github.com/Altinity/clickhouse-backup/v2/pkg/config"
"github.com/Altinity/clickhouse-backup/v2/pkg/progressbar"
"github.com/Altinity/clickhouse-backup/v2/pkg/utils"
"github.com/eapache/go-resiliency/retrier"
"io"
"os"
Expand Down Expand Up @@ -58,37 +57,7 @@ type BackupDestination struct {

var metadataCacheLock sync.RWMutex

func (bd *BackupDestination) RemoveOldBackups(ctx context.Context, keep int) error {
if keep < 1 {
return nil
}
start := time.Now()
backupList, err := bd.BackupList(ctx, true, "")
if err != nil {
return err
}
backupsToDelete := GetBackupsToDelete(backupList, keep)
bd.Log.WithFields(apexLog.Fields{
"operation": "RemoveOldBackups",
"duration": utils.HumanizeDuration(time.Since(start)),
}).Info("calculate backup list for deleteKey")
for _, backupToDelete := range backupsToDelete {
startDelete := time.Now()
if err := bd.RemoveBackup(ctx, backupToDelete); err != nil {
bd.Log.Warnf("can't deleteKey %s return error : %v", backupToDelete.BackupName, err)
}
bd.Log.WithFields(apexLog.Fields{
"operation": "RemoveOldBackups",
"location": "remote",
"backup": backupToDelete.BackupName,
"duration": utils.HumanizeDuration(time.Since(startDelete)),
}).Info("done")
}
bd.Log.WithFields(apexLog.Fields{"operation": "RemoveOldBackups", "duration": utils.HumanizeDuration(time.Since(start))}).Info("done")
return nil
}

func (bd *BackupDestination) RemoveBackup(ctx context.Context, backup Backup) error {
func (bd *BackupDestination) RemoveBackupRemote(ctx context.Context, backup Backup) error {
if bd.Kind() == "SFTP" || bd.Kind() == "FTP" {
return bd.DeleteFile(ctx, backup.BackupName)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"
)

func GetBackupsToDelete(backups []Backup, keep int) []Backup {
func GetBackupsToDeleteRemote(backups []Backup, keep int) []Backup {
if len(backups) > keep {
// sort backup ascending
sort.SliceStable(backups, func(i, j int) bool {
Expand Down
Loading

0 comments on commit 517c3c8

Please sign in to comment.