Skip to content

Commit

Permalink
vetu pull and vetu clone sparsely (#28)
Browse files Browse the repository at this point in the history
* Introduce sparseio.Copy() and write the disk layers sparsely

* vetu create: no need to use temporary.AtomicallyCopyThrough()

...since we're dealing with a temporary VM directory anyways.

* Put temporary.AtomicallyCopyThrough() on the sparse rails

* sparseio_test.go: add TestCopySmall test

* Fix sparseio.Copy() argument order and add TestAtomicallyCopyThrough
  • Loading branch information
edigaryev authored Nov 27, 2023
1 parent 82a729f commit df0b3ca
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 12 deletions.
7 changes: 4 additions & 3 deletions internal/command/create/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/cirruslabs/vetu/internal/storage/local"
"github.com/cirruslabs/vetu/internal/storage/temporary"
"github.com/cirruslabs/vetu/internal/vmconfig"
cp "github.com/otiai10/copy"
"github.com/spf13/cobra"
"path/filepath"
)
Expand Down Expand Up @@ -63,7 +64,7 @@ func runCreate(cmd *cobra.Command, args []string) error {

// Kernel
if kernel != "" {
if err := temporary.AtomicallyCopyThrough(kernel, vmDir.KernelPath()); err != nil {
if err := cp.Copy(kernel, vmDir.KernelPath()); err != nil {
return fmt.Errorf("failed to copy kernel to the VM's directory: %v", err)
}
} else {
Expand All @@ -72,7 +73,7 @@ func runCreate(cmd *cobra.Command, args []string) error {

// Initramfs
if initramfs != "" {
if err := temporary.AtomicallyCopyThrough(initramfs, vmDir.InitramfsPath()); err != nil {
if err := cp.Copy(initramfs, vmDir.InitramfsPath()); err != nil {
return fmt.Errorf("failed to copy initramfs to the VM's directory: %v", err)
}
}
Expand All @@ -86,7 +87,7 @@ func runCreate(cmd *cobra.Command, args []string) error {
for _, disk := range disks {
diskName := filepath.Base(disk)

if err := temporary.AtomicallyCopyThrough(disk, filepath.Join(vmDir.Path(), diskName)); err != nil {
if err := cp.Copy(disk, filepath.Join(vmDir.Path(), diskName)); err != nil {
return fmt.Errorf("failed to copy disk %q to the VM's directory: %v", diskName, err)
}

Expand Down
8 changes: 4 additions & 4 deletions internal/oci/diskpuller/diskpuller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package diskpuller
import (
"context"
"fmt"
"github.com/cirruslabs/vetu/internal/sparseio"
"github.com/cirruslabs/vetu/internal/vmdirectory"
"github.com/dustin/go-humanize"
"github.com/regclient/regclient"
Expand Down Expand Up @@ -160,9 +161,8 @@ func (diskTask *diskTask) process(
if err != nil {
return err
}
if _, err := diskFile.Seek(diskTask.Offset, io.SeekStart); err != nil {
return err
}

diskFileAtOffset := io.NewOffsetWriter(diskFile, diskTask.Offset)

// Pull disk layer from the OCI registry
blobReader, err := client.BlobGet(ctx, reference, diskTask.Desc)
Expand All @@ -175,7 +175,7 @@ func (diskTask *diskTask) process(
progressBarReader := progressbar.NewReader(blobReader, progressBar)
decompressor := initializeDecompressor(&progressBarReader)

if _, err := io.Copy(diskFile, decompressor); err != nil {
if err := sparseio.Copy(diskFileAtOffset, decompressor); err != nil {
return err
}

Expand Down
36 changes: 36 additions & 0 deletions internal/sparseio/sparseio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package sparseio

import (
"bytes"
"errors"
"io"
)

const blockSize = 64 * 1024

func Copy(dst io.WriterAt, src io.Reader) error {
chunk := make([]byte, blockSize)
zeroedChunk := make([]byte, blockSize)

var offset int64

for {
n, err := src.Read(chunk)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}

return err
}

// Only write non-zero chunks
if !bytes.Equal(chunk[:n], zeroedChunk[:n]) {
if _, err := dst.WriteAt(chunk[:n], offset); err != nil {
return err
}
}

offset += int64(n)
}
}
96 changes: 96 additions & 0 deletions internal/sparseio/sparseio_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package sparseio_test

import (
"github.com/cirruslabs/vetu/internal/sparseio"
"github.com/dustin/go-humanize"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/require"
"math/rand"
"os"
"path/filepath"
"testing"
)

func TestCopySmall(t *testing.T) {
// Create a small file
originalFilePath := filepath.Join(t.TempDir(), "original.txt")
err := os.WriteFile(originalFilePath, []byte("Hello, World!\n"), 0600)
require.NoError(t, err)

// Sparsely copy it
sparseFilePath := filepath.Join(t.TempDir(), "sparse.txt")
copySparse(t, originalFilePath, sparseFilePath)

// Ensure that both files have identical contents
require.Equal(t, fileDigest(t, originalFilePath), fileDigest(t, sparseFilePath))
}

//nolint:gosec // we don't need cryptographically secure randomness here
func TestCopyRandomized(t *testing.T) {
// Create a sufficiently large file that contains
// interleaved random-filled and zero-filled parts
originalFilePath := filepath.Join(t.TempDir(), "original.bin")
originalFile, err := os.Create(originalFilePath)
require.NoError(t, err)

var wroteBytes int64

for wroteBytes < 1*humanize.GByte {
chunk := randomlySizedChunk(1*humanize.KByte, 4*humanize.MByte)

// Randomize the contents of some chunks
if rand.Intn(2) == 1 {
//nolint:staticcheck // what's the alternative to the deprecated rand.Read() anyways?
_, err = rand.Read(chunk)
require.NoError(t, err)
}

n, err := originalFile.Write(chunk)
require.NoError(t, err)

wroteBytes += int64(n)
}

require.NoError(t, originalFile.Close())

// Sparsely copy the original file
sparseFilePath := filepath.Join(t.TempDir(), "sparse.bin")
copySparse(t, originalFilePath, sparseFilePath)

// Ensure that the copied file has the same contents as the original file
require.Equal(t, fileDigest(t, originalFilePath), fileDigest(t, sparseFilePath))
}

func copySparse(t *testing.T, originalFilePath string, sparseFilePath string) {
originalFile, err := os.Open(originalFilePath)
require.NoError(t, err)

originalFileInfo, err := originalFile.Stat()
require.NoError(t, err)

sparseFile, err := os.Create(sparseFilePath)
require.NoError(t, err)

require.NoError(t, sparseFile.Truncate(originalFileInfo.Size()))
require.NoError(t, sparseio.Copy(sparseFile, originalFile))

require.NoError(t, originalFile.Close())
require.NoError(t, sparseFile.Close())
}

//nolint:gosec // we don't need cryptographically secure randomness here
func randomlySizedChunk(minBytes int, maxBytes int) []byte {
return make([]byte, rand.Intn(maxBytes-minBytes+1)+minBytes)
}

func fileDigest(t *testing.T, path string) digest.Digest {
file, err := os.Open(path)
require.NoError(t, err)

digest, err := digest.FromReader(file)
require.NoError(t, err)

require.NoError(t, file.Close())

return digest
}
52 changes: 47 additions & 5 deletions internal/storage/temporary/temporary.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,68 @@ package temporary

import (
"github.com/cirruslabs/vetu/internal/homedir"
"github.com/cirruslabs/vetu/internal/sparseio"
"github.com/cirruslabs/vetu/internal/vmdirectory"
"github.com/google/uuid"
cp "github.com/otiai10/copy"
"os"
"path/filepath"
)

func AtomicallyCopyThrough(src string, dest string) error {
func AtomicallyCopyThrough(srcDir string, dstDir string) error {
baseDir, err := initialize()
if err != nil {
return err
}

copyThroughPath := filepath.Join(baseDir, uuid.NewString())
// Create an intermediate directory that we'll later
// os.Rename() into dstDir to achieve the atomicity
intermediateDir := filepath.Join(baseDir, uuid.NewString())

if err := cp.Copy(src, copyThroughPath); err != nil {
if err := os.Mkdir(intermediateDir, 0755); err != nil {
return err
}

return os.Rename(copyThroughPath, dest)
// Copy the files from the source directory
// to the intermediate directory
dirEntries, err := os.ReadDir(srcDir)
if err != nil {
return err
}

for _, dirEntry := range dirEntries {
srcFile, err := os.Open(filepath.Join(srcDir, dirEntry.Name()))
if err != nil {
return err
}

srcFileInfo, err := srcFile.Stat()
if err != nil {
return err
}

dstFile, err := os.Create(filepath.Join(intermediateDir, dirEntry.Name()))
if err != nil {
return err
}

if err := dstFile.Truncate(srcFileInfo.Size()); err != nil {
return err
}

if err := sparseio.Copy(dstFile, srcFile); err != nil {
return err
}

if err := srcFile.Close(); err != nil {
return err
}

if err := dstFile.Close(); err != nil {
return err
}
}

return os.Rename(intermediateDir, dstDir)
}

func Create() (*vmdirectory.VMDirectory, error) {
Expand Down
55 changes: 55 additions & 0 deletions internal/storage/temporary/temporary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package temporary_test

import (
cryptorand "crypto/rand"
"github.com/cirruslabs/vetu/internal/storage/temporary"
"github.com/dustin/go-humanize"
"github.com/opencontainers/go-digest"
"github.com/stretchr/testify/require"
"os"
"path/filepath"
"testing"
)

func TestAtomicallyCopyThrough(t *testing.T) {
t.Setenv("VETU_HOME", filepath.Join(t.TempDir(), ".vetu"))

tmpDir := t.TempDir()

// Create a source directory
srcDir := filepath.Join(tmpDir, "src")
require.NoError(t, os.Mkdir(srcDir, 0700))

// Add a small-sized text file to it
err := os.WriteFile(filepath.Join(srcDir, "text.txt"), []byte("Hello, World!\n"), 0600)
require.NoError(t, err)

// Add a medium-sized binary file to it
buf := make([]byte, 64*humanize.MByte)
_, err = cryptorand.Read(buf)
require.NoError(t, err)

err = os.WriteFile(filepath.Join(srcDir, "binary.bin"), buf, 0600)
require.NoError(t, err)

// Copy source directory contents to destination directory
dstDir := filepath.Join(tmpDir, "dst")
require.NoError(t, temporary.AtomicallyCopyThrough(srcDir, dstDir))

// Ensure that the files copied are identical
// to the ones in the source directory
require.Equal(t, fileDigest(t, filepath.Join(dstDir, "text.txt")), digest.FromString("Hello, World!\n"))
require.Equal(t, fileDigest(t, filepath.Join(dstDir, "binary.bin")), digest.FromBytes(buf))
}

func fileDigest(t *testing.T, path string) digest.Digest {
file, err := os.Open(path)
require.NoError(t, err)

digest, err := digest.FromReader(file)
require.NoError(t, err)

require.NoError(t, file.Close())

return digest
}

0 comments on commit df0b3ca

Please sign in to comment.