Skip to content

Commit

Permalink
Removing old unused read shards (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rixhee authored Dec 15, 2024
1 parent a0d6578 commit 0007c6b
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ use messages::responses::write_response::WriteResponse;
use anyhow::Result;
use rand::seq::SliceRandom;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use tokio::time;
use utils::constants::MAIN_INSTANCE_IP_PORT;

#[derive(Clone)]
struct AnnounceInfo {
ip: u128,
port: u16,
announce_id: u128,
timestamp: SystemTime,
}

#[derive(Clone)]
Expand Down Expand Up @@ -105,6 +108,7 @@ impl RouterHandler for InfoRouter {
ip: req.ip,
port: req.port,
announce_id: req.shard_id,
timestamp: SystemTime::now(),
});

AnnounceShardResponse {
Expand All @@ -126,6 +130,7 @@ impl RouterHandler for InfoRouter {
ip: req.ip,
port: req.port,
announce_id: req.shard_id,
timestamp: SystemTime::now(),
});
AnnounceShardResponse {
writer_number: idx as u16,
Expand Down Expand Up @@ -251,6 +256,26 @@ async fn main() -> Result<()> {
let args = InfoArgs::parse();
let info_router = InfoRouter::new(args.write_shards);
let mut info_server = RouterBuilder::new(info_router, Some(MAIN_INSTANCE_IP_PORT));
let info_router = info_server.get_handler_arc();

tokio::spawn(async move {
let mut interval = time::interval(time::Duration::from_millis(100));
loop {
interval.tick().await;
let mut reader_writers = info_router.reader_writers.lock().unwrap();

reader_writers.iter_mut().for_each(|block| {
block.readers.retain(|reader| {
reader
.timestamp
.elapsed()
.map(|elapsed| elapsed.as_secs() < 2)
.unwrap_or(false)
});
});
}
});

tokio::spawn(async move {
info_server.bind().await?;
info_server.listen().await?;
Expand Down

0 comments on commit 0007c6b

Please sign in to comment.