Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add digest validation utils and examples #20887

Merged
merged 10 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/containers/azcontainerregistry/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "go",
"TagPrefix": "go/containers/azcontainerregistry",
"Tag": "go/containers/azcontainerregistry_5bce238ccf"
"Tag": "go/containers/azcontainerregistry_9579d04096"
}
79 changes: 72 additions & 7 deletions sdk/containers/azcontainerregistry/blob_client_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import (
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/containers/azcontainerregistry"
"io"
"log"
"os"
"strconv"
"strings"
)

var blobClient *azcontainerregistry.BlobClient
Expand Down Expand Up @@ -47,21 +51,82 @@ func ExampleBlobClient_DeleteBlob() {
}

func ExampleBlobClient_GetBlob() {
res, err := blobClient.GetBlob(context.TODO(), "prod/bash", "sha256:16463e0c481e161aabb735437d30b3c9c7391c2747cc564bb927e843b73dcb39", nil)
digest := "sha256:16463e0c481e161aabb735437d30b3c9c7391c2747cc564bb927e843b73dcb39"
tadelesh marked this conversation as resolved.
Show resolved Hide resolved
res, err := blobClient.GetBlob(context.TODO(), "prod/bash", digest, nil)
if err != nil {
log.Fatalf("failed to finish the request: %v", err)
}
// deal with the blob io
_ = res.BlobData
reader, err := azcontainerregistry.NewDigestValidationReader(digest, res.BlobData)
if err != nil {
log.Fatalf("failed to create validation reader: %v", err)
}
f, err := os.Create("blob_file")
if err != nil {
log.Fatalf("failed to create blob file: %v", err)
}
defer f.Close()
buf := make([]byte, 1024*1024)
for {
tadelesh marked this conversation as resolved.
Show resolved Hide resolved
_, err := reader.Read(buf)
if err == io.EOF {
tadelesh marked this conversation as resolved.
Show resolved Hide resolved
_, err = f.Write(buf)
if err != nil {
log.Fatalf("failed to write to the file: %v", err)
}
break
}
if err != nil {
log.Fatalf("failed to read blob: %v", err)
}
_, err = f.Write(buf)
if err != nil {
log.Fatalf("failed to write to the file: %v", err)
}
}
}

func ExampleBlobClient_GetChunk() {
res, err := blobClient.GetChunk(context.TODO(), "prod/bash", "sha256:16463e0c481e161aabb735437d30b3c9c7391c2747cc564bb927e843b73dcb39", "bytes=0-299", nil)
chunkSize := 1024 * 1024
digest := "sha256:16463e0c481e161aabb735437d30b3c9c7391c2747cc564bb927e843b73dcb39"
current := 0
f, err := os.Create("blob_file")
if err != nil {
log.Fatalf("failed to finish the request: %v", err)
log.Fatalf("failed to create blob file: %v", err)
}
defer f.Close()
for {
res, err := blobClient.GetChunk(context.TODO(), "prod/bash", digest, fmt.Sprintf("bytes=%d-%d", current, current+chunkSize-1), nil)
if err != nil {
log.Fatalf("failed to finish the request: %v", err)
}
chunk, err := io.ReadAll(res.ChunkData)
if err != nil {
log.Fatalf("failed to read the chunk: %v", err)
}
_, err = f.Write(chunk)
if err != nil {
log.Fatalf("failed to write to the file: %v", err)
}

totalSize, _ := strconv.Atoi(strings.Split(*res.ContentRange, "/")[1])
currentRangeEnd, _ := strconv.Atoi(strings.Split(strings.Split(*res.ContentRange, "/")[0], "-")[1])
if totalSize == currentRangeEnd+1 {
break
}
current += chunkSize
}
_, err = f.Seek(0, io.SeekStart)
if err != nil {
log.Fatalf("failed to set to the start of the file: %v", err)
}
reader, err := azcontainerregistry.NewDigestValidationReader(digest, f)
if err != nil {
log.Fatalf("failed to create digest validation reader: %v", err)
}
_, err = io.ReadAll(reader)
if err != nil {
log.Fatalf("failed to validate digest: %v", err)
}
// deal with the chunk io
_ = res.ChunkData
}

func ExampleBlobClient_GetUploadStatus() {
Expand Down
52 changes: 50 additions & 2 deletions sdk/containers/azcontainerregistry/blob_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ package azcontainerregistry
import (
"bytes"
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/internal/mock"
"github.com/stretchr/testify/require"
"io"
"net/http"
"strconv"
"strings"
"testing"
)

Expand Down Expand Up @@ -162,6 +168,29 @@ func TestBlobClient_GetBlob(t *testing.T) {
res, err := client.GetBlob(ctx, "alpine", digest, nil)
require.NoError(t, err)
require.NotEmpty(t, *res.ContentLength)
reader, err := NewDigestValidationReader(digest, res.BlobData)
require.NoError(t, err)
_, err = io.ReadAll(reader)
require.NoError(t, err)
}

func TestBlobClient_GetBlob_wrongDigest(t *testing.T) {
srv, closeServer := mock.NewServer()
defer closeServer()
srv.AppendResponse(mock.WithStatusCode(http.StatusOK), mock.WithBody([]byte("test")))

pl := runtime.NewPipeline(moduleName, moduleVersion, runtime.PipelineOptions{}, &policy.ClientOptions{Transport: srv})
client := &BlobClient{
srv.URL(),
pl,
}
ctx := context.Background()
resp, err := client.GetBlob(ctx, "name", "sha256:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08", nil)
require.NoError(t, err)
reader, err := NewDigestValidationReader("sha256:wrong", resp.BlobData)
require.NoError(t, err)
_, err = io.ReadAll(reader)
require.Error(t, err, ErrMismatchedHash)
}

func TestBlobClient_GetBlob_fail(t *testing.T) {
Expand Down Expand Up @@ -190,10 +219,29 @@ func TestBlobClient_GetChunk(t *testing.T) {
ctx := context.Background()
client, err := NewBlobClient(endpoint, cred, &BlobClientOptions{ClientOptions: options})
require.NoError(t, err)
name := "alpine"
digest := "sha256:042a816809aac8d0f7d7cacac7965782ee2ecac3f21bcf9f24b1de1a7387b769"
res, err := client.GetChunk(ctx, "alpine", digest, "bytes=0-999", nil)
chunkSize := 1000
current := 0
blob := bytes.NewBuffer(nil)
for {
res, err := client.GetChunk(ctx, name, digest, fmt.Sprintf("bytes=%d-%d", current, current+chunkSize-1), nil)
require.NoError(t, err)
chunk, err := io.ReadAll(res.ChunkData)
require.NoError(t, err)
_, err = blob.Write(chunk)
require.NoError(t, err)
totalSize, _ := strconv.Atoi(strings.Split(*res.ContentRange, "/")[1])
currentRangeEnd, _ := strconv.Atoi(strings.Split(strings.Split(*res.ContentRange, "/")[0], "-")[1])
if totalSize == currentRangeEnd+1 {
break
}
current += chunkSize
}
reader, err := NewDigestValidationReader(digest, blob)
require.NoError(t, err)
_, err = io.ReadAll(reader)
require.NoError(t, err)
require.Equal(t, int64(1000), *res.ContentLength)
}

func TestBlobClient_GetChunk_fail(t *testing.T) {
Expand Down
66 changes: 3 additions & 63 deletions sdk/containers/azcontainerregistry/blob_custom_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,13 @@ package azcontainerregistry

import (
"context"
"crypto/sha256"
"encoding"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"hash"
"io"
"reflect"
)
Expand Down Expand Up @@ -61,59 +58,6 @@ func NewBlobClient(endpoint string, credential azcore.TokenCredential, options *
}, nil
}

// BlobDigestCalculator help to calculate blob digest when uploading blob.
// Don't use this type directly, use NewBlobDigestCalculator() instead.
type BlobDigestCalculator struct {
h hash.Hash
hashState []byte
}

type wrappedReadSeeker struct {
io.Reader
io.Seeker
}

// NewBlobDigestCalculator creates a new calculator to help to calculate blob digest when uploading blob.
func NewBlobDigestCalculator() *BlobDigestCalculator {
return &BlobDigestCalculator{
h: sha256.New(),
}
}

func (b *BlobDigestCalculator) saveState() {
b.hashState, _ = b.h.(encoding.BinaryMarshaler).MarshalBinary()
}

func (b *BlobDigestCalculator) restoreState() {
if b.hashState == nil {
return
}
_ = b.h.(encoding.BinaryUnmarshaler).UnmarshalBinary(b.hashState)
}

// newLimitTeeReader returns a Reader that writes to w what it reads from r with n bytes limit.
func newLimitTeeReader(r io.Reader, w io.Writer, n int64) io.Reader {
return &limitTeeReader{r, w, n}
}

type limitTeeReader struct {
r io.Reader
w io.Writer
n int64
}

func (lt *limitTeeReader) Read(p []byte) (int, error) {
n, err := lt.r.Read(p)
if n > 0 && lt.n > 0 {
wn, werr := lt.w.Write(p[:n])
if werr != nil {
return wn, werr
}
lt.n -= int64(wn)
}
return n, err
}

// BlobClientUploadChunkOptions contains the optional parameters for the BlobClient.UploadChunk method.
type BlobClientUploadChunkOptions struct {
// Start of range for the blob to be uploaded.
Expand All @@ -130,15 +74,11 @@ type BlobClientUploadChunkOptions struct {
// - options - BlobClientUploadChunkOptions contains the optional parameters for the BlobClient.UploadChunk method.
func (client *BlobClient) UploadChunk(ctx context.Context, location string, chunkData io.ReadSeeker, blobDigestCalculator *BlobDigestCalculator, options *BlobClientUploadChunkOptions) (BlobClientUploadChunkResponse, error) {
blobDigestCalculator.saveState()
size, err := chunkData.Seek(0, io.SeekEnd) // Seek to the end to get the stream's size
if err != nil {
return BlobClientUploadChunkResponse{}, err
}
_, err = chunkData.Seek(0, io.SeekStart)
reader, err := blobDigestCalculator.wrapReader(chunkData)
if err != nil {
return BlobClientUploadChunkResponse{}, err
}
wrappedChunkData := &wrappedReadSeeker{Reader: newLimitTeeReader(chunkData, blobDigestCalculator.h, size), Seeker: chunkData}
wrappedChunkData := &wrappedReadSeeker{Reader: reader, Seeker: chunkData}
var requestOptions *blobClientUploadChunkOptions
if options != nil && options.RangeStart != nil && options.RangeEnd != nil {
requestOptions = &blobClientUploadChunkOptions{ContentRange: to.Ptr(fmt.Sprintf("%d-%d", *options.RangeStart, *options.RangeEnd))}
Expand All @@ -157,5 +97,5 @@ func (client *BlobClient) UploadChunk(ctx context.Context, location string, chun
// - blobDigestCalculator - Calculator that help to calculate blob digest
// - options - BlobClientCompleteUploadOptions contains the optional parameters for the BlobClient.CompleteUpload method.
func (client *BlobClient) CompleteUpload(ctx context.Context, location string, blobDigestCalculator *BlobDigestCalculator, options *BlobClientCompleteUploadOptions) (BlobClientCompleteUploadResponse, error) {
return client.completeUpload(ctx, fmt.Sprintf("sha256:%x", blobDigestCalculator.h.Sum(nil)), location, options)
return client.completeUpload(ctx, blobDigestCalculator.getDigest(), location, options)
}
tadelesh marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
package azcontainerregistry_test

import (
"bytes"
"context"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/containers/azcontainerregistry"
"io"
"log"
"os"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
)
Expand Down Expand Up @@ -42,9 +43,32 @@ func ExampleBlobClient_CompleteUpload() {
func ExampleBlobClient_UploadChunk() {
// calculator should be created when starting upload blob and passing to UploadChunk and CompleteUpload method
calculator := azcontainerregistry.NewBlobDigestCalculator()
res, err := blobClient.UploadChunk(context.TODO(), "v2/blobland/blobs/uploads/2b28c60d-d296-44b7-b2b4-1f01c63195c6?_nouploadcache=false&_state=VYABvUSCNW2yY5e5VabLHppXqwU0K7cvT0YUdq57KBt7Ik5hbWUiOiJibG9ibGFuZCIsIlVVSUQiOiIyYjI4YzYwZC1kMjk2LTQ0YjctYjJiNC0xZjAxYzYzMTk1YzYiLCJPZmZzZXQiOjAsIlN0YXJ0ZWRBdCI6IjIwMTktMDgtMjdUMjM6NTI6NDcuMDUzNjU2Mjg1WiJ9", streaming.NopCloser(bytes.NewReader([]byte("U29tZXRoaW5nRWxzZQ=="))), calculator, nil)
location := "v2/blobland/blobs/uploads/2b28c60d-d296-44b7-b2b4-1f01c63195c6?_nouploadcache=false&_state=VYABvUSCNW2yY5e5VabLHppXqwU0K7cvT0YUdq57KBt7Ik5hbWUiOiJibG9ibGFuZCIsIlVVSUQiOiIyYjI4YzYwZC1kMjk2LTQ0YjctYjJiNC0xZjAxYzYzMTk1YzYiLCJPZmZzZXQiOjAsIlN0YXJ0ZWRBdCI6IjIwMTktMDgtMjdUMjM6NTI6NDcuMDUzNjU2Mjg1WiJ9"
f, err := os.Open("blob-file")
if err != nil {
log.Fatalf("failed to finish the request: %v", err)
log.Fatalf("failed to read blob file: %v", err)
}
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
log.Fatalf("failed to calculate blob size: %v", err)
}
chunkSize := int64(5)
current := int64(0)
for {
end := current + chunkSize
if end > size {
end = size
}
chunkReader := io.NewSectionReader(f, current, end-current)
uploadResp, err := blobClient.UploadChunk(context.TODO(), location, chunkReader, calculator, &azcontainerregistry.BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(current)), RangeEnd: to.Ptr(int32(end - 1))})
if err != nil {
log.Fatalf("failed to upload chunk: %v", err)
}
location = *uploadResp.Location
current = end
if current >= size {
break
}
}
fmt.Printf("upload location: %s", *res.Location)
fmt.Printf("upload location: %s", location)
}
43 changes: 20 additions & 23 deletions sdk/containers/azcontainerregistry/blob_custom_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,26 @@ func TestBlobClient_CompleteUpload_uploadByChunk(t *testing.T) {
require.NoError(t, err)
calculator := NewBlobDigestCalculator()
oriReader := bytes.NewReader(blob)
firstPart := io.NewSectionReader(oriReader, int64(0), int64(len(blob)/2))
secondPart := io.NewSectionReader(oriReader, int64(len(blob)/2), int64(len(blob)-len(blob)/2))
uploadResp, err := client.UploadChunk(ctx, *startRes.Location, firstPart, calculator, &BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(0)), RangeEnd: to.Ptr(int32(len(blob)/2 - 1))})
require.NoError(t, err)
require.NotEmpty(t, *uploadResp.Location)
uploadResp, err = client.UploadChunk(ctx, *uploadResp.Location, secondPart, calculator, &BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(len(blob) / 2)), RangeEnd: to.Ptr(int32(len(blob) - 1))})
require.NoError(t, err)
require.NotEmpty(t, *uploadResp.Location)
completeResp, err := client.CompleteUpload(ctx, *uploadResp.Location, calculator, nil)
size := int64(len(blob))
chunkSize := int64(736)
current := int64(0)
location := *startRes.Location
for {
end := current + chunkSize
if end > size {
end = size
}
chunkReader := io.NewSectionReader(oriReader, current, end-current)
uploadResp, err := client.UploadChunk(ctx, location, chunkReader, calculator, &BlobClientUploadChunkOptions{RangeStart: to.Ptr(int32(current)), RangeEnd: to.Ptr(int32(end - 1))})
require.NoError(t, err)
require.NotEmpty(t, *uploadResp.Location)
location = *uploadResp.Location
current = end
if current >= size {
break
}
}
completeResp, err := client.CompleteUpload(ctx, location, calculator, nil)
require.NoError(t, err)
require.NotEmpty(t, *completeResp.DockerContentDigest)
}
Expand All @@ -103,20 +114,6 @@ func TestNewBlobClient(t *testing.T) {
require.Errorf(t, err, "provided Cloud field is missing Azure Container Registry configuration")
}

func TestBlobDigestCalculator_saveAndRestoreState(t *testing.T) {
calculator := NewBlobDigestCalculator()
calculator.restoreState()
calculator.saveState()
calculator.restoreState()
calculator.h.Write([]byte("test1"))
sum := calculator.h.Sum(nil)
calculator.saveState()
calculator.h.Write([]byte("test2"))
require.NotEqual(t, sum, calculator.h.Sum(nil))
calculator.restoreState()
require.Equal(t, sum, calculator.h.Sum(nil))
}

func TestBlobClient_CompleteUpload_uploadByChunkFailOver(t *testing.T) {
startRecording(t)
endpoint, cred, options := getEndpointCredAndClientOptions(t)
Expand Down
Loading