Skip to content

Commit

Permalink
fix: invalid objects don't stop the reconciliation (#551)
Browse files Browse the repository at this point in the history
* fix: invalid objects don't stop the reconciliation

* Update rust/operator-binary/src/main.rs

Co-authored-by: Natalie Klestrup Röijezon <teo@nullable.se>

* Update rust/operator-binary/src/main.rs

Co-authored-by: Natalie Klestrup Röijezon <teo@nullable.se>

* Update rust/operator-binary/src/main.rs

Co-authored-by: Natalie Klestrup Röijezon <teo@nullable.se>

* Update rust/operator-binary/src/main.rs

Co-authored-by: Natalie Klestrup Röijezon <teo@nullable.se>

---------

Co-authored-by: Natalie Klestrup Röijezon <teo@nullable.se>
  • Loading branch information
razvan and nightkr authored Oct 24, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent ed7c03b commit 04a3046
Showing 4 changed files with 126 additions and 52 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -14,9 +14,14 @@
- `podOverrides`
- `affinity`

### Fixed

- Invalid `SupersetCluster`, `DruidConnection` or `AuthenticationClass` objects don't stop the operator from reconciling ([#551]).

[#528]: https://github.com/stackabletech/superset-operator/pull/528
[#530]: https://github.com/stackabletech/superset-operator/pull/530
[#549]: https://github.com/stackabletech/superset-operator/pull/549
[#551]: https://github.com/stackabletech/superset-operator/pull/551

## [24.7.0] - 2024-07-24

46 changes: 30 additions & 16 deletions rust/operator-binary/src/druid_connection_controller.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ use stackable_operator::{
},
kube::{
core::DynamicObject,
core::{error_boundary, DeserializeGuard},
runtime::{controller::Action, reflector::ObjectRef},
ResourceExt,
},
@@ -84,6 +85,11 @@ pub enum Error {
SupersetClusterRetrieval {
source: stackable_operator::client::Error,
},

#[snafu(display("DruidConnection object is invalid"))]
InvalidDruidConnection {
source: error_boundary::InvalidObject,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
@@ -110,20 +116,25 @@ impl ReconcilerError for Error {
Error::ApplyServiceAccount { .. } => None,
Error::ApplyRoleBinding { .. } => None,
Error::SupersetClusterRetrieval { .. } => None,
Error::InvalidDruidConnection { .. } => None,
}
}
}

pub async fn reconcile_druid_connection(
druid_connection: Arc<DruidConnection>,
druid_connection: Arc<DeserializeGuard<DruidConnection>>,
ctx: Arc<Ctx>,
) -> Result<Action> {
tracing::info!("Starting reconciling DruidConnections");

let druid_connection = druid_connection
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidDruidConnectionSnafu)?;
let client = &ctx.client;

let (rbac_sa, rbac_rolebinding) =
rbac::build_rbac_resources(druid_connection.as_ref(), APP_NAME);
let (rbac_sa, rbac_rolebinding) = rbac::build_rbac_resources(druid_connection, APP_NAME);
client
.apply_patch(DRUID_CONNECTION_CONTROLLER_NAME, &rbac_sa, &rbac_sa)
.await
@@ -146,7 +157,7 @@ pub async fn reconcile_druid_connection(
&druid_connection.druid_name(),
&druid_connection.druid_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
druid_connection: ObjectRef::from_obj(druid_connection),
},
)?,
)
@@ -159,7 +170,7 @@ pub async fn reconcile_druid_connection(
&druid_connection.superset_name(),
&druid_connection.superset_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
druid_connection: ObjectRef::from_obj(druid_connection),
},
)?,
)
@@ -183,7 +194,7 @@ pub async fn reconcile_druid_connection(
&druid_connection.druid_name(),
&druid_connection.druid_namespace().context(
DruidConnectionNoNamespaceSnafu {
druid_connection: ObjectRef::from_obj(&*druid_connection),
druid_connection: ObjectRef::from_obj(druid_connection),
},
)?,
client,
@@ -195,7 +206,7 @@ pub async fn reconcile_druid_connection(
.resolve(DOCKER_IMAGE_BASE_NAME, crate::built_info::PKG_VERSION);
let job = build_import_job(
&superset_cluster,
&druid_connection,
druid_connection,
&resolved_product_image,
&sqlalchemy_str,
&rbac_sa.name_any(),
@@ -209,7 +220,7 @@ pub async fn reconcile_druid_connection(
client
.apply_patch_status(
DRUID_CONNECTION_CONTROLLER_NAME,
&*druid_connection,
druid_connection,
&s.importing(),
)
.await
@@ -236,11 +247,7 @@ pub async fn reconcile_druid_connection(

if let Some(ns) = new_status {
client
.apply_patch_status(
DRUID_CONNECTION_CONTROLLER_NAME,
&*druid_connection,
&ns,
)
.apply_patch_status(DRUID_CONNECTION_CONTROLLER_NAME, druid_connection, &ns)
.await
.context(ApplyStatusSnafu)?;
}
@@ -253,7 +260,7 @@ pub async fn reconcile_druid_connection(
client
.apply_patch_status(
DRUID_CONNECTION_CONTROLLER_NAME,
&*druid_connection,
druid_connection,
&DruidConnectionStatus::new(),
)
.await
@@ -360,6 +367,13 @@ async fn build_import_job(
Ok(job)
}

pub fn error_policy(_obj: Arc<DruidConnection>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(
_obj: Arc<DeserializeGuard<DruidConnection>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
match error {
Error::InvalidDruidConnection { .. } => Action::await_change(),
_ => Action::requeue(*Duration::from_secs(5)),
}
}
74 changes: 52 additions & 22 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ use stackable_operator::{
core::v1::{ConfigMap, Service},
},
kube::{
core::DeserializeGuard,
runtime::{reflector::ObjectRef, watcher, Controller},
ResourceExt,
},
@@ -86,28 +87,28 @@ async fn main() -> anyhow::Result<()> {
.await?;

let superset_controller_builder = Controller::new(
watch_namespace.get_api::<SupersetCluster>(&client),
watch_namespace.get_api::<DeserializeGuard<SupersetCluster>>(&client),
watcher::Config::default(),
);
let superset_store_1 = superset_controller_builder.store();
let superset_controller = superset_controller_builder
.owns(
watch_namespace.get_api::<Service>(&client),
watch_namespace.get_api::<DeserializeGuard<Service>>(&client),
watcher::Config::default(),
)
.owns(
watch_namespace.get_api::<StatefulSet>(&client),
watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
.watches(
client.get_api::<AuthenticationClass>(&()),
client.get_api::<DeserializeGuard<AuthenticationClass>>(&()),
watcher::Config::default(),
move |authentication_class| {
superset_store_1
.state()
.into_iter()
.filter(move |superset: &Arc<SupersetCluster>| {
.filter(move |superset| {
references_authentication_class(superset, &authentication_class)
})
.map(|superset| ObjectRef::from_obj(&*superset))
@@ -130,7 +131,7 @@ async fn main() -> anyhow::Result<()> {
});

let druid_connection_controller_builder = Controller::new(
watch_namespace.get_api::<DruidConnection>(&client),
watch_namespace.get_api::<DeserializeGuard<DruidConnection>>(&client),
watcher::Config::default(),
);
let druid_connection_store_1 = druid_connection_controller_builder.store();
@@ -139,46 +140,38 @@ async fn main() -> anyhow::Result<()> {
let druid_connection_controller = druid_connection_controller_builder
.shutdown_on_signal()
.watches(
watch_namespace.get_api::<SupersetCluster>(&client),
watch_namespace.get_api::<DeserializeGuard<SupersetCluster>>(&client),
watcher::Config::default(),
move |superset_cluster| {
druid_connection_store_1
.state()
.into_iter()
.filter(move |druid_connection| {
druid_connection.superset_name() == superset_cluster.name_any()
&& druid_connection.superset_namespace().ok()
== superset_cluster.namespace()
valid_druid_connection(&superset_cluster, druid_connection)
})
.map(|druid_connection| ObjectRef::from_obj(&*druid_connection))
},
)
.watches(
watch_namespace.get_api::<Job>(&client),
watch_namespace.get_api::<DeserializeGuard<Job>>(&client),
watcher::Config::default(),
move |job| {
druid_connection_store_2
.state()
.into_iter()
.filter(move |druid_connection| {
druid_connection.metadata.namespace == job.metadata.namespace
&& Some(druid_connection.job_name()) == job.metadata.name
})
.filter(move |druid_connection| valid_druid_job(druid_connection, &job))
.map(|druid_connection| ObjectRef::from_obj(&*druid_connection))
},
)
.watches(
watch_namespace.get_api::<ConfigMap>(&client),
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
watcher::Config::default(),
move |config_map| {
druid_connection_store_3
.state()
.into_iter()
.filter(move |druid_connection| {
druid_connection.druid_namespace().ok()
== config_map.metadata.namespace
&& Some(druid_connection.druid_name())
== config_map.metadata.name
valid_druid_connection_namespace(druid_connection, &config_map)
})
.map(|druid_connection| ObjectRef::from_obj(&*druid_connection))
},
@@ -208,9 +201,13 @@ async fn main() -> anyhow::Result<()> {
}

fn references_authentication_class(
superset: &SupersetCluster,
authentication_class: &AuthenticationClass,
superset: &DeserializeGuard<SupersetCluster>,
authentication_class: &DeserializeGuard<AuthenticationClass>,
) -> bool {
let Ok(superset) = &superset.0 else {
return false;
};

let authentication_class_name = authentication_class.name_any();
superset
.spec
@@ -219,3 +216,36 @@ fn references_authentication_class(
.iter()
.any(|c| c.common.authentication_class_name() == &authentication_class_name)
}

fn valid_druid_connection(
superset_cluster: &DeserializeGuard<SupersetCluster>,
druid_connection: &DeserializeGuard<DruidConnection>,
) -> bool {
let Ok(druid_connection) = &druid_connection.0 else {
return false;
};
druid_connection.superset_name() == superset_cluster.name_any()
&& druid_connection.superset_namespace().ok() == superset_cluster.namespace()
}

fn valid_druid_connection_namespace(
druid_connection: &DeserializeGuard<DruidConnection>,
config_map: &DeserializeGuard<ConfigMap>,
) -> bool {
let Ok(druid_connection) = &druid_connection.0 else {
return false;
};
druid_connection.druid_namespace().ok() == config_map.meta().namespace
&& Some(druid_connection.druid_name()) == config_map.meta().name
}

fn valid_druid_job(
druid_connection: &DeserializeGuard<DruidConnection>,
job: &DeserializeGuard<Job>,
) -> bool {
let Ok(druid_connection) = &druid_connection.0 else {
return false;
};
druid_connection.metadata.namespace == job.meta().namespace
&& Some(druid_connection.job_name()) == job.meta().name
}
Loading

0 comments on commit 04a3046

Please sign in to comment.