22// This will need changes in our and upstream error types.
33#![ allow( clippy:: result_large_err) ]
44
5- use std:: { os:: unix:: prelude:: FileTypeExt , path:: PathBuf , pin :: pin } ;
5+ use std:: { os:: unix:: prelude:: FileTypeExt , path:: PathBuf } ;
66
7- use anyhow:: Context ;
7+ use anyhow:: { Context , anyhow } ;
88use clap:: Parser ;
99use csi_server:: {
1010 controller:: SecretProvisionerController , identity:: SecretProvisionerIdentity ,
1111 node:: SecretProvisionerNode ,
1212} ;
13- use futures:: { FutureExt , TryStreamExt } ;
13+ use futures:: { FutureExt , TryFutureExt , TryStreamExt , try_join } ;
1414use grpc:: csi:: v1:: {
1515 controller_server:: ControllerServer , identity_server:: IdentityServer , node_server:: NodeServer ,
1616} ;
17- use stackable_operator:: { CustomResourceExt , cli:: ProductOperatorRun , telemetry:: Tracing } ;
18- use tokio:: signal:: unix:: { SignalKind , signal} ;
17+ use stackable_operator:: {
18+ YamlSchema ,
19+ cli:: { CommonOptions , RunArguments } ,
20+ client:: Client ,
21+ kvp:: { Label , LabelExt } ,
22+ shared:: yaml:: SerializeOptions ,
23+ telemetry:: Tracing ,
24+ webhook:: servers:: ConversionWebhookServer ,
25+ } ;
26+ use tokio:: {
27+ signal:: unix:: { SignalKind , signal} ,
28+ sync:: oneshot,
29+ } ;
1930use tokio_stream:: wrappers:: UnixListenerStream ;
2031use tonic:: transport:: Server ;
2132use utils:: { TonicUnixStream , uds_bind_private} ;
@@ -32,6 +43,7 @@ mod truststore_controller;
3243mod utils;
3344
3445pub const OPERATOR_NAME : & str = "secrets.stackable.tech" ;
46+ pub const FIELD_MANAGER : & str = "secret-operator" ;
3547
3648#[ derive( clap:: Parser ) ]
3749#[ clap( author, version) ]
@@ -55,7 +67,7 @@ struct SecretOperatorRun {
5567 privileged : bool ,
5668
5769 #[ clap( flatten) ]
58- common : ProductOperatorRun ,
70+ common : RunArguments ,
5971}
6072
6173mod built_info {
@@ -76,12 +88,16 @@ async fn main() -> anyhow::Result<()> {
7688 csi_endpoint,
7789 privileged,
7890 common :
79- ProductOperatorRun {
91+ RunArguments {
92+ common :
93+ CommonOptions {
94+ telemetry,
95+ cluster_info,
96+ } ,
8097 product_config : _,
8198 watch_namespace,
82- operator_environment : _,
83- telemetry,
84- cluster_info,
99+ operator_environment,
100+ maintenance,
85101 } ,
86102 } ) => {
87103 // NOTE (@NickLarsenNZ): Before stackable-telemetry was used:
@@ -105,46 +121,125 @@ async fn main() -> anyhow::Result<()> {
105121 & cluster_info,
106122 )
107123 . await ?;
124+
108125 if csi_endpoint
109126 . symlink_metadata ( )
110127 . is_ok_and ( |meta| meta. file_type ( ) . is_socket ( ) )
111128 {
112129 let _ = std:: fs:: remove_file ( & csi_endpoint) ;
113130 }
131+
132+ // NOTE (@Techassi): This could maybe be moved into a setup function again. For now,
133+ // it is here.
134+ let crds_and_handlers = [
135+ (
136+ SecretClass :: merged_crd ( SecretClassVersion :: V1Alpha2 ) ?,
137+ SecretClass :: try_convert as fn ( _) -> _ ,
138+ ) ,
139+ (
140+ TrustStore :: merged_crd ( TrustStoreVersion :: V1Alpha1 ) ?,
141+ TrustStore :: try_convert as fn ( _) -> _ ,
142+ ) ,
143+ ] ;
144+
145+ let ( conversion_webhook, crd_maintainer, initial_reconcile_rx) =
146+ ConversionWebhookServer :: with_maintainer (
147+ crds_and_handlers,
148+ & operator_environment. operator_service_name ,
149+ & operator_environment. operator_namespace ,
150+ FIELD_MANAGER ,
151+ maintenance. disable_crd_maintenance ,
152+ client. as_kube_client ( ) ,
153+ )
154+ . await
155+ . context ( "failed to create conversion webhook server and CRD maintainer" ) ?;
156+
114157 let mut sigterm = signal ( SignalKind :: terminate ( ) ) ?;
115- let csi_server = pin ! (
116- Server :: builder( )
117- . add_service(
118- tonic_reflection:: server:: Builder :: configure( )
119- . include_reflection_service( true )
120- . register_encoded_file_descriptor_set( grpc:: FILE_DESCRIPTOR_SET_BYTES )
121- . build_v1( ) ?,
158+ let csi_server = Server :: builder ( )
159+ . add_service (
160+ tonic_reflection:: server:: Builder :: configure ( )
161+ . include_reflection_service ( true )
162+ . register_encoded_file_descriptor_set ( grpc:: FILE_DESCRIPTOR_SET_BYTES )
163+ . build_v1 ( ) ?,
164+ )
165+ . add_service ( IdentityServer :: new ( SecretProvisionerIdentity ) )
166+ . add_service ( ControllerServer :: new ( SecretProvisionerController {
167+ client : client. clone ( ) ,
168+ } ) )
169+ . add_service ( NodeServer :: new ( SecretProvisionerNode {
170+ client : client. clone ( ) ,
171+ node_name : cluster_info. kubernetes_node_name . to_owned ( ) ,
172+ privileged,
173+ } ) )
174+ . serve_with_incoming_shutdown (
175+ UnixListenerStream :: new (
176+ uds_bind_private ( csi_endpoint) . context ( "failed to bind CSI listener" ) ?,
122177 )
123- . add_service( IdentityServer :: new( SecretProvisionerIdentity ) )
124- . add_service( ControllerServer :: new( SecretProvisionerController {
125- client: client. clone( ) ,
126- } ) )
127- . add_service( NodeServer :: new( SecretProvisionerNode {
128- client: client. clone( ) ,
129- node_name: cluster_info. kubernetes_node_name. to_owned( ) ,
130- privileged,
131- } ) )
132- . serve_with_incoming_shutdown(
133- UnixListenerStream :: new(
134- uds_bind_private( csi_endpoint)
135- . context( "failed to bind CSI listener" ) ?,
136- )
137- . map_ok( TonicUnixStream ) ,
138- sigterm. recv( ) . map( |_| ( ) ) ,
139- )
140- ) ;
178+ . map_ok ( TonicUnixStream ) ,
179+ sigterm. recv ( ) . map ( |_| ( ) ) ,
180+ )
181+ . map_err ( |err| anyhow ! ( err) . context ( "failed to run csi server" ) ) ;
182+
141183 let truststore_controller =
142- pin ! ( truststore_controller:: start( & client, & watch_namespace) . map( Ok ) ) ;
143- futures:: future:: select ( csi_server, truststore_controller)
144- . await
145- . factor_first ( )
146- . 0 ?;
184+ truststore_controller:: start ( & client, & watch_namespace) . map ( anyhow:: Ok ) ;
185+
186+ let conversion_webhook = conversion_webhook
187+ . run ( )
188+ . map_err ( |err| anyhow ! ( err) . context ( "failed to run conversion webhook" ) ) ;
189+
190+ let crd_maintainer = crd_maintainer
191+ . run ( )
192+ . map_err ( |err| anyhow ! ( err) . context ( "failed to run CRD maintainer" ) ) ;
193+
194+ let default_secretclass = create_default_secretclass (
195+ initial_reconcile_rx,
196+ operator_environment. operator_namespace . clone ( ) ,
197+ client. clone ( ) ,
198+ )
199+ . map_err ( |err| anyhow ! ( err) . context ( "failed to apply default custom resources" ) ) ;
200+
201+ try_join ! (
202+ csi_server,
203+ truststore_controller,
204+ conversion_webhook,
205+ crd_maintainer,
206+ default_secretclass,
207+ ) ?;
147208 }
148209 }
149210 Ok ( ( ) )
150211}
212+
213+ async fn create_default_secretclass (
214+ initial_reconcile_rx : oneshot:: Receiver < ( ) > ,
215+ operator_namespace : String ,
216+ client : Client ,
217+ ) -> anyhow:: Result < ( ) > {
218+ initial_reconcile_rx. await ?;
219+
220+ tracing:: info!( "applying default secretclass" ) ;
221+
222+ let deserializer = serde_yaml:: Deserializer :: from_slice ( include_bytes ! ( "secretclass.yaml" ) ) ;
223+ let mut tls_secret_class: v1alpha2:: SecretClass =
224+ serde_yaml:: with:: singleton_map_recursive:: deserialize ( deserializer)
225+ . expect ( "compile-time included secretclass must be valid YAML" ) ;
226+
227+ #[ rustfmt:: skip]
228+ let managed_by = Label :: managed_by ( OPERATOR_NAME , "secretclass" ) . expect ( "managed-by label must be valid" ) ;
229+ let version = Label :: version ( built_info:: PKG_VERSION ) . expect ( "version label must be valid" ) ;
230+ let instance = Label :: instance ( OPERATOR_NAME ) . expect ( "instance label must be valid" ) ;
231+ let name = Label :: name ( OPERATOR_NAME ) . expect ( "name label must be valid" ) ;
232+
233+ tls_secret_class
234+ . add_label ( managed_by)
235+ . add_label ( version)
236+ . add_label ( instance)
237+ . add_label ( name)
238+ . add_label ( Label :: stackable_vendor ( ) ) ;
239+
240+ tls_secret_class. metadata . namespace = Some ( operator_namespace) ;
241+
242+ client. create_if_missing ( & tls_secret_class) . await ?;
243+
244+ Ok ( ( ) )
245+ }
0 commit comments