Skip to content

Commit

Permalink
Implement parallel ItemBlock processing via backup_controller goroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Seago <sseago@redhat.com>
  • Loading branch information
sseago committed Jan 29, 2025
1 parent 223e1fc commit fa680a9
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 73 deletions.
16 changes: 8 additions & 8 deletions design/backup-performance-improvements.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,25 @@ type ItemBlockWorkerPool struct {
}

type ItemBlockInput struct {
itemBlock ItemBlock
itemBlock *BackupItemBlock
returnChan chan ItemBlockReturn
}

type ItemBlockReturn struct {
itemBlock ItemBlock
itemBlock *BackupItemBlock
resources []schema.GroupResource
err error
}

func (*p ItemBlockWorkerPool) getInputChannel() chan ItemBlockInput
func RunItemBlockWorkers(context context.Context, workers int)
func processItemBlocksWorker(context context.Context, itemBlockChannel chan ItemBlockInput, logger logrus.FieldLogger, wg *sync.WaitGroup)
func StartItemBlockWorkerPool(context context.Context, workers int, logger logrus.FieldLogger) ItemBlockWorkerPool
func processItemBlockWorker(context context.Context, itemBlockChannel chan ItemBlockInput, logger logrus.FieldLogger, wg *sync.WaitGroup)
```

The worker pool will be started by calling `RunItemBlockWorkers` in `backupReconciler.SetupWithManager`, passing in the worker count and reconciler context.
`SetupWithManager` will also add the input channel to the `itemBackupper` so that it will be available during backup processing.
The func `RunItemBlockWorkers` will create the `ItemBlockWorkerPool` with a shared buffered input channel (fixed buffer size) and start `workers` gororoutines which will each call `processItemBlocksWorker`.
The `processItemBlocksWorker` func (run by the worker goroutines) will read from `itemBlockChannel`, call `BackupItemBlock` on the retrieved `ItemBlock`, and then send the return value to the retrieved `returnChan`, and then process the next block.
The worker pool will be started by calling `StartItemBlockWorkerPool` in `NewBackupReconciler()`, passing in the worker count and reconciler context.
`backupreconciler.prepareBackupRequest` will also add the input channel to the `backupRequest` so that it will be available during backup processing.
The func `StartItemBlockWorkerPool` will create the `ItemBlockWorkerPool` with a shared buffered input channel (fixed buffer size) and start `workers` gororoutines which will each call `processItemBlockWorker`.
The `processItemBlockWorker` func (run by the worker goroutines) will read from `itemBlockChannel`, call `BackupItemBlock` on the retrieved `ItemBlock`, and then send the return value to the retrieved `returnChan`, and then process the next block.

#### Modify ItemBlock processing loop to send ItemBlocks to the worker pool rather than backing them up directly

Expand Down
134 changes: 98 additions & 36 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"io"
"os"
"path/filepath"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -237,7 +238,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
gzippedData := gzip.NewWriter(backupFile)
defer gzippedData.Close()

tw := tar.NewWriter(gzippedData)
tw := NewTarWriter(tar.NewWriter(gzippedData))
defer tw.Close()

log.Info("Writing backup version file")
Expand Down Expand Up @@ -378,6 +379,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
boolptr.IsSetToTrue(backupRequest.Spec.DefaultVolumesToFsBackup),
!backupRequest.ResourceIncludesExcludes.ShouldInclude(kuberesource.PersistentVolumeClaims.String()),
),
kubernetesBackupper: kb,
}

// helper struct to send current progress between the main
Expand Down Expand Up @@ -444,6 +446,50 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
}

var itemBlock *BackupItemBlock
itemBlockReturn := make(chan ItemBlockReturn)
responseCtx, cancelFunc := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
// Handle returns from worker pool processing ItemBlocks
go func() {
for {
select {
case response := <-itemBlockReturn: // process each BackupItemBlock response
func() {
defer wg.Done()
if response.err != nil {
log.WithError(errors.WithStack((response.err))).Error("Got error in BackupItemBlock.")
}
for _, backedUpGR := range response.resources {
backedUpGroupResources[backedUpGR] = true
}
// We could eventually track which itemBlocks have finished
// using response.itemBlock

// updated total is computed as "how many items we've backed up so far,
// plus how many items we know of that are remaining"
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (response.itemIterCount + 1))

// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: backedUpItems,
}

if len(response.itemBlock.Items) > 0 {
log.WithFields(map[string]interface{}{
"progress": "",
"kind": response.itemBlock.Items[0].Item.GroupVersionKind().GroupKind().String(),
"namespace": response.itemBlock.Items[0].Item.GetNamespace(),
"name": response.itemBlock.Items[0].Item.GetName(),
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
}
}()
case <-responseCtx.Done():
return
}
}
}()

for i := range items {
log.WithFields(map[string]interface{}{
Expand All @@ -458,7 +504,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
log.Debugf("Not creating new ItemBlock for %s %s/%s because it's already in an ItemBlock", items[i].groupResource.String(), items[i].namespace, items[i].name)
} else {
if itemBlock == nil {
itemBlock = NewBackupItemBlock(log, itemBackupper)
itemBlock = NewBackupItemBlock(log, itemBackupper, ctx)
}
var newBlockItem *unstructured.Unstructured

Expand Down Expand Up @@ -489,31 +535,30 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock)
for _, backedUpGR := range backedUpGRs {
backedUpGroupResources[backedUpGR] = true
wg.Add(1)
backupRequest.ItemBlockChannel <- ItemBlockInput{
itemBlock: itemBlock,
returnChan: itemBlockReturn,
}
itemBlock = nil
}
}

// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (i + 1))

// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: backedUpItems,
}
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()

log.WithFields(map[string]interface{}{
"progress": "",
"resource": items[i].groupResource.String(),
"namespace": items[i].namespace,
"name": items[i].name,
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
// Wait for all the ItemBlocks to be processed
select {
case <-done:
log.Info("done processing ItemBlocks")
case <-responseCtx.Done():
log.Info("ItemBlock pocessing canceled")
}
// cancel response-processing goroutine
cancelFunc()

// no more progress updates will be sent on the 'update' channel
quit <- struct{}{}
Expand Down Expand Up @@ -661,7 +706,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions(
}
}

func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource {
func (kb *kubernetesBackupper) backupItemBlock(itemBlock *BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock
// filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks
Expand Down Expand Up @@ -695,15 +740,15 @@ func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock Ba
itemBlock.Log.Debug("Backing up items in BackupItemBlock")
var grList []schema.GroupResource
for _, item := range itemBlock.Items {
if backedUp := kb.backupItem(itemBlock.Log, item.Gr, itemBlock.itemBackupper, item.Item, item.PreferredGVR, &itemBlock); backedUp {
if backedUp := kb.backupItem(itemBlock.Log, item.Gr, itemBlock.itemBackupper, item.Item, item.PreferredGVR, itemBlock); backedUp {
grList = append(grList, item.Gr)
}
}

if len(postHookPods) > 0 {
itemBlock.Log.Debug("Executing post hooks")
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods)
go kb.handleItemBlockPostHooks(itemBlock, postHookPods)
}

return grList
Expand All @@ -722,7 +767,7 @@ func (kb *kubernetesBackupper) getItemKey(item itemblock.ItemBlockItem) (itemKey
return key, nil
}

func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock *BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
var successPods []itemblock.ItemBlockItem
var failedPods []itemblock.ItemBlockItem
var errs []error
Expand All @@ -739,12 +784,12 @@ func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock
}

// The hooks cannot execute until the PVBs to be processed
func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
func (kb *kubernetesBackupper) handleItemBlockPostHooks(itemBlock *BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()

// the post hooks will not execute until all PVBs of the item block pods are processed
if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
if err := kb.waitUntilPVBsProcessed(log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
}
Expand All @@ -758,7 +803,7 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite
}

// wait all PVBs of the item block pods to be processed
func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
func (kb *kubernetesBackupper) waitUntilPVBsProcessed(log logrus.FieldLogger, itemBlock *BackupItemBlock, pods []itemblock.ItemBlockItem) error {
pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for _, pod := range pods {
namespace, name := pod.Item.GetNamespace(), pod.Item.GetName()
Expand Down Expand Up @@ -795,7 +840,7 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l
return allProcessed, nil
}

return wait.PollUntilContextCancel(ctx, 5*time.Second, true, checkFunc)
return wait.PollUntilContextCancel(itemBlock.pvbTimeoutCtx, 5*time.Second, true, checkFunc)
}

func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool {
Expand Down Expand Up @@ -881,7 +926,7 @@ func (kb *kubernetesBackupper) backupCRD(log logrus.FieldLogger, gr schema.Group
kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr, nil)
}

func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error {
func (kb *kubernetesBackupper) writeBackupVersion(tw tarWriter) error {
versionFile := filepath.Join(velerov1api.MetadataDir, "version")
versionString := fmt.Sprintf("%s\n", BackupFormatVersion)

Expand Down Expand Up @@ -912,7 +957,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
) error {
gzw := gzip.NewWriter(outBackupFile)
defer gzw.Close()
tw := tar.NewWriter(gzw)
tw := NewTarWriter(tar.NewWriter(gzw))
defer tw.Close()

gzr, err := gzip.NewReader(inBackupFile)
Expand Down Expand Up @@ -966,6 +1011,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
itemHookHandler: &hook.NoOpItemHookHandler{},
podVolumeSnapshotTracker: podvolume.NewTracker(),
hookTracker: hook.NewHookTracker(),
kubernetesBackupper: kb,
}
updateFiles := make(map[string]FileForArchive)
backedUpGroupResources := map[schema.GroupResource]bool{}
Expand Down Expand Up @@ -1051,7 +1097,9 @@ func (kb *kubernetesBackupper) FinalizeBackup(
return nil
}

func buildFinalTarball(tr *tar.Reader, tw *tar.Writer, updateFiles map[string]FileForArchive) error {
func buildFinalTarball(tr *tar.Reader, tw tarWriter, updateFiles map[string]FileForArchive) error {
tw.Lock()
defer tw.Unlock()
for {
header, err := tr.Next()
if err == io.EOF {
Expand Down Expand Up @@ -1102,10 +1150,24 @@ func buildFinalTarball(tr *tar.Reader, tw *tar.Writer, updateFiles map[string]Fi
return nil
}

type tarWriter interface {
io.Closer
Write([]byte) (int, error)
WriteHeader(*tar.Header) error
//type tarWriter interface {
// io.Closer
// Write([]byte) (int, error)
// WriteHeader(*tar.Header) error
// Lock()
// Unlock()
//}

type tarWriter struct {
*tar.Writer
*sync.Mutex
}

func NewTarWriter(writer *tar.Writer) tarWriter {
return tarWriter{
Writer: writer,
Mutex: &sync.Mutex{},
}
}

// updateVolumeInfos update the VolumeInfos according to the AsyncOperations
Expand Down
Loading

0 comments on commit fa680a9

Please sign in to comment.