Skip to content

Commit

Permalink
added --diff-from-remote parameter for create command, will copy …
Browse files Browse the repository at this point in the history
…only new data parts object disk data, also allows to download properly object disk data from required backup during `restore`, fix #865
  • Loading branch information
Slach committed Mar 25, 2024
1 parent 14f1001 commit dd87132
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 37 deletions.
22 changes: 15 additions & 7 deletions pkg/backup/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,11 @@ func (b *Backuper) cleanEmbeddedAndObjectDiskLocalIfSameRemoteNotPresent(ctx con
return err
}
if !skip && (hasObjectDisks || (b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk == "")) {
if err = b.cleanBackupObjectDisks(ctx, backupName); err != nil {
log.Warnf("b.cleanBackupObjectDisks return error: %v", err)
if deletedKeys, deleteErr := b.cleanBackupObjectDisks(ctx, backupName); deleteErr != nil {
log.Warnf("b.cleanBackupObjectDisks return error: %v", deleteErr)
return err
} else {
log.Infof("cleanBackupObjectDisks deleted %d keys", deletedKeys)
}
}
if !skip && (b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk != "") {
Expand Down Expand Up @@ -335,8 +337,10 @@ func (b *Backuper) cleanEmbeddedAndObjectDiskRemoteIfSameLocalNotPresent(ctx con
return nil
}
if b.hasObjectDisksRemote(backup) || (b.isEmbedded && b.cfg.ClickHouse.EmbeddedBackupDisk == "") {
if err = b.cleanBackupObjectDisks(ctx, backup.BackupName); err != nil {
log.Warnf("b.cleanBackupObjectDisks return error: %v", err)
if deletedKeys, deleteErr := b.cleanBackupObjectDisks(ctx, backup.BackupName); deleteErr != nil {
log.Warnf("b.cleanBackupObjectDisks return error: %v", deleteErr)
} else {
log.Infof("cleanBackupObjectDisks deleted %d keys", deletedKeys)
}
return nil
}
Expand Down Expand Up @@ -379,22 +383,26 @@ func (b *Backuper) cleanRemoteEmbedded(ctx context.Context, backup storage.Backu
}

// cleanBackupObjectDisks - recursive delete <object_disks_path>/<backupName>
func (b *Backuper) cleanBackupObjectDisks(ctx context.Context, backupName string) error {
func (b *Backuper) cleanBackupObjectDisks(ctx context.Context, backupName string) (uint, error) {
objectDiskPath, err := b.getObjectDiskPath()
if err != nil {
return err
return 0, err
}
//walk absolute path, delete relative
return b.dst.WalkAbsolute(ctx, path.Join(objectDiskPath, backupName), true, func(ctx context.Context, f storage.RemoteFile) error {
deletedKeys := uint(0)
walkErr := b.dst.WalkAbsolute(ctx, path.Join(objectDiskPath, backupName), true, func(ctx context.Context, f storage.RemoteFile) error {
if b.dst.Kind() == "azblob" {
if f.Size() > 0 || !f.LastModified().IsZero() {
deletedKeys += 1
return b.dst.DeleteFileFromObjectDiskBackup(ctx, path.Join(backupName, f.Name()))
} else {
return nil
}
}
deletedKeys += 1
return b.dst.DeleteFileFromObjectDiskBackup(ctx, path.Join(backupName, f.Name()))
})
return deletedKeys, walkErr
}

func (b *Backuper) skipIfSameLocalBackupPresent(ctx context.Context, backupName, tags string) (bool, error) {
Expand Down
25 changes: 20 additions & 5 deletions pkg/backup/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
idx := i
dataGroup.Go(func() error {
start := time.Now()
if err := b.downloadTableData(dataCtx, remoteBackup.BackupMetadata, *tableMetadataAfterDownload[idx]); err != nil {
if err := b.downloadTableData(dataCtx, remoteBackup.BackupMetadata, *tableMetadataAfterDownload[idx], disks); err != nil {
return err
}
log.
Expand Down Expand Up @@ -287,6 +287,21 @@ func (b *Backuper) Download(backupName string, tablePattern string, partitions [
b.resumableState.Close()
}

//clean partially downloaded requiredBackup
if remoteBackup.RequiredBackup != "" {
if localBackups, _, err = b.GetLocalBackups(ctx, disks); err == nil {
for _, localBackup := range localBackups {
if localBackup.BackupName != remoteBackup.BackupName && localBackup.DataSize+localBackup.CompressedSize+localBackup.MetadataSize == 0 {
if err = b.RemoveBackupLocal(ctx, localBackup.BackupName, disks); err != nil {
return fmt.Errorf("downloadWithDiff -> RemoveBackupLocal cleaning error: %v", err)
}
}
}
} else {
return fmt.Errorf("downloadWithDiff -> GetLocalBackups cleaning error: %v", err)
}
}

log.
WithField("duration", utils.HumanizeDuration(time.Since(startDownload))).
WithField("size", utils.FormatBytes(dataSize+metadataSize+rbacSize+configSize)).
Expand Down Expand Up @@ -567,7 +582,7 @@ func (b *Backuper) downloadBackupRelatedDir(ctx context.Context, remoteBackup st
return uint64(remoteFileInfo.Size()), nil
}

func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.BackupMetadata, table metadata.TableMetadata) error {
func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.BackupMetadata, table metadata.TableMetadata, disks []clickhouse.Disk) error {
log := b.log.WithField("logger", "downloadTableData")
dbAndTableDir := path.Join(common.TablePathEncode(table.Database), common.TablePathEncode(table.Table))
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -664,8 +679,8 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
return fmt.Errorf("one of downloadTableData go-routine return error: %v", err)
}

if !b.isEmbedded {
err := b.downloadDiffParts(ctx, remoteBackup, table, dbAndTableDir)
if !b.isEmbedded && remoteBackup.RequiredBackup != "" {
err := b.downloadDiffParts(ctx, remoteBackup, table, dbAndTableDir, disks)
if err != nil {
return err
}
Expand All @@ -674,7 +689,7 @@ func (b *Backuper) downloadTableData(ctx context.Context, remoteBackup metadata.
return nil
}

func (b *Backuper) downloadDiffParts(ctx context.Context, remoteBackup metadata.BackupMetadata, table metadata.TableMetadata, dbAndTableDir string) error {
func (b *Backuper) downloadDiffParts(ctx context.Context, remoteBackup metadata.BackupMetadata, table metadata.TableMetadata, dbAndTableDir string, disks []clickhouse.Disk) error {
log := b.log.WithField("operation", "downloadDiffParts")
log.WithField("table", fmt.Sprintf("%s.%s", table.Database, table.Table)).Debug("start")
start := time.Now()
Expand Down
45 changes: 37 additions & 8 deletions pkg/backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,13 @@ func (b *Backuper) Restore(backupName, tablePattern string, databaseMapping, par
return nil
}
}

if b.cfg.ClickHouse.UseEmbeddedBackupRestore && b.cfg.ClickHouse.EmbeddedBackupDisk == "" {
isObjectDiskPresents := false
for _, d := range disks {
if isObjectDiskPresents = b.isDiskTypeObject(d.Type); isObjectDiskPresents {
break
}
}
if (b.cfg.ClickHouse.UseEmbeddedBackupRestore && b.cfg.ClickHouse.EmbeddedBackupDisk == "") || isObjectDiskPresents {
if b.dst, err = storage.NewBackupDestination(ctx, b.cfg, b.ch, false, backupName); err != nil {
return err
}
Expand Down Expand Up @@ -1028,10 +1033,10 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
srcDiskName := diskName
// copy from required backup for required data parts, https://github.com/Altinity/clickhouse-backup/issues/865
if part.Required && backupMetadata.RequiredBackup != "" {
var findRecusiveErr error
srcBackupName, srcDiskName, findRecusiveErr = b.findObjectDiskPartRecursive(backupMetadata, backupTable, part, diskName)
if findRecusiveErr != nil {
return findRecusiveErr
var findRecursiveErr error
srcBackupName, srcDiskName, findRecursiveErr = b.findObjectDiskPartRecursive(ctx, backupMetadata, backupTable, part, diskName, log)
if findRecursiveErr != nil {
return findRecursiveErr
}
}
walkErr := filepath.Walk(partPath, func(fPath string, fInfo fs.FileInfo, err error) error {
Expand Down Expand Up @@ -1103,8 +1108,32 @@ func (b *Backuper) downloadObjectDiskParts(ctx context.Context, backupName strin
return nil
}

func (b *Backuper) findObjectDiskPartRecursive(backupMetadata metadata.BackupMetadata, table metadata.TableMetadata, part metadata.Part, name string) (string, string, error) {
return "", "", fmt.Errorf("not implemented")
func (b *Backuper) findObjectDiskPartRecursive(ctx context.Context, backup metadata.BackupMetadata, table metadata.TableMetadata, part metadata.Part, diskName string, log *apexLog.Entry) (string, string, error) {
if !part.Required {
return backup.BackupName, diskName, nil
}
if part.Required && backup.RequiredBackup == "" {
return "", "", fmt.Errorf("part %s have required flag, in %s but backup.RequiredBackup is empty", part.Name, backup.BackupName)
}
requiredBackup, err := b.ReadBackupMetadataRemote(ctx, backup.RequiredBackup)
if err != nil {
return "", "", err
}
var requiredTable *metadata.TableMetadata
requiredTable, err = b.downloadTableMetadataIfNotExists(ctx, requiredBackup.BackupName, log, metadata.TableTitle{Database: table.Database, Table: table.Table})
// @todo think about add check what if disk type could changed (should already restricted, cause upload seek part in the same disk name)
for requiredDiskName, parts := range requiredTable.Parts {
for _, requiredPart := range parts {
if requiredPart.Name == part.Name {
if requiredPart.Required {
return b.findObjectDiskPartRecursive(ctx, *requiredBackup, *requiredTable, requiredPart, requiredDiskName, log)
}
return requiredBackup.BackupName, requiredDiskName, nil
}
}

}
return "", "", fmt.Errorf("part %s have required flag in %s, but not found in %s", part.Name, backup.BackupName, backup.RequiredBackup)
}

func (b *Backuper) checkMissingTables(tablesForRestore ListOfTables, chTables []clickhouse.Table) []string {
Expand Down
32 changes: 16 additions & 16 deletions pkg/filesystemhelper/filesystemhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,16 @@ func MoveShadowToBackup(shadowPath, backupPartsPath string, partitionsBackupMap
if len(partitionsBackupMap) != 0 && !IsPartInPartition(pathParts[3], partitionsBackupMap) {
return nil
}
var isRequiredPartFound, partExists bool
if tableDiffFromRemote.Database != "" && tableDiffFromRemote.Table != "" && len(tableDiffFromRemote.Parts) > 0 && len(tableDiffFromRemote.Parts[disk.Name]) > 0 {
var isRequiredPartAdded, partExists bool
parts, isRequiredPartAdded, partExists = addRequiredPartIfNotExists(parts, pathParts[3], tableDiffFromRemote, disk)
if isRequiredPartAdded || partExists {
parts, isRequiredPartFound, partExists = addRequiredPartIfNotExists(parts, pathParts[3], tableDiffFromRemote, disk)
if isRequiredPartFound {
return nil
}
}
dstFilePath := filepath.Join(backupPartsPath, pathParts[3])
if info.IsDir() {
if !strings.HasSuffix(pathParts[3], ".proj") {
if !strings.HasSuffix(pathParts[3], ".proj") && !isRequiredPartFound && !partExists {
parts = append(parts, metadata.Part{
Name: pathParts[3],
})
Expand All @@ -283,26 +283,26 @@ func MoveShadowToBackup(shadowPath, backupPartsPath string, partitionsBackupMap
}

func addRequiredPartIfNotExists(parts []metadata.Part, relativePath string, tableDiffFromRemote metadata.TableMetadata, disk clickhouse.Disk) ([]metadata.Part, bool, bool) {
isRequiredPartAdded := false
isRequiredPartFound := false
exists := false
for _, p := range parts {
if p.Name == relativePath || strings.HasPrefix(relativePath, p.Name+"/") {
exists = true
break
}
}
if !exists {
for _, diffPart := range tableDiffFromRemote.Parts[disk.Name] {
if diffPart.Name == relativePath || strings.HasPrefix(relativePath, diffPart.Name+"/") {
for _, diffPart := range tableDiffFromRemote.Parts[disk.Name] {
if diffPart.Name == relativePath || strings.HasPrefix(relativePath, diffPart.Name+"/") {
for _, p := range parts {
if p.Name == relativePath || strings.HasPrefix(relativePath, p.Name+"/") {
exists = true
break
}
}
if !exists {
parts = append(parts, metadata.Part{
Name: relativePath,
Required: true,
})
isRequiredPartAdded = true
}
isRequiredPartFound = true
}
}
return parts, isRequiredPartAdded, exists
return parts, isRequiredPartFound, exists
}

func IsDuplicatedParts(part1, part2 string) error {
Expand Down
2 changes: 1 addition & 1 deletion test/integration/docker-compose_advanced.yml
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ services:
- "8123:8123"
- "9000:9000"
# for delve debugger
# - "40001:40001"
- "40002:40002"
networks:
- clickhouse-backup
links:
Expand Down

0 comments on commit dd87132

Please sign in to comment.