Skip to content

Commit

Permalink
storage: store the diffID and use for validation
Browse files Browse the repository at this point in the history
when copying a partial image, store the expected diffID so that it can
be later used to validate the obtained layer stream.

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
  • Loading branch information
giuseppe committed Dec 5, 2023
1 parent 265599f commit 0bd2e0c
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 38 deletions.
81 changes: 57 additions & 24 deletions storage/storage_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)
}
s.lock.Unlock()
// Note: commitLayer locks on-demand.
if err := s.commitLayer(index, info, -1); err != nil {
if stopQueue, err := s.commitLayer(index, info, -1); stopQueue || err != nil {
return err
}
s.lock.Lock()
Expand Down Expand Up @@ -570,15 +570,17 @@ func (s *storageImageDestination) getDiffIDOrTOCDigest(uncompressedDigest digest
// commitLayer commits the specified layer with the given index to the storage.
// size can usually be -1; it can be provided if the layer is not known to be already present in uncompressedOrTocDigest.
//
// If the layer cannot be committed yet, the function returns (true, nil).
//
// Note that the previous layer is expected to already be committed.
//
// Caution: this function must be called without holding `s.lock`. Callers
// must guarantee that, at any given time, at most one goroutine may execute
// `commitLayer()`.
func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) error {
func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) (bool, error) {
// Already committed? Return early.
if _, alreadyCommitted := s.indexToStorageID[index]; alreadyCommitted {
return nil
return false, nil
}

// Start with an empty string or the previous layer ID. Note that
Expand All @@ -592,7 +594,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Carry over the previous ID for empty non-base layers.
if info.emptyLayer {
s.indexToStorageID[index] = &lastLayer
return nil
return false, nil
}

// Check if there's already a layer with the ID that we'd give to the result of applying
Expand All @@ -613,14 +615,14 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
CanSubstitute: false,
})
if err != nil {
return fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err)
return false, fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err)
}
if !has {
return fmt.Errorf("error determining uncompressed digest or TOC digest for blob %q", info.digest.String())
return false, fmt.Errorf("error determining uncompressed digest or TOC digest for blob %q", info.digest.String())
}
diffIDOrTOCDigest, haveDiffIDOrTOCDigest = s.getDiffIDOrTOCDigest(info.digest)
if !haveDiffIDOrTOCDigest {
return fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", info.digest.String())
return false, fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", info.digest.String())
}
}
id := diffIDOrTOCDigest.Hex()
Expand All @@ -631,28 +633,57 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// There's already a layer that should have the right contents, just reuse it.
lastLayer = layer.ID
s.indexToStorageID[index] = &lastLayer
return nil
return false, nil
}

s.lock.Lock()
diffOutput, ok := s.diffOutputs[info.digest]
s.lock.Unlock()
if ok {
if s.manifest == nil {
logrus.Debugf("Skipping commit for TOC=%q, manifest not yet available", id)
return true, nil
}

man, err := manifest.FromBlob(s.manifest, manifest.GuessMIMEType(s.manifest))
if err != nil {
return false, fmt.Errorf("parsing manifest: %w", err)
}

cb, err := s.getConfigBlob(man.ConfigInfo())
if err != nil {
return false, err
}

// retrieve the expected uncompressed digest from the config blob.
configOCI := &imgspecv1.Image{}
if err := json.Unmarshal(cb, configOCI); err != nil {
return false, err
}
if index >= len(configOCI.RootFS.DiffIDs) {
return false, fmt.Errorf("index %d out of range for configOCI.RootFS.DiffIDs", index)
}

layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil)
if err != nil {
return err
return false, err
}

// FIXME: what to do with the uncompressed digest?
diffOutput.UncompressedDigest = info.digest
// let the storage layer know what was the original uncompressed layer.
flags := make(map[string]interface{})
flags[expectedLayerDiffIDFlag] = configOCI.RootFS.DiffIDs[index]
logrus.Debugf("Setting uncompressed digest to %q for layer %q", configOCI.RootFS.DiffIDs[index], id)
options := &graphdriver.ApplyDiffWithDifferOpts{
Flags: flags,
}

if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, nil); err != nil {
if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, options); err != nil {
_ = s.imageRef.transport.store.Delete(layer.ID)
return err
return false, err
}

s.indexToStorageID[index] = &layer.ID
return nil
return false, nil
}

s.lock.Lock()
Expand All @@ -661,11 +692,11 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
if ok {
layer, err := al.PutAs(id, lastLayer, nil)
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
return fmt.Errorf("failed to put layer from digest and labels: %w", err)
return false, fmt.Errorf("failed to put layer from digest and labels: %w", err)
}
lastLayer = layer.ID
s.indexToStorageID[index] = &lastLayer
return nil
return false, nil
}

// Check if we previously cached a file with that blob's contents. If we didn't,
Expand All @@ -686,7 +717,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}
}
if layer == "" {
return fmt.Errorf("locating layer for blob %q: %w", info.digest, err2)
return false, fmt.Errorf("locating layer for blob %q: %w", info.digest, err2)
}
// Read the layer's contents.
noCompression := archive.Uncompressed
Expand All @@ -695,7 +726,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}
diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions)
if err2 != nil {
return fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2)
return false, fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2)
}
// Copy the layer diff to a file. Diff() takes a lock that it holds
// until the ReadCloser that it returns is closed, and PutLayer() wants
Expand All @@ -705,7 +736,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0o600)
if err != nil {
diff.Close()
return fmt.Errorf("creating temporary file %q: %w", filename, err)
return false, fmt.Errorf("creating temporary file %q: %w", filename, err)
}
// Copy the data to the file.
// TODO: This can take quite some time, and should ideally be cancellable using
Expand All @@ -714,7 +745,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
diff.Close()
file.Close()
if err != nil {
return fmt.Errorf("storing blob to file %q: %w", filename, err)
return false, fmt.Errorf("storing blob to file %q: %w", filename, err)
}
// Make sure that we can find this file later, should we need the layer's
// contents again.
Expand All @@ -725,7 +756,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Read the cached blob and use it as a diff.
file, err := os.Open(filename)
if err != nil {
return fmt.Errorf("opening file %q: %w", filename, err)
return false, fmt.Errorf("opening file %q: %w", filename, err)
}
defer file.Close()
// Build the new layer using the diff, regardless of where it came from.
Expand All @@ -735,11 +766,11 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
UncompressedDigest: diffIDOrTOCDigest,
}, file)
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
return fmt.Errorf("adding layer with blob %q: %w", info.digest, err)
return false, fmt.Errorf("adding layer with blob %q: %w", info.digest, err)
}

s.indexToStorageID[index] = &layer.ID
return nil
return false, nil
}

// Commit marks the process of storing the image as successful and asks for the image to be persisted.
Expand Down Expand Up @@ -786,11 +817,13 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t

// Extract, commit, or find the layers.
for i, blob := range layerBlobs {
if err := s.commitLayer(i, addedLayerInfo{
if stopQueue, err := s.commitLayer(i, addedLayerInfo{
digest: blob.Digest,
emptyLayer: blob.EmptyLayer,
}, blob.Size); err != nil {
return err
} else if stopQueue {
return fmt.Errorf("Internal error: storageImageDestination.Commit(): commitLayer() not ready to commit for layer %q", blob.Digest)
}
}
var lastLayer string
Expand Down
75 changes: 61 additions & 14 deletions storage/storage_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,24 @@ type storageImageSource struct {
impl.PropertyMethodsInitialize
stubs.NoGetBlobAtInitialize

imageRef storageReference
image *storage.Image
systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files
layerPosition map[digest.Digest]int // Where we are in reading a blob's layers
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions
imageRef storageReference
image *storage.Image
systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID)
getBlobMutexProtected struct {
// digestToLayerID is a lookup map from the layer digest (either the uncompressed digest or the TOC digest) to the
// layer ID in the store.
digestToLayerID map[digest.Digest]string
// layerPosition stores where we are in reading a blob's layers
layerPosition map[digest.Digest]int
}
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // List of sizes of each signature slice
}

const expectedLayerDiffIDFlag = "expected-layer-diffid"

// newImageSource sets up an image for reading.
func newImageSource(sys *types.SystemContext, imageRef storageReference) (*storageImageSource, error) {
// First, locate the image.
Expand All @@ -62,9 +70,15 @@ func newImageSource(sys *types.SystemContext, imageRef storageReference) (*stora
imageRef: imageRef,
systemContext: sys,
image: img,
layerPosition: make(map[digest.Digest]int),
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
getBlobMutexProtected: struct {
digestToLayerID map[digest.Digest]string
layerPosition map[digest.Digest]int
}{
digestToLayerID: make(map[digest.Digest]string),
layerPosition: make(map[digest.Digest]int),
},
}
image.Compat = impl.AddCompat(image)
if img.Metadata != "" {
Expand All @@ -91,6 +105,7 @@ func (s *storageImageSource) Close() error {
func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (rc io.ReadCloser, n int64, err error) {
// We need a valid digest value.
digest := info.Digest

err = digest.Validate()
if err != nil {
return nil, 0, err
Expand All @@ -100,10 +115,24 @@ func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, c
return io.NopCloser(bytes.NewReader(image.GzippedEmptyLayer)), int64(len(image.GzippedEmptyLayer)), nil
}

// Check if the blob corresponds to a diff that was used to initialize any layers. Our
// callers should try to retrieve layers using their uncompressed digests, so no need to
// check if they're using one of the compressed digests, which we can't reproduce anyway.
layers, _ := s.imageRef.transport.store.LayersByUncompressedDigest(digest)
var layers []storage.Layer

// If the digest was overriden by LayerInfosForCopy, then we need to use the TOC digest
// to retrieve it from the storage.
s.getBlobMutex.Lock()
layerID, found := s.getBlobMutexProtected.digestToLayerID[digest]
s.getBlobMutex.Unlock()

if found {
if layer, err := s.imageRef.transport.store.Layer(layerID); err == nil {
layers = []storage.Layer{*layer}
}
} else {
// Check if the blob corresponds to a diff that was used to initialize any layers. Our
// callers should try to retrieve layers using their uncompressed digests, so no need to
// check if they're using one of the compressed digests, which we can't reproduce anyway.
layers, _ = s.imageRef.transport.store.LayersByUncompressedDigest(digest)
}

// If it's not a layer, then it must be a data item.
if len(layers) == 0 {
Expand Down Expand Up @@ -174,8 +203,8 @@ func (s *storageImageSource) getBlobAndLayerID(digest digest.Digest, layers []st
// which claim to have the same contents, that we actually do have multiple layers, otherwise we could
// just go ahead and use the first one every time.
s.getBlobMutex.Lock()
i := s.layerPosition[digest]
s.layerPosition[digest] = i + 1
i := s.getBlobMutexProtected.layerPosition[digest]
s.getBlobMutexProtected.layerPosition[digest] = i + 1
s.getBlobMutex.Unlock()
if len(layers) > 0 {
layer = layers[i%len(layers)]
Expand Down Expand Up @@ -273,8 +302,26 @@ func (s *storageImageSource) LayerInfosForCopy(ctx context.Context, instanceDige
if layer.UncompressedSize < 0 {
return nil, fmt.Errorf("uncompressed size for layer %q is unknown", layerID)
}

blobDigest := layer.UncompressedDigest

if layer.Flags != nil {
if v, ok := layer.Flags[expectedLayerDiffIDFlag]; ok {
if expectedDigest, ok := v.(string); ok {
// if the layer is stored by its TOC, report the expected diffID as the layer Digest
// but store the TOC digest so we can later retrieve it from the storage.
blobDigest, err = digest.Parse(expectedDigest)
if err != nil {
return nil, fmt.Errorf("parsing expected diffID %q for layer %q: %w", expectedDigest, layerID, err)
}
}
}
}
s.getBlobMutex.Lock()
s.getBlobMutexProtected.digestToLayerID[blobDigest] = layer.ID
s.getBlobMutex.Unlock()
blobInfo := types.BlobInfo{
Digest: layer.UncompressedDigest,
Digest: blobDigest,
Size: layer.UncompressedSize,
MediaType: uncompressedLayerType,
}
Expand Down

0 comments on commit 0bd2e0c

Please sign in to comment.