Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change File info poller to respond when capacity in channel opens up and decode stream into Vec of structs #715

Merged
merged 9 commits into from
Jan 23, 2024
2 changes: 2 additions & 0 deletions file_store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub enum Error {
#[cfg(feature = "sqlx-postgres")]
#[error("db error")]
DbError(#[from] sqlx::Error),
#[error("channel send error")]
SendError(#[from] tokio::sync::mpsc::error::SendError<()>),
}

#[derive(Error, Debug)]
Expand Down
146 changes: 85 additions & 61 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ use derive_builder::Builder;
use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt};
use futures_util::TryFutureExt;
use retainer::Cache;
use std::marker::PhantomData;
use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
use task_manager::ManagedTask;
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender};
use tokio::sync::mpsc::{Receiver, Sender};

const DEFAULT_POLL_DURATION_SECS: i64 = 30;
const DEFAULT_POLL_DURATION: std::time::Duration =
std::time::Duration::from_secs(DEFAULT_POLL_DURATION_SECS as u64);
const CLEAN_DURATION: std::time::Duration = std::time::Duration::from_secs(12 * 60 * 60);
const CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(3 * 60 * 60);

type MemoryFileCache = Cache<String, bool>;
type MemoryFileCache = Arc<Cache<String, bool>>;

#[async_trait::async_trait]
pub trait FileInfoPollerState: Send + Sync + 'static {
Expand All @@ -37,27 +37,30 @@ pub trait FileInfoPollerStateRecorder {
pub struct FileInfoStream<T> {
pub file_info: FileInfo,
process_name: String,
stream: BoxStream<'static, T>,
data: Vec<T>,
}

impl<T> FileInfoStream<T>
where
T: Send,
{
pub fn new(process_name: String, file_info: FileInfo, stream: BoxStream<'static, T>) -> Self {
pub fn new(process_name: String, file_info: FileInfo, data: Vec<T>) -> Self {
Self {
file_info,
process_name,
stream,
data,
}
}

pub async fn into_stream(
self,
recorder: impl FileInfoPollerStateRecorder,
) -> Result<BoxStream<'static, T>> {
) -> Result<BoxStream<'static, T>>
where
T: 'static,
{
recorder.record(&self.process_name, &self.file_info).await?;
Ok(self.stream)
Ok(futures::stream::iter(self.data.into_iter()).boxed())
}
}

Expand Down Expand Up @@ -86,21 +89,39 @@ pub struct FileInfoPollerConfig<T, S> {
p: PhantomData<T>,
}

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct FileInfoPollerServer<T, S> {
config: FileInfoPollerConfig<T, S>,
sender: Sender<FileInfoStream<T>>,
file_queue: VecDeque<FileInfo>,
latest_file_timestamp: Option<DateTime<Utc>>,
cache: MemoryFileCache,
}

type FileInfoStreamReceiver<T> = Receiver<FileInfoStream<T>>;
impl<T, S> FileInfoPollerConfigBuilder<T, S>
where
T: Clone,
S: FileInfoPollerState,
{
pub fn create(self) -> Result<(FileInfoStreamReceiver<T>, FileInfoPollerServer<T, S>)> {
pub async fn create(self) -> Result<(FileInfoStreamReceiver<T>, FileInfoPollerServer<T, S>)> {
let config = self.build()?;
let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size);
Ok((receiver, FileInfoPollerServer { config, sender }))
let latest_file_timestamp = config
.state
.latest_timestamp(&config.process_name, &config.prefix)
.await?;

Ok((
receiver,
FileInfoPollerServer {
config,
sender,
file_queue: VecDeque::new(),
latest_file_timestamp,
cache: create_cache(),
},
))
}
}

Expand Down Expand Up @@ -142,47 +163,57 @@ where
})
}

async fn run(self, shutdown: triggered::Listener) -> Result {
let cache = create_cache();
let mut poll_trigger = tokio::time::interval(self.poll_duration());
async fn get_next_file(&mut self) -> Result<FileInfo> {
loop {
if let Some(file_info) = self.file_queue.pop_front() {
return Ok(file_info);
}

let after = self.after(self.latest_file_timestamp);
let before = Utc::now();
let files = self
.config
.store
.list_all(&self.config.prefix, after, before)
.await?;

for file in files {
if !self.is_already_processed(&file).await? {
self.latest_file_timestamp = Some(file.timestamp);
self.file_queue.push_back(file);
}
}

if self.file_queue.is_empty() {
tokio::time::sleep(self.poll_duration()).await;
}
}
}

async fn run(mut self, shutdown: triggered::Listener) -> Result {
let mut cleanup_trigger = tokio::time::interval(CLEAN_DURATION);
let process_name = self.config.process_name.clone();

let mut latest_ts = self
.config
.state
.latest_timestamp(&self.config.process_name, &self.config.prefix)
.await?;
tracing::info!(
r#type = self.config.prefix,
%process_name,
"starting FileInfoPoller",
);

let sender = self.sender.clone();
loop {
let after = self.after(latest_ts);
let before = Utc::now();

tokio::select! {
biased;
_ = shutdown.clone() => {
tracing::info!(r#type = self.config.prefix, %process_name, "stopping FileInfoPoller");
break;
}
_ = cleanup_trigger.tick() => self.clean(&cache).await?,
_ = poll_trigger.tick() => {
let files = self.config.store.list_all(&self.config.prefix, after, before).await?;
for file in files {
if !is_already_processed(&self.config.state, &cache, &process_name, &file).await? {
if send_stream(&self.sender, &self.config.store, process_name.clone(), file.clone()).await? {
latest_ts = Some(file.timestamp);
cache_file(&cache, &file).await;
} else {
tracing::info!(r#type = self.config.prefix, %process_name, "FileInfoPoller: channel full");
break;
}
}
}
_ = cleanup_trigger.tick() => self.clean(&self.cache).await?,
result = futures::future::try_join(sender.reserve().map_err(Error::from), self.get_next_file()) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it's safe to assume that the joined result tuple is treated like collecting an iterator of results into a single result wrapping a collection, i.e. assuming both operations being joined are Ok(_) the result tuple will be Ok(result_1_inner, result_2_inner)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let (permit, file) = result?;
let data = parse_file(&self.config.store, process_name.clone(), file.clone()).await?;
permit.send(data);
cache_file(&self.cache, &file).await;
}
}
}
Expand Down Expand Up @@ -215,18 +246,28 @@ where
.to_std()
.unwrap_or(DEFAULT_POLL_DURATION)
}

async fn is_already_processed(&self, file_info: &FileInfo) -> Result<bool> {
if self.cache.get(&file_info.key).await.is_some() {
Ok(true)
} else {
self.config
.state
.exists(&self.config.process_name, file_info)
.await
}
}
}

async fn send_stream<T>(
sender: &Sender<FileInfoStream<T>>,
async fn parse_file<T>(
store: &FileStore,
process_name: String,
file: FileInfo,
) -> Result<bool>
) -> Result<FileInfoStream<T>>
where
T: MsgDecode + TryFrom<T::Msg, Error = Error> + Send + Sync + 'static,
{
let stream = store
let stream: Vec<T> = store
.stream_file(file.clone())
.await?
.filter_map(|msg| async {
Expand All @@ -250,31 +291,14 @@ where
})
.ok()
})
.boxed();
.collect()
.await;

let incoming_data_stream = FileInfoStream::new(process_name, file, stream);
match sender.try_send(incoming_data_stream) {
Ok(_) => Ok(true),
Err(TrySendError::Full(_)) => Ok(false),
Err(TrySendError::Closed(_)) => Err(Error::channel()),
}
Ok(FileInfoStream::new(process_name, file, stream))
}

fn create_cache() -> MemoryFileCache {
Cache::new()
}

async fn is_already_processed(
state: &impl FileInfoPollerState,
cache: &MemoryFileCache,
process_name: &str,
file_info: &FileInfo,
) -> Result<bool> {
if cache.get(&file_info.key).await.is_some() {
Ok(true)
} else {
state.exists(process_name, file_info).await
}
Arc::new(Cache::new())
}

async fn cache_file(cache: &MemoryFileCache, file_info: &FileInfo) {
Expand Down
3 changes: 2 additions & 1 deletion iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ impl Cmd {
.store(file_store)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::IotPacketReport.to_string())
.create()?;
.create()
.await?;

let balance_store = balances.balances();
let verifier_daemon = Daemon {
Expand Down
6 changes: 4 additions & 2 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ impl Server {
.lookback(LookbackBehavior::Max(max_lookback_age))
.poll_duration(entropy_interval)
.offset(entropy_interval * 2)
.create()?;
.create()
.await?;

let entropy_loader = EntropyLoader {
pool: pool.clone(),
Expand Down Expand Up @@ -190,7 +191,8 @@ impl Server {
.lookback(LookbackBehavior::Max(max_lookback_age))
.poll_duration(packet_interval)
.offset(packet_interval * 2)
.create()?;
.create()
.await?;

let packet_loader = packet_loader::PacketLoader::from_settings(
settings,
Expand Down
3 changes: 2 additions & 1 deletion mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ impl Cmd {
))
.prefix(FileType::DataTransferSessionIngestReport.to_string())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.create()?;
.create()
.await?;

let gateway_client = GatewayClient::from_settings(&settings.config_client)?;
let auth_client = AuthorizationClient::from_settings(&settings.config_client)?;
Expand Down
18 changes: 12 additions & 6 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ impl Cmd {
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CbrsHeartbeatIngestReport.to_string())
.queue_size(1)
.create()?;
.create()
.await?;

// Wifi Heartbeats
let (wifi_heartbeats, wifi_heartbeats_server) =
Expand All @@ -66,7 +67,8 @@ impl Cmd {
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::WifiHeartbeatIngestReport.to_string())
.create()?;
.create()
.await?;

let (valid_heartbeats, valid_heartbeats_server) = file_sink::FileSinkBuilder::new(
FileType::ValidatedHeartbeat,
Expand Down Expand Up @@ -118,7 +120,8 @@ impl Cmd {
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CellSpeedtestIngestReport.to_string())
.create()?;
.create()
.await?;

let (speedtests_avg, speedtests_avg_server) = file_sink::FileSinkBuilder::new(
FileType::SpeedtestAvg,
Expand Down Expand Up @@ -157,7 +160,8 @@ impl Cmd {
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CoverageObjectIngestReport.to_string())
.create()?;
.create()
.await?;

let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new(
FileType::CoverageObject,
Expand Down Expand Up @@ -216,7 +220,8 @@ impl Cmd {
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::SubscriberLocationIngestReport.to_string())
.create()?;
.create()
.await?;

let (verified_subscriber_location, verified_subscriber_location_server) =
file_sink::FileSinkBuilder::new(
Expand Down Expand Up @@ -244,7 +249,8 @@ impl Cmd {
.store(data_transfer_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::ValidDataTransferSession.to_string())
.create()?;
.create()
.await?;

let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest);

Expand Down
3 changes: 2 additions & 1 deletion reward_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ impl Server {
))
.poll_duration(settings.interval())
.offset(settings.interval() * 2)
.create()?;
.create()
.await?;
let source_join_handle = server.start(shutdown_listener.clone()).await?;

// Reward server
Expand Down