Skip to content

Commit 44057f4

Browse files
Remove reference to the iostack from info fetcher (#1287)
remove reference to the iostack from info fetcher fix fmt Merge branch 'main' into vianney/fix-info-fetcher-holding-ref-to-runtime Add comment for drain Co-authored-by: vianney.ruhlmann <vianney.ruhlmann@datadoghq.com>
1 parent 3ad7d4a commit 44057f4

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

data-pipeline/src/agent_info/fetcher.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,14 @@ impl AgentInfoFetcher {
166166

167167
(fetcher, response_observer)
168168
}
169+
170+
/// Drain message from the trigger channel.
171+
pub fn drain(&mut self) {
172+
// We read only once as the channel has a capacity of 1
173+
if let Some(rx) = &mut self.trigger_rx {
174+
let _ = rx.try_recv();
175+
}
176+
}
169177
}
170178

171179
impl Worker for AgentInfoFetcher {
@@ -272,6 +280,11 @@ impl ResponseObserver {
272280
}
273281
}
274282
}
283+
284+
/// Manually send a message to the trigger channel.
285+
pub fn manual_trigger(&self) {
286+
let _ = self.trigger_tx.try_send(());
287+
}
275288
}
276289

277290
#[cfg(test)]

data-pipeline/src/trace_exporter/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,15 @@ impl TraceExporter {
314314
};
315315
});
316316
}
317-
// Drop runtime to shutdown all threads
317+
// When the info fetcher is paused, the trigger channel keeps a reference to the runtime's
318+
// IoStack as a waker. This prevents the IoStack from being dropped when shutting
319+
// down runtime. By manually sending a message to the trigger channel we trigger the
320+
// waker releasing the reference to the IoStack. Finally we drain the channel to
321+
// avoid triggering a fetch when the info fetcher is restarted.
322+
if let PausableWorker::Paused { worker } = &mut self.workers.lock_or_panic().info {
323+
self.info_response_observer.manual_trigger();
324+
worker.drain();
325+
}
318326
drop(runtime);
319327
}
320328

@@ -462,7 +470,7 @@ impl TraceExporter {
462470
/// 2) It's not guaranteed to not block forever, since the /info endpoint might not be
463471
/// available.
464472
///
465-
/// The `send`` function will check agent_info when running, which will only be available if the
473+
/// The `send` function will check agent_info when running, which will only be available if the
466474
/// fetcher had time to reach to the agent.
467475
/// Since agent_info can enable CSS computation, waiting for this during testing can make
468476
/// snapshots non-deterministic.

0 commit comments

Comments
 (0)