Skip to content

Commit

Permalink
Enable instance create saga to create a new disk (#1873)
Browse files Browse the repository at this point in the history
* Start wiring up disk-create in instance-create

* Attach disk after its created

* Flatten attach

* Maybe fix create enum

* Implement undo

* Update subsaga_append to take a dag instead of builder

* Wire up disk create subsaga

* Fix lint error

* Update fn name to be singular to clarify it attaches 1 disk

* Improve attach/detaching to not create a saga as a node

* Clean up attach iteration logic

* Pass direct references down instead of disk array

* Fix disk create subsaga params; clean up comments

* Appease ye almighty clippy

* Update attach params shape to include serialized_athn; project_id

* Address nit

* Add TODO on correctness of disk lookup by name

* instance_id isn't in the subsaga context, pass it down instead

* Fix clippy failures

* Add integration test to create and attach disks via instance create

* Add test for undos of creates/attaches for failed instance create saga

* Reduce disk sizes to hopefully resolve zpool failures

* Address PR feedback on tests

* Comment what test is validating
  • Loading branch information
zephraph authored Nov 2, 2022
1 parent c6bb16d commit 2b3a643
Show file tree
Hide file tree
Showing 2 changed files with 296 additions and 123 deletions.
213 changes: 90 additions & 123 deletions nexus/src/app/sagas/instance_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use super::{NexusActionContext, NexusSaga, SagaInitError, ACTION_GENERATE_ID};
use crate::app::sagas::disk_create::{self, SagaDiskCreate};
use crate::app::sagas::NexusAction;
use crate::app::{
MAX_DISKS_PER_INSTANCE, MAX_EXTERNAL_IPS_PER_INSTANCE,
Expand All @@ -17,6 +18,7 @@ use crate::{authn, authz, db};
use chrono::Utc;
use lazy_static::lazy_static;
use nexus_defaults::DEFAULT_PRIMARY_NIC_NAME;
use nexus_types::external_api::params::InstanceDiskAttachment;
use omicron_common::api::external::Error;
use omicron_common::api::external::Generation;
use omicron_common::api::external::IdentityMetadataCreateParams;
Expand Down Expand Up @@ -61,12 +63,12 @@ struct NetParams {
new_id: Uuid,
}

// The disk-related nodes get a similar treatment, but the data they need are
// different.
#[derive(Debug, Deserialize, Serialize)]
struct DiskParams {
saga_params: Params,
which: usize,
struct DiskAttachParams {
serialized_authn: authn::saga::Serialized,
project_id: Uuid,
instance_id: Uuid,
attach_params: InstanceDiskAttachment,
}

// instance create saga: actions
Expand Down Expand Up @@ -103,15 +105,10 @@ lazy_static! {
sic_allocate_instance_external_ip,
sic_allocate_instance_external_ip_undo,
);
static ref CREATE_DISKS_FOR_INSTANCE: NexusAction = ActionFunc::new_action(
"instance-create.create-disks-for-instance",
sic_create_disks_for_instance,
sic_create_disks_for_instance_undo,
);
static ref ATTACH_DISKS_TO_INSTANCE: NexusAction = ActionFunc::new_action(
"instance-create.attach-disks-to-instance",
sic_attach_disks_to_instance,
sic_attach_disks_to_instance_undo,
sic_attach_disk_to_instance,
sic_attach_disk_to_instance_undo,
);
static ref INSTANCE_ENSURE: NexusAction = new_action_noop_undo(
"instance-create.instance-ensure",
Expand All @@ -134,7 +131,6 @@ impl NexusSaga for SagaInstanceCreate {
registry.register(Arc::clone(&*CREATE_NETWORK_INTERFACE));
registry.register(Arc::clone(&*CREATE_SNAT_IP));
registry.register(Arc::clone(&*CREATE_EXTERNAL_IP));
registry.register(Arc::clone(&*CREATE_DISKS_FOR_INSTANCE));
registry.register(Arc::clone(&*ATTACH_DISKS_TO_INSTANCE));
registry.register(Arc::clone(&*INSTANCE_ENSURE));
}
Expand Down Expand Up @@ -179,7 +175,7 @@ impl NexusSaga for SagaInstanceCreate {
// Helper function for appending subsagas to our parent saga.
fn subsaga_append<S: Serialize>(
node_basename: &'static str,
subsaga_builder: steno::DagBuilder,
subsaga_dag: steno::Dag,
parent_builder: &mut steno::DagBuilder,
params: S,
which: usize,
Expand All @@ -198,7 +194,7 @@ impl NexusSaga for SagaInstanceCreate {
let output_name = format!("{}{}", node_basename, which);
parent_builder.append(Node::subsaga(
output_name.as_str(),
subsaga_builder.build()?,
subsaga_dag,
params_node_name,
));
Ok(())
Expand Down Expand Up @@ -243,7 +239,7 @@ impl NexusSaga for SagaInstanceCreate {
));
subsaga_append(
"network_interface",
subsaga_builder,
subsaga_builder.build()?,
&mut builder,
repeat_params,
i,
Expand Down Expand Up @@ -281,36 +277,56 @@ impl NexusSaga for SagaInstanceCreate {
));
subsaga_append(
"external_ip",
subsaga_builder,
subsaga_builder.build()?,
&mut builder,
repeat_params,
i,
)?;
}

// See the comment above where we add nodes for creating NICs. We use
// the same pattern here.
for i in 0..(MAX_DISKS_PER_INSTANCE as usize) {
let disk_params =
DiskParams { saga_params: params.clone(), which: i };
// Appends the disk create saga as a subsaga directly to the instance create builder.
for (i, disk) in params.create_params.disks.iter().enumerate() {
if let InstanceDiskAttachment::Create(create_disk) = disk {
let subsaga_name =
SagaName::new(&format!("instance-create-disk-{i}"));
let subsaga_builder = DagBuilder::new(subsaga_name);
let params = disk_create::Params {
serialized_authn: params.serialized_authn.clone(),
project_id: params.project_id,
create_params: create_disk.clone(),
};
subsaga_append(
"create_disk",
SagaDiskCreate::make_saga_dag(&params, subsaga_builder)?,
&mut builder,
params,
i,
)?;
}
}

// Attaches all disks included in the instance create request, including those which were previously created
// by the disk create subsagas.
for (i, disk_attach) in params.create_params.disks.iter().enumerate() {
let subsaga_name =
SagaName::new(&format!("instance-create-disk{i}"));
SagaName::new(&format!("instance-attach-disk-{i}"));
let mut subsaga_builder = DagBuilder::new(subsaga_name);
subsaga_builder.append(Node::action(
"create_disk_output",
format!("CreateDisksForInstance-{i}").as_str(),
CREATE_DISKS_FOR_INSTANCE.as_ref(),
));
subsaga_builder.append(Node::action(
"attach_disk_output",
format!("AttachDisksToInstance-{i}").as_str(),
ATTACH_DISKS_TO_INSTANCE.as_ref(),
));
let params = DiskAttachParams {
serialized_authn: params.serialized_authn.clone(),
project_id: params.project_id,
instance_id,
attach_params: disk_attach.clone(),
};
subsaga_append(
"disk",
subsaga_builder,
"attach_disk",
subsaga_builder.build()?,
&mut builder,
disk_params,
params,
i,
)?;
}
Expand Down Expand Up @@ -704,50 +720,13 @@ async fn sic_allocate_instance_external_ip_undo(
Ok(())
}

/// Create disks during instance creation, and return a list of disk names
// TODO implement
async fn sic_create_disks_for_instance(
sagactx: NexusActionContext,
) -> Result<Option<String>, ActionError> {
let disk_params = sagactx.saga_params::<DiskParams>()?;
let saga_params = disk_params.saga_params;
let disk_index = disk_params.which;
let saga_disks = &saga_params.create_params.disks;

if disk_index >= saga_disks.len() {
return Ok(None);
}

let disk = &saga_disks[disk_index];

match disk {
params::InstanceDiskAttachment::Create(_create_params) => {
return Err(ActionError::action_failed(
"Creating disk during instance create unsupported!".to_string(),
));
}

_ => {}
}

Ok(None)
}

/// Undo disks created during instance creation
// TODO implement
async fn sic_create_disks_for_instance_undo(
_sagactx: NexusActionContext,
) -> Result<(), anyhow::Error> {
Ok(())
}

async fn sic_attach_disks_to_instance(
async fn sic_attach_disk_to_instance(
sagactx: NexusActionContext,
) -> Result<(), ActionError> {
ensure_instance_disk_attach_state(sagactx, true).await
}

async fn sic_attach_disks_to_instance_undo(
async fn sic_attach_disk_to_instance_undo(
sagactx: NexusActionContext,
) -> Result<(), anyhow::Error> {
Ok(ensure_instance_disk_attach_state(sagactx, false).await?)
Expand All @@ -758,64 +737,52 @@ async fn ensure_instance_disk_attach_state(
attached: bool,
) -> Result<(), ActionError> {
let osagactx = sagactx.user_data();
let disk_params = sagactx.saga_params::<DiskParams>()?;
let saga_params = disk_params.saga_params;
let disk_index = disk_params.which;
let opctx =
OpContext::for_saga_action(&sagactx, &saga_params.serialized_authn);

let saga_disks = &saga_params.create_params.disks;
let instance_name =
db::model::Name(saga_params.create_params.identity.name);
let params = sagactx.saga_params::<DiskAttachParams>()?;
let datastore = osagactx.datastore();
let opctx = OpContext::for_saga_action(&sagactx, &params.serialized_authn);
let instance_id = params.instance_id;
let project_id = params.project_id;

if disk_index >= saga_disks.len() {
return Ok(());
}
let disk_name = match params.attach_params {
InstanceDiskAttachment::Create(create_params) => {
db::model::Name(create_params.identity.name)
}
InstanceDiskAttachment::Attach(attach_params) => {
db::model::Name(attach_params.name)
}
};

let disk = &saga_disks[disk_index];
let (.., authz_instance, _db_instance) =
LookupPath::new(&opctx, &datastore)
.instance_id(instance_id)
.fetch()
.await
.map_err(ActionError::action_failed)?;

// TODO-correctness TODO-security It's not correct to re-resolve the
// organization and project names now. See oxidecomputer/omicron#1536.
let organization_name: db::model::Name =
saga_params.organization_name.clone().into();
let project_name: db::model::Name = saga_params.project_name.clone().into();

match disk {
params::InstanceDiskAttachment::Create(_) => {
// TODO grab disks created in sic_create_disks_for_instance
return Err(ActionError::action_failed(Error::invalid_request(
"creating disks while creating an instance not supported",
)));
}
params::InstanceDiskAttachment::Attach(instance_disk_attach) => {
let disk_name: db::model::Name =
instance_disk_attach.name.clone().into();

if attached {
osagactx
.nexus()
.instance_attach_disk(
&opctx,
&organization_name,
&project_name,
&instance_name,
&disk_name,
)
.await
} else {
osagactx
.nexus()
.instance_detach_disk(
&opctx,
&organization_name,
&project_name,
&instance_name,
&disk_name,
)
.await
}
// disk name now. See oxidecomputer/omicron#1536.
let (.., authz_disk, _db_disk) = LookupPath::new(&opctx, &datastore)
.project_id(project_id)
.disk_name(&disk_name)
.fetch()
.await
.map_err(ActionError::action_failed)?;

if attached {
datastore
.instance_attach_disk(
&opctx,
&authz_instance,
&authz_disk,
MAX_DISKS_PER_INSTANCE,
)
.await
.map_err(ActionError::action_failed)?;
} else {
datastore
.instance_detach_disk(&opctx, &authz_instance, &authz_disk)
.await
.map_err(ActionError::action_failed)?;
}
}

Ok(())
Expand Down
Loading

0 comments on commit 2b3a643

Please sign in to comment.