Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat:] Parallelize Layer Packing and Unpacking #525

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
35 changes: 33 additions & 2 deletions pkg/cmd/unpack/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"archive/tar"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"

"kitops/pkg/artifact"
"kitops/pkg/lib/constants"
Expand Down Expand Up @@ -78,6 +80,21 @@ func runUnpackRecursive(ctx context.Context, opts *unpackOptions, visitedRefs []
// Since there might be multiple datasets, etc. we need to synchronously iterate
// through the config's relevant field to get the correct path for unpacking
var modelPartIdx, codeIdx, datasetIdx, docsIdx int

// Error channel and WaitGroup for parallel unpacking
var wg sync.WaitGroup
errChan := make(chan error, len(manifest.Layers))

// Helper function to handle layer unpacking concurrently
unpackLayerAsync := func(layerDesc ocispec.Descriptor, relPath string, mediaType constants.MediaType) {
defer wg.Done()

if err := unpackLayer(ctx, store, layerDesc, relPath, opts.overwrite, mediaType.Compression); err != nil {
errChan <- err
return
}
}

for _, layerDesc := range manifest.Layers {
var relPath string
mediaType := constants.ParseMediaType(layerDesc.MediaType)
Expand Down Expand Up @@ -145,10 +162,24 @@ func runUnpackRecursive(ctx context.Context, opts *unpackOptions, visitedRefs []
docsIdx += 1
}

if err := unpackLayer(ctx, store, layerDesc, relPath, opts.overwrite, mediaType.Compression); err != nil {
return fmt.Errorf("failed to unpack: %w", err)
// Run the unpack operation in a goroutine
wg.Add(1)
go unpackLayerAsync(layerDesc, relPath, mediaType)
}
wg.Wait()
close(errChan)

var allErrors []error
for err := range errChan {
if err != nil {
amisevsk marked this conversation as resolved.
Show resolved Hide resolved
allErrors = append(allErrors, err)
}
}

if len(allErrors) > 0 {
return fmt.Errorf("failed to unpack layers: %w", errors.Join(allErrors...))
}

output.Debugf("Unpacked %d model part layers", modelPartIdx)
output.Debugf("Unpacked %d code layers", codeIdx)
output.Debugf("Unpacked %d dataset layers", datasetIdx)
Expand Down
105 changes: 71 additions & 34 deletions pkg/lib/kitfile/local-storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"os"
"sync"

"kitops/pkg/lib/repo/local"
"kitops/pkg/lib/repo/util"
Expand Down Expand Up @@ -90,63 +91,99 @@ func saveConfig(ctx context.Context, localRepo local.LocalRepo, kitfile *artifac
}

func saveKitfileLayers(ctx context.Context, localRepo local.LocalRepo, kitfile *artifact.KitFile, ignore filesystem.IgnorePaths, compression string) ([]ocispec.Descriptor, error) {
var layers []ocispec.Descriptor

modelPartsLen := 0
if kitfile.Model != nil {
modelPartsLen = len(kitfile.Model.Parts)
if kitfile.Model.Path != "" && !util.IsModelKitReference(kitfile.Model.Path) {
modelPartsLen++ // Account for the model path
}
}
var layers = make([]ocispec.Descriptor, modelPartsLen+len(kitfile.Code)+len(kitfile.DataSets)+len(kitfile.Docs))
var wg sync.WaitGroup
errChan := make(chan error, len(layers))

processLayer := func(index int, path string, mediaType constants.MediaType) {
defer wg.Done()

if ctx.Err() != nil {
errChan <- ctx.Err()
return
}

layer, err := saveContentLayer(ctx, localRepo, path, mediaType, ignore)
if err != nil {
errChan <- err
return
}
// Place the layer in the correct position in the layers slice
layers[index] = layer
}

// Counter to track index of each layer
layerIndex := 0

// Process model layers
if kitfile.Model != nil {
if kitfile.Model.Path != "" && !util.IsModelKitReference(kitfile.Model.Path) {
mediaType := constants.MediaType{
wg.Add(1)
go processLayer(layerIndex, kitfile.Model.Path, constants.MediaType{
BaseType: constants.ModelType,
Compression: compression,
}
layer, err := saveContentLayer(ctx, localRepo, kitfile.Model.Path, mediaType, ignore)
if err != nil {
return nil, err
}
layers = append(layers, layer)
})
layerIndex++
}
for _, part := range kitfile.Model.Parts {
mediaType := constants.MediaType{
wg.Add(1)
go processLayer(layerIndex, part.Path, constants.MediaType{
BaseType: constants.ModelPartType,
Compression: compression,
}
layer, err := saveContentLayer(ctx, localRepo, part.Path, mediaType, ignore)
if err != nil {
return nil, err
}
layers = append(layers, layer)
})
layerIndex++
}
}

// Process code layers
for _, code := range kitfile.Code {
mediaType := constants.MediaType{
wg.Add(1)
go processLayer(layerIndex, code.Path, constants.MediaType{
BaseType: constants.CodeType,
Compression: compression,
}
layer, err := saveContentLayer(ctx, localRepo, code.Path, mediaType, ignore)
if err != nil {
return nil, err
}
layers = append(layers, layer)
})
layerIndex++
}

// Process dataset layers
for _, dataset := range kitfile.DataSets {
mediaType := constants.MediaType{
wg.Add(1)
go processLayer(layerIndex, dataset.Path, constants.MediaType{
BaseType: constants.DatasetType,
Compression: compression,
}
layer, err := saveContentLayer(ctx, localRepo, dataset.Path, mediaType, ignore)
if err != nil {
return nil, err
}
layers = append(layers, layer)
})
layerIndex++
}

// Process documentation layers
for _, docs := range kitfile.Docs {
mediaType := constants.MediaType{
wg.Add(1)
go processLayer(layerIndex, docs.Path, constants.MediaType{
BaseType: constants.DocsType,
Compression: compression,
}
layer, err := saveContentLayer(ctx, localRepo, docs.Path, mediaType, ignore)
})
layerIndex++
}
wg.Wait()
close(errChan)

var allErrors []error
for err := range errChan {
amisevsk marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
allErrors = append(allErrors, err)
}
layers = append(layers, layer)
}

if len(allErrors) > 0 {
return nil, errors.Join(allErrors...)
}

return layers, nil
Expand Down
Loading