Skip to content

Commit 8ce5aeb

Browse files
authored
Add migration endpoints to propolis-server (#69)
This adds the endpoints described in [RFD 71](https://rfd.shared.oxide.computer/rfd/0071) along with a first pass for some of the setup. **New Endpoints:** 1. `/instances/{src-uuid}/migrate/start` — This is the endpoint called by the new (destination) propolis-server instance when its ready to begin the migration process. This request requires no body as we'll perform an HTTP upgrade to reuse the underlying TCP socket as a bidirectional channel to perform the migration over. 2. `/instances/{src-uuid}/migrate/status` — This endpoint allows querying either the source or destination on the current progress of a migration. **Modified Endpoints:** 1. `/instances/{dst-uuid}` — This is the existing "ensure" endpoint, which has been extended with an optional blob describing a source VM from which a new VM will be populated.
1 parent 42ef43f commit 8ce5aeb

File tree

7 files changed

+739
-52
lines changed

7 files changed

+739
-52
lines changed

cli/src/main.rs

Lines changed: 111 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ use std::path::{Path, PathBuf};
44
use std::{
55
net::{IpAddr, SocketAddr, ToSocketAddrs},
66
os::unix::prelude::AsRawFd,
7+
time::Duration,
78
};
89

910
use anyhow::{anyhow, Context};
10-
use futures::{SinkExt, StreamExt};
11+
use futures::{future, SinkExt, StreamExt};
1112
use propolis_client::{
1213
api::{
13-
DiskRequest, InstanceEnsureRequest, InstanceProperties,
14-
InstanceStateRequested,
14+
DiskRequest, InstanceEnsureRequest, InstanceMigrateInitiateRequest,
15+
InstanceProperties, InstanceStateRequested, MigrationState,
1516
},
1617
Client,
1718
};
@@ -50,6 +51,10 @@ enum Command {
5051
/// Instance name
5152
name: String,
5253

54+
/// Instance uuid (if specified)
55+
#[structopt(short = "u")]
56+
uuid: Option<Uuid>,
57+
5358
/// Number of vCPUs allocated to instance
5459
#[structopt(short = "c", default_value = "4")]
5560
vcpus: u8,
@@ -84,6 +89,24 @@ enum Command {
8489
/// Instance name
8590
name: String,
8691
},
92+
93+
/// Migrate instance to new propolis-server
94+
Migrate {
95+
/// Instance name
96+
name: String,
97+
98+
/// Destination propolis-server address
99+
#[structopt(parse(try_from_str = resolve_host))]
100+
dst_server: IpAddr,
101+
102+
/// Destination propolis-server port
103+
#[structopt(short = "p", default_value = "12400")]
104+
dst_port: u16,
105+
106+
/// Uuid for the destination instance
107+
#[structopt(short = "u")]
108+
dst_uuid: Option<Uuid>,
109+
},
87110
}
88111

89112
fn parse_state(state: &str) -> anyhow::Result<InstanceStateRequested> {
@@ -128,13 +151,11 @@ fn create_logger(opt: &Opt) -> Logger {
128151
async fn new_instance(
129152
client: &Client,
130153
name: String,
154+
id: Uuid,
131155
vcpus: u8,
132156
memory: u64,
133157
disks: Vec<DiskRequest>,
134158
) -> anyhow::Result<()> {
135-
// Generate a UUID for the new instance
136-
let id = Uuid::new_v4();
137-
138159
let properties = InstanceProperties {
139160
id,
140161
name,
@@ -151,7 +172,8 @@ async fn new_instance(
151172
properties,
152173
// TODO: Allow specifying NICs
153174
nics: vec![],
154-
disks: disks.to_vec(),
175+
disks,
176+
migrate: None,
155177
};
156178

157179
// Try to create the instance
@@ -261,6 +283,72 @@ async fn serial(
261283
Ok(())
262284
}
263285

286+
async fn migrate_instance(
287+
src_client: Client,
288+
dst_client: Client,
289+
src_name: String,
290+
src_addr: SocketAddr,
291+
dst_uuid: Uuid,
292+
) -> anyhow::Result<()> {
293+
// Grab the src instance UUID
294+
let src_uuid = src_client
295+
.instance_get_uuid(&src_name)
296+
.await
297+
.with_context(|| anyhow!("failed to get src instance UUID"))?;
298+
299+
// Grab the instance details
300+
let src_instance = src_client
301+
.instance_get(src_uuid)
302+
.await
303+
.with_context(|| anyhow!("failed to get src instance properties"))?;
304+
305+
let request = InstanceEnsureRequest {
306+
properties: InstanceProperties {
307+
// Use a new ID for the destination instance we're creating
308+
id: dst_uuid,
309+
..src_instance.instance.properties
310+
},
311+
// TODO: Handle migrating NICs & disks
312+
nics: vec![],
313+
disks: vec![],
314+
migrate: Some(InstanceMigrateInitiateRequest { src_addr, src_uuid }),
315+
};
316+
317+
// Initiate the migration via the destination instance
318+
let migration_id = dst_client
319+
.instance_ensure(&request)
320+
.await?
321+
.migrate
322+
.ok_or_else(|| anyhow!("no migrate id on response"))?
323+
.migration_id;
324+
325+
// Wait for the migration to complete by polling both source and destination
326+
// TODO: replace with into_iter method call after edition upgrade
327+
let handles = IntoIterator::into_iter([
328+
("src", src_client, src_uuid),
329+
("dst", dst_client, dst_uuid),
330+
])
331+
.map(|(role, client, id)| {
332+
tokio::spawn(async move {
333+
loop {
334+
let state = client
335+
.instance_migrate_status(id, migration_id)
336+
.await?
337+
.state;
338+
println!("{}({}) migration state={:?}", role, id, state);
339+
if state == MigrationState::Finish {
340+
return Ok::<_, anyhow::Error>(());
341+
}
342+
tokio::time::sleep(Duration::from_secs(1)).await;
343+
}
344+
})
345+
});
346+
347+
future::join_all(handles).await;
348+
349+
Ok(())
350+
}
351+
264352
#[tokio::main]
265353
async fn main() -> anyhow::Result<()> {
266354
let opt = Opt::from_args();
@@ -270,20 +358,33 @@ async fn main() -> anyhow::Result<()> {
270358
let client = Client::new(addr, log.new(o!()));
271359

272360
match opt.cmd {
273-
Command::New { name, vcpus, memory, crucible_disks } => {
361+
Command::New { name, uuid, vcpus, memory, crucible_disks } => {
274362
let disks = if let Some(crucible_disks) = crucible_disks {
275363
parse_crucible_disks(&crucible_disks)?
276364
} else {
277365
vec![]
278366
};
279-
new_instance(&client, name.to_string(), vcpus, memory, disks)
280-
.await?
367+
new_instance(
368+
&client,
369+
name.to_string(),
370+
uuid.unwrap_or_else(Uuid::new_v4),
371+
vcpus,
372+
memory,
373+
disks,
374+
)
375+
.await?
281376
}
282377
Command::Get { name } => get_instance(&client, name).await?,
283378
Command::State { name, state } => {
284379
put_instance(&client, name, state).await?
285380
}
286381
Command::Serial { name } => serial(&client, addr, name).await?,
382+
Command::Migrate { name, dst_server, dst_port, dst_uuid } => {
383+
let dst_addr = SocketAddr::new(dst_server, dst_port);
384+
let dst_client = Client::new(dst_addr, log.clone());
385+
let dst_uuid = dst_uuid.unwrap_or_else(Uuid::new_v4);
386+
migrate_instance(client, dst_client, name, addr, dst_uuid).await?
387+
}
287388
}
288389

289390
Ok(())

client/src/api.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,63 @@ pub struct InstanceEnsureRequest {
2828

2929
#[serde(default)]
3030
pub disks: Vec<DiskRequest>,
31+
32+
pub migrate: Option<InstanceMigrateInitiateRequest>,
3133
}
3234

3335
#[derive(Clone, Deserialize, Serialize, JsonSchema)]
34-
pub struct InstanceEnsureResponse {}
36+
pub struct InstanceEnsureResponse {
37+
pub migrate: Option<InstanceMigrateInitiateResponse>,
38+
}
39+
40+
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
41+
pub struct InstanceMigrateInitiateRequest {
42+
pub src_addr: SocketAddr,
43+
pub src_uuid: Uuid,
44+
}
45+
46+
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
47+
pub struct InstanceMigrateInitiateResponse {
48+
pub migration_id: Uuid,
49+
}
50+
51+
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
52+
pub struct InstanceMigrateStartRequest {
53+
pub migration_id: Uuid,
54+
}
55+
56+
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
57+
pub struct InstanceMigrateStatusRequest {
58+
pub migration_id: Uuid,
59+
}
60+
61+
#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)]
62+
pub struct InstanceMigrateStatusResponse {
63+
pub state: MigrationState,
64+
}
65+
66+
#[derive(
67+
Clone,
68+
Copy,
69+
Debug,
70+
Deserialize,
71+
PartialEq,
72+
Eq,
73+
PartialOrd,
74+
Ord,
75+
Serialize,
76+
JsonSchema,
77+
)]
78+
pub enum MigrationState {
79+
Sync,
80+
Ram,
81+
Pause,
82+
RamDirty,
83+
Device,
84+
Arch,
85+
Resume,
86+
Finish,
87+
}
3588

3689
#[derive(Clone, Deserialize, Serialize, JsonSchema)]
3790
pub struct InstanceGetResponse {

client/src/lib.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,21 @@ impl Client {
157157
let body = Body::from(serde_json::to_string(&state).unwrap());
158158
self.put_no_response(path, Some(body)).await
159159
}
160+
161+
/// Get the status of an ongoing migration
162+
pub async fn instance_migrate_status(
163+
&self,
164+
id: Uuid,
165+
migration_id: Uuid,
166+
) -> Result<api::InstanceMigrateStatusResponse, Error> {
167+
let path =
168+
format!("http://{}/instances/{}/migrate/status", self.address, id);
169+
let body = Body::from(
170+
serde_json::to_string(&api::InstanceMigrateStatusRequest {
171+
migration_id,
172+
})
173+
.unwrap(),
174+
);
175+
self.get(path, Some(body)).await
176+
}
160177
}

server/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ doc = false
1818

1919
[dependencies]
2020
anyhow = "1.0"
21+
const_format = "0.2"
2122
# dropshot = "0.6"
2223
dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main" }
2324
futures = "0.3"
@@ -28,6 +29,7 @@ tokio-tungstenite = "0.14"
2829
toml = "0.5"
2930
serde = "1.0"
3031
serde_derive = "1.0"
32+
serde_json = "1.0"
3133
slog = "2.7"
3234
structopt = { version = "0.3", default-features = false }
3335
propolis = { path = "../propolis", features = ["crucible"], default-features = false }

server/src/lib/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@
22

33
pub mod config;
44
mod initializer;
5+
mod migrate;
56
mod serial;
67
pub mod server;

0 commit comments

Comments
 (0)