@@ -298,8 +298,67 @@ impl CdnBackend {
298
298
}
299
299
}
300
300
301
+ /// fully invalidate the CDN distribution, also emptying the queue.
302
+ #[ instrument( skip( conn) ) ]
303
+ pub ( crate ) async fn full_invalidation (
304
+ config : & Config ,
305
+ cdn : & CdnBackend ,
306
+ metrics : & InstanceMetrics ,
307
+ conn : & mut sqlx:: PgConnection ,
308
+ distribution_id : & str ,
309
+ ) -> Result < ( ) > {
310
+ let mut transaction = conn. begin ( ) . await ?;
311
+
312
+ let now = Utc :: now ( ) ;
313
+ for row in sqlx:: query!(
314
+ "SELECT queued
315
+ FROM cdn_invalidation_queue
316
+ WHERE cdn_distribution_id = $1 AND created_in_cdn IS NULL
317
+ FOR UPDATE" ,
318
+ distribution_id,
319
+ )
320
+ . fetch_all ( & mut * transaction)
321
+ . await ?
322
+ {
323
+ if let Ok ( duration) = ( now - row. queued ) . to_std ( ) {
324
+ // This can only fail when the duration is negative, which can't happen anyways
325
+ metrics
326
+ . cdn_queue_time
327
+ . with_label_values ( & [ distribution_id] )
328
+ . observe ( duration_to_seconds ( duration) ) ;
329
+ }
330
+ }
331
+
332
+ match cdn
333
+ . create_invalidation ( distribution_id, & [ "/*" ] )
334
+ . await
335
+ . context ( "error creating new invalidation" )
336
+ {
337
+ Ok ( invalidation) => {
338
+ sqlx:: query!(
339
+ "UPDATE cdn_invalidation_queue
340
+ SET
341
+ created_in_cdn = CURRENT_TIMESTAMP,
342
+ cdn_reference = $1
343
+ WHERE
344
+ cdn_distribution_id = $2 AND created_in_cdn IS NULL" ,
345
+ invalidation. invalidation_id,
346
+ distribution_id,
347
+ )
348
+ . execute ( & mut * transaction)
349
+ . await ?;
350
+
351
+ transaction. commit ( ) . await ?;
352
+ }
353
+ Err ( err) => return Err ( err) ,
354
+ } ;
355
+
356
+ Ok ( ( ) )
357
+ }
358
+
301
359
#[ instrument( skip( conn) ) ]
302
360
pub ( crate ) async fn handle_queued_invalidation_requests (
361
+ config : & Config ,
303
362
cdn : & CdnBackend ,
304
363
metrics : & InstanceMetrics ,
305
364
conn : & mut sqlx:: PgConnection ,
@@ -385,6 +444,24 @@ pub(crate) async fn handle_queued_invalidation_requests(
385
444
return Ok ( ( ) ) ;
386
445
}
387
446
447
+ if let Some ( min_queued) = sqlx:: query_scalar!(
448
+ "SELECT
449
+ min(queued)
450
+ FROM cdn_invalidation_queue
451
+ WHERE
452
+ cdn_distribution_id = $1 AND
453
+ created_in_cdn IS NULL" ,
454
+ distribution_id
455
+ )
456
+ . fetch_one ( & mut * conn)
457
+ . await ?
458
+ {
459
+ if ( now - min_queued) . to_std ( ) . unwrap_or_default ( ) >= config. cdn_max_queued_age {
460
+ full_invalidation ( config, cdn, metrics, conn, distribution_id) . await ?;
461
+ return Ok ( ( ) ) ;
462
+ }
463
+ }
464
+
388
465
// create new an invalidation for the queued path patterns
389
466
let mut transaction = conn. begin ( ) . await ?;
390
467
let mut path_patterns: Vec < String > = Vec :: new ( ) ;
@@ -566,6 +643,8 @@ pub(crate) async fn queued_or_active_crate_invalidation_count_by_distribution(
566
643
567
644
#[ cfg( test) ]
568
645
mod tests {
646
+ use std:: time:: Duration ;
647
+
569
648
use super :: * ;
570
649
use crate :: test:: async_wrapper;
571
650
@@ -671,6 +750,111 @@ mod tests {
671
750
} )
672
751
}
673
752
753
+ #[ test]
754
+ fn escalate_to_full_invalidation ( ) {
755
+ crate :: test:: async_wrapper ( |env| async move {
756
+ env. override_config ( |config| {
757
+ config. cloudfront_distribution_id_web = Some ( "distribution_id_web" . into ( ) ) ;
758
+ config. cloudfront_distribution_id_static = Some ( "distribution_id_static" . into ( ) ) ;
759
+ config. cdn_max_queued_age = Duration :: from_secs ( 0 ) ;
760
+ } ) ;
761
+
762
+ let cdn = env. cdn ( ) . await ;
763
+ let config = env. config ( ) ;
764
+ let mut conn = env. async_db ( ) . await . async_conn ( ) . await ;
765
+ assert ! ( queued_or_active_crate_invalidations( & mut conn)
766
+ . await ?
767
+ . is_empty( ) ) ;
768
+
769
+ queue_crate_invalidation ( & mut conn, & env. config ( ) , "krate" ) . await ?;
770
+
771
+ // invalidation paths are queued.
772
+ assert_eq ! (
773
+ queued_or_active_crate_invalidations( & mut conn)
774
+ . await ?
775
+ . into_iter( )
776
+ . map( |i| (
777
+ i. cdn_distribution_id,
778
+ i. krate,
779
+ i. path_pattern,
780
+ i. cdn_reference
781
+ ) )
782
+ . collect:: <Vec <_>>( ) ,
783
+ vec![
784
+ (
785
+ "distribution_id_web" . into( ) ,
786
+ "krate" . into( ) ,
787
+ "/krate*" . into( ) ,
788
+ None
789
+ ) ,
790
+ (
791
+ "distribution_id_web" . into( ) ,
792
+ "krate" . into( ) ,
793
+ "/crate/krate*" . into( ) ,
794
+ None
795
+ ) ,
796
+ (
797
+ "distribution_id_static" . into( ) ,
798
+ "krate" . into( ) ,
799
+ "/rustdoc/krate*" . into( ) ,
800
+ None
801
+ ) ,
802
+ ]
803
+ ) ;
804
+
805
+ let counts =
806
+ queued_or_active_crate_invalidation_count_by_distribution ( & mut conn, & config)
807
+ . await ?;
808
+ assert_eq ! ( counts. len( ) , 2 ) ;
809
+ assert_eq ! ( * counts. get( "distribution_id_web" ) . unwrap( ) , 2 ) ;
810
+ assert_eq ! ( * counts. get( "distribution_id_static" ) . unwrap( ) , 1 ) ;
811
+
812
+ // queueing the invalidation doesn't create it in the CDN
813
+ assert ! ( active_invalidations( & cdn, "distribution_id_web" ) . is_empty( ) ) ;
814
+ assert ! ( active_invalidations( & cdn, "distribution_id_static" ) . is_empty( ) ) ;
815
+
816
+ let cdn = env. cdn ( ) . await ;
817
+ let config = env. config ( ) ;
818
+
819
+ // now handle the queued invalidations
820
+ handle_queued_invalidation_requests (
821
+ & config,
822
+ & cdn,
823
+ & env. instance_metrics ( ) ,
824
+ & mut conn,
825
+ "distribution_id_web" ,
826
+ )
827
+ . await ?;
828
+ handle_queued_invalidation_requests (
829
+ & config,
830
+ & cdn,
831
+ & env. instance_metrics ( ) ,
832
+ & mut conn,
833
+ "distribution_id_static" ,
834
+ )
835
+ . await ?;
836
+
837
+ // which creates them in the CDN
838
+ {
839
+ let ir_web = active_invalidations ( & cdn, "distribution_id_web" ) ;
840
+ assert_eq ! ( ir_web. len( ) , 1 ) ;
841
+ assert_eq ! ( ir_web[ 0 ] . path_patterns, vec![ "/*" ] ) ;
842
+
843
+ let ir_static = active_invalidations ( & cdn, "distribution_id_static" ) ;
844
+ assert_eq ! ( ir_web. len( ) , 1 ) ;
845
+ assert_eq ! ( ir_static[ 0 ] . path_patterns, vec![ "/*" ] ) ;
846
+ }
847
+
848
+ // the queued entries got a CDN reference attached
849
+ assert ! ( queued_or_active_crate_invalidations( & mut conn)
850
+ . await ?
851
+ . iter( )
852
+ . all( |i| i. cdn_reference. is_some( ) && i. created_in_cdn. is_some( ) ) ) ;
853
+
854
+ Ok ( ( ) )
855
+ } ) ;
856
+ }
857
+
674
858
#[ test]
675
859
fn invalidate_a_crate ( ) {
676
860
crate :: test:: async_wrapper ( |env| async move {
@@ -734,16 +918,19 @@ mod tests {
734
918
assert ! ( active_invalidations( & cdn, "distribution_id_static" ) . is_empty( ) ) ;
735
919
736
920
let cdn = env. cdn ( ) . await ;
921
+ let config = env. config ( ) ;
737
922
738
923
// now handle the queued invalidations
739
924
handle_queued_invalidation_requests (
925
+ & config,
740
926
& cdn,
741
927
& env. instance_metrics ( ) ,
742
928
& mut conn,
743
929
"distribution_id_web" ,
744
930
)
745
931
. await ?;
746
932
handle_queued_invalidation_requests (
933
+ & config,
747
934
& cdn,
748
935
& env. instance_metrics ( ) ,
749
936
& mut conn,
@@ -774,13 +961,15 @@ mod tests {
774
961
775
962
// now handle again
776
963
handle_queued_invalidation_requests (
964
+ & config,
777
965
& cdn,
778
966
& env. instance_metrics ( ) ,
779
967
& mut conn,
780
968
"distribution_id_web" ,
781
969
)
782
970
. await ?;
783
971
handle_queued_invalidation_requests (
972
+ & config,
784
973
& cdn,
785
974
& env. instance_metrics ( ) ,
786
975
& mut conn,
@@ -849,6 +1038,7 @@ mod tests {
849
1038
850
1039
// handle the queued invalidations
851
1040
handle_queued_invalidation_requests (
1041
+ & env. config ( ) ,
852
1042
& * env. cdn ( ) . await ,
853
1043
& env. instance_metrics ( ) ,
854
1044
& mut conn,
@@ -909,6 +1099,7 @@ mod tests {
909
1099
910
1100
// handle the queued invalidations
911
1101
handle_queued_invalidation_requests (
1102
+ & env. config ( ) ,
912
1103
& * env. cdn ( ) . await ,
913
1104
& env. instance_metrics ( ) ,
914
1105
& mut conn,
@@ -937,6 +1128,7 @@ mod tests {
937
1128
938
1129
// now handle again
939
1130
handle_queued_invalidation_requests (
1131
+ & env. config ( ) ,
940
1132
& * env. cdn ( ) . await ,
941
1133
& env. instance_metrics ( ) ,
942
1134
& mut conn,
@@ -976,6 +1168,7 @@ mod tests {
976
1168
977
1169
// run the handler
978
1170
handle_queued_invalidation_requests (
1171
+ & env. config ( ) ,
979
1172
& * env. cdn ( ) . await ,
980
1173
& env. instance_metrics ( ) ,
981
1174
& mut conn,
0 commit comments