Skip to content

Commit

Permalink
feat: add ready endpoints for workers to enterprise
Browse files Browse the repository at this point in the history
  • Loading branch information
rubenfiszel committed Feb 18, 2025
1 parent 52ad48a commit 0d0ada6
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 45 deletions.
19 changes: 13 additions & 6 deletions backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,21 @@ Windmill Community Edition {GIT_VERSION}
};

let metrics_f = async {
if METRICS_ENABLED.load(std::sync::atomic::Ordering::Relaxed) {
#[cfg(not(feature = "enterprise"))]
let enabled = METRICS_ENABLED.load(std::sync::atomic::Ordering::Relaxed);
#[cfg(not(feature = "enterprise"))]
if enabled {
tracing::error!("Metrics are only available in the EE, ignoring...");

#[cfg(feature = "enterprise")]
windmill_common::serve_metrics(*METRICS_ADDR, _killpill_phase2_rx, num_workers > 0)
.await;
}

#[cfg(all(feature = "enterprise", feature = "prometheus"))]
windmill_common::serve_metrics(
*METRICS_ADDR,
_killpill_phase2_rx,
num_workers > 0,
enabled,
)
.await;

Ok(()) as anyhow::Result<()>
};

Expand Down
34 changes: 23 additions & 11 deletions backend/windmill-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,32 +155,38 @@ pub async fn shutdown_signal(
}

use tokio::sync::RwLock;
#[cfg(feature = "prometheus")]
use tokio::task::JoinHandle;
use utils::rd_string;

#[cfg(feature = "prometheus")]
pub async fn serve_metrics(
addr: SocketAddr,
mut rx: tokio::sync::broadcast::Receiver<()>,
ready_worker_endpoint: bool,
) -> JoinHandle<()> {
use std::sync::atomic::Ordering;

metrics_endpoint: bool,
) -> anyhow::Result<()> {
if !metrics_endpoint && !ready_worker_endpoint {
return Ok(());
}
use axum::{
routing::{get, post},
Router,
};
use hyper::StatusCode;
let router = Router::new()
.route("/metrics", get(metrics))
.route("/reset", post(reset));
let router = Router::new();

let router = if metrics_endpoint {
router
.route("/metrics", get(metrics))
.route("/reset", post(reset))
} else {
router
};

let router = if ready_worker_endpoint {
router.route(
"/ready",
get(|| async {
if IS_READY.load(Ordering::Relaxed) {
if IS_READY.load(std::sync::atomic::Ordering::Relaxed) {
(StatusCode::OK, "ready")
} else {
(StatusCode::INTERNAL_SERVER_ERROR, "not ready")
Expand All @@ -193,8 +199,12 @@ pub async fn serve_metrics(

tokio::spawn(async move {
tracing::info!("Serving metrics at: {addr}");
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
if let Err(e) = axum::serve(listener, router.into_make_service())
let listener = tokio::net::TcpListener::bind(addr).await;
if let Err(e) = listener {
tracing::error!("Error binding to metrics address: {}", e);
return;
}
if let Err(e) = axum::serve(listener.unwrap(), router.into_make_service())
.with_graceful_shutdown(async move {
rx.recv().await.ok();
tracing::info!("Graceful shutdown of metrics");
Expand All @@ -204,6 +214,8 @@ pub async fn serve_metrics(
tracing::error!("Error serving metrics: {}", e);
}
})
.await?;
Ok(())
}

#[cfg(feature = "prometheus")]
Expand Down
1 change: 1 addition & 0 deletions backend/windmill-worker/src/worker_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ pub async fn update_flow_status_after_job_completion_internal(
"error while deleting parallel_monitor_lock: {e:#}"
))
})?;

if r.is_some() {
tracing::info!(
"parallel flow has removed lock on its parent, last ping was {:?}",
Expand Down
26 changes: 26 additions & 0 deletions frontend/src/lib/components/InstanceSetting.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
let renewing = false
let opening = false
let to: string = ''
async function reloadKeyrenewalAttemptInfo() {
latestKeyRenewalAttempt = await SettingService.getLatestKeyRenewalAttempt()
}
Expand Down Expand Up @@ -747,6 +749,30 @@
{:else if setting.fieldType == 'smtp_connect'}
<div class="flex flex-col gap-4 border rounded p-4">
{#if $values[setting.key]}
<div class="flex gap-4"
><input type="email" bind:value={to} placeholder="contact@windmill.dev" />
<Button
disabled={to == ''}
on:click={async () => {
let smtp = $values[setting.key]
await SettingService.testSmtp({
requestBody: {
to,
smtp: {
host: smtp['smtp_host'],
username: smtp['smtp_username'],
password: smtp['smtp_password'],
port: smtp['smtp_port'],
from: smtp['smtp_from'],
tls_implicit: smtp['smtp_tls_implicit'],
disable_tls: smtp['smtp_disable_tls']
}
}
})
sendUserToast('Test email sent')
}}>Test SMTP settings</Button
></div
>
<div>
<label for="smtp_host" class="block text-sm font-medium">Host</label>
<input
Expand Down
28 changes: 0 additions & 28 deletions frontend/src/lib/components/InstanceSettings.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@
oauths['snowflake_oauth'].connect_config = connect_config
}
let to: string = ''
async function sendStats() {
await SettingService.sendStats()
sendUserToast('Usage sent')
Expand Down Expand Up @@ -339,32 +337,6 @@
{/each}
</div>
</div>
{#if category == 'SMTP'}
{@const smtp = $values['smtp_settings']}
<div class="flex gap-4"
><input type="email" bind:value={to} placeholder="contact@windmill.dev" />
<Button
disabled={to == '' || !smtp}
on:click={async () => {
await SettingService.testSmtp({
requestBody: {
to,
smtp: {
host: smtp['smtp_host'],
username: smtp['smtp_username'],
password: smtp['smtp_password'],
port: smtp['smtp_port'],
from: smtp['smtp_from'],
tls_implicit: smtp['smtp_tls_implicit'],
disable_tls: smtp['smtp_disable_tls']
}
}
})
sendUserToast('Test email sent')
}}>Test SMTP settings</Button
></div
>
{/if}
</TabContent>
{/each}
</svelte:fragment>
Expand Down

0 comments on commit 0d0ada6

Please sign in to comment.