Skip to content

Commit e09a2dc

Browse files
committed
Worker heartbeating
1 parent 8c320f1 commit e09a2dc

File tree

6 files changed

+83
-10
lines changed

6 files changed

+83
-10
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,5 +100,6 @@
100100
"node": ">= 18.0.0",
101101
"npm": ">= 7.0.0",
102102
"rustc": ">= 1.53.0"
103-
}
103+
},
104+
"packageManager": "pnpm@10.7.1+sha512.2d92c86b7928dc8284f53494fb4201f983da65f0fb4f0d40baafa5cf628fa31dae3e5968f12466f17df7e97310e30f343a648baea1b9b350685dafafffdf5808"
104105
}

packages/core-bridge/src/runtime.rs

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ macro_rules! enter_sync {
3636
pub fn init(cx: &mut neon::prelude::ModuleContext) -> neon::prelude::NeonResult<()> {
3737
cx.export_function("newRuntime", runtime_new)?;
3838
cx.export_function("runtimeShutdown", runtime_shutdown)?;
39+
// cx.export_function(
40+
// "runtimeGetWorkerHeartbeatIntervalMillis",
41+
// runtime_get_worker_heartbeat_interval_millis,
42+
// )?;
3943

4044
Ok(())
4145
}
@@ -51,6 +55,7 @@ pub struct Runtime {
5155
// For some unknown reason, the otel metrics exporter will go crazy on shutdown in some
5256
// scenarios if we don't hold on to the `CoreOtelMeter` till the `Runtime` finally gets dropped.
5357
_otel_metrics_exporter: Option<Arc<dyn CoreMeter + 'static>>,
58+
// worker_heartbeat_interval_millis: Option<u64>,
5459
}
5560

5661
/// Initialize Core global telemetry and create the tokio runtime required to run Core.
@@ -59,11 +64,13 @@ pub struct Runtime {
5964
pub fn runtime_new(
6065
bridge_options: config::RuntimeOptions,
6166
) -> BridgeResult<OpaqueOutboundHandle<Runtime>> {
62-
let (telemetry_options, metrics_options, logging_options) = bridge_options.try_into()?;
67+
let (telemetry_options, metrics_options, logging_options, worker_heartbeat_interval_millis) =
68+
bridge_options.try_into()?;
6369

6470
// Create core runtime which starts tokio multi-thread runtime
6571
let runtime_options = RuntimeOptionsBuilder::default()
6672
.telemetry_options(telemetry_options)
73+
.heartbeat_interval(worker_heartbeat_interval_millis.map(Duration::from_millis))
6774
.build()
6875
.context("Failed to build runtime options")?;
6976
let mut core_runtime = CoreRuntime::new(runtime_options, TokioRuntimeBuilder::default())
@@ -125,6 +132,7 @@ pub fn runtime_new(
125132
log_exporter_task,
126133
metrics_exporter_task: prom_metrics_exporter_task.map(Arc::new),
127134
_otel_metrics_exporter: otel_metrics_exporter,
135+
// worker_heartbeat_interval_millis: runtime_options.worker_heartbeat_interval.map(|d| d.as_millis() as u64),
128136
}))
129137
}
130138

@@ -138,6 +146,21 @@ pub fn runtime_shutdown(runtime: OpaqueInboundHandle<Runtime>) -> BridgeResult<(
138146
Ok(())
139147
}
140148

149+
// #[js_function]
150+
// pub fn runtime_get_worker_heartbeat_interval_millis(
151+
// runtime: OpaqueInboundHandle<Runtime>,
152+
// ) -> BridgeResult<Option<u32>> {
153+
// runtime
154+
// .borrow()?
155+
// .worker_heartbeat_interval_millis
156+
// .map(u32::try_from)
157+
// .transpose()
158+
// .map_err(|_| BridgeError::TypeError {
159+
// field: None,
160+
// message: "workerHeartbeatIntervalMillis is too large to represent in JavaScript".into(),
161+
// })
162+
// }
163+
141164
/// Drop will handle the cleanup
142165
impl MutableFinalize for Runtime {}
143166

@@ -265,6 +288,7 @@ mod config {
265288
log_exporter: LogExporterOptions,
266289
telemetry: TelemetryOptions,
267290
metrics_exporter: Option<MetricsExporterOptions>,
291+
worker_heartbeat_interval_millis: Option<u64>,
268292
}
269293

270294
#[derive(Debug, Clone, TryFromJs)]
@@ -321,6 +345,7 @@ mod config {
321345
CoreTelemetryOptions,
322346
Option<super::BridgeMetricsExporter>,
323347
super::BridgeLogExporter,
348+
Option<u64>,
324349
)> for RuntimeOptions
325350
{
326351
type Error = BridgeError;
@@ -330,8 +355,16 @@ mod config {
330355
CoreTelemetryOptions,
331356
Option<super::BridgeMetricsExporter>,
332357
super::BridgeLogExporter,
358+
Option<u64>,
333359
)> {
334-
let (telemetry_logger, log_exporter) = match self.log_exporter {
360+
let Self {
361+
log_exporter,
362+
telemetry,
363+
metrics_exporter,
364+
worker_heartbeat_interval_millis,
365+
} = self;
366+
367+
let (telemetry_logger, log_exporter) = match log_exporter {
335368
LogExporterOptions::Console { filter } => (
336369
CoreTelemetryLogger::Console { filter },
337370
BridgeLogExporter::Console,
@@ -351,17 +384,21 @@ mod config {
351384
let mut telemetry_options = TelemetryOptionsBuilder::default();
352385
let telemetry_options = telemetry_options
353386
.logging(telemetry_logger)
354-
.metric_prefix(self.telemetry.metric_prefix)
355-
.attach_service_name(self.telemetry.attach_service_name)
387+
.metric_prefix(telemetry.metric_prefix)
388+
.attach_service_name(telemetry.attach_service_name)
356389
.build()
357390
.context("Failed to build telemetry options")?;
358391

359-
let metrics_exporter = self
360-
.metrics_exporter
392+
let metrics_exporter = metrics_exporter
361393
.map(std::convert::TryInto::try_into)
362394
.transpose()?;
363395

364-
Ok((telemetry_options, metrics_exporter, log_exporter))
396+
Ok((
397+
telemetry_options,
398+
metrics_exporter,
399+
log_exporter,
400+
worker_heartbeat_interval_millis,
401+
))
365402
}
366403
}
367404

packages/core-bridge/src/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ pub fn worker_complete_nexus_task(
295295
.complete_nexus_task(nexus_completion)
296296
.await
297297
.map_err(|err| match err {
298-
CompleteNexusError::NexusNotEnabled {} => {
298+
CompleteNexusError::NexusNotEnabled => {
299299
BridgeError::UnexpectedError(format!("{err}"))
300300
}
301301
CompleteNexusError::MalformedNexusCompletion { reason } => BridgeError::TypeError {

packages/core-bridge/ts/native.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ export type JsonString<_T> = string;
4040
// Runtime
4141
////////////////////////////////////////////////////////////////////////////////////////////////////
4242

43-
export declare function newRuntime(telemOptions: RuntimeOptions): Runtime;
43+
export declare function newRuntime(runtimeOptions: RuntimeOptions): Runtime;
4444

4545
export declare function runtimeShutdown(runtime: Runtime): void;
4646

47+
export declare function runtimeGetWorkerHeartbeatIntervalMillis(runtime: Runtime): number | null;
48+
4749
export interface Runtime {
4850
type: 'runtime';
4951
}
@@ -52,6 +54,7 @@ export type RuntimeOptions = {
5254
logExporter: LogExporterOptions;
5355
telemetry: TelemetryOptions;
5456
metricsExporter: MetricExporterOptions;
57+
workerHeartbeatIntervalMillis: Option<number>;
5558
};
5659

5760
export type TelemetryOptions = {

packages/test/src/test-bridge.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,20 @@ test("Stopping Worker after creating another runtime doesn't fail", async (t) =>
227227
t.pass();
228228
});
229229

230+
test('Creating runtime with heartbeat enabled plumbs heartbeat duration', (t) => {
231+
const runtime = native.newRuntime({
232+
...GenericConfigs.runtime.basic,
233+
workerHeartbeatIntervalMillis: 100,
234+
});
235+
t.is(native.runtimeGetWorkerHeartbeatIntervalMillis(runtime), 100);
236+
237+
const runtime1 = native.newRuntime({
238+
...GenericConfigs.runtime.basic,
239+
workerHeartbeatIntervalMillis: null,
240+
});
241+
t.is(native.runtimeGetWorkerHeartbeatIntervalMillis(runtime1), null);
242+
});
243+
230244
// Sample configs ///////////////////////////////////////////////////////////////////////////////////
231245

232246
const GenericConfigs = {
@@ -241,6 +255,7 @@ const GenericConfigs = {
241255
attachServiceName: false,
242256
},
243257
metricsExporter: null,
258+
workerHeartbeatIntervalMillis: null,
244259
} satisfies native.RuntimeOptions,
245260
},
246261
client: {

packages/worker/src/runtime-options.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,14 @@ export interface RuntimeOptions {
3232
*/
3333
telemetryOptions?: TelemetryOptions;
3434

35+
/**
36+
* Interval for worker heartbeats. `null` disables heartbeating.
37+
*
38+
* @format number of milliseconds or {@link https://www.npmjs.com/package/ms | ms-formatted string}
39+
* @default 60000 (60 seconds)
40+
*/
41+
workerHeartbeatInterval?: Duration | null;
42+
3543
/**
3644
* Automatically shutdown workers on any of these signals.
3745
*
@@ -367,6 +375,14 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions
367375
const { metrics, noTemporalPrefixForMetrics } = options.telemetryOptions ?? {}; // eslint-disable-line deprecation/deprecation
368376
const [logger, logExporter] = compileLoggerOptions(options);
369377

378+
// Handle worker heartbeat interval - default to 60s, allow null to disable
379+
let workerHeartbeatIntervalMillis: number | null;
380+
if (options.workerHeartbeatInterval === null) {
381+
workerHeartbeatIntervalMillis = null;
382+
} else {
383+
workerHeartbeatIntervalMillis = msToNumber(options.workerHeartbeatInterval ?? '60s');
384+
}
385+
370386
return {
371387
logger,
372388
shutdownSignals: options.shutdownSignals ?? ['SIGINT', 'SIGTERM', 'SIGQUIT', 'SIGUSR2'],
@@ -376,6 +392,7 @@ export function compileOptions(options: RuntimeOptions): CompiledRuntimeOptions
376392
metricPrefix: metrics?.metricPrefix ?? (noTemporalPrefixForMetrics ? '' : 'temporal_'),
377393
attachServiceName: metrics?.attachServiceName ?? true,
378394
},
395+
workerHeartbeatIntervalMillis,
379396
metricsExporter:
380397
metrics && isPrometheusMetricsExporter(metrics)
381398
? ({

0 commit comments

Comments
 (0)