Skip to content

Commit

Permalink
Refactor VM snapshot management
Browse files Browse the repository at this point in the history
Currently, VM snapshots are managed by the orchestrator via a table of idle
function instances. With #794 snapshot management will become more
complicated, and thus it requires refactoring into a separate component.

Closes #802
Part of #794

Signed-off-by: Georgiy Lebedev <lebedev.gk@phystech.edu>
  • Loading branch information
CuriousGeorgiy authored and leokondrashov committed Sep 11, 2023
1 parent c346bdb commit e7caff8
Show file tree
Hide file tree
Showing 12 changed files with 505 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
strategy:
fail-fast: false
matrix:
module: [taps, misc, profile, networking]
module: [taps, misc, profile, networking, snapshotting]
steps:

- name: Set up Go 1.19
Expand Down
94 changes: 44 additions & 50 deletions cri/firecracker/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package firecracker
import (
"context"
"errors"
"github.com/vhive-serverless/vhive/snapshotting"
"strconv"
"sync"
"sync/atomic"
Expand All @@ -40,7 +41,7 @@ type coordinator struct {
nextID uint64

activeInstances map[string]*funcInstance
idleInstances map[string][]*funcInstance
snapshotManager *snapshotting.SnapshotManager
withoutOrchestrator bool
}

Expand All @@ -56,56 +57,32 @@ func withoutOrchestrator() coordinatorOption {
func newFirecrackerCoordinator(orch *ctriface.Orchestrator, opts ...coordinatorOption) *coordinator {
c := &coordinator{
activeInstances: make(map[string]*funcInstance),
idleInstances: make(map[string][]*funcInstance),
orch: orch,
}

for _, opt := range opts {
opt(c)
}

return c
}

func (c *coordinator) getIdleInstance(image string) *funcInstance {
c.Lock()
defer c.Unlock()

idles, ok := c.idleInstances[image]
if !ok {
c.idleInstances[image] = []*funcInstance{}
return nil
}

if len(idles) != 0 {
fi := idles[0]
c.idleInstances[image] = idles[1:]
return fi
}

return nil
}

func (c *coordinator) setIdleInstance(fi *funcInstance) {
c.Lock()
defer c.Unlock()

_, ok := c.idleInstances[fi.Image]
if !ok {
c.idleInstances[fi.Image] = []*funcInstance{}
snapshotsDir := "/fccd/test/snapshots"
if !c.withoutOrchestrator {
snapshotsDir = orch.GetSnapshotsDir()
}
c.snapshotManager = snapshotting.NewSnapshotManager(snapshotsDir)

c.idleInstances[fi.Image] = append(c.idleInstances[fi.Image], fi)
return c
}

func (c *coordinator) startVM(ctx context.Context, image string) (*funcInstance, error) {
return c.startVMWithEnvironment(ctx, image, []string{})
}

func (c *coordinator) startVMWithEnvironment(ctx context.Context, image string, environment []string) (*funcInstance, error) {
if fi := c.getIdleInstance(image); c.orch != nil && c.orch.GetSnapshotsEnabled() && fi != nil {
err := c.orchLoadInstance(ctx, fi)
return fi, err
if c.orch != nil && c.orch.GetSnapshotsEnabled() {
// Check if snapshot is available
if snap, err := c.snapshotManager.AcquireSnapshot(image); err == nil {
return c.orchLoadInstance(ctx, snap)
}
}

return c.orchStartVM(ctx, image, environment)
Expand Down Expand Up @@ -134,7 +111,6 @@ func (c *coordinator) stopVM(ctx context.Context, containerID string) error {
func (c *coordinator) isActive(containerID string) bool {
c.Lock()
defer c.Unlock()

_, ok := c.activeInstances[containerID]
return ok
}
Expand Down Expand Up @@ -185,29 +161,44 @@ func (c *coordinator) orchStartVM(ctx context.Context, image string, envVariable
return fi, err
}

func (c *coordinator) orchLoadInstance(ctx context.Context, fi *funcInstance) error {
fi.Logger.Debug("found idle instance to load")
func (c *coordinator) orchLoadInstance(ctx context.Context, snap *snapshotting.Snapshot) (*funcInstance, error) {
logger := log.WithFields(
log.Fields{
"vmID": snap.GetId(),
"image": snap.GetImage(),
},
)

logger.Debug("found idle instance to load")

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()

if _, err := c.orch.LoadSnapshot(ctxTimeout, fi.VmID); err != nil {
fi.Logger.WithError(err).Error("failed to load VM")
return err
resp, _, err := c.orch.LoadSnapshot(ctxTimeout, snap.GetId(), snap)
if err != nil {
logger.WithError(err).Error("failed to load VM")
return nil, err
}

if _, err := c.orch.ResumeVM(ctxTimeout, fi.VmID); err != nil {
fi.Logger.WithError(err).Error("failed to load VM")
return err
if _, err := c.orch.ResumeVM(ctxTimeout, snap.GetId()); err != nil {
logger.WithError(err).Error("failed to load VM")
return nil, err
}

fi.Logger.Debug("successfully loaded idle instance")
return nil
fi := newFuncInstance(snap.GetId(), snap.GetImage(), resp)
logger.Debug("successfully loaded idle instance")
return fi, nil
}

func (c *coordinator) orchCreateSnapshot(ctx context.Context, fi *funcInstance) error {
var err error

snap, err := c.snapshotManager.InitSnapshot(fi.VmID, fi.Image)
if err != nil {
fi.Logger.WithError(err).Error("failed to initialize snapshot")
return nil
}

fi.OnceCreateSnapInstance.Do(
func() {
ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*60)
Expand All @@ -221,15 +212,20 @@ func (c *coordinator) orchCreateSnapshot(ctx context.Context, fi *funcInstance)
return
}

err = c.orch.CreateSnapshot(ctxTimeout, fi.VmID)
err = c.orch.CreateSnapshot(ctxTimeout, fi.VmID, snap)
if err != nil {
fi.Logger.WithError(err).Error("failed to create snapshot")
return
}
},
)

return err
if err := c.snapshotManager.CommitSnapshot(fi.VmID); err != nil {
fi.Logger.WithError(err).Error("failed to commit snapshot")
return err
}

return nil
}

func (c *coordinator) orchOffloadInstance(ctx context.Context, fi *funcInstance) error {
Expand All @@ -246,8 +242,6 @@ func (c *coordinator) orchOffloadInstance(ctx context.Context, fi *funcInstance)
fi.Logger.WithError(err).Error("failed to offload instance")
}

c.setIdleInstance(fi)

return nil
}

Expand Down
6 changes: 4 additions & 2 deletions ctriface/failing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"testing"
"time"

"github.com/vhive-serverless/vhive/snapshotting"
ctrdlog "github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -61,13 +62,14 @@ func TestStartSnapStop(t *testing.T) {
err = orch.PauseVM(ctx, vmID)
require.NoError(t, err, "Failed to pause VM")

err = orch.CreateSnapshot(ctx, vmID)
snap := snapshotting.NewSnapshot(vmID, "/fccd/snapshots", testImageName)
err = orch.CreateSnapshot(ctx, vmID, snap)
require.NoError(t, err, "Failed to create snapshot of VM")

err = orch.Offload(ctx, vmID)
require.NoError(t, err, "Failed to offload VM")

_, err = orch.LoadSnapshot(ctx, vmID)
_, _, err = orch.LoadSnapshot(ctx, vmID, snap)
require.NoError(t, err, "Failed to load snapshot of VM")

_, err = orch.ResumeVM(ctx, vmID)
Expand Down
27 changes: 18 additions & 9 deletions ctriface/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package ctriface

import (
"context"
"github.com/vhive-serverless/vhive/snapshotting"
"os"
"os/exec"
"strings"
Expand Down Expand Up @@ -392,16 +393,16 @@ func (o *Orchestrator) ResumeVM(ctx context.Context, vmID string) (*metrics.Metr
}

// CreateSnapshot Creates a snapshot of a VM
func (o *Orchestrator) CreateSnapshot(ctx context.Context, vmID string) error {
func (o *Orchestrator) CreateSnapshot(ctx context.Context, vmID string, snap *snapshotting.Snapshot) error {
logger := log.WithFields(log.Fields{"vmID": vmID})
logger.Debug("Orchestrator received CreateSnapshot")

ctx = namespaces.WithNamespace(ctx, namespaceName)

req := &proto.CreateSnapshotRequest{
VMID: vmID,
SnapshotFilePath: o.getSnapshotFile(vmID),
MemFilePath: o.getMemoryFile(vmID),
SnapshotFilePath: snap.GetSnapshotFilePath(),
MemFilePath: snap.GetMemFilePath(),
}

if _, err := o.fcClient.CreateSnapshot(ctx, req); err != nil {
Expand All @@ -413,7 +414,7 @@ func (o *Orchestrator) CreateSnapshot(ctx context.Context, vmID string) error {
}

// LoadSnapshot Loads a snapshot of a VM
func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID string) (*metrics.Metric, error) {
func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID string, snap *snapshotting.Snapshot) (*StartVMResponse, *metrics.Metric, error) {
var (
loadSnapshotMetric *metrics.Metric = metrics.NewMetric()
tStart time.Time
Expand All @@ -424,18 +425,26 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID string) (*metrics.
logger := log.WithFields(log.Fields{"vmID": vmID})
logger.Debug("Orchestrator received LoadSnapshot")

vm, err := o.vmPool.GetVM(vmID)
if err != nil {
if _, ok := err.(*misc.NonExistErr); ok {
logger.Panic("LoadSnapshot: VM does not exist")
}
logger.Panic("LoadSnapshot: GetVM() failed for an unknown reason")
}

ctx = namespaces.WithNamespace(ctx, namespaceName)

req := &proto.LoadSnapshotRequest{
VMID: vmID,
SnapshotFilePath: o.getSnapshotFile(vmID),
MemFilePath: o.getMemoryFile(vmID),
SnapshotFilePath: snap.GetSnapshotFilePath(),
MemFilePath: snap.GetMemFilePath(),
EnableUserPF: o.GetUPFEnabled(),
}

if o.GetUPFEnabled() {
if err := o.memoryManager.FetchState(vmID); err != nil {
return nil, err
return nil, nil, err
}
}

Expand All @@ -461,10 +470,10 @@ func (o *Orchestrator) LoadSnapshot(ctx context.Context, vmID string) (*metrics.

if loadErr != nil || activateErr != nil {
multierr := multierror.Of(loadErr, activateErr)
return nil, multierr
return nil, nil, multierr
}

return loadSnapshotMetric, nil
return &StartVMResponse{GuestIP: vm.Ni.PrimaryAddress}, loadSnapshotMetric, nil
}

// Offload Shuts down the VM but leaves shim and other resources running.
Expand Down
4 changes: 3 additions & 1 deletion ctriface/iface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"testing"
"time"

"github.com/vhive-serverless/vhive/snapshotting"
ctrdlog "github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -75,7 +76,8 @@ func TestPauseSnapResume(t *testing.T) {
err = orch.PauseVM(ctx, vmID)
require.NoError(t, err, "Failed to pause VM")

err = orch.CreateSnapshot(ctx, vmID)
snap := snapshotting.NewSnapshot(vmID, "/fccd/snapshots", testImageName)
err = orch.CreateSnapshot(ctx, vmID, snap)
require.NoError(t, err, "Failed to create snapshot of VM")

_, err = orch.ResumeVM(ctx, vmID)
Expand Down
Loading

0 comments on commit e7caff8

Please sign in to comment.