Skip to content

Commit 5b7a4f6

Browse files
Techassisbernauer
andauthored
feat!: Add CRD maintainer (#1099)
* feat: Add CRD maintainer * docs: Add/improve various (doc) comments * fix: Use default crypto provider for TLS server * chore(stackable-operator): Gate maintainer behind webhook feature * fix(stackable-operator): Make options fields public * feat(stackable-operator): Add create_if_missing method to client Co-authored-by: Sebastian Bernauer <sebastian.bernauer@stackable.de> * chore(webhook): Add changelog entry * chore(operator): Add changelog entry * chore: Streamline feature gate * docs(operator): Add example in doc comment * refactor(operator): Adjust oneshot channel handling * chore: Apply suggestions Co-authored-by: Sebastian Bernauer <sebastian.bernauer@stackable.de> * chore(webhook): Update dev comment * fix(operator): Import ensure! macro * feat: Add ConversionWebhookServer::with_maintainer * chore: Apply suggestion Co-authored-by: Sebastian Bernauer <sebastian.bernauer@stackable.de> * chore: Adjust changelog entry Co-authored-by: Sebastian Bernauer <sebastian.bernauer@stackable.de> * docs(webhook): Add example for ConversionWebhookServer::with_maintainer * refactor(webhook): Make field manager a separate field * test(webhook): Adjust doc tests * refactor(webhook): Rename operator_name to operator_service_name --------- Co-authored-by: Sebastian Bernauer <sebastian.bernauer@stackable.de>
1 parent ebf9e20 commit 5b7a4f6

File tree

10 files changed

+541
-279
lines changed

10 files changed

+541
-279
lines changed

crates/stackable-operator/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@ All notable changes to this project will be documented in this file.
1414

1515
### Added
1616

17+
- Add `CustomResourceDefinitionMaintainer` which applies and patches CRDs triggered by TLS
18+
certificate rotations of the `ConversionWebhookServer`. It additionally provides a `oneshot`
19+
channel which can for example be used to trigger creation/patching of any custom resources deployed by
20+
the operator ([#1099]).
21+
- Add a `Client::create_if_missing` associated function to create a resource if it doesn't
22+
exist ([#1099]).
1723
- Add CLI argument and env var to disable the end-of-support checker: `EOS_DISABLED` (`--eos-disabled`) ([#1101]).
1824
- Add end-of-support checker ([#1096], [#1103]).
1925
- The EoS checker can be constructed using `EndOfSupportChecker::new()`.
@@ -34,6 +40,7 @@ All notable changes to this project will be documented in this file.
3440

3541
[#1096]: https://github.com/stackabletech/operator-rs/pull/1096
3642
[#1098]: https://github.com/stackabletech/operator-rs/pull/1098
43+
[#1099]: https://github.com/stackabletech/operator-rs/pull/1099
3744
[#1101]: https://github.com/stackabletech/operator-rs/pull/1101
3845
[#1103]: https://github.com/stackabletech/operator-rs/pull/1103
3946

crates/stackable-operator/src/client.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,25 @@ impl Client {
253253
})
254254
}
255255

256+
/// Optionally creates a resource if it does not exist yet.
257+
///
258+
/// The name used for lookup is extracted from the resource via [`ResourceExt::name_any()`].
259+
/// This function either returns the existing resource or the newly created one.
260+
pub async fn create_if_missing<T>(&self, resource: &T) -> Result<T>
261+
where
262+
T: Clone + Debug + DeserializeOwned + Resource + Serialize + GetApi,
263+
<T as Resource>::DynamicType: Default,
264+
{
265+
if let Some(r) = self
266+
.get_opt(&resource.name_any(), resource.get_namespace())
267+
.await?
268+
{
269+
return Ok(r);
270+
}
271+
272+
self.create(resource).await
273+
}
274+
256275
/// Patches a resource using the `MERGE` patch strategy described
257276
/// in [JSON Merge Patch](https://tools.ietf.org/html/rfc7386)
258277
/// This will fail for objects that do not exist yet.

crates/stackable-webhook/CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file.
44

55
## [Unreleased]
66

7+
### Changed
8+
9+
- BREAKING: `ConversionWebhookServer::new` now returns a pair of values ([#1099]):
10+
- The conversion webhook server itself
11+
- A `mpsc::Receiver<Certificate>` to provide consumers the newly generated TLS certificate
12+
- BREAKING: Constants for ports, IP addresses and socket addresses are now associated constants on
13+
`(Conversion)WebhookServer` instead of free-standing ones ([#1099]).
14+
15+
### Removed
16+
17+
- BREAKING: The `maintain_crds` and `field_manager` fields in `ConversionWebhookOptions`
18+
are removed ([#1099]).
19+
20+
[#1099]: https://github.com/stackabletech/operator-rs/pull/1099
21+
722
## [0.6.0] - 2025-09-09
823

924
### Added

crates/stackable-webhook/src/constants.rs

Lines changed: 0 additions & 21 deletions
This file was deleted.

crates/stackable-webhook/src/lib.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
//! enable complete control over these details if needed.
2727
//!
2828
//! [1]: crate::servers::ConversionWebhookServer
29+
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
30+
31+
use ::x509_cert::Certificate;
2932
use axum::{Router, routing::get};
3033
use futures_util::{FutureExt as _, pin_mut, select};
3134
use snafu::{ResultExt, Snafu};
@@ -35,19 +38,16 @@ use tokio::{
3538
sync::mpsc,
3639
};
3740
use tower::ServiceBuilder;
38-
use x509_cert::Certificate;
3941

40-
// use tower_http::trace::TraceLayer;
42+
// Selected re-exports
43+
pub use crate::options::WebhookOptions;
4144
use crate::tls::TlsServer;
4245

43-
pub mod constants;
46+
pub mod maintainer;
4447
pub mod options;
4548
pub mod servers;
4649
pub mod tls;
4750

48-
// Selected re-exports
49-
pub use crate::options::WebhookOptions;
50-
5151
/// A generic webhook handler receiving a request and sending back a response.
5252
///
5353
/// This trait is not intended to be implemented by external crates and this
@@ -86,6 +86,19 @@ pub struct WebhookServer {
8686
}
8787

8888
impl WebhookServer {
89+
/// The default HTTPS port `8443`
90+
pub const DEFAULT_HTTPS_PORT: u16 = 8443;
91+
/// The default IP address [`Ipv4Addr::UNSPECIFIED`] (`0.0.0.0`) the webhook server binds to,
92+
/// which represents binding on all network addresses.
93+
//
94+
// TODO: We might want to switch to `Ipv6Addr::UNSPECIFIED)` here, as this *normally* binds to IPv4
95+
// and IPv6. However, it's complicated and depends on the underlying system...
96+
// If we do so, we should set `set_only_v6(false)` on the socket to not rely on system defaults.
97+
pub const DEFAULT_LISTEN_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
98+
/// The default socket address `0.0.0.0:8443` the webhook server binds to.
99+
pub const DEFAULT_SOCKET_ADDRESS: SocketAddr =
100+
SocketAddr::new(Self::DEFAULT_LISTEN_ADDRESS, Self::DEFAULT_HTTPS_PORT);
101+
89102
/// Creates a new ready-to-use webhook server.
90103
///
91104
/// The server listens on `socket_addr` which is provided via the [`WebhookOptions`] and handles
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
use k8s_openapi::{
2+
ByteString,
3+
apiextensions_apiserver::pkg::apis::apiextensions::v1::{
4+
CustomResourceConversion, CustomResourceDefinition, ServiceReference, WebhookClientConfig,
5+
WebhookConversion,
6+
},
7+
};
8+
use kube::{
9+
Api, Client, ResourceExt,
10+
api::{Patch, PatchParams},
11+
};
12+
use snafu::{ResultExt, Snafu, ensure};
13+
use tokio::sync::{mpsc, oneshot};
14+
use x509_cert::{
15+
Certificate,
16+
der::{EncodePem, pem::LineEnding},
17+
};
18+
19+
#[derive(Debug, Snafu)]
20+
pub enum Error {
21+
#[snafu(display("failed to encode CA certificate as PEM format"))]
22+
EncodeCertificateAuthorityAsPem { source: x509_cert::der::Error },
23+
24+
#[snafu(display("failed to send initial CRD reconcile heartbeat"))]
25+
SendInitialReconcileHeartbeat,
26+
27+
#[snafu(display("failed to patch CRD {crd_name:?}"))]
28+
PatchCrd {
29+
source: kube::Error,
30+
crd_name: String,
31+
},
32+
}
33+
34+
/// Maintains various custom resource definitions.
35+
///
36+
/// When running this, the following operations are done:
37+
///
38+
/// - Apply the CRDs when starting up
39+
/// - Reconcile the CRDs when the conversion webhook certificate is rotated
40+
pub struct CustomResourceDefinitionMaintainer<'a> {
41+
client: Client,
42+
certificate_rx: mpsc::Receiver<Certificate>,
43+
44+
definitions: Vec<CustomResourceDefinition>,
45+
options: CustomResourceDefinitionMaintainerOptions<'a>,
46+
47+
initial_reconcile_tx: oneshot::Sender<()>,
48+
}
49+
50+
impl<'a> CustomResourceDefinitionMaintainer<'a> {
51+
/// Creates and returns a new [`CustomResourceDefinitionMaintainer`] which manages one or more
52+
/// custom resource definitions.
53+
///
54+
/// ## Parameters
55+
///
56+
/// This function expects four parameters:
57+
///
58+
/// - `client`: A [`Client`] to interact with the Kubernetes API server. It continuously patches
59+
/// the CRDs when the TLS certificate is rotated.
60+
/// - `certificate_rx`: A [`mpsc::Receiver`] to receive newly generated TLS certificates. The
61+
/// certificate data sent through the channel is used to set the caBundle in the conversion
62+
/// section of the CRD.
63+
/// - `definitions`: An iterator of [`CustomResourceDefinition`]s which should be maintained
64+
/// by this maintainer. If the iterator is empty, the maintainer returns early without doing
65+
/// any work. As such, a polling mechanism which waits for all futures should be used to
66+
/// prevent premature termination of the operator.
67+
/// - `options`: Provides [`CustomResourceDefinitionMaintainerOptions`] to customize various
68+
/// parts of the maintainer. In the future, this will be converted to a builder, to enable a
69+
/// cleaner API interface.
70+
///
71+
/// ## Return Values
72+
///
73+
/// This function returns a 2-tuple (pair) of values:
74+
///
75+
/// - The [`CustomResourceDefinitionMaintainer`] itself. This is used to run the maintainer.
76+
/// See [`CustomResourceDefinitionMaintainer::run`] for more details.
77+
/// - The [`oneshot::Receiver`] which will be used to send out a message once the initial
78+
/// CRD reconciliation ran. This signal can be used to trigger the deployment of custom
79+
/// resources defined by the maintained CRDs.
80+
///
81+
/// ## Example
82+
///
83+
/// ```no_run
84+
/// # use stackable_operator::crd::s3::{S3Connection, S3ConnectionVersion, S3Bucket, S3BucketVersion};
85+
/// # use tokio::sync::mpsc::channel;
86+
/// # use x509_cert::Certificate;
87+
/// # use kube::Client;
88+
/// use stackable_webhook::maintainer::{
89+
/// CustomResourceDefinitionMaintainerOptions,
90+
/// CustomResourceDefinitionMaintainer,
91+
/// };
92+
///
93+
/// # #[tokio::main]
94+
/// # async fn main() {
95+
/// # let (certificate_tx, certificate_rx) = channel(1);
96+
/// let options = CustomResourceDefinitionMaintainerOptions {
97+
/// operator_service_name: "my-service-name",
98+
/// operator_namespace: "my-namespace",
99+
/// field_manager: "my-field-manager",
100+
/// webhook_https_port: 8443,
101+
/// disabled: true,
102+
/// };
103+
///
104+
/// let client = Client::try_default().await.unwrap();
105+
///
106+
/// let definitions = vec![
107+
/// S3Connection::merged_crd(S3ConnectionVersion::V1Alpha1).unwrap(),
108+
/// S3Bucket::merged_crd(S3BucketVersion::V1Alpha1).unwrap(),
109+
/// ];
110+
///
111+
/// let (maintainer, initial_reconcile_rx) = CustomResourceDefinitionMaintainer::new(
112+
/// client,
113+
/// certificate_rx,
114+
/// definitions,
115+
/// options,
116+
/// );
117+
/// # }
118+
/// ```
119+
pub fn new(
120+
client: Client,
121+
certificate_rx: mpsc::Receiver<Certificate>,
122+
definitions: impl IntoIterator<Item = CustomResourceDefinition>,
123+
options: CustomResourceDefinitionMaintainerOptions<'a>,
124+
) -> (Self, oneshot::Receiver<()>) {
125+
let (initial_reconcile_tx, initial_reconcile_rx) = oneshot::channel();
126+
127+
let maintainer = Self {
128+
definitions: definitions.into_iter().collect(),
129+
initial_reconcile_tx,
130+
certificate_rx,
131+
options,
132+
client,
133+
};
134+
135+
(maintainer, initial_reconcile_rx)
136+
}
137+
138+
/// Runs the [`CustomResourceDefinitionMaintainer`] asynchronously.
139+
///
140+
/// This needs to be polled in parallel with other parts of an operator, like controllers or
141+
/// webhook servers. If it is disabled, the returned future immediately resolves to
142+
/// [`std::task::Poll::Ready`] and thus doesn't consume any resources.
143+
pub async fn run(mut self) -> Result<(), Error> {
144+
let CustomResourceDefinitionMaintainerOptions {
145+
operator_service_name,
146+
operator_namespace,
147+
webhook_https_port,
148+
field_manager,
149+
disabled,
150+
} = self.options;
151+
152+
// If the maintainer is disabled or there are no custom resource definitions, immediately
153+
// return without doing any work.
154+
if disabled || self.definitions.is_empty() {
155+
return Ok(());
156+
}
157+
158+
// This channel can only be used exactly once. The sender's send method consumes self, and
159+
// as such, the sender is wrapped in an Option to be able to call take to consume the inner
160+
// value.
161+
let mut initial_reconcile_tx = Some(self.initial_reconcile_tx);
162+
163+
// This get's polled by the async runtime on a regular basis (or when woken up). Once we
164+
// receive a message containing the newly generated TLS certificate for the conversion
165+
// webhook, we need to update the caBundle in the CRD.
166+
while let Some(certificate) = self.certificate_rx.recv().await {
167+
tracing::info!(
168+
k8s.crd.names = ?self.definitions.iter().map(CustomResourceDefinition::name_any).collect::<Vec<_>>(),
169+
"reconciling custom resource definitions"
170+
);
171+
172+
// The caBundle needs to be provided as a base64-encoded PEM envelope.
173+
let ca_bundle = certificate
174+
.to_pem(LineEnding::LF)
175+
.context(EncodeCertificateAuthorityAsPemSnafu)?;
176+
177+
let crd_api: Api<CustomResourceDefinition> = Api::all(self.client.clone());
178+
179+
for crd in self.definitions.iter_mut() {
180+
let crd_kind = &crd.spec.names.kind;
181+
let crd_name = crd.name_any();
182+
183+
tracing::debug!(
184+
k8s.crd.kind = crd_kind,
185+
k8s.crd.name = crd_name,
186+
"reconciling custom resource definition"
187+
);
188+
189+
crd.spec.conversion = Some(CustomResourceConversion {
190+
strategy: "Webhook".to_owned(),
191+
webhook: Some(WebhookConversion {
192+
// conversionReviewVersions indicates what ConversionReview versions are
193+
// supported by the webhook. The first version in the list understood by the
194+
// API server is sent to the webhook. The webhook must respond with a
195+
// ConversionReview object in the same version it received. We only support
196+
// the stable v1 ConversionReview to keep the implementation as simple as
197+
// possible.
198+
conversion_review_versions: vec!["v1".to_owned()],
199+
client_config: Some(WebhookClientConfig {
200+
service: Some(ServiceReference {
201+
name: operator_service_name.to_owned(),
202+
namespace: operator_namespace.to_owned(),
203+
path: Some(format!("/convert/{crd_name}")),
204+
port: Some(webhook_https_port.into()),
205+
}),
206+
// Here, ByteString takes care of encoding the provided content as
207+
// base64.
208+
ca_bundle: Some(ByteString(ca_bundle.as_bytes().to_vec())),
209+
url: None,
210+
}),
211+
}),
212+
});
213+
214+
// Deploy the updated CRDs using a server-side apply.
215+
let patch = Patch::Apply(&crd);
216+
let patch_params = PatchParams::apply(field_manager);
217+
crd_api
218+
.patch(&crd_name, &patch_params, &patch)
219+
.await
220+
.with_context(|_| PatchCrdSnafu { crd_name })?;
221+
}
222+
223+
// After the reconciliation of the CRDs, the initial reconcile heartbeat is sent out
224+
// via the oneshot channel.
225+
if let Some(initial_reconcile_tx) = initial_reconcile_tx.take() {
226+
ensure!(
227+
initial_reconcile_tx.send(()).is_ok(),
228+
SendInitialReconcileHeartbeatSnafu
229+
);
230+
}
231+
}
232+
233+
Ok(())
234+
}
235+
}
236+
237+
// TODO (@Techassi): Make this a builder instead
238+
/// This contains required options to customize a [`CustomResourceDefinitionMaintainer`].
239+
pub struct CustomResourceDefinitionMaintainerOptions<'a> {
240+
/// The service name used by the operator/conversion webhook.
241+
pub operator_service_name: &'a str,
242+
243+
/// The namespace the operator/conversion webhook runs in.
244+
pub operator_namespace: &'a str,
245+
246+
/// The field manager used when maintaining the CRDs.
247+
pub field_manager: &'a str,
248+
249+
/// The HTTPS port the conversion webhook listens on.
250+
pub webhook_https_port: u16,
251+
252+
/// Indicates if the maintainer should be disabled.
253+
pub disabled: bool,
254+
}

0 commit comments

Comments
 (0)