Skip to content

Commit

Permalink
Merge pull request #15 from TheCacophonyProject/offload-last
Browse files Browse the repository at this point in the history
Offload last files first
  • Loading branch information
gferraro authored Aug 19, 2024
2 parents 97f5ad6 + 970a311 commit d12bfc2
Show file tree
Hide file tree
Showing 7 changed files with 282 additions and 136 deletions.
6 changes: 5 additions & 1 deletion src/core0_audio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,6 @@ fn should_offload_audio_recordings(
if !has_files {
return false;
}

// flash getting full
if flash_storage.is_too_full_for_audio() {
info!("Offloading as flash is nearly full");
Expand All @@ -673,6 +672,11 @@ fn should_offload_audio_recordings(
info!("Offloading as logger is nearly full");
return true;
}
if flash_storage.file_start_block_index.is_none() {
//one off
info!("Offloading as previous file system version");
return true;
}

return false;
}
179 changes: 105 additions & 74 deletions src/core1_sub_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,106 +145,137 @@ pub fn offload_flash_storage_and_events(
None
},
);
let mut has_file = flash_storage.begin_offload_reverse();

// do some offloading.
let mut file_count = 0;
flash_storage.begin_offload();
let mut file_start = true;
let mut part_count = 0;
let mut success: bool = true;
let mut counter = timer.get_counter();

// TODO: Could speed this up slightly using cache_random_read interleaving on flash storage.
// Probably doesn't matter though.
while let Some(((part, crc, block_index, page_index), is_last, spi)) =
flash_storage.get_file_part()
{
if watchdog.is_some() {
watchdog.as_mut().unwrap().feed();
}
pi_spi.enable(spi, resets);
let transfer_type = if file_start && !is_last {
ExtTransferMessage::BeginFileTransfer
} else if !file_start && !is_last {
ExtTransferMessage::ResumeFileTransfer
} else if is_last {
ExtTransferMessage::EndFileTransfer
} else if file_start && is_last {
ExtTransferMessage::BeginAndEndFileTransfer
} else {
crate::unreachable!("Invalid file transfer state");
};

let crc_check = Crc::<u16>::new(&CRC_16_XMODEM);
let current_crc = crc_check.checksum(&part);
if current_crc != crc {
warn!(
"Data corrupted at part #{} ({}:{}) in transfer to or from flash memory",
part_count, block_index, page_index
);
}

let mut attempts = 0;
'transfer_part: loop {
while has_file {
let mut file_start = true;
let mut part_count = 0;
let mut file_ended = false;
while let Some(((part, crc, block_index, page_index), is_last, spi)) =
flash_storage.get_file_part()
{
if watchdog.is_some() {
watchdog.as_mut().unwrap().feed();
}
let did_transfer =
pi_spi.send_message(transfer_type, &part, current_crc, dma, timer, resets);
counter = timer.get_counter();
if !did_transfer {
attempts += 1;
if attempts > 100 {
success = false;
pi_spi.enable(spi, resets);
let transfer_type = if file_start && !is_last {
ExtTransferMessage::BeginFileTransfer
} else if !file_start && !is_last {
ExtTransferMessage::ResumeFileTransfer
} else if is_last {
ExtTransferMessage::EndFileTransfer
} else if file_start && is_last {
ExtTransferMessage::BeginAndEndFileTransfer
} else {
crate::unreachable!("Invalid file transfer state");
};

if file_start {
info!("Offload start is {}:{}", block_index, page_index)
}
if is_last {
info!("Got last file {}:{}", block_index, page_index);
}
let crc_check = Crc::<u16>::new(&CRC_16_XMODEM);
let current_crc = crc_check.checksum(&part);
if current_crc != crc {
warn!(
"Data corrupted at part #{} ({}:{}) in transfer to or from flash memory",
part_count, block_index, page_index
);
}

let mut attempts = 0;
'transfer_part: loop {
if watchdog.is_some() {
watchdog.as_mut().unwrap().feed();
}
let did_transfer =
pi_spi.send_message(transfer_type, &part, current_crc, dma, timer, resets);
counter = timer.get_counter();
if !did_transfer {
attempts += 1;
if attempts > 100 {
success = false;
break 'transfer_part;
}
//takes tc2-agent about this long to poll again will fail a lot otherwise
let time_since = (timer.get_counter() - counter).to_micros();
if time_since < TIME_BETWEEN_TRANSFER {
delay.delay_us((TIME_BETWEEN_TRANSFER - time_since) as u32);
}
} else {
break 'transfer_part;
}
//takes tc2-agent about this long to poll again will fail a lot otherwise
let time_since = (timer.get_counter() - counter).to_micros();
if time_since < TIME_BETWEEN_TRANSFER {
delay.delay_us((TIME_BETWEEN_TRANSFER - time_since) as u32);
}

// Give spi peripheral back to flash storage.
if let Some(spi) = pi_spi.disable() {
flash_storage.take_spi(spi, resets, clock_freq.Hz());
if is_last {
event_logger.log_event(
LoggerEvent::new(
LoggerEventKind::OffloadedRecording,
time.get_timestamp_micros(&timer),
),
flash_storage,
);
}
} else {
break 'transfer_part;
}
}
if !success {
break;
}

// Give spi peripheral back to flash storage.
if let Some(spi) = pi_spi.disable() {
flash_storage.take_spi(spi, resets, clock_freq.Hz());
part_count += 1;
if is_last {
file_count += 1;
info!("Offloaded {} file(s)", file_count);
if watchdog.is_some() {
watchdog.as_mut().unwrap().feed();
}
let _ = flash_storage.erase_last_file();
file_ended = true;
}
file_start = false;
}
if !success {
break;
}

if !file_ended {
info!(
"Incomplete file at block {} erasing",
flash_storage.file_start_block_index
);
if watchdog.is_some() {
watchdog.as_mut().unwrap().feed();
}
if let Err(e) = flash_storage.erase_last_file() {
event_logger.log_event(
LoggerEvent::new(
LoggerEventKind::OffloadedRecording,
LoggerEventKind::ErasePartialOrCorruptRecording,
time.get_timestamp_micros(&timer),
),
flash_storage,
);
}
}
if !success {
info!("NOT success so breaking");
break;
}

part_count += 1;
if is_last {
file_count += 1;
info!("Offloaded {} file(s)", file_count);
file_start = true;
} else {
file_start = false;
}
has_file = flash_storage.begin_offload_reverse();
}
if success {
info!("Completed file offload, transferred {} files", file_count);
// TODO: Some validation from the raspberry pi that the transfer completed
// without errors, in the form of a hash, and if we have errors, we'd re-transmit.

// Once we've successfully offloaded all files, we can erase the flash and we're
// ready to start recording new CPTV files there.

info!("Erasing after successful offload");
//flash_storage.erase_all_good_used_blocks();
flash_storage.erase_all_blocks();
info!(
"Completed file offload, transferred {} files start {} previous is {}",
file_count,
flash_storage.file_start_block_index,
flash_storage.previous_file_start_block_index
);
file_count != 0
} else {
flash_storage.scan();
Expand Down
54 changes: 29 additions & 25 deletions src/core1_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,9 @@ pub fn core_1_task(
let has_files_to_offload = flash_storage.has_files_to_offload();
let should_offload = (has_files_to_offload
&& !device_config.time_is_in_recording_window(&synced_date_time.date_time_utc, &None))
|| flash_storage.is_too_full_to_start_new_recordings();
|| flash_storage.is_too_full_to_start_new_recordings()
|| (has_files_to_offload && flash_storage.file_start_block_index.is_none());
//means old file system offload once
let should_offload = if !should_offload {
let previous_offload_time = event_logger
.latest_event_of_kind(LoggerEventKind::OffloadedRecording, &mut flash_storage)
Expand All @@ -419,7 +421,6 @@ pub fn core_1_task(
} else {
should_offload
};

let did_offload_files = if should_offload {
offload_flash_storage_and_events(
&mut flash_storage,
Expand All @@ -437,7 +438,6 @@ pub fn core_1_task(
} else {
false
};

// Unset the is_recording flag on attiny on startup
let _ = shared_i2c
.set_recording_flag(&mut delay, false)
Expand Down Expand Up @@ -485,7 +485,7 @@ pub fn core_1_task(
let mut logged_flash_storage_nearly_full = false;
// NOTE: If there are already recordings on the flash memory,
// assume we've already made the startup status recording during this recording window.
let mut made_startup_status_recording = !has_files_to_offload || did_offload_files;
let mut made_startup_status_recording = has_files_to_offload || did_offload_files;
let mut made_shutdown_status_recording = false;
let mut making_status_recording = false;
// Enable raw frame transfers to pi – if not already enabled.
Expand Down Expand Up @@ -759,10 +759,31 @@ pub fn core_1_task(
// Finalise on a different frame period to writing out the prev/last frame,
// to give more breathing room.
if let Some(cptv_stream) = &mut cptv_stream {
error!("Ending current recording");
let cptv_start_block_index = cptv_stream.starting_block_index as isize;
cptv_stream.finalise(&mut flash_storage);
let cptv_end_block_index = flash_storage.last_used_block_index.unwrap();

if !making_status_recording
&& motion_detection.as_ref().unwrap().was_false_positive()
// && cptv_stream.num_frames <= 100
{
info!(
"Discarding as a false-positive {}:{} ",
cptv_start_block_index, flash_storage.last_used_block_index
);
let _ = flash_storage.erase_last_file();
event_logger.log_event(
LoggerEvent::new(
LoggerEventKind::WouldDiscardAsFalsePositive,
synced_date_time.get_timestamp_micros(&timer),
),
&mut flash_storage,
);
} else {
cptv_stream.finalise(&mut flash_storage);
error!(
"Ending current recording start block {} end block{}",
cptv_start_block_index, flash_storage.last_used_block_index
);
}

ended_recording = true;
let _ = shared_i2c
Expand All @@ -776,24 +797,7 @@ pub fn core_1_task(
),
&mut flash_storage,
);
if !making_status_recording
&& motion_detection.as_ref().unwrap().was_false_positive()
// && cptv_stream.num_frames <= 100
{
info!("Discarding as a false-positive");
cptv_stream.discard(
&mut flash_storage,
cptv_start_block_index,
cptv_end_block_index,
);
// event_logger.log_event(
// LoggerEvent::new(
// LoggerEventKind::WouldDiscardAsFalsePositive,
// synced_date_time.get_timestamp_micros(&timer),
// ),
// &mut flash_storage,
// );
}

if making_status_recording {
making_status_recording = false;
}
Expand Down
16 changes: 3 additions & 13 deletions src/cptv_encoder/streaming_cptv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ impl<'a> CptvStream<'a> {
for b in buf {
self.cursor.write_byte(b);
if let Some((to_flush, num_bytes)) = self.cursor.should_flush() {
flash_storage.append_file_bytes(
let _ = flash_storage.append_file_bytes(
to_flush,
num_bytes,
false,
Expand All @@ -572,7 +572,7 @@ impl<'a> CptvStream<'a> {
for b in buf {
self.cursor.write_byte(b);
if let Some((to_flush, num_bytes)) = self.cursor.should_flush() {
flash_storage.append_file_bytes(
let _ = flash_storage.append_file_bytes(
to_flush,
num_bytes,
false,
Expand All @@ -586,7 +586,7 @@ impl<'a> CptvStream<'a> {
// and write out to storage.
let _ = self.cursor.end_aligned();
let (to_flush, num_bytes) = self.cursor.flush();
flash_storage.append_file_bytes(
let _ = flash_storage.append_file_bytes(
to_flush,
num_bytes,
!at_header_location,
Expand Down Expand Up @@ -621,16 +621,6 @@ impl<'a> CptvStream<'a> {
}
self.write_gzip_trailer(flash_storage, true);
}

pub fn discard(
&mut self,
flash_storage: &mut OnboardFlash,
start_block_index: isize,
end_block_index: isize,
) {
// NOTE: In the case that the block erase fails, that just means we won't reclaim the space at the moment.
let _ = flash_storage.erase_block_range(start_block_index, end_block_index);
}
}

struct HeaderIterator {
Expand Down
3 changes: 3 additions & 0 deletions src/event_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub enum LoggerEventKind {
AttinyCommError,
Rp2040MissedAudioAlarm(u64),
AudioRecordingFailed,
ErasePartialOrCorruptRecording,
}

impl Into<u16> for LoggerEventKind {
Expand Down Expand Up @@ -57,6 +58,7 @@ impl Into<u16> for LoggerEventKind {
AttinyCommError => 19,
Rp2040MissedAudioAlarm(_) => 20,
AudioRecordingFailed => 21,
ErasePartialOrCorruptRecording => 22,
}
}
}
Expand Down Expand Up @@ -88,6 +90,7 @@ impl TryFrom<u16> for LoggerEventKind {
19 => Ok(AttinyCommError),
20 => Ok(Rp2040MissedAudioAlarm(0)),
21 => Ok(AudioRecordingFailed),
22 => Ok(ErasePartialOrCorruptRecording),
_ => Err(()),
}
}
Expand Down
Loading

0 comments on commit d12bfc2

Please sign in to comment.