@@ -21,6 +21,7 @@ use nix::fcntl::FcntlArg::F_SETFL;
2121#[ cfg( any( target_os = "linux" , target_os = "android" ) ) ]
2222use nix:: fcntl:: OFlag ;
2323use parseargs:: Parser ;
24+ use progress:: ProgUpdateType ;
2425use progress:: { gen_prog_updater, ProgUpdate , ReadStat , StatusLevel , WriteStat } ;
2526use uucore:: io:: OwnedFileDescriptorOrHandle ;
2627
@@ -39,10 +40,8 @@ use std::os::unix::{
3940#[ cfg( windows) ]
4041use std:: os:: windows:: { fs:: MetadataExt , io:: AsHandle } ;
4142use std:: path:: Path ;
42- use std:: sync:: {
43- atomic:: { AtomicBool , Ordering :: Relaxed } ,
44- mpsc, Arc ,
45- } ;
43+ use std:: sync:: atomic:: AtomicU8 ;
44+ use std:: sync:: { atomic:: Ordering :: Relaxed , mpsc, Arc } ;
4645use std:: thread;
4746use std:: time:: { Duration , Instant } ;
4847
@@ -87,38 +86,65 @@ struct Settings {
8786
8887/// A timer which triggers on a given interval
8988///
90- /// After being constructed with [`Alarm::with_interval`], [`Alarm::is_triggered`]
91- /// will return true once per the given [`Duration`].
89+ /// After being constructed with [`Alarm::with_interval`], [`Alarm::get_trigger`]
90+ /// will return [`ALARM_TRIGGER_TIMER`] once per the given [`Duration`].
91+ /// Alarm can be manually triggered with closure returned by [`Alarm::manual_trigger_fn`].
92+ /// [`Alarm::get_trigger`] will return [`ALARM_TRIGGER_SIGNAL`] in this case.
9293///
9394/// Can be cloned, but the trigger status is shared across all instances so only
9495/// the first caller each interval will yield true.
9596///
9697/// When all instances are dropped the background thread will exit on the next interval.
97- #[ derive( Debug , Clone ) ]
9898pub struct Alarm {
9999 interval : Duration ,
100- trigger : Arc < AtomicBool > ,
100+ trigger : Arc < AtomicU8 > ,
101101}
102102
103+ pub const ALARM_TRIGGER_NONE : u8 = 0 ;
104+ pub const ALARM_TRIGGER_TIMER : u8 = 1 ;
105+ pub const ALARM_TRIGGER_SIGNAL : u8 = 2 ;
106+
103107impl Alarm {
108+ /// use to construct alarm timer with duration
104109 pub fn with_interval ( interval : Duration ) -> Self {
105- let trigger = Arc :: new ( AtomicBool :: default ( ) ) ;
110+ let trigger = Arc :: new ( AtomicU8 :: default ( ) ) ;
106111
107112 let weak_trigger = Arc :: downgrade ( & trigger) ;
108113 thread:: spawn ( move || {
109114 while let Some ( trigger) = weak_trigger. upgrade ( ) {
110115 thread:: sleep ( interval) ;
111- trigger. store ( true , Relaxed ) ;
116+ trigger. store ( ALARM_TRIGGER_TIMER , Relaxed ) ;
112117 }
113118 } ) ;
114119
115120 Self { interval, trigger }
116121 }
117122
118- pub fn is_triggered ( & self ) -> bool {
119- self . trigger . swap ( false , Relaxed )
123+ /// Returns a closure that allows to manually trigger the alarm
124+ ///
125+ /// This is useful for cases where more than one alarm even source exists
126+ /// In case of `dd` there is the SIGUSR1/SIGINFO case where we want to
127+ /// trigger an manual progress report.
128+ pub fn manual_trigger_fn ( & self ) -> Box < dyn Send + Sync + Fn ( ) > {
129+ let weak_trigger = Arc :: downgrade ( & self . trigger ) ;
130+ Box :: new ( move || {
131+ if let Some ( trigger) = weak_trigger. upgrade ( ) {
132+ trigger. store ( ALARM_TRIGGER_SIGNAL , Relaxed ) ;
133+ }
134+ } )
135+ }
136+
137+ /// Use this function to poll for any pending alarm event
138+ ///
139+ /// Returns `ALARM_TRIGGER_NONE` for no pending event.
140+ /// Returns `ALARM_TRIGGER_TIMER` if the event was triggered by timer
141+ /// Returns `ALARM_TRIGGER_SIGNAL` if the event was triggered manually
142+ /// by the closure returned from `manual_trigger_fn`
143+ pub fn get_trigger ( & self ) -> u8 {
144+ self . trigger . swap ( ALARM_TRIGGER_NONE , Relaxed )
120145 }
121146
147+ // Getter function for the configured interval duration
122148 pub fn get_interval ( & self ) -> Duration {
123149 self . interval
124150 }
@@ -818,6 +844,30 @@ impl<'a> Output<'a> {
818844 }
819845 }
820846
847+ /// writes a block of data. optionally retries when first try didn't complete
848+ ///
849+ /// this is needed by gnu-test: tests/dd/stats.s
850+ /// the write can be interrupted by a system signal.
851+ /// e.g. SIGUSR1 which is send to report status
852+ /// without retry, the data might not be fully written to destination.
853+ fn write_block ( & mut self , chunk : & [ u8 ] ) -> io:: Result < usize > {
854+ let full_len = chunk. len ( ) ;
855+ let mut base_idx = 0 ;
856+ loop {
857+ match self . dst . write ( & chunk[ base_idx..] ) {
858+ Ok ( wlen) => {
859+ base_idx += wlen;
860+ // take iflags.fullblock as oflags shall not have this option
861+ if ( base_idx >= full_len) || !self . settings . iflags . fullblock {
862+ return Ok ( base_idx) ;
863+ }
864+ }
865+ Err ( e) if e. kind ( ) == io:: ErrorKind :: Interrupted => continue ,
866+ Err ( e) => return Err ( e) ,
867+ }
868+ }
869+ }
870+
821871 /// Write the given bytes one block at a time.
822872 ///
823873 /// This may write partial blocks (for example, if the underlying
@@ -831,7 +881,7 @@ impl<'a> Output<'a> {
831881 let mut bytes_total = 0 ;
832882
833883 for chunk in buf. chunks ( self . settings . obs ) {
834- let wlen = self . dst . write ( chunk) ?;
884+ let wlen = self . write_block ( chunk) ?;
835885 if wlen < self . settings . obs {
836886 writes_partial += 1 ;
837887 } else {
@@ -922,6 +972,29 @@ impl<'a> BlockWriter<'a> {
922972 }
923973}
924974
975+ /// depending on the command line arguments, this function
976+ /// informs the OS to flush/discard the caches for input and/or output file.
977+ fn flush_caches_full_length ( i : & Input , o : & Output ) -> std:: io:: Result < ( ) > {
978+ // TODO Better error handling for overflowing `len`.
979+ if i. settings . iflags . nocache {
980+ let offset = 0 ;
981+ #[ allow( clippy:: useless_conversion) ]
982+ let len = i. src . len ( ) ?. try_into ( ) . unwrap ( ) ;
983+ i. discard_cache ( offset, len) ;
984+ }
985+ // Similarly, discard the system cache for the output file.
986+ //
987+ // TODO Better error handling for overflowing `len`.
988+ if i. settings . oflags . nocache {
989+ let offset = 0 ;
990+ #[ allow( clippy:: useless_conversion) ]
991+ let len = o. dst . len ( ) ?. try_into ( ) . unwrap ( ) ;
992+ o. discard_cache ( offset, len) ;
993+ }
994+
995+ Ok ( ( ) )
996+ }
997+
925998/// Copy the given input data to this output, consuming both.
926999///
9271000/// This method contains the main loop for the `dd` program. Bytes
@@ -981,22 +1054,7 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
9811054 // requests that we inform the system that we no longer
9821055 // need the contents of the input file in a system cache.
9831056 //
984- // TODO Better error handling for overflowing `len`.
985- if i. settings . iflags . nocache {
986- let offset = 0 ;
987- #[ allow( clippy:: useless_conversion) ]
988- let len = i. src . len ( ) ?. try_into ( ) . unwrap ( ) ;
989- i. discard_cache ( offset, len) ;
990- }
991- // Similarly, discard the system cache for the output file.
992- //
993- // TODO Better error handling for overflowing `len`.
994- if i. settings . oflags . nocache {
995- let offset = 0 ;
996- #[ allow( clippy:: useless_conversion) ]
997- let len = o. dst . len ( ) ?. try_into ( ) . unwrap ( ) ;
998- o. discard_cache ( offset, len) ;
999- }
1057+ flush_caches_full_length ( & i, & o) ?;
10001058 return finalize (
10011059 BlockWriter :: Unbuffered ( o) ,
10021060 rstat,
@@ -1018,6 +1076,18 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
10181076 // This avoids the need to query the OS monotonic clock for every block.
10191077 let alarm = Alarm :: with_interval ( Duration :: from_secs ( 1 ) ) ;
10201078
1079+ // The signal handler spawns an own thread that waits for signals.
1080+ // When the signal is received, it calls a handler function.
1081+ // We inject a handler function that manually triggers the alarm.
1082+ #[ cfg( target_os = "linux" ) ]
1083+ let signal_handler = progress:: SignalHandler :: install_signal_handler ( alarm. manual_trigger_fn ( ) ) ;
1084+ #[ cfg( target_os = "linux" ) ]
1085+ if let Err ( e) = & signal_handler {
1086+ if Some ( StatusLevel :: None ) != i. settings . status {
1087+ eprintln ! ( "Internal dd Warning: Unable to register signal handler \n \t {e}" ) ;
1088+ }
1089+ }
1090+
10211091 // Index in the input file where we are reading bytes and in
10221092 // the output file where we are writing bytes.
10231093 //
@@ -1086,11 +1156,20 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
10861156 // error.
10871157 rstat += rstat_update;
10881158 wstat += wstat_update;
1089- if alarm. is_triggered ( ) {
1090- let prog_update = ProgUpdate :: new ( rstat, wstat, start. elapsed ( ) , false ) ;
1091- prog_tx. send ( prog_update) . unwrap_or ( ( ) ) ;
1159+ match alarm. get_trigger ( ) {
1160+ ALARM_TRIGGER_NONE => { }
1161+ t @ ALARM_TRIGGER_TIMER | t @ ALARM_TRIGGER_SIGNAL => {
1162+ let tp = match t {
1163+ ALARM_TRIGGER_TIMER => ProgUpdateType :: Periodic ,
1164+ _ => ProgUpdateType :: Signal ,
1165+ } ;
1166+ let prog_update = ProgUpdate :: new ( rstat, wstat, start. elapsed ( ) , tp) ;
1167+ prog_tx. send ( prog_update) . unwrap_or ( ( ) ) ;
1168+ }
1169+ _ => { }
10921170 }
10931171 }
1172+
10941173 finalize ( o, rstat, wstat, start, & prog_tx, output_thread, truncate)
10951174}
10961175
@@ -1118,12 +1197,13 @@ fn finalize<T>(
11181197
11191198 // Print the final read/write statistics.
11201199 let wstat = wstat + wstat_update;
1121- let prog_update = ProgUpdate :: new ( rstat, wstat, start. elapsed ( ) , true ) ;
1200+ let prog_update = ProgUpdate :: new ( rstat, wstat, start. elapsed ( ) , ProgUpdateType :: Final ) ;
11221201 prog_tx. send ( prog_update) . unwrap_or ( ( ) ) ;
11231202 // Wait for the output thread to finish
11241203 output_thread
11251204 . join ( )
11261205 . expect ( "Failed to join with the output thread." ) ;
1206+
11271207 Ok ( ( ) )
11281208}
11291209
0 commit comments