Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use no compression for model layers by default #298

Merged
merged 4 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cmd/list/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func listImageTag(ctx context.Context, repo registry.Repository, ref *registry.R
if err := json.Unmarshal(manifestBytes, manifest); err != nil {
return nil, fmt.Errorf("failed to parse manifest: %w", err)
}
if manifest.Config.MediaType != constants.ModelConfigMediaType {
if manifest.Config.MediaType != constants.ModelConfigMediaType.String() {
return nil, nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/cmd/pack/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type packOptions struct {
configHome string
storageHome string
fullTagRef string
compression string
modelRef *registry.Reference
extraRefs []string
}
Expand All @@ -62,6 +63,7 @@ func PackCommand() *cobra.Command {
}
cmd.Flags().StringVarP(&opts.modelFile, "file", "f", "", "Specifies the path to the Kitfile explictly (use \"-\" to read from standard input)")
cmd.Flags().StringVarP(&opts.fullTagRef, "tag", "t", "", "Assigns one or more tags to the built modelkit. Example: -t registry/repository:tag1,tag2")
cmd.Flags().StringVar(&opts.compression, "compression", "none", "Compression format to use for layers. Valid options: 'none' (default), 'gzip', 'gzip-fastest'")
cmd.Args = cobra.ExactArgs(1)
return cmd
}
Expand Down Expand Up @@ -119,6 +121,11 @@ func (opts *packOptions) complete(ctx context.Context, args []string) error {
} else {
opts.modelRef = repo.DefaultReference()
}

if err := constants.IsValidCompression(opts.compression); err != nil {
return err
}

printConfig(opts)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/pack/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func pack(ctx context.Context, opts *packOptions, kitfile *artifact.KitFile, sto
return nil, err
}

manifestDesc, err := storage.SaveModel(ctx, store, kitfile, ignore)
manifestDesc, err := storage.SaveModel(ctx, store, kitfile, ignore, opts.compression)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func referenceIsModel(ctx context.Context, ref *registry.Reference, repo registr
if err := json.Unmarshal(manifestBytes, manifest); err != nil {
return fmt.Errorf("failed to parse manifest: %w", err)
}
if manifest.Config.MediaType != constants.ModelConfigMediaType {
if manifest.Config.MediaType != constants.ModelConfigMediaType.String() {
return fmt.Errorf("reference %s does not refer to a model", ref.String())
}
return nil
Expand Down
32 changes: 20 additions & 12 deletions pkg/cmd/unpack/unpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ func runUnpackRecursive(ctx context.Context, opts *unpackOptions, visitedRefs []
var modelPartIdx, codeIdx, datasetIdx int
for _, layerDesc := range manifest.Layers {
var relPath string
switch layerDesc.MediaType {
case constants.ModelLayerMediaType:
mediaType := constants.ParseMediaType(layerDesc.MediaType)
switch mediaType.BaseType {
case constants.ModelType:
if !opts.unpackConf.unpackModels {
continue
}
Expand All @@ -92,7 +93,7 @@ func runUnpackRecursive(ctx context.Context, opts *unpackOptions, visitedRefs []
}
output.Infof("Unpacking model %s to %s", config.Model.Name, relPath)

case constants.ModelPartLayerMediaType:
case constants.ModelPartType:
if !opts.unpackConf.unpackModels {
continue
}
Expand All @@ -104,7 +105,7 @@ func runUnpackRecursive(ctx context.Context, opts *unpackOptions, visitedRefs []
output.Infof("Unpacking model part %s to %s", part.Name, relPath)
modelPartIdx += 1

case constants.CodeLayerMediaType:
case constants.CodeType:
if !opts.unpackConf.unpackCode {
continue
}
Expand All @@ -116,7 +117,7 @@ func runUnpackRecursive(ctx context.Context, opts *unpackOptions, visitedRefs []
output.Infof("Unpacking code to %s", relPath)
codeIdx += 1

case constants.DataSetLayerMediaType:
case constants.DatasetType:
if !opts.unpackConf.unpackDatasets {
continue
}
Expand All @@ -128,7 +129,7 @@ func runUnpackRecursive(ctx context.Context, opts *unpackOptions, visitedRefs []
output.Infof("Unpacking dataset %s to %s", datasetEntry.Name, relPath)
datasetIdx += 1
}
if err := unpackLayer(ctx, store, layerDesc, relPath, opts.overwrite); err != nil {
if err := unpackLayer(ctx, store, layerDesc, relPath, opts.overwrite, mediaType.Compression); err != nil {
return fmt.Errorf("Failed to unpack: %w", err)
}
}
Expand Down Expand Up @@ -181,7 +182,7 @@ func unpackConfig(config *artifact.KitFile, unpackDir string, overwrite bool) er
return nil
}

func unpackLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, unpackPath string, overwrite bool) error {
func unpackLayer(ctx context.Context, store content.Storage, desc ocispec.Descriptor, unpackPath string, overwrite bool, compression string) error {
rc, err := store.Fetch(ctx, desc)
if err != nil {
return fmt.Errorf("failed get layer %s: %w", desc.Digest, err)
Expand All @@ -191,12 +192,19 @@ func unpackLayer(ctx context.Context, store content.Storage, desc ocispec.Descri
defer rc.Close()
defer logger.Wait()

gzr, err := gzip.NewReader(rc)
if err != nil {
return fmt.Errorf("error extracting gzipped file: %w", err)
var cr io.ReadCloser
var cErr error
switch compression {
case constants.GzipCompression, constants.GzipFastestCompression:
cr, cErr = gzip.NewReader(rc)
case constants.NoneCompression:
cr = rc
}
if cErr != nil {
return fmt.Errorf("error setting up decompress: %w", err)
}
defer gzr.Close()
tr := tar.NewReader(gzr)
defer cr.Close()
tr := tar.NewReader(cr)

unpackDir := filepath.Dir(unpackPath)
if err := os.MkdirAll(unpackDir, 0755); err != nil {
Expand Down
11 changes: 0 additions & 11 deletions pkg/lib/constants/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,6 @@ const (
HarnessProcessFile = "process.pid"
HarnessLogFile = "harness.log"

// Media type for the model layer
ModelLayerMediaType = "application/vnd.kitops.modelkit.model.v1.tar+gzip"
// Media type for model part layer
ModelPartLayerMediaType = "application/vnd.kitops.modelkit.modelpart.v1.tar+gzip"
// Media type for the dataset layer
DataSetLayerMediaType = "application/vnd.kitops.modelkit.dataset.v1.tar+gzip"
// Media type for the code layer
CodeLayerMediaType = "application/vnd.kitops.modelkit.code.v1.tar+gzip"
// Media type for the model config (Kitfile)
ModelConfigMediaType = "application/vnd.kitops.modelkit.config.v1+json"

// Kitops-specific annotations for modelkit artifacts
CliVersionAnnotation = "ml.kitops.modelkit.cli-version"

Expand Down
90 changes: 90 additions & 0 deletions pkg/lib/constants/mediaType.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 The KitOps Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package constants

import (
"fmt"
"regexp"
)

const (
ConfigType = "config"
ModelType = "model"
ModelPartType = "modelpart"
DatasetType = "dataset"
CodeType = "code"
)

const (
NoneCompression = "none"
GzipCompression = "gzip"
GzipFastestCompression = "gzip-fastest"
)

var mediaTypeRegexp = regexp.MustCompile(`^application/vnd.kitops.modelkit.(\w+).v1.tar(?:\+(\w+))?`)

type MediaType struct {
BaseType string
Compression string
}

var ModelConfigMediaType = MediaType{
BaseType: ConfigType,
}

func (t MediaType) String() string {
if t.BaseType == ConfigType {
return "application/vnd.kitops.modelkit.config.v1+json"
}
if t.Compression == NoneCompression {
return fmt.Sprintf("application/vnd.kitops.modelkit.%s.v1.tar", t.BaseType)
}
comp := t.Compression
if comp == GzipFastestCompression {
comp = GzipCompression
}
return fmt.Sprintf("application/vnd.kitops.modelkit.%s.v1.tar+%s", t.BaseType, comp)
}

func ParseMediaType(s string) MediaType {
if s == "application/vnd.kitops.modelkit.config.v1+json" {
return MediaType{
BaseType: ConfigType,
}
}
match := mediaTypeRegexp.FindStringSubmatch(s)
if match == nil {
return MediaType{}
}
mediaType := MediaType{
BaseType: match[1],
Compression: match[2],
}
if mediaType.Compression == "" {
mediaType.Compression = NoneCompression
}
return mediaType
}

func IsValidCompression(compression string) error {
switch compression {
case NoneCompression, GzipCompression, GzipFastestCompression:
return nil
default:
return fmt.Errorf("Invalid option for --compression flag: must be one of 'none', 'gzip', or 'gzip-fastest'")
}
}
4 changes: 2 additions & 2 deletions pkg/lib/repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func GetManifest(ctx context.Context, store content.Storage, manifestDesc ocispe
if err := json.Unmarshal(manifestBytes, &manifest); err != nil {
return nil, fmt.Errorf("failed to parse manifest %s: %w", manifestDesc.Digest, err)
}
if manifest.Config.MediaType != constants.ModelConfigMediaType {
if manifest.Config.MediaType != constants.ModelConfigMediaType.String() {
return nil, fmt.Errorf("reference exists but is not a model")
}

Expand All @@ -202,7 +202,7 @@ func GetManifest(ctx context.Context, store content.Storage, manifestDesc ocispe
// GetConfig returns the config (Kitfile) described by a descriptor. Returns an error if the config blob cannot
// be resolved or if the descriptor does not describe a Kitfile.
func GetConfig(ctx context.Context, store content.Storage, configDesc ocispec.Descriptor) (*artifact.KitFile, error) {
if configDesc.MediaType != constants.ModelConfigMediaType {
if configDesc.MediaType != constants.ModelConfigMediaType.String() {
return nil, fmt.Errorf("configuration descriptor does not describe a Kitfile")
}
configBytes, err := content.FetchAll(ctx, store, configDesc)
Expand Down
39 changes: 29 additions & 10 deletions pkg/lib/storage/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ import (
"fmt"
"io"
"io/fs"
"kitops/pkg/lib/filesystem"
"kitops/pkg/output"
"os"
"path/filepath"
"time"

"kitops/pkg/lib/constants"
"kitops/pkg/lib/filesystem"
"kitops/pkg/output"

"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
Expand All @@ -37,7 +39,7 @@ import (
// a descriptor (including hash) for the compressed file, the layer is saved to a temporary file
// on disk and must be moved to an appropriate location. It is the responsibility of the caller
// to clean up the temporary file when it is no longer needed.
func compressLayer(path, mediaType string, ignore filesystem.IgnorePaths) (tempFilePath string, desc ocispec.Descriptor, err error) {
func compressLayer(path string, mediaType constants.MediaType, ignore filesystem.IgnorePaths) (tempFilePath string, desc ocispec.Descriptor, err error) {
// Clean path to ensure consistent format (./path vs path/ vs path)
path = filepath.Clean(path)

Expand All @@ -50,7 +52,7 @@ func compressLayer(path, mediaType string, ignore filesystem.IgnorePaths) (tempF
pathInfo, err := os.Stat(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return "", ocispec.DescriptorEmptyJSON, fmt.Errorf("%s path %s does not exist", layerTypeForMediaType(mediaType), path)
return "", ocispec.DescriptorEmptyJSON, fmt.Errorf("%s path %s does not exist", mediaType.BaseType, path)
}
return "", ocispec.DescriptorEmptyJSON, err
}
Expand All @@ -69,17 +71,32 @@ func compressLayer(path, mediaType string, ignore filesystem.IgnorePaths) (tempF
digester := digest.Canonical.Digester()
mw := io.MultiWriter(tempFile, digester.Hash())

// Note: we have to close gzip writer before reading digest from digester as closing is what writes the GZIP footer
gzw := gzip.NewWriter(mw)
tw := tar.NewWriter(gzw)
var cw io.WriteCloser
var tw *tar.Writer
switch mediaType.Compression {
case constants.GzipCompression:
cw = gzip.NewWriter(mw)
tw = tar.NewWriter(cw)
case constants.GzipFastestCompression:
cw, err = gzip.NewWriterLevel(mw, gzip.BestSpeed)
if err != nil {
return "", ocispec.DescriptorEmptyJSON, fmt.Errorf("failed to set up gzip compression: %w", err)
}
tw = tar.NewWriter(cw)
case constants.NoneCompression:
tw = tar.NewWriter(mw)
}
ptw, plog := output.TarProgress(totalSize, tw)

// Wrapper function for closing writers before returning an error
// Note: we have to close gzip writer before reading digest from digester as closing is what writes the GZIP footer
handleErr := func(err error) (string, ocispec.Descriptor, error) {
// Don't care about these errors since we'll be deleting the file anyways
_ = ptw.Close()
_ = tw.Close()
_ = gzw.Close()
if cw != nil {
_ = cw.Close()
}
_ = tempFile.Close()
removeTempFile(tempFileName)
return "", ocispec.DescriptorEmptyJSON, err
Expand All @@ -102,7 +119,9 @@ func compressLayer(path, mediaType string, ignore filesystem.IgnorePaths) (tempF

callAndPrintError(ptw.Close, "Failed to close writer: %s")
callAndPrintError(tw.Close, "Failed to close tar writer: %s")
callAndPrintError(gzw.Close, "Failed to close gzip writer: %s")
if cw != nil {
callAndPrintError(cw.Close, "Failed to close compression writer: %s")
}

tempFileInfo, err := tempFile.Stat()
if err != nil {
Expand All @@ -112,7 +131,7 @@ func compressLayer(path, mediaType string, ignore filesystem.IgnorePaths) (tempF
callAndPrintError(tempFile.Close, "Failed to close temporary file: %s")

desc = ocispec.Descriptor{
MediaType: mediaType,
MediaType: mediaType.String(),
Digest: digester.Digest(),
Size: tempFileInfo.Size(),
}
Expand Down
Loading