Skip to content

Commit 28f8b2b

Browse files
authored
Only set trace subscriber when useful (#602)
1 parent 6f11bbf commit 28f8b2b

File tree

12 files changed

+42
-31
lines changed

12 files changed

+42
-31
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[workspace]
22
members = ["core", "client", "core-api", "fsm", "test-utils", "sdk-core-protos", "sdk"]
3+
resolver = "2"
34

45
[workspace.dependencies]
56
tonic = "0.9"

core-api/src/telemetry/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ impl MetricAttributes {
136136
Arc::make_mut(kvs).extend(new_kvs.into_iter().map(Into::into));
137137
}
138138
MetricAttributes::Lang(ref mut attrs, ..) => {
139-
attrs.new_attributes.extend(new_kvs.into_iter());
139+
attrs.new_attributes.extend(new_kvs);
140140
}
141141
}
142142
}
@@ -149,7 +149,7 @@ pub struct MetricsAttributesOptions {
149149
}
150150
impl MetricsAttributesOptions {
151151
pub fn extend(&mut self, new_kvs: impl IntoIterator<Item = MetricKeyValue>) {
152-
self.attributes.extend(new_kvs.into_iter())
152+
self.attributes.extend(new_kvs)
153153
}
154154
}
155155

core/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ itertools = "0.11"
4343
lazy_static = "1.4"
4444
lru = "0.11"
4545
mockall = "0.11"
46-
nix = { version = "0.26", optional = true }
46+
nix = { version = "0.27", optional = true, features = ["process", "signal"] }
4747
once_cell = "1.5"
4848
opentelemetry = { workspace = true, features = ["rt-tokio", "metrics"] }
4949
opentelemetry_sdk = { version = "0.20", features = ["metrics"] }
@@ -60,7 +60,7 @@ ringbuf = "0.3"
6060
rmp-serde = { version = "1.1", optional = true }
6161
serde = "1.0"
6262
serde_json = "1.0"
63-
siphasher = "0.3"
63+
siphasher = "1.0"
6464
slotmap = "1.0"
6565
tar = { version = "0.4", optional = true }
6666
thiserror = "1.0"

core/src/internal_flags.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl InternalFlags {
7575
..
7676
} = self
7777
{
78-
lang_since_last_complete.extend(flags.into_iter());
78+
lang_since_last_complete.extend(flags);
7979
}
8080
}
8181

core/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,9 @@ impl CoreRuntime {
223223
let runtime = tokio_builder
224224
.enable_all()
225225
.on_thread_start(move || {
226-
set_trace_subscriber_for_current_thread(subscriber.clone());
226+
if let Some(sub) = subscriber.as_ref() {
227+
set_trace_subscriber_for_current_thread(sub.clone());
228+
}
227229
})
228230
.build()?;
229231
let _rg = runtime.enter();
@@ -249,7 +251,9 @@ impl CoreRuntime {
249251
/// If there is no currently active Tokio runtime
250252
pub fn new_assume_tokio_initialized_telem(telemetry: TelemetryInstance) -> Self {
251253
let runtime_handle = tokio::runtime::Handle::current();
252-
set_trace_subscriber_for_current_thread(telemetry.trace_subscriber());
254+
if let Some(sub) = telemetry.trace_subscriber() {
255+
set_trace_subscriber_for_current_thread(sub);
256+
}
253257
Self {
254258
telemetry,
255259
runtime: None,

core/src/telemetry/log_export.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ mod tests {
166166
.build()
167167
.unwrap();
168168
let instance = telemetry_init(opts).unwrap();
169-
let _g = tracing::subscriber::set_default(instance.trace_subscriber.clone());
169+
let _g = tracing::subscriber::set_default(instance.trace_subscriber().unwrap().clone());
170170

171171
let top_span = span!(Level::INFO, "yayspan", huh = "wat");
172172
let _guard = top_span.enter();

core/src/telemetry/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ mod tests {
813813
let no_op_subscriber = Arc::new(NoSubscriber::new());
814814
let call_buffer = Arc::new(MetricsCallBuffer::new(100));
815815
let telem_instance = TelemetryInstance::new(
816-
no_op_subscriber,
816+
Some(no_op_subscriber),
817817
None,
818818
METRIC_PREFIX.to_string(),
819819
Some(call_buffer.clone()),

core/src/telemetry/mod.rs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,15 @@ pub struct TelemetryInstance {
5050
metric_prefix: String,
5151
logs_out: Option<Mutex<CoreLogsOut>>,
5252
metrics: Option<Arc<dyn CoreMeter + 'static>>,
53-
trace_subscriber: Arc<dyn Subscriber + Send + Sync>,
53+
/// The tracing subscriber which is associated with this telemetry instance. May be `None` if
54+
/// the user has not opted into any tracing configuration.
55+
trace_subscriber: Option<Arc<dyn Subscriber + Send + Sync>>,
5456
attach_service_name: bool,
5557
}
5658

5759
impl TelemetryInstance {
5860
fn new(
59-
trace_subscriber: Arc<dyn Subscriber + Send + Sync>,
61+
trace_subscriber: Option<Arc<dyn Subscriber + Send + Sync>>,
6062
logs_out: Option<Mutex<CoreLogsOut>>,
6163
metric_prefix: String,
6264
metrics: Option<Arc<dyn CoreMeter + 'static>>,
@@ -73,8 +75,8 @@ impl TelemetryInstance {
7375

7476
/// Return the trace subscriber associated with the telemetry options/instance. Can be used
7577
/// to manually set the default for a thread or globally using the `tracing` crate, or with
76-
/// [set_trace_subscriber_for_current_thread]
77-
pub fn trace_subscriber(&self) -> Arc<dyn Subscriber + Send + Sync> {
78+
/// [set_trace_subscriber_for_current_thread].
79+
pub fn trace_subscriber(&self) -> Option<Arc<dyn Subscriber + Send + Sync>> {
7880
self.trace_subscriber.clone()
7981
}
8082

@@ -172,7 +174,7 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
172174
let mut forward_layer = None;
173175
// ===================================
174176

175-
if let Some(ref logger) = opts.logging {
177+
let tracing_sub = opts.logging.map(|logger| {
176178
match logger {
177179
Logger::Console { filter } => {
178180
// This is silly dupe but can't be avoided without boxing.
@@ -206,18 +208,18 @@ pub fn telemetry_init(opts: TelemetryOptions) -> Result<TelemetryInstance, anyho
206208
forward_layer = Some(export_layer.with_filter(EnvFilter::new(filter)));
207209
}
208210
};
209-
};
211+
let reg = tracing_subscriber::registry()
212+
.with(console_pretty_layer)
213+
.with(console_compact_layer)
214+
.with(forward_layer);
210215

211-
let reg = tracing_subscriber::registry()
212-
.with(console_pretty_layer)
213-
.with(console_compact_layer)
214-
.with(forward_layer);
215-
216-
#[cfg(feature = "tokio-console")]
217-
let reg = reg.with(console_subscriber::spawn());
216+
#[cfg(feature = "tokio-console")]
217+
let reg = reg.with(console_subscriber::spawn());
218+
Arc::new(reg) as Arc<dyn Subscriber + Send + Sync>
219+
});
218220

219221
Ok(TelemetryInstance::new(
220-
Arc::new(reg),
222+
tracing_sub,
221223
logs_out,
222224
opts.metric_prefix,
223225
opts.metrics,
@@ -234,7 +236,9 @@ pub fn telemetry_init_global(opts: TelemetryOptions) -> Result<(), anyhow::Error
234236
.is_ok()
235237
{
236238
let ti = telemetry_init(opts)?;
237-
tracing::subscriber::set_global_default(ti.trace_subscriber())?;
239+
if let Some(ts) = ti.trace_subscriber() {
240+
tracing::subscriber::set_global_default(ts)?;
241+
}
238242
}
239243
Ok(())
240244
}

core/src/worker/workflow/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ impl Workflows {
185185
let (start_polling_tx, start_polling_rx) = oneshot::channel();
186186
// We must spawn a task to constantly poll the activation stream, because otherwise
187187
// activation completions would not cause anything to happen until the next poll.
188-
let tracing_sub = telem_instance.map(|ti| ti.trace_subscriber());
188+
let tracing_sub = telem_instance.and_then(|ti| ti.trace_subscriber());
189189
let processing_task = thread::spawn(move || {
190190
if let Some(ts) = tracing_sub {
191191
set_trace_subscriber_for_current_thread(ts);
@@ -620,7 +620,7 @@ impl Workflows {
620620
}
621621
let with_permits = reserved_act_permits
622622
.into_iter()
623-
.zip(eager_acts.into_iter())
623+
.zip(eager_acts)
624624
.map(|(permit, resp)| TrackedPermittedTqResp { permit, resp });
625625
if with_permits.len() > 0 {
626626
debug!(

core/src/worker/workflow/workflow_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ impl WFStream {
187187
}
188188
};
189189

190-
activations.extend(maybe_act.into_iter());
190+
activations.extend(maybe_act);
191191
activations.extend(state.reconcile_buffered());
192192

193193
// Always flush *after* actually handling the input, as this allows LA sink

0 commit comments

Comments
 (0)