Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow local live-migration between storage pools #1410

Merged
merged 5 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions cmd/incusd/instance_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
dbCluster "github.com/lxc/incus/v6/internal/server/db/cluster"
"github.com/lxc/incus/v6/internal/server/db/operationtype"
"github.com/lxc/incus/v6/internal/server/instance"
"github.com/lxc/incus/v6/internal/server/instance/instancetype"
"github.com/lxc/incus/v6/internal/server/operations"
"github.com/lxc/incus/v6/internal/server/project"
"github.com/lxc/incus/v6/internal/server/request"
Expand Down Expand Up @@ -234,9 +235,15 @@ func instancePost(d *Daemon, r *http.Request) response.Response {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved statelessly"))
}

// Storage pool changes require a stopped instance.
// Storage pool changes require a target flag.
if req.Pool != "" {
return response.BadRequest(fmt.Errorf("Instance must be stopped to be moved across storage pools"))
if inst.Type() != instancetype.VM {
return response.BadRequest(fmt.Errorf("Storage pool change supported only by virtual-machines"))
}

if target == "" {
return response.BadRequest(fmt.Errorf("Storage pool can be specified only together with target flag"))
}
}

// Project changes require a stopped instance.
Expand Down Expand Up @@ -433,7 +440,7 @@ func instancePost(d *Daemon, r *http.Request) response.Response {
}

// Cross-server instance migration.
ws, err := newMigrationSource(inst, req.Live, req.InstanceOnly, req.AllowInconsistent, "", req.Target)
ws, err := newMigrationSource(inst, req.Live, req.InstanceOnly, req.AllowInconsistent, "", "", req.Target)
if err != nil {
return response.InternalError(err)
}
Expand Down Expand Up @@ -476,6 +483,11 @@ func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance
return fmt.Errorf("Failed loading instance storage pool: %w", err)
}

// Check that we're not requested to move to the same storage pool we're currently use.
if req.Pool != "" && req.Pool == sourcePool.Name() {
return fmt.Errorf("Requested storage pool is the same as current pool")
}

// Get the DB volume type for the instance.
volType, err := storagePools.InstanceTypeToVolumeType(inst.Type())
if err != nil {
Expand Down Expand Up @@ -593,8 +605,8 @@ func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance
req.Name = ""
}

// Handle pool and project moves.
if req.Project != "" || req.Pool != "" {
// Handle pool and project moves for stopped instances.
if (req.Project != "" || req.Pool != "") && !req.Live {
// Get a local client.
args := &incus.ConnectionArgs{
SkipGetServer: true,
Expand Down Expand Up @@ -756,7 +768,7 @@ func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance
req.Project = ""
}

// Handle remote migrations (location changes).
// Handle remote migrations (location and storage pool changes).
if targetMemberInfo != nil && inst.Location() != targetMemberInfo.Name {
// Get the client.
networkCert := s.Endpoints.NetworkCert()
Expand Down Expand Up @@ -794,7 +806,7 @@ func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance
}

// Setup a new migration source.
sourceMigration, err := newMigrationSource(inst, req.Live, false, req.AllowInconsistent, inst.Name(), nil)
sourceMigration, err := newMigrationSource(inst, req.Live, false, req.AllowInconsistent, inst.Name(), req.Pool, nil)
if err != nil {
return fmt.Errorf("Failed setting up instance migration on source: %w", err)
}
Expand Down Expand Up @@ -918,8 +930,9 @@ func migrateInstance(ctx context.Context, s *state.State, inst instance.Instance
return err
}

// Cleanup instance paths on source member if using remote shared storage.
if sourcePool.Driver().Info().Remote {
// Cleanup instance paths on source member if using remote shared storage
// and there was no storage pool change.
if sourcePool.Driver().Info().Remote && req.Pool == "" {
err = sourcePool.CleanupInstancePaths(inst, nil)
if err != nil {
return fmt.Errorf("Failed cleaning up instance paths on source member: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/incusd/instance_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func snapshotPost(s *state.State, r *http.Request, snapInst instance.Instance) r
}
}

ws, err := newMigrationSource(snapInst, reqNew.Live, true, false, "", req.Target)
ws, err := newMigrationSource(snapInst, reqNew.Live, true, false, "", "", req.Target)
if err != nil {
return response.SmartError(err)
}
Expand Down
44 changes: 44 additions & 0 deletions cmd/incusd/instances_post.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -363,6 +364,7 @@ func createFromMigration(ctx context.Context, s *state.State, r *http.Request, p
ClusterMoveSourceName: clusterMoveSourceName,
Refresh: req.Source.Refresh,
RefreshExcludeOlder: req.Source.RefreshExcludeOlder,
StoragePool: storagePool,
}

sink, err := newMigrationSink(&migrationArgs)
Expand All @@ -388,6 +390,48 @@ func createFromMigration(ctx context.Context, s *state.State, r *http.Request, p
}

instOp.Done(nil) // Complete operation that was created earlier, to release lock.

// Update root device for instance.
err = s.DB.Cluster.Transaction(context.Background(), func(ctx context.Context, tx *db.ClusterTx) error {
devs := inst.LocalDevices().CloneNative()
rootDevKey, _, err := internalInstance.GetRootDiskDevice(devs)
if err != nil {
if !errors.Is(err, internalInstance.ErrNoRootDisk) {
return err
}

rootDev := map[string]string{}
rootDev["type"] = "disk"
rootDev["path"] = "/"
rootDev["pool"] = storagePool

devs["root"] = rootDev
} else {
// Apply the override.
devs[rootDevKey]["pool"] = storagePool
}

devices, err := dbCluster.APIToDevices(devs)
if err != nil {
return err
}

id, err := dbCluster.GetInstanceID(ctx, tx.Tx(), inst.Project().Name, inst.Name())
if err != nil {
return fmt.Errorf("Failed to get ID of moved instance: %w", err)
}

err = dbCluster.UpdateInstanceDevices(ctx, tx.Tx(), int64(id), devices)
if err != nil {
return err
}

return nil
})
if err != nil {
return err
}

runRevert.Success()

return instanceCreateFinish(s, req, args, op)
Expand Down
6 changes: 4 additions & 2 deletions cmd/incusd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type migrationFields struct {
// storage specific fields
volumeOnly bool
allowInconsistent bool
storagePool string
}

func (c *migrationFields) send(m proto.Message) error {
Expand Down Expand Up @@ -207,8 +208,9 @@ type migrationSinkArgs struct {
Snapshots []*migration.Snapshot

// Storage specific fields
VolumeOnly bool
VolumeSize int64
StoragePool string
VolumeOnly bool
VolumeSize int64

// Transport specific fields
RsyncFeatures []string
Expand Down
6 changes: 5 additions & 1 deletion cmd/incusd/migrate_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (
"github.com/lxc/incus/v6/shared/logger"
)

func newMigrationSource(inst instance.Instance, stateful bool, instanceOnly bool, allowInconsistent bool, clusterMoveSourceName string, pushTarget *api.InstancePostTarget) (*migrationSourceWs, error) {
func newMigrationSource(inst instance.Instance, stateful bool, instanceOnly bool, allowInconsistent bool, clusterMoveSourceName string, storagePool string, pushTarget *api.InstancePostTarget) (*migrationSourceWs, error) {
ret := migrationSourceWs{
migrationFields: migrationFields{
instance: inst,
allowInconsistent: allowInconsistent,
storagePool: storagePool,
},
clusterMoveSourceName: clusterMoveSourceName,
}
Expand Down Expand Up @@ -144,6 +145,7 @@ func (s *migrationSourceWs) Do(state *state.State, migrateOp *operations.Operati
}
},
ClusterMoveSourceName: s.clusterMoveSourceName,
StoragePool: s.storagePool,
},
AllowInconsistent: s.allowInconsistent,
})
Expand All @@ -164,6 +166,7 @@ func newMigrationSink(args *migrationSinkArgs) (*migrationSink, error) {
instance: args.Instance,
instanceOnly: args.InstanceOnly,
live: args.Live,
storagePool: args.StoragePool,
},
url: args.URL,
clusterMoveSourceName: args.ClusterMoveSourceName,
Expand Down Expand Up @@ -275,6 +278,7 @@ func (c *migrationSink) Do(state *state.State, instOp *operationlock.InstanceOpe
}
},
ClusterMoveSourceName: c.clusterMoveSourceName,
StoragePool: c.storagePool,
},
InstanceOperation: instOp,
Refresh: c.refresh,
Expand Down
4 changes: 4 additions & 0 deletions doc/api-extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2656,3 +2656,7 @@ The following configuration options have been added:
* `initial.gid`
* `initial.mode`
* `initial.uid`

## `storage_live_migration`

This adds support for virtual-machines live-migration between storage pools.
52 changes: 31 additions & 21 deletions internal/server/instance/drivers/driver_qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -6693,6 +6693,7 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
VolumeOnly: !args.Snapshots,
Info: &localMigration.Info{Config: srcConfig},
ClusterMove: args.ClusterMoveSourceName != "",
StorageMove: args.StoragePool != "",
}

// Only send the snapshots that the target requests when refreshing.
Expand Down Expand Up @@ -6784,7 +6785,7 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
defer instanceRefClear(d)
}

err = d.migrateSendLive(pool, args.ClusterMoveSourceName, blockSize, filesystemConn, stateConn, volSourceArgs)
err = d.migrateSendLive(pool, args.ClusterMoveSourceName, args.StoragePool, blockSize, filesystemConn, stateConn, volSourceArgs)
if err != nil {
return err
}
Expand Down Expand Up @@ -6823,7 +6824,7 @@ func (d *qemu) MigrateSend(args instance.MigrateSendArgs) error {
}

// migrateSendLive performs live migration send process.
func (d *qemu) migrateSendLive(pool storagePools.Pool, clusterMoveSourceName string, rootDiskSize int64, filesystemConn io.ReadWriteCloser, stateConn io.ReadWriteCloser, volSourceArgs *localMigration.VolumeSourceArgs) error {
func (d *qemu) migrateSendLive(pool storagePools.Pool, clusterMoveSourceName string, storagePool string, rootDiskSize int64, filesystemConn io.ReadWriteCloser, stateConn io.ReadWriteCloser, volSourceArgs *localMigration.VolumeSourceArgs) error {
monitor, err := qmp.Connect(d.monitorPath(), qemuSerialChardevName, d.getMonitorEventHandler(), d.QMPLogFilePath())
if err != nil {
return err
Expand All @@ -6833,14 +6834,14 @@ func (d *qemu) migrateSendLive(pool storagePools.Pool, clusterMoveSourceName str
nbdTargetDiskName := "incus_root_nbd" // Name of NBD disk device added to local VM to sync to.
rootSnapshotDiskName := "incus_root_snapshot" // Name of snapshot disk device to use.

// If we are performing an intra-cluster member move on a Ceph storage pool then we can treat this as
// shared storage and avoid needing to sync the root disk.
sharedStorage := clusterMoveSourceName != "" && pool.Driver().Info().Remote
// If we are performing an intra-cluster member move on a Ceph storage pool without storage change
// then we can treat this as shared storage and avoid needing to sync the root disk.
sameSharedStorage := clusterMoveSourceName != "" && pool.Driver().Info().Remote && storagePool == ""

revert := revert.New()

// Non-shared storage snapshot setup.
if !sharedStorage {
if !sameSharedStorage {
// Setup migration capabilities.
capabilities := map[string]bool{
// Automatically throttle down the guest to speed up convergence of RAM migration.
Expand Down Expand Up @@ -7003,7 +7004,7 @@ func (d *qemu) migrateSendLive(pool storagePools.Pool, clusterMoveSourceName str
}

// Non-shared storage snapshot transfer.
if !sharedStorage {
if !sameSharedStorage {
listener, err := net.Listen("unix", "")
if err != nil {
return fmt.Errorf("Failed creating NBD unix listener: %w", err)
Expand Down Expand Up @@ -7097,7 +7098,7 @@ func (d *qemu) migrateSendLive(pool storagePools.Pool, clusterMoveSourceName str
}

// Non-shared storage snapshot transfer finalization.
if !sharedStorage {
if !sameSharedStorage {
// Wait until state transfer has reached pre-switchover state (the guest OS will remain paused).
err = monitor.MigrateWait("pre-switchover")
if err != nil {
Expand Down Expand Up @@ -7195,19 +7196,26 @@ func (d *qemu) MigrateReceive(args instance.MigrateReceiveArgs) error {
// record because it may be associated to the wrong cluster member. Instead we ascertain the pool to load
// using the instance's root disk device.
if args.ClusterMoveSourceName == d.name {
_, rootDiskDevice, err := d.getRootDiskDevice()
if err != nil {
return fmt.Errorf("Failed getting root disk: %w", err)
}
if args.StoragePool != "" {
d.storagePool, err = storagePools.LoadByName(d.state, args.StoragePool)
if err != nil {
return fmt.Errorf("Failed loading storage pool: %w", err)
}
} else {
_, rootDiskDevice, err := d.getRootDiskDevice()
if err != nil {
return fmt.Errorf("Failed getting root disk: %w", err)
}

if rootDiskDevice["pool"] == "" {
return fmt.Errorf("The instance's root device is missing the pool property")
}
if rootDiskDevice["pool"] == "" {
return fmt.Errorf("The instance's root device is missing the pool property")
}

// Initialize the storage pool cache.
d.storagePool, err = storagePools.LoadByName(d.state, rootDiskDevice["pool"])
if err != nil {
return fmt.Errorf("Failed loading storage pool: %w", err)
// Initialize the storage pool cache.
d.storagePool, err = storagePools.LoadByName(d.state, rootDiskDevice["pool"])
if err != nil {
return fmt.Errorf("Failed loading storage pool: %w", err)
}
}
}

Expand Down Expand Up @@ -7424,6 +7432,7 @@ func (d *qemu) MigrateReceive(args instance.MigrateReceiveArgs) error {
VolumeSize: offerHeader.GetVolumeSize(), // Block size setting override.
VolumeOnly: !args.Snapshots,
ClusterMoveSourceName: args.ClusterMoveSourceName,
StoragePool: args.StoragePool,
}

// At this point we have already figured out the parent instances's root
Expand Down Expand Up @@ -7503,6 +7512,7 @@ func (d *qemu) MigrateReceive(args instance.MigrateReceiveArgs) error {
// Setup the volume entry.
extraTargetArgs := localMigration.VolumeTargetArgs{
ClusterMoveSourceName: args.ClusterMoveSourceName,
StoragePool: args.StoragePool,
}

vol := diskPool.GetVolume(storageDrivers.VolumeTypeCustom, storageDrivers.ContentTypeBlock, project.StorageVolume(storageProjectName, dev.Config["source"]), nil)
Expand Down Expand Up @@ -7546,8 +7556,8 @@ func (d *qemu) MigrateReceive(args instance.MigrateReceiveArgs) error {
}

// Populate the filesystem connection handle if doing non-shared storage migration.
sharedStorage := args.ClusterMoveSourceName != "" && poolInfo.Remote
if !sharedStorage {
sameSharedStorage := args.ClusterMoveSourceName != "" && poolInfo.Remote && args.StoragePool == ""
if !sameSharedStorage {
d.migrationReceiveStateful[api.SecretNameFilesystem] = filesystemConn
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/server/instance/instance_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ type MigrateArgs struct {
Live bool
Disconnect func()
ClusterMoveSourceName string // Will be empty if not a cluster move, othwise indicates the source instance.
StoragePool string
}

// MigrateSendArgs represent arguments for instance migration send.
Expand Down
2 changes: 2 additions & 0 deletions internal/server/migration/migration_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type VolumeSourceArgs struct {
Info *Info
VolumeOnly bool
ClusterMove bool
StorageMove bool
}

// VolumeTargetArgs represents the arguments needed to setup a volume migration sink.
Expand All @@ -79,6 +80,7 @@ type VolumeTargetArgs struct {
ContentType string
VolumeOnly bool
ClusterMoveSourceName string
StoragePool string
}

// TypesToHeader converts one or more Types to a MigrationHeader. It uses the first type argument
Expand Down
4 changes: 2 additions & 2 deletions internal/server/storage/drivers/driver_ceph_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (d *ceph) CreateVolumeFromCopy(vol Volume, srcVol Volume, copySnapshots boo

// CreateVolumeFromMigration creates a volume being sent via a migration.
func (d *ceph) CreateVolumeFromMigration(vol Volume, conn io.ReadWriteCloser, volTargetArgs localMigration.VolumeTargetArgs, preFiller *VolumeFiller, op *operations.Operation) error {
if volTargetArgs.ClusterMoveSourceName != "" {
if volTargetArgs.ClusterMoveSourceName != "" && volTargetArgs.StoragePool == "" {
err := vol.EnsureMountPath()
if err != nil {
return err
Expand Down Expand Up @@ -1353,7 +1353,7 @@ func (d *ceph) RenameVolume(vol Volume, newVolName string, op *operations.Operat

// MigrateVolume sends a volume for migration.
func (d *ceph) MigrateVolume(vol Volume, conn io.ReadWriteCloser, volSrcArgs *localMigration.VolumeSourceArgs, op *operations.Operation) error {
if volSrcArgs.ClusterMove {
if volSrcArgs.ClusterMove && !volSrcArgs.StorageMove {
return nil // When performing a cluster member move don't do anything on the source member.
}

Expand Down
Loading