Skip to content

Commit

Permalink
Poll /migrate/status endpoint in propolis-cli during migration.
Browse files Browse the repository at this point in the history
  • Loading branch information
luqmana committed Nov 18, 2021
1 parent fb2c9b7 commit ae37936
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 4 deletions.
34 changes: 31 additions & 3 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::{
net::{IpAddr, SocketAddr, ToSocketAddrs},
os::unix::prelude::AsRawFd,
time::Duration,
};

use anyhow::{anyhow, bail, Context};
use futures::{SinkExt, StreamExt};
use futures::{future, SinkExt, StreamExt};
use propolis_client::{
api::{
DiskRequest, InstanceEnsureRequest, InstanceProperties,
InstanceStateRequested, Slot,
InstanceStateRequested, MigrationState, Slot,
},
Client,
};
Expand Down Expand Up @@ -295,7 +296,34 @@ async fn migrate_instance(
.with_context(|| anyhow!("failed to get src instance UUID"))?;

// Initiate the migration via the destination instance
dst_client.instance_migrate_initiate(dst_uuid, src_uuid, src_addr).await?;
let migration_id = dst_client
.instance_migrate_initiate(dst_uuid, src_uuid, src_addr)
.await?
.migration_id;

// Wait for the migration to complete by polling both source and destination
// TODO: replace with into_iter method call after edition upgrade
let handles = IntoIterator::into_iter([
("src", src_client, src_uuid),
("dst", dst_client, dst_uuid),
])
.map(|(role, client, id)| {
tokio::spawn(async move {
loop {
let state = client
.instance_migrate_status(id, migration_id)
.await?
.state;
println!("{}({}) migration state={:?}", role, id, state);
if state == MigrationState::Finish {
return Ok::<_, anyhow::Error>(());
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
})
});

future::join_all(handles).await;

Ok(())
}
Expand Down
17 changes: 17 additions & 0 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,21 @@ impl Client {
);
self.put(path, Some(body)).await
}

/// Get the status of an ongoing migration
pub async fn instance_migrate_status(
&self,
id: Uuid,
migration_id: Uuid,
) -> Result<api::InstanceMigrateStatusResponse, Error> {
let path =
format!("http://{}/instances/{}/migrate/status", self.address, id);
let body = Body::from(
serde_json::to_string(&api::InstanceMigrateStatusRequest {
migration_id,
})
.unwrap(),
);
self.get(path, Some(body)).await
}
}
12 changes: 11 additions & 1 deletion server/src/lib/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,17 @@ async fn source_migrate_task(
}

// Random state demonstration
migrate_context.set_state(MigrationState::Resume).await;
migrate_context.set_state(MigrationState::Arch).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

for x in 10..20 {
conn.write_u32(x).await.map_err(|_| MigrateError::Protocol)?;
}

// More random state demonstration
migrate_context.set_state(MigrationState::Resume).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

migrate_context.set_state(MigrationState::Finish).await;

info!(log, "Migrate Successful");
Expand Down Expand Up @@ -277,13 +282,18 @@ async fn dest_migrate_task(

// Random state demonstration
migrate_context.set_state(MigrationState::Device).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

for x in 10..20 {
let read = conn.read_u32().await.map_err(|_| MigrateError::Protocol)?;
info!(log, "Dest Read: {:?}", read);
assert_eq!(read, x);
}

// More random state demonstration
migrate_context.set_state(MigrationState::Resume).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

migrate_context.set_state(MigrationState::Finish).await;

info!(log, "Migrate Successful");
Expand Down

0 comments on commit ae37936

Please sign in to comment.