Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

66 changes: 66 additions & 0 deletions Cargo.nix

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ snafu = "0.8"
strum = { version = "0.27", features = ["derive"] }
tokio = { version = "1.45", features = ["full"] }
tracing = "0.1"
uuid = "1.18"

#[patch."https://github.com/stackabletech/operator-rs"]
# stackable-operator = { git = "https://github.com/stackabletech//operator-rs.git", branch = "main" }
1 change: 1 addition & 0 deletions deploy/helm/opensearch-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ spec:
type: object
roleGroups:
additionalProperties:
description: Variant of [`stackable_operator::role_utils::GenericProductSpecificCommonConfig`] that implements [`Merge`]
properties:
cliOverrides:
additionalProperties:
Expand Down
1 change: 1 addition & 0 deletions rust/operator-binary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ snafu.workspace = true
strum.workspace = true
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true

[build-dependencies]
built.workspace = true
Expand Down
93 changes: 61 additions & 32 deletions rust/operator-binary/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
//! Controller for [`v1alpha1::OpenSearchCluster`]
//!
//! The cluster specification is validated, Kubernetes resource specifications are created and
//! applied and the cluster status is updated.

use std::{collections::BTreeMap, marker::PhantomData, str::FromStr, sync::Arc};

use apply::Applier;
Expand Down Expand Up @@ -28,8 +33,8 @@ use crate::{
v1alpha1::{self},
},
framework::{
ClusterName, ControllerName, HasNamespace, HasObjectName, HasUid, IsLabelValue,
OperatorName, ProductName, ProductVersion, RoleGroupName, RoleName,
ClusterName, ControllerName, HasName, HasUid, NameIsValidLabelValue, NamespaceName,
OperatorName, ProductName, ProductVersion, RoleGroupName, RoleName, Uid,
role_utils::{GenericProductSpecificCommonConfig, RoleGroupConfig},
},
};
Expand All @@ -39,12 +44,17 @@ mod build;
mod update_status;
mod validate;

/// Names in the controller context which are passed to the submodules of the controller
///
/// The names are not directly defined in [`Context`] because not every submodule requires a
/// Kubernetes client and unit testing is easier without an unnecessary client.
pub struct ContextNames {
pub product_name: ProductName,
pub operator_name: OperatorName,
pub controller_name: ControllerName,
}

/// The controller context
pub struct Context {
client: stackable_operator::client::Client,
names: ContextNames,
Expand Down Expand Up @@ -113,6 +123,7 @@ type OpenSearchRoleGroupConfig =
type OpenSearchNodeResources =
stackable_operator::commons::resources::Resources<v1alpha1::StorageConfig>;

/// The validated [`v1alpha1::OpenSearchConfig`]
#[derive(Clone, Debug, PartialEq)]
pub struct ValidatedOpenSearchConfig {
pub affinity: StackableAffinity,
Expand All @@ -122,19 +133,23 @@ pub struct ValidatedOpenSearchConfig {
pub listener_class: String,
}

// validated and converted to validated and safe types
// no user errors
// not restricted by CRD compliance
/// The validated [`v1alpha1::OpenSearchCluster`]
///
/// Validated means that there should be no reason for Kubernetes to reject resources generated
/// from these values. This is usually achieved by using fail-safe types. For instance, the cluster
/// name is wrapped in the type [`ClusterName`]. This type implements e.g. the function
/// [`ClusterName::to_label_value`] which returns a valid label value as string. If this function
/// is used as intended, i.e. to set a label value, and if it is used as late as possible in the
/// call chain, then chances are high that the resulting Kubernetes resource is valid.
#[derive(Clone, Debug, PartialEq)]
pub struct ValidatedCluster {
metadata: ObjectMeta,
pub image: ProductImage,
pub product_version: ProductVersion,
pub name: ClusterName,
pub namespace: String,
pub uid: String,
pub namespace: NamespaceName,
pub uid: Uid,
pub role_config: GenericRoleConfig,
// "validated" means that labels are valid and no ugly rolegroup name broke them
pub role_group_configs: BTreeMap<RoleGroupName, OpenSearchRoleGroupConfig>,
}

Expand All @@ -143,16 +158,17 @@ impl ValidatedCluster {
image: ProductImage,
product_version: ProductVersion,
name: ClusterName,
namespace: String,
uid: String,
namespace: NamespaceName,
uid: impl Into<Uid>,
role_config: GenericRoleConfig,
role_group_configs: BTreeMap<RoleGroupName, OpenSearchRoleGroupConfig>,
) -> Self {
let uid = uid.into();
ValidatedCluster {
metadata: ObjectMeta {
name: Some(name.to_object_name()),
namespace: Some(namespace.clone()),
uid: Some(uid.clone()),
name: Some(name.to_string()),
namespace: Some(namespace.to_string()),
uid: Some(uid.to_string()),
..ObjectMeta::default()
},
image,
Expand All @@ -165,21 +181,25 @@ impl ValidatedCluster {
}
}

/// Returns the one role name
pub fn role_name() -> RoleName {
RoleName::from_str("nodes").expect("should be a valid role name")
}

/// Returns true if only a single OpenSearch node is defined in the cluster
pub fn is_single_node(&self) -> bool {
self.node_count() == 1
}

/// Returns the sum of the replicas in all role-groups
pub fn node_count(&self) -> u32 {
self.role_group_configs
.values()
.map(|rg| rg.replicas as u32)
.sum()
}

/// Returns all role-group configurations which contain the given node role
pub fn role_group_configs_filtered_by_node_role(
&self,
node_role: &v1alpha1::NodeRole,
Expand All @@ -192,27 +212,20 @@ impl ValidatedCluster {
}
}

impl HasObjectName for ValidatedCluster {
fn to_object_name(&self) -> String {
self.name.to_object_name()
}
}

impl HasNamespace for ValidatedCluster {
fn to_namespace(&self) -> String {
self.namespace.clone()
impl HasName for ValidatedCluster {
fn to_name(&self) -> String {
self.name.to_string()
}
}

impl HasUid for ValidatedCluster {
fn to_uid(&self) -> String {
fn to_uid(&self) -> Uid {
self.uid.clone()
}
}

impl IsLabelValue for ValidatedCluster {
impl NameIsValidLabelValue for ValidatedCluster {
fn to_label_value(&self) -> String {
// opinionated!
self.name.to_label_value()
}
}
Expand Down Expand Up @@ -259,6 +272,13 @@ pub fn error_policy(
}
}

/// Reconcile function of the OpenSearchCluster controller
///
/// The reconcile function performs the following steps:
/// 1. Validate the given cluster specification and return a [`ValidatedCluster`] if successful.
/// 2. Build Kubernetes resource specifications from the validated cluster.
/// 3. Apply the Kubernetes resource specifications
/// 4. Update the cluster status
pub async fn reconcile(
object: Arc<DeserializeGuard<v1alpha1::OpenSearchCluster>>,
context: Arc<Context>,
Expand All @@ -271,7 +291,7 @@ pub async fn reconcile(
.map_err(stackable_operator::kube::core::error_boundary::InvalidObject::clone)
.context(DeserializeClusterDefinitionSnafu)?;

// dereference (client required)
// not necessary in this controller: dereference (client required)

// validate (no client required)
let validated_cluster = validate(&context.names, cluster).context(ValidateClusterSnafu)?;
Expand All @@ -284,14 +304,16 @@ pub async fn reconcile(
let applied_resources = Applier::new(
&context.client,
&context.names,
&validated_cluster,
&validated_cluster.name,
&validated_cluster.namespace,
&validated_cluster.uid,
apply_strategy,
)
.apply(prepared_resources)
.await
.context(ApplyResourcesSnafu)?;

// create discovery ConfigMap based on the applied resources (client required)
// not necessary in this controller: create discovery ConfigMap based on the applied resources (client required)

// update status (client required)
update_status(&context.client, &context.names, cluster, applied_resources)
Expand All @@ -301,10 +323,16 @@ pub async fn reconcile(
Ok(Action::await_change())
}

// Marker
/// Marker for prepared Kubernetes resources which are not applied yet
struct Prepared;
/// Marker for applied Kubernetes resources
struct Applied;

/// List of all Kubernetes resources produced by this controller
///
/// `T` is a marker that indicates if these resources are only [`Prepared`] or already [`Applied`].
/// The marker is useful e.g. to ensure that the cluster status is updated based on the applied
/// resources.
struct KubernetesResources<T> {
stateful_sets: Vec<StatefulSet>,
services: Vec<Service>,
Expand All @@ -324,13 +352,14 @@ mod tests {
commons::affinity::StackableAffinity, k8s_openapi::api::core::v1::PodTemplateSpec,
role_utils::GenericRoleConfig,
};
use uuid::uuid;

use super::{Context, OpenSearchRoleGroupConfig, ValidatedCluster};
use crate::{
controller::{OpenSearchNodeResources, ValidatedOpenSearchConfig},
crd::{NodeRoles, v1alpha1},
framework::{
ClusterName, OperatorName, ProductVersion, RoleGroupName,
ClusterName, NamespaceName, OperatorName, ProductVersion, RoleGroupName,
builder::pod::container::EnvVarSet, role_utils::GenericProductSpecificCommonConfig,
},
};
Expand Down Expand Up @@ -400,8 +429,8 @@ mod tests {
.expect("should be a valid ProductImage structure"),
ProductVersion::from_str_unsafe("3.1.0"),
ClusterName::from_str_unsafe("my-opensearch"),
"default".to_owned(),
"e6ac237d-a6d4-43a1-8135-f36506110912".to_owned(),
NamespaceName::from_str_unsafe("default"),
uuid!("e6ac237d-a6d4-43a1-8135-f36506110912"),
GenericRoleConfig::default(),
[
(
Expand Down
Loading