@@ -78,6 +78,7 @@ use derivative::Derivative;
7878use differential_dataflow:: lattice:: Lattice ;
7979use futures:: future:: { FutureExt , TryFutureExt } ;
8080use futures:: stream:: StreamExt ;
81+ use itertools:: Itertools ;
8182use rand:: Rng ;
8283use timely:: order:: PartialOrder ;
8384use timely:: progress:: frontier:: MutableAntichain ;
@@ -104,8 +105,8 @@ use mz_dataflow_types::{
104105 Update ,
105106} ;
106107use mz_expr:: {
107- permutation_for_arrangement, CollectionPlan , EvalError , ExprHumanizer , GlobalId ,
108- MirRelationExpr , MirScalarExpr , OptimizedMirRelationExpr , RowSetFinishing ,
108+ permutation_for_arrangement, CollectionPlan , ExprHumanizer , GlobalId , MirRelationExpr ,
109+ MirScalarExpr , OptimizedMirRelationExpr , RowSetFinishing ,
109110} ;
110111use mz_ore:: metrics:: MetricsRegistry ;
111112use mz_ore:: now:: { to_datetime, EpochMillis , NowFn } ;
@@ -116,6 +117,7 @@ use mz_ore::thread::JoinHandleExt;
116117use mz_repr:: adt:: interval:: Interval ;
117118use mz_repr:: adt:: numeric:: { Numeric , NumericMaxScale } ;
118119use mz_repr:: { Datum , Diff , RelationDesc , RelationType , Row , RowArena , ScalarType , Timestamp } ;
120+ use mz_secrets:: { SecretOp , SecretsController } ;
119121use mz_sql:: ast:: display:: AstDisplay ;
120122use mz_sql:: ast:: {
121123 CreateIndexStatement , CreateSinkStatement , CreateSourceStatement , ExplainStage , FetchStatement ,
@@ -257,6 +259,7 @@ pub struct Config {
257259 pub metrics_registry : MetricsRegistry ,
258260 pub persister : PersisterWithConfig ,
259261 pub now : NowFn ,
262+ pub secrets_controller : Box < dyn SecretsController > ,
260263}
261264
262265struct PendingPeek {
@@ -328,6 +331,10 @@ pub struct Coordinator {
328331 write_lock : Arc < tokio:: sync:: Mutex < ( ) > > ,
329332 /// Holds plans deferred due to write lock.
330333 write_lock_wait_group : VecDeque < DeferredPlan > ,
334+
335+ /// Handle to secret manager that can create and delete secrets from
336+ /// an arbitrary secret storage engine.
337+ secrets_controller : Box < dyn SecretsController > ,
331338}
332339
333340/// Metadata about an active connection.
@@ -2118,18 +2125,22 @@ impl Coordinator {
21182125 let evaled = secret. secret_as . eval ( & [ ] , & temp_storage) ?;
21192126
21202127 if evaled == Datum :: Null {
2121- return Err ( CoordError :: Eval ( EvalError :: NullCharacterNotPermitted ) ) ;
2128+ coord_bail ! ( "secret value can not be null" ) ;
21222129 }
21232130
2124- // TODO martin: hook the payload into a secrets backend
2125- let _payload = evaled. unwrap_bytes ( ) ;
2131+ let payload = evaled. unwrap_bytes ( ) ;
21262132
21272133 let id = self . catalog . allocate_user_id ( ) ?;
21282134 let oid = self . catalog . allocate_oid ( ) ?;
21292135 let secret = catalog:: Secret {
21302136 create_sql : format ! ( "CREATE SECRET {} AS '********'" , full_name) ,
21312137 } ;
21322138
2139+ self . secrets_controller . apply ( vec ! [ SecretOp :: Ensure {
2140+ id,
2141+ contents: Vec :: from( payload) ,
2142+ } ] ) ?;
2143+
21332144 let ops = vec ! [ catalog:: Op :: CreateItem {
21342145 id,
21352146 oid,
@@ -2143,7 +2154,18 @@ impl Coordinator {
21432154 kind : catalog:: ErrorKind :: ItemAlreadyExists ( _) ,
21442155 ..
21452156 } ) ) if if_not_exists => Ok ( ExecuteResponse :: CreatedSecret { existed : true } ) ,
2146- Err ( err) => Err ( err) ,
2157+ Err ( err) => {
2158+ match self . secrets_controller . apply ( vec ! [ SecretOp :: Delete { id } ] ) {
2159+ Ok ( _) => { }
2160+ Err ( e) => {
2161+ warn ! (
2162+ "Dropping newly created secrets has encountered an error: {}" ,
2163+ e
2164+ ) ;
2165+ }
2166+ }
2167+ Err ( err)
2168+ }
21472169 }
21482170 }
21492171
@@ -4353,6 +4375,7 @@ impl Coordinator {
43534375 let mut sinks_to_drop = vec ! [ ] ;
43544376 let mut indexes_to_drop = vec ! [ ] ;
43554377 let mut replication_slots_to_drop: HashMap < String , Vec < String > > = HashMap :: new ( ) ;
4378+ let mut secrets_to_drop = vec ! [ ] ;
43564379
43574380 for op in & ops {
43584381 if let catalog:: Op :: DropItem ( id) = op {
@@ -4390,6 +4413,9 @@ impl Coordinator {
43904413 } ) => {
43914414 indexes_to_drop. push ( ( * compute_instance, * id) ) ;
43924415 }
4416+ CatalogItem :: Secret ( _) => {
4417+ secrets_to_drop. push ( * id) ;
4418+ }
43934419 _ => ( ) ,
43944420 }
43954421 }
@@ -4438,6 +4464,9 @@ impl Coordinator {
44384464 if !indexes_to_drop. is_empty ( ) {
44394465 self . drop_indexes ( indexes_to_drop) . await ;
44404466 }
4467+ if !secrets_to_drop. is_empty ( ) {
4468+ self . drop_secrets ( secrets_to_drop) . await ;
4469+ }
44414470
44424471 // We don't want to block the coordinator on an external postgres server, so
44434472 // move the drop slots to a separate task. This does mean that a failed drop
@@ -4613,6 +4642,20 @@ impl Coordinator {
46134642 Ok ( ( ) )
46144643 }
46154644
4645+ async fn drop_secrets ( & mut self , secrets : Vec < GlobalId > ) {
4646+ let ops = secrets
4647+ . into_iter ( )
4648+ . map ( |id| SecretOp :: Delete { id } )
4649+ . collect_vec ( ) ;
4650+
4651+ match self . secrets_controller . apply ( ops) {
4652+ Ok ( _) => { }
4653+ Err ( e) => {
4654+ warn ! ( "Dropping secrets has encountered an error: {}" , e) ;
4655+ }
4656+ }
4657+ }
4658+
46164659 /// Finalizes a dataflow and then broadcasts it to all workers.
46174660 /// Utility method for the more general [Self::ship_dataflows]
46184661 async fn ship_dataflow ( & mut self , dataflow : DataflowDesc , instance : ComputeInstanceId ) {
@@ -4839,6 +4882,7 @@ pub async fn serve(
48394882 metrics_registry,
48404883 persister,
48414884 now,
4885+ secrets_controller,
48424886 } : Config ,
48434887) -> Result < ( Handle , Client ) , CoordError > {
48444888 let ( cmd_tx, cmd_rx) = mpsc:: unbounded_channel ( ) ;
@@ -4885,6 +4929,7 @@ pub async fn serve(
48854929 // for bootstrap completion before proceeding.
48864930 let ( bootstrap_tx, bootstrap_rx) = oneshot:: channel ( ) ;
48874931 let handle = TokioHandle :: current ( ) ;
4932+
48884933 let thread = thread:: Builder :: new ( )
48894934 . name ( "coordinator" . to_string ( ) )
48904935 . spawn ( move || {
@@ -4908,6 +4953,7 @@ pub async fn serve(
49084953 pending_tails : HashMap :: new ( ) ,
49094954 write_lock : Arc :: new ( tokio:: sync:: Mutex :: new ( ( ) ) ) ,
49104955 write_lock_wait_group : VecDeque :: new ( ) ,
4956+ secrets_controller,
49114957 } ;
49124958 let bootstrap = handle. block_on ( coord. bootstrap ( builtin_table_updates) ) ;
49134959 let ok = bootstrap. is_ok ( ) ;
0 commit comments