-
Notifications
You must be signed in to change notification settings - Fork 218
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: support modctl blob's nydus attribute
- copy snapshotter code to internal package. - add modctl handler to produce nydus attribute pattern. - support push manifest to registry Signed-off-by: Yang Kaiyong <yangkaiyong.yky@antgroup.com>
- Loading branch information
Yang Kaiyong
committed
Feb 28, 2025
1 parent
f914b91
commit 53e1e3f
Showing
33 changed files
with
4,593 additions
and
130 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
// Copyright 2025 Nydus Developers. All rights reserved. | ||
// | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package external |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,314 @@ | ||
package modctl | ||
|
||
import ( | ||
"archive/tar" | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"os" | ||
"path/filepath" | ||
"strings" | ||
|
||
"github.com/pkg/errors" | ||
|
||
"github.com/dragonflyoss/nydus/contrib/nydusify/pkg/snapshotter/external/backend" | ||
) | ||
|
||
const BLOB_PATH = "/content.v1/docker/registry/v2/blobs/%s/%s/%s/data" | ||
const REPOS_PATH = "/content.v1/docker/registry/v2/repositories" | ||
const MANIFEST_PATH = "/_manifests/tags/%s/current/link" | ||
const WEIGHT_MEDIA_TYPE = "application/vnd.cnai.model.weight.v1.tar" | ||
|
||
var _ backend.Handler = &Handler{} | ||
|
||
type Handler struct { | ||
root string | ||
registryHost string | ||
namespace string | ||
imageName string | ||
tag string | ||
blobs []backend.Blob | ||
// key 为blob的sha256,value为blob的mediaType类型和索引 | ||
blobsMap map[string]blobInfo | ||
// config layer | ||
blobConfig BlobConfig | ||
objectID uint32 | ||
} | ||
|
||
type blobInfo struct { | ||
mediaType string | ||
// Index in the blobs array | ||
blobIndex uint32 | ||
blobDigest string | ||
} | ||
|
||
type chunk struct { | ||
blobDigest string | ||
objectID uint32 | ||
objectContent Object | ||
objectOffset uint64 | ||
} | ||
|
||
func (c *chunk) ObjectID() uint32 { | ||
return c.objectID | ||
} | ||
|
||
func (c *chunk) ObjectContent() interface{} { | ||
return c.objectContent | ||
} | ||
|
||
func (c *chunk) ObjectOffset() uint64 { | ||
return c.objectOffset | ||
} | ||
|
||
func (c *chunk) FilePath() string { | ||
return c.objectContent.Path | ||
} | ||
|
||
func (c *chunk) LimitChunkSize() uint64 { | ||
return c.objectContent.ChunkSize | ||
} | ||
|
||
func (c *chunk) BlobDigest() string { | ||
return c.blobDigest | ||
} | ||
|
||
type Object struct { | ||
Path string | ||
ChunkSize uint64 | ||
} | ||
|
||
type Manifest struct { | ||
Config BlobConfig `json:"config"` | ||
Layers []BlobConfig `json:"layers"` | ||
} | ||
|
||
type BlobConfig struct { | ||
MediaType string `json:"mediaType"` | ||
Digest string `json:"digest"` | ||
Size uint64 `json:"size"` | ||
} | ||
|
||
type fileInfo struct { | ||
name string | ||
size uint64 | ||
offset uint64 | ||
} | ||
|
||
type Option struct { | ||
Root string `json:"root"` | ||
RegistryHost string `json:"registry_host"` | ||
Namespace string `json:"namespace"` | ||
ImageName string `json:"image_name"` | ||
Tag string `json:"tag"` | ||
TargetRef string `json:"target_ref"` | ||
} | ||
|
||
func NewHandler(opt Option) (*Handler, error) { | ||
handler := &Handler{ | ||
root: opt.Root, | ||
registryHost: opt.RegistryHost, | ||
namespace: opt.Namespace, | ||
imageName: opt.ImageName, | ||
tag: opt.Tag, | ||
objectID: 0, | ||
blobsMap: make(map[string]blobInfo), | ||
} | ||
m, err := handler.extractManifest() | ||
if err != nil { | ||
return nil, errors.Wrap(err, "extract manifest failed") | ||
} | ||
handler.setBlobs(m) | ||
handler.setBlobConfig(m) | ||
handler.setBlobsMap() | ||
|
||
return handler, nil | ||
} | ||
|
||
// MEMO 对输入的tar文件进行读取获得所有的文件列表,对每个文件切分chunk | ||
// 需要返回原始文件名,原始文件所在blobIndex, 所在blob的Digest, chunk的大小, 0chunk的在blob这个tar文件的offset | ||
func (handler *Handler) Handle(ctx context.Context, file backend.File) ([]backend.Chunk, error) { | ||
chunks := []backend.Chunk{} | ||
needIgnore, blobInfo := handler.needIgnore(file.RelativePath) | ||
if needIgnore { | ||
return nil, nil | ||
} | ||
|
||
fmt.Printf("Handle %s\n", file.RelativePath) | ||
// MEMO 按照不同类型来处理chunk大小 | ||
chunkSize := backend.DefaultFileChunkSize | ||
if blobInfo.mediaType == WEIGHT_MEDIA_TYPE { | ||
chunkSize = backend.DefaultModctlFileChunkSize | ||
} | ||
|
||
// 读取tar文件,获得tar文件元信息。 | ||
files, err := handler.readBlob(filepath.Join(handler.root, file.RelativePath)) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "read blob failed") | ||
} | ||
|
||
for _, f := range files { | ||
fmt.Printf("File %s, size: %d, offset: %d\n", f.name, f.size, f.offset) | ||
objectOffsets := backend.SplitObjectOffsets(int64(f.size), int64(chunkSize)) | ||
for _, objectOffset := range objectOffsets { | ||
chunks = append(chunks, &chunk{ | ||
blobDigest: blobInfo.blobDigest, | ||
objectID: blobInfo.blobIndex, | ||
objectContent: Object{ | ||
Path: f.name, | ||
ChunkSize: uint64(chunkSize), | ||
}, | ||
objectOffset: f.offset + objectOffset, | ||
}) | ||
} | ||
for _, c := range chunks { | ||
fmt.Printf("Chunk objId %d, blobIndex:%d objOffset: %d objContent: %v\n", c.ObjectID(), blobInfo.blobIndex, c.ObjectOffset(), c.ObjectContent()) | ||
} | ||
} | ||
|
||
handler.objectID++ | ||
|
||
return chunks, nil | ||
} | ||
|
||
// MEMO 需要输出backend.json文件 | ||
// 实际数据在blobs中读取。 | ||
func (handler *Handler) Backend(ctx context.Context) (*backend.Backend, error) { | ||
bkd := backend.Backend{ | ||
Version: "v1", | ||
} | ||
bkd.Backends = []backend.BackendConfig{ | ||
{ | ||
Type: "model_registry", | ||
}, | ||
} | ||
bkd.Blobs = handler.blobs | ||
return &bkd, nil | ||
} | ||
|
||
func (handler *Handler) GetConfig() ([]byte, error) { | ||
return json.Marshal(handler.blobConfig) | ||
} | ||
|
||
func (handler *Handler) setBlobs(m *Manifest) { | ||
handler.blobs = handler.convertToBlobs(m) | ||
} | ||
|
||
func (handler *Handler) setBlobConfig(m *Manifest) { | ||
handler.blobConfig = m.Config | ||
} | ||
|
||
func (handler *Handler) setBlobsMap() { | ||
for i, blob := range handler.blobs { | ||
handler.blobsMap[blob.Config.Digest] = blobInfo{ | ||
mediaType: blob.Config.MediaType, | ||
blobIndex: uint32(i), | ||
blobDigest: blob.Config.Digest, | ||
} | ||
} | ||
} | ||
func (handler *Handler) extractManifest() (*Manifest, error) { | ||
tagPath := fmt.Sprintf(MANIFEST_PATH, handler.tag) | ||
manifestPath := filepath.Join(handler.root, REPOS_PATH, handler.registryHost, handler.namespace, handler.imageName, tagPath) | ||
content, err := os.ReadFile(manifestPath) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "read manifest digest file failed") | ||
} | ||
|
||
line := strings.TrimSpace(string(content)) | ||
manifestDigest := strings.Split(line, ":") | ||
if len(manifestDigest) != 2 { | ||
return nil, errors.New("invalid manifest digest file") | ||
} | ||
|
||
blobPath := fmt.Sprintf(BLOB_PATH, manifestDigest[0], manifestDigest[1][:2], manifestDigest[1]) | ||
blobPath = filepath.Join(handler.root, blobPath) | ||
content, err = os.ReadFile(blobPath) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "read manifest blobs file failed") | ||
} | ||
|
||
var m Manifest | ||
if err := json.Unmarshal(content, &m); err != nil { | ||
return nil, errors.Wrap(err, "unmarshal manifest blobs file failed") | ||
} | ||
return &m, nil | ||
} | ||
|
||
func (handler *Handler) convertToBlobs(m *Manifest) []backend.Blob { | ||
createBlob := func(layer BlobConfig) backend.Blob { | ||
digest := strings.Split(layer.Digest, ":") | ||
if len(digest) == 2 { | ||
layer.Digest = digest[1] | ||
} | ||
|
||
return backend.Blob{ | ||
Backend: 0, | ||
Config: backend.BlobConfig{ | ||
MediaType: layer.MediaType, | ||
Digest: layer.Digest, | ||
Size: layer.Size, | ||
}, | ||
} | ||
} | ||
|
||
blobs := make([]backend.Blob, len(m.Layers)) | ||
|
||
for i, layer := range m.Layers { | ||
blobs[i] = createBlob(layer) | ||
} | ||
|
||
return blobs | ||
} | ||
|
||
func (handler *Handler) needIgnore(relPath string) (bool, *blobInfo) { | ||
// ignore manifest link file | ||
if strings.HasSuffix(relPath, "link") { | ||
return true, nil | ||
} | ||
|
||
// ignore blobs file belong to other image | ||
parts := strings.Split(relPath, "/") | ||
if len(parts) < 3 { | ||
return true, nil | ||
} | ||
|
||
digest := parts[len(parts)-2] | ||
blobInfo, ok := handler.blobsMap[digest] | ||
if !ok { | ||
return true, nil | ||
} | ||
|
||
return false, &blobInfo | ||
} | ||
|
||
func (handler *Handler) readBlob(path string) ([]fileInfo, error) { | ||
f, err := os.Open(path) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "open tar file failed") | ||
} | ||
defer f.Close() | ||
var files []fileInfo | ||
tarReader := tar.NewReader(f) | ||
for { | ||
headerOffset, err := f.Seek(0, io.SeekCurrent) | ||
if err != nil { | ||
return nil, errors.Wrap(err, "seek tar file failed") | ||
} | ||
header, err := tarReader.Next() | ||
if err == io.EOF { | ||
break | ||
} | ||
if err != nil { | ||
return nil, errors.Wrap(err, "read tar file failed") | ||
} | ||
dataOffset := headerOffset + 512 | ||
files = append(files, fileInfo{ | ||
name: header.Name, | ||
size: uint64(header.Size), | ||
offset: uint64(dataOffset), | ||
}) | ||
} | ||
return files, nil | ||
} |
Oops, something went wrong.