Skip to content

Commit

Permalink
Change File info poller to respond when capacity in channel opens up …
Browse files Browse the repository at this point in the history
…and decode stream into Vec of structs (#715)

* Update file info poller to use reseve when channel is at capacity

* Have file_info_poller convert stream into Vec on decoded structs

* Change back to turn into stream

* Update remaining consumer and remove fake test

* Change length check to is_empty

* Change name of type

* make clippy happy again

* Removing println

* simplify lifetimes on FileInfoStream::into_stream
  • Loading branch information
bbalser authored Jan 23, 2024
1 parent 2e42c85 commit 0e603e0
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 72 deletions.
2 changes: 2 additions & 0 deletions file_store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub enum Error {
#[cfg(feature = "sqlx-postgres")]
#[error("db error")]
DbError(#[from] sqlx::Error),
#[error("channel send error")]
SendError(#[from] tokio::sync::mpsc::error::SendError<()>),
}

#[derive(Error, Debug)]
Expand Down
146 changes: 85 additions & 61 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ use derive_builder::Builder;
use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt};
use futures_util::TryFutureExt;
use retainer::Cache;
use std::marker::PhantomData;
use std::{collections::VecDeque, marker::PhantomData, sync::Arc};
use task_manager::ManagedTask;
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender};
use tokio::sync::mpsc::{Receiver, Sender};

const DEFAULT_POLL_DURATION_SECS: i64 = 30;
const DEFAULT_POLL_DURATION: std::time::Duration =
std::time::Duration::from_secs(DEFAULT_POLL_DURATION_SECS as u64);
const CLEAN_DURATION: std::time::Duration = std::time::Duration::from_secs(12 * 60 * 60);
const CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(3 * 60 * 60);

type MemoryFileCache = Cache<String, bool>;
type MemoryFileCache = Arc<Cache<String, bool>>;

#[async_trait::async_trait]
pub trait FileInfoPollerState: Send + Sync + 'static {
Expand All @@ -37,27 +37,30 @@ pub trait FileInfoPollerStateRecorder {
pub struct FileInfoStream<T> {
pub file_info: FileInfo,
process_name: String,
stream: BoxStream<'static, T>,
data: Vec<T>,
}

impl<T> FileInfoStream<T>
where
T: Send,
{
pub fn new(process_name: String, file_info: FileInfo, stream: BoxStream<'static, T>) -> Self {
pub fn new(process_name: String, file_info: FileInfo, data: Vec<T>) -> Self {
Self {
file_info,
process_name,
stream,
data,
}
}

pub async fn into_stream(
self,
recorder: impl FileInfoPollerStateRecorder,
) -> Result<BoxStream<'static, T>> {
) -> Result<BoxStream<'static, T>>
where
T: 'static,
{
recorder.record(&self.process_name, &self.file_info).await?;
Ok(self.stream)
Ok(futures::stream::iter(self.data.into_iter()).boxed())
}
}

Expand Down Expand Up @@ -86,21 +89,39 @@ pub struct FileInfoPollerConfig<T, S> {
p: PhantomData<T>,
}

#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct FileInfoPollerServer<T, S> {
config: FileInfoPollerConfig<T, S>,
sender: Sender<FileInfoStream<T>>,
file_queue: VecDeque<FileInfo>,
latest_file_timestamp: Option<DateTime<Utc>>,
cache: MemoryFileCache,
}

type FileInfoStreamReceiver<T> = Receiver<FileInfoStream<T>>;
impl<T, S> FileInfoPollerConfigBuilder<T, S>
where
T: Clone,
S: FileInfoPollerState,
{
pub fn create(self) -> Result<(FileInfoStreamReceiver<T>, FileInfoPollerServer<T, S>)> {
pub async fn create(self) -> Result<(FileInfoStreamReceiver<T>, FileInfoPollerServer<T, S>)> {
let config = self.build()?;
let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size);
Ok((receiver, FileInfoPollerServer { config, sender }))
let latest_file_timestamp = config
.state
.latest_timestamp(&config.process_name, &config.prefix)
.await?;

Ok((
receiver,
FileInfoPollerServer {
config,
sender,
file_queue: VecDeque::new(),
latest_file_timestamp,
cache: create_cache(),
},
))
}
}

Expand Down Expand Up @@ -142,47 +163,57 @@ where
})
}

async fn run(self, shutdown: triggered::Listener) -> Result {
let cache = create_cache();
let mut poll_trigger = tokio::time::interval(self.poll_duration());
async fn get_next_file(&mut self) -> Result<FileInfo> {
loop {
if let Some(file_info) = self.file_queue.pop_front() {
return Ok(file_info);
}

let after = self.after(self.latest_file_timestamp);
let before = Utc::now();
let files = self
.config
.store
.list_all(&self.config.prefix, after, before)
.await?;

for file in files {
if !self.is_already_processed(&file).await? {
self.latest_file_timestamp = Some(file.timestamp);
self.file_queue.push_back(file);
}
}

if self.file_queue.is_empty() {
tokio::time::sleep(self.poll_duration()).await;
}
}
}

async fn run(mut self, shutdown: triggered::Listener) -> Result {
let mut cleanup_trigger = tokio::time::interval(CLEAN_DURATION);
let process_name = self.config.process_name.clone();

let mut latest_ts = self
.config
.state
.latest_timestamp(&self.config.process_name, &self.config.prefix)
.await?;
tracing::info!(
r#type = self.config.prefix,
%process_name,
"starting FileInfoPoller",
);

let sender = self.sender.clone();
loop {
let after = self.after(latest_ts);
let before = Utc::now();

tokio::select! {
biased;
_ = shutdown.clone() => {
tracing::info!(r#type = self.config.prefix, %process_name, "stopping FileInfoPoller");
break;
}
_ = cleanup_trigger.tick() => self.clean(&cache).await?,
_ = poll_trigger.tick() => {
let files = self.config.store.list_all(&self.config.prefix, after, before).await?;
for file in files {
if !is_already_processed(&self.config.state, &cache, &process_name, &file).await? {
if send_stream(&self.sender, &self.config.store, process_name.clone(), file.clone()).await? {
latest_ts = Some(file.timestamp);
cache_file(&cache, &file).await;
} else {
tracing::info!(r#type = self.config.prefix, %process_name, "FileInfoPoller: channel full");
break;
}
}
}
_ = cleanup_trigger.tick() => self.clean(&self.cache).await?,
result = futures::future::try_join(sender.reserve().map_err(Error::from), self.get_next_file()) => {
let (permit, file) = result?;
let data = parse_file(&self.config.store, process_name.clone(), file.clone()).await?;
permit.send(data);
cache_file(&self.cache, &file).await;
}
}
}
Expand Down Expand Up @@ -215,18 +246,28 @@ where
.to_std()
.unwrap_or(DEFAULT_POLL_DURATION)
}

async fn is_already_processed(&self, file_info: &FileInfo) -> Result<bool> {
if self.cache.get(&file_info.key).await.is_some() {
Ok(true)
} else {
self.config
.state
.exists(&self.config.process_name, file_info)
.await
}
}
}

async fn send_stream<T>(
sender: &Sender<FileInfoStream<T>>,
async fn parse_file<T>(
store: &FileStore,
process_name: String,
file: FileInfo,
) -> Result<bool>
) -> Result<FileInfoStream<T>>
where
T: MsgDecode + TryFrom<T::Msg, Error = Error> + Send + Sync + 'static,
{
let stream = store
let stream: Vec<T> = store
.stream_file(file.clone())
.await?
.filter_map(|msg| async {
Expand All @@ -250,31 +291,14 @@ where
})
.ok()
})
.boxed();
.collect()
.await;

let incoming_data_stream = FileInfoStream::new(process_name, file, stream);
match sender.try_send(incoming_data_stream) {
Ok(_) => Ok(true),
Err(TrySendError::Full(_)) => Ok(false),
Err(TrySendError::Closed(_)) => Err(Error::channel()),
}
Ok(FileInfoStream::new(process_name, file, stream))
}

fn create_cache() -> MemoryFileCache {
Cache::new()
}

async fn is_already_processed(
state: &impl FileInfoPollerState,
cache: &MemoryFileCache,
process_name: &str,
file_info: &FileInfo,
) -> Result<bool> {
if cache.get(&file_info.key).await.is_some() {
Ok(true)
} else {
state.exists(process_name, file_info).await
}
Arc::new(Cache::new())
}

async fn cache_file(cache: &MemoryFileCache, file_info: &FileInfo) {
Expand Down
3 changes: 2 additions & 1 deletion iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ impl Cmd {
.store(file_store)
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::IotPacketReport.to_string())
.create()?;
.create()
.await?;

let balance_store = balances.balances();
let verifier_daemon = Daemon {
Expand Down
6 changes: 4 additions & 2 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ impl Server {
.lookback(LookbackBehavior::Max(max_lookback_age))
.poll_duration(entropy_interval)
.offset(entropy_interval * 2)
.create()?;
.create()
.await?;

let entropy_loader = EntropyLoader {
pool: pool.clone(),
Expand Down Expand Up @@ -190,7 +191,8 @@ impl Server {
.lookback(LookbackBehavior::Max(max_lookback_age))
.poll_duration(packet_interval)
.offset(packet_interval * 2)
.create()?;
.create()
.await?;

let packet_loader = packet_loader::PacketLoader::from_settings(
settings,
Expand Down
3 changes: 2 additions & 1 deletion mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ impl Cmd {
))
.prefix(FileType::DataTransferSessionIngestReport.to_string())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.create()?;
.create()
.await?;

let gateway_client = GatewayClient::from_settings(&settings.config_client)?;
let auth_client = AuthorizationClient::from_settings(&settings.config_client)?;
Expand Down
18 changes: 12 additions & 6 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ impl Cmd {
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CbrsHeartbeatIngestReport.to_string())
.queue_size(1)
.create()?;
.create()
.await?;

// Wifi Heartbeats
let (wifi_heartbeats, wifi_heartbeats_server) =
Expand All @@ -66,7 +67,8 @@ impl Cmd {
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::WifiHeartbeatIngestReport.to_string())
.create()?;
.create()
.await?;

let (valid_heartbeats, valid_heartbeats_server) = file_sink::FileSinkBuilder::new(
FileType::ValidatedHeartbeat,
Expand Down Expand Up @@ -118,7 +120,8 @@ impl Cmd {
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CellSpeedtestIngestReport.to_string())
.create()?;
.create()
.await?;

let (speedtests_avg, speedtests_avg_server) = file_sink::FileSinkBuilder::new(
FileType::SpeedtestAvg,
Expand Down Expand Up @@ -157,7 +160,8 @@ impl Cmd {
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::CoverageObjectIngestReport.to_string())
.create()?;
.create()
.await?;

let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new(
FileType::CoverageObject,
Expand Down Expand Up @@ -216,7 +220,8 @@ impl Cmd {
.store(report_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::SubscriberLocationIngestReport.to_string())
.create()?;
.create()
.await?;

let (verified_subscriber_location, verified_subscriber_location_server) =
file_sink::FileSinkBuilder::new(
Expand Down Expand Up @@ -244,7 +249,8 @@ impl Cmd {
.store(data_transfer_ingest.clone())
.lookback(LookbackBehavior::StartAfter(settings.start_after()))
.prefix(FileType::ValidDataTransferSession.to_string())
.create()?;
.create()
.await?;

let data_session_ingestor = DataSessionIngestor::new(pool.clone(), data_session_ingest);

Expand Down
3 changes: 2 additions & 1 deletion reward_index/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ impl Server {
))
.poll_duration(settings.interval())
.offset(settings.interval() * 2)
.create()?;
.create()
.await?;
let source_join_handle = server.start(shutdown_listener.clone()).await?;

// Reward server
Expand Down

0 comments on commit 0e603e0

Please sign in to comment.