Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Fix file descriptor leak in the postoffice plugin (#1989)
Browse files Browse the repository at this point in the history
* Fix file descritor leak in the postoffice plugin

The loop of inotify events was never ending
keeping a reference to the opened file descriptor.

On each new session a new file descriptor was allocated.
New session is created every time the manager (iml-http-agent) restarts.

Signed-off-by: Igor Pashev <pashev.igor@gmail.com>

* Remove storage of inotify ref

it is moved into the spawned task and kept alive as long as the task is.

Signed-off-by: Joe Grund <jgrund@whamcloud.io>

Co-authored-by: Joe Grund <jgrund@whamcloud.io>
  • Loading branch information
ip1981 and jgrund authored Jun 23, 2020
1 parent 4cc0956 commit d245ebf
Showing 1 changed file with 18 additions and 24 deletions.
42 changes: 18 additions & 24 deletions iml-agent/src/daemon_plugins/postoffice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::{
stream::{StreamExt as _, TryStreamExt},
Future, FutureExt,
};
use inotify::{Inotify, WatchDescriptor, WatchMask};
use inotify::{Inotify, WatchMask};
use std::{
collections::{HashMap, HashSet},
pin::Pin,
Expand All @@ -24,22 +24,16 @@ use stream_cancel::{Trigger, Tripwire};
use tokio::{fs, net::UnixListener, sync::Mutex};
use tokio_util::codec::{BytesCodec, FramedRead};

pub struct POWD(pub Option<WatchDescriptor>);

pub struct PostOffice {
// individual mailbox socket listeners
routes: Arc<Mutex<HashMap<String, Trigger>>>,
inotify: Arc<Mutex<Inotify>>,
wd: Arc<Mutex<POWD>>,
trigger: Option<Trigger>,
}

pub fn create() -> impl DaemonPlugin {
PostOffice {
wd: Arc::new(Mutex::new(POWD(None))),
inotify: Arc::new(Mutex::new(
Inotify::init().expect("Failed to initialize inotify"),
)),
routes: Arc::new(Mutex::new(HashMap::new())),
trigger: None,
}
}

Expand All @@ -59,15 +53,15 @@ impl std::fmt::Debug for PostOffice {
}
}

// Returned trigger should bed drop'd to cause route to stop
// Returned trigger should be dropped to cause route to stop
fn start_route(mailbox: String) -> Trigger {
let (trigger, tripwire) = Tripwire::new();
let addr = socket_name(&mailbox);

let rc = async move {
// remove old unix socket
let _ = fs::remove_file(&addr).await.map_err(|e| {
tracing::error!("Failed to remove file {}: {}", &addr, &e);
tracing::debug!("Failed to remove file {}: {}", &addr, &e);
});
let mut listener = UnixListener::bind(addr.clone()).map_err(|e| {
tracing::error!("Failed to open unix socket {}: {}", &addr, &e);
Expand Down Expand Up @@ -108,10 +102,13 @@ impl DaemonPlugin for PostOffice {
&mut self,
) -> Pin<Box<dyn Future<Output = Result<Output, ImlAgentError>> + Send>> {
let routes = Arc::clone(&self.routes);
let inotify = Arc::clone(&self.inotify);
let wd = Arc::clone(&self.wd);
let mut inotify = Inotify::init().expect("Failed to initialize inotify");
let conf_file = env::get_var("POSTMAN_CONF_PATH");

let (trigger, tripwire) = Tripwire::new();

self.trigger = Some(trigger);

async move {
if let Ok(file) = fs::read_to_string(&conf_file).await {
let itr = file.lines().map(|mb| {
Expand All @@ -127,19 +124,18 @@ impl DaemonPlugin for PostOffice {
.await?;
}

wd.lock().await.0 = inotify
.lock()
.await
inotify
.add_watch(&conf_file, WatchMask::MODIFY)
.map_err(|e| tracing::error!("Failed to watch configuration: {}", e))
.ok();

let watcher = async move {
let mut buffer = [0; 32];
let mut stream = inotify.lock().await.event_stream(&mut buffer)?;
let stream = inotify.event_stream(&mut buffer)?;
let mut events = stream.take_until(tripwire);

while let Some(event_or_error) = stream.next().await {
tracing::debug!("event: {:?}", event_or_error);
while let Some(ev) = events.next().await {
tracing::debug!("inotify event: {:?}", ev);
match fs::read_to_string(&conf_file).await {
Ok(file) => {
let newset: HashSet<String> =
Expand All @@ -161,24 +157,22 @@ impl DaemonPlugin for PostOffice {
Err(e) => {
tracing::error!("Failed to open configuration {}: {}", &conf_file, e)
}
}
};
}
tracing::debug!("Ending Inotify Listen for {}", &conf_file);
Ok::<_, ImlAgentError>(())
};

tokio::spawn(watcher);
Ok(None)
}
.boxed()
}

async fn teardown(&mut self) -> Result<(), ImlAgentError> {
if let Some(wd) = self.wd.lock().await.0.clone() {
let _ = self.inotify.lock().await.rm_watch(wd);
}
// drop all triggers
self.routes.lock().await.clear();

self.trigger.take();
Ok(())
}
}

0 comments on commit d245ebf

Please sign in to comment.