-
Notifications
You must be signed in to change notification settings - Fork 329
/
map.rs
257 lines (223 loc) · 7.93 KB
/
map.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
use alloc::collections::btree_map::BTreeMap as HashMap;
use core::mem;
use ibc_relayer_types::core::ics02_client::events::NewBlock;
use ibc_relayer_types::core::ics24_host::identifier::ChainId;
use ibc_relayer_types::Height;
use tracing::{debug, trace};
use crate::{
chain::handle::{ChainHandle, ChainHandlePair},
config::Config,
object::Object,
telemetry,
};
use super::{spawn_worker_tasks, WorkerHandle, WorkerId};
/// Manage the lifecycle of [`WorkerHandle`]s associated with [`Object`]s.
#[derive(Debug)]
pub struct WorkerMap {
workers: HashMap<Object, WorkerHandle>,
latest_worker_id: WorkerId,
}
impl Default for WorkerMap {
fn default() -> Self {
Self {
workers: HashMap::new(),
latest_worker_id: WorkerId::new(0),
}
}
}
impl WorkerMap {
/// Create a new worker map, which will spawn workers with
/// the given channel for sending messages back to the
/// [supervisor](crate::supervisor::SupervisorHandle).
pub fn new() -> Self {
Self::default()
}
/// Returns `true` if there is a spawned [`WorkerHandle`] associated with the given [`Object`].
pub fn contains(&self, object: &Object) -> bool {
self.workers.contains_key(object)
}
/// Remove the [`WorkerHandle`] associated with the given [`Object`] from
/// the map and wait for its thread to terminate.
pub fn remove_stopped(&mut self, id: WorkerId, object: Object) -> bool {
match self.workers.remove(&object) {
Some(handle) if handle.id() == id => {
telemetry!(worker, metric_type(&object), -1);
let id = handle.id();
trace!(
worker.id = %id, worker.object = %object.short_name(),
"waiting for worker loop to end"
);
handle.join();
trace!(
worker.id = %id, worker.object = %object.short_name(),
"worker loop has ended"
);
true
}
Some(handle) => {
debug!(
worker.object = %object.short_name(),
"ignoring attempt to remove worker with outdated id {} (current: {})",
id, handle.id()
);
self.workers.insert(object, handle);
false
}
None => {
debug!(
worker.object = %object.short_name(),
"ignoring attempt to remove unknown worker",
);
false
}
}
}
/// Returns all the [`WorkerHandle`] which are interested in new block events originating
/// from the chain with the given [`ChainId`].
/// See: [`Object::notify_new_block`]
pub fn to_notify<'a>(
&'a self,
src_chain_id: &'a ChainId,
) -> impl Iterator<Item = &'a WorkerHandle> {
self.workers.iter().filter_map(move |(o, w)| {
if !w.is_stopped() && o.notify_new_block(src_chain_id) {
Some(w)
} else {
None
}
})
}
pub fn notify_new_block(&self, src_chain_id: &ChainId, height: Height, new_block: NewBlock) {
for worker in self.to_notify(src_chain_id) {
// Ignore send error if the worker task handling
// NewBlock cmd has been terminated.
worker.send_new_block(height, new_block);
}
}
/// Get a handle to the worker in charge of handling events associated
/// with the given [`Object`].
///
/// This function will spawn a new [`WorkerHandle`] if one does not exists already.
pub fn get_or_spawn<Chain: ChainHandle>(
&mut self,
object: Object,
src: Chain,
dst: Chain,
config: &Config,
) -> &WorkerHandle {
if self.workers.contains_key(&object) {
&self.workers[&object]
} else {
let worker = self.spawn_worker(src, dst, &object, config);
self.workers.entry(object).or_insert(worker)
}
}
/// Spawn a new [`WorkerHandle`], only if one does not exists already.
///
/// Returns whether or not the worker was actually spawned.
pub fn spawn<Chain: ChainHandle>(
&mut self,
src: Chain,
dst: Chain,
object: &Object,
config: &Config,
) -> bool {
if !self.workers.contains_key(object) {
let worker = self.spawn_worker(src, dst, object, config);
self.workers.entry(object.clone()).or_insert(worker);
true
} else {
false
}
}
/// Force spawn a worker for the given [`Object`].
fn spawn_worker<Chain: ChainHandle>(
&mut self,
src: Chain,
dst: Chain,
object: &Object,
config: &Config,
) -> WorkerHandle {
telemetry!(worker, metric_type(object), 1);
spawn_worker_tasks(
ChainHandlePair { a: src, b: dst },
self.next_worker_id(),
object.clone(),
config,
)
}
/// Compute the next worker id
fn next_worker_id(&mut self) -> WorkerId {
let id = self.latest_worker_id.next();
self.latest_worker_id = id;
id
}
/// List the [`Object`]s for which there is an associated worker
/// for the given chain.
pub fn objects_for_chain(&self, chain_id: &ChainId) -> Vec<Object> {
self.workers
.keys()
.filter(|o| o.for_chain(chain_id))
.cloned()
.collect()
}
/// List the [`WorkerHandle`]s associated with the given chain.
pub fn workers_for_chain(&self, chain_id: &ChainId) -> Vec<&WorkerHandle> {
self.workers
.iter()
.filter_map(|(o, h)| o.for_chain(chain_id).then_some(h))
.collect()
}
/// Return all the handles to the workers tracked in this map.
pub fn handles(&self) -> impl Iterator<Item = &WorkerHandle> {
self.workers.values()
}
/// Shutdown the worker associated with the given [`Object`], synchronously.
pub fn shutdown_worker(&mut self, object: &Object) {
if let Some(handle) = self.workers.remove(object) {
telemetry!(worker, metric_type(object), -1);
handle.shutdown_and_wait();
}
// Drop handle automatically handles the waiting for tasks to terminate.
}
/// Shut down all the workers, asynchronously.
pub fn shutdown(&mut self) {
let workers = mem::take(&mut self.workers);
for worker in workers.values() {
// Send shutdown signal to all tasks in parallel.
worker.shutdown();
}
}
/// Verify if at least one task of the WorkerHandle is stopped.
/// If it is the case, shutdown all remaining tasks and remove the worker from the WorkerMap.
pub fn clean_stopped_workers(&mut self) {
let stopped_workers: Vec<(WorkerId, Object)> = self
.workers
.iter()
.filter(|(_, worker_handle)| worker_handle.shutdown_stopped_tasks())
.map(|(object, worker_handle)| (worker_handle.id(), object.clone()))
.collect();
for worker in stopped_workers {
self.remove_stopped(worker.0, worker.1);
}
}
}
// Drop handle to send shutdown signals to background tasks in parallel
// before waiting for all of them to terminate.
impl Drop for WorkerMap {
fn drop(&mut self) {
self.shutdown()
}
}
#[cfg(feature = "telemetry")]
fn metric_type(o: &Object) -> ibc_telemetry::state::WorkerType {
use ibc_telemetry::state::WorkerType;
match o {
Object::Client(_) => WorkerType::Client,
Object::Connection(_) => WorkerType::Connection,
Object::Channel(_) => WorkerType::Channel,
Object::Packet(_) => WorkerType::Packet,
Object::Wallet(_) => WorkerType::Wallet,
Object::CrossChainQuery(_) => WorkerType::CrossChainQuery,
}
}