diff --git a/infra/tf/grafana/grafana_dashboards/chirp-workflow.json b/infra/tf/grafana/grafana_dashboards/chirp-workflow.json new file mode 100644 index 0000000000..0450eaf2e7 --- /dev/null +++ b/infra/tf/grafana/grafana_dashboards/chirp-workflow.json @@ -0,0 +1,1121 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 1, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMax": 5, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 0 + }, + "id": 5, + "options": { + "legend": { + "calcs": ["lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "rivet_chirp_workflow_active{workflow_name=~\"[[workflow_name]]\"}", + "instant": false, + "legendFormat": "{{workflow_name}}", + "range": true, + "refId": "A" + } + ], + "title": "Running Workflows", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMax": 5, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 0 + }, + "id": 3, + "options": { + "legend": { + "calcs": ["lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "rivet_chirp_workflow_sleeping{workflow_name=~\"[[workflow_name]]\"}", + "instant": false, + "legendFormat": "{{workflow_name}}", + "range": true, + "refId": "A" + } + ], + "title": "Sleeping Workflows", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMax": 5, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 8 + }, + "id": 1, + "options": { + "legend": { + "calcs": ["lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum by (workflow_name) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})", + "instant": false, + "legendFormat": "{{workflow_name}}", + "range": true, + "refId": "A" + } + ], + "title": "Dead Workflows", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMax": 5, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 8 + }, + "id": 6, + "options": { + "legend": { + "calcs": ["lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum by (error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})", + "instant": false, + "legendFormat": "{{error_code}}", + "range": true, + "refId": "A" + } + ], + "title": "Dead Workflow Errors", + "type": "timeseries" + }, + { + "cards": {}, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateOranges", + "exponent": 0.5, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "custom": { + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "scaleDistribution": { + "type": "linear" + } + } + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 0, + "y": 16 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 12, + "interval": "15s", + "legend": { + "show": false + }, + "options": { + "calculate": false, + "calculation": { + "xBuckets": { + "mode": "size" + } + }, + "cellGap": 0, + "color": { + "exponent": 0.5, + "fill": "dark-orange", + "mode": "scheme", + "reverse": false, + "scale": "exponential", + "scheme": "RdBu", + "steps": 64 + }, + "exemplars": { + "color": "rgba(255,0,255,0.7)" + }, + "filterValues": { + "le": 1e-9 + }, + "legend": { + "show": true + }, + "rowsFrame": { + "layout": "auto" + }, + "tooltip": { + "mode": "single", + "showColorScale": false, + "yHistogram": true + }, + "yAxis": { + "axisPlacement": "left", + "max": "60", + "min": 0, + "reverse": false, + "unit": "s" + } + }, + "pluginVersion": "10.4.1", + "reverseYBuckets": false, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum(increase(rivet_chirp_workflow_activity_duration_bucket{workflow_name=~\"[[workflow_name]]\"} [$__rate_interval])) by (le)", + "format": "heatmap", + "legendFormat": "{{le}}", + "range": true, + "refId": "A" + } + ], + "title": "Activity Duration", + "tooltip": { + "show": true, + "showHistogram": true + }, + "type": "heatmap", + "xAxis": { + "show": true + }, + "yAxis": { + "format": "s", + "logBase": 2, + "max": "60", + "min": "0", + "show": true + }, + "yBucketBound": "auto" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 9, + "w": 12, + "x": 12, + "y": 16 + }, + "id": 9, + "interval": "15s", + "options": { + "legend": { + "calcs": ["mean"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Mean", + "sortDesc": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum(rate(rivet_chirp_workflow_activity_errors{workflow_name=~\"[[workflow_name]]\"}[$__rate_interval])) by (activity_name, error_code)", + "legendFormat": "{{activity_name}}: {{error_code}}", + "range": true, + "refId": "A" + } + ], + "title": "Activity Error Rate", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMax": 5, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 25 + }, + "id": 4, + "options": { + "legend": { + "calcs": ["lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": true, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "rivet_chirp_workflow_signal_pending", + "instant": false, + "legendFormat": "{{signal_name}}", + "range": true, + "refId": "A" + } + ], + "title": "Pending Signals", + "type": "timeseries" + }, + { + "cards": {}, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateOranges", + "exponent": 0.5, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "custom": { + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "scaleDistribution": { + "type": "linear" + } + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 25 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 10, + "interval": "15s", + "legend": { + "show": false + }, + "options": { + "calculate": false, + "calculation": { + "xBuckets": { + "mode": "size" + } + }, + "cellGap": 0, + "color": { + "exponent": 0.5, + "fill": "dark-orange", + "mode": "scheme", + "reverse": false, + "scale": "exponential", + "scheme": "RdBu", + "steps": 64 + }, + "exemplars": { + "color": "rgba(255,0,255,0.7)" + }, + "filterValues": { + "le": 1e-9 + }, + "legend": { + "show": true + }, + "rowsFrame": { + "layout": "auto" + }, + "tooltip": { + "mode": "single", + "showColorScale": false, + "yHistogram": true + }, + "yAxis": { + "axisPlacement": "left", + "max": "60", + "min": 0, + "reverse": false, + "unit": "s" + } + }, + "pluginVersion": "10.4.1", + "reverseYBuckets": false, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum(increase(rivet_chirp_workflow_signal_recv_lag_bucket{workflow_name=~\"[[workflow_name]]\"} [$__rate_interval])) by (le)", + "format": "heatmap", + "legendFormat": "{{le}}", + "range": true, + "refId": "A" + } + ], + "title": "Signal Receive Lag", + "tooltip": { + "show": true, + "showHistogram": true + }, + "type": "heatmap", + "xAxis": { + "show": true + }, + "yAxis": { + "format": "s", + "logBase": 2, + "max": "60", + "min": "0", + "show": true + }, + "yBucketBound": "auto" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "axisSoftMax": 5, + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 33 + }, + "id": 2, + "options": { + "legend": { + "calcs": ["lastNotNull"], + "displayMode": "table", + "placement": "bottom", + "showLegend": false, + "sortBy": "Last *", + "sortDesc": true + }, + "tooltip": { + "mode": "multi", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "rivet_chirp_workflow_worker_active", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Active Workers", + "type": "timeseries" + }, + { + "cards": {}, + "color": { + "cardColor": "#b4ff00", + "colorScale": "sqrt", + "colorScheme": "interpolateOranges", + "exponent": 0.5, + "mode": "spectrum" + }, + "dataFormat": "tsbuckets", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "custom": { + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "scaleDistribution": { + "type": "linear" + } + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 33 + }, + "heatmap": {}, + "hideZeroBuckets": true, + "highlightCards": true, + "id": 11, + "interval": "15s", + "legend": { + "show": false + }, + "options": { + "calculate": false, + "calculation": { + "xBuckets": { + "mode": "size" + } + }, + "cellGap": 0, + "color": { + "exponent": 0.5, + "fill": "dark-orange", + "mode": "scheme", + "reverse": false, + "scale": "exponential", + "scheme": "RdBu", + "steps": 64 + }, + "exemplars": { + "color": "rgba(255,0,255,0.7)" + }, + "filterValues": { + "le": 1e-9 + }, + "legend": { + "show": true + }, + "rowsFrame": { + "layout": "auto" + }, + "tooltip": { + "mode": "single", + "showColorScale": false, + "yHistogram": true + }, + "yAxis": { + "axisPlacement": "left", + "max": "60", + "min": 0, + "reverse": false, + "unit": "s" + } + }, + "pluginVersion": "10.4.1", + "reverseYBuckets": false, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum(increase(rivet_chirp_workflow_message_recv_lag_bucket [$__rate_interval])) by (le)", + "format": "heatmap", + "legendFormat": "{{le}}", + "range": true, + "refId": "A" + } + ], + "title": "Message Receive Lag", + "tooltip": { + "show": true, + "showHistogram": true + }, + "type": "heatmap", + "xAxis": { + "show": true + }, + "yAxis": { + "format": "s", + "logBase": 2, + "max": "60", + "min": "0", + "show": true + }, + "yBucketBound": "auto" + } + ], + "refresh": "5s", + "schemaVersion": 39, + "tags": ["chirp"], + "templating": { + "list": [ + { + "current": { + "selected": true, + "text": ["All"], + "value": ["$__all"] + }, + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "definition": "label_values(rivet_chirp_workflow_total,workflow_name)", + "hide": 0, + "includeAll": true, + "label": "Workflow Name", + "multi": true, + "name": "workflow_name", + "options": [], + "query": { + "qryType": 1, + "query": "label_values(rivet_chirp_workflow_total,workflow_name)", + "refId": "PrometheusVariableQueryEditor-VariableQuery" + }, + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "sort": 0, + "type": "query" + } + ] + }, + "time": { + "from": "now-5m", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Chirp / Workflow", + "uid": "636d22f9-d18f-4086-8b45-7c50886a105c", + "version": 7, + "weekStart": "" +} diff --git a/lib/chirp-workflow/core/Cargo.toml b/lib/chirp-workflow/core/Cargo.toml index 8f5bc697d5..d2458a7a3d 100644 --- a/lib/chirp-workflow/core/Cargo.toml +++ b/lib/chirp-workflow/core/Cargo.toml @@ -14,6 +14,7 @@ formatted-error = { path = "../../formatted-error" } futures-util = "0.3" global-error = { path = "../../global-error" } indoc = "2.0.5" +lazy_static = "1.4" prost = "0.12.4" prost-types = "0.12.4" rand = "0.8.5" diff --git a/lib/chirp-workflow/core/src/ctx/listen.rs b/lib/chirp-workflow/core/src/ctx/listen.rs index d8a2b0fbea..d38f319333 100644 --- a/lib/chirp-workflow/core/src/ctx/listen.rs +++ b/lib/chirp-workflow/core/src/ctx/listen.rs @@ -32,6 +32,11 @@ impl<'a> ListenCtx<'a> { return Err(WorkflowError::NoSignalFound(Box::from(signal_names))); }; + let recv_lag = (rivet_util::timestamp::now() as f64 - signal.create_ts as f64) / 1000.; + crate::metrics::SIGNAL_RECV_LAG + .with_label_values(&[&self.ctx.name(), &signal.signal_name]) + .observe(recv_lag); + tracing::info!( workflow_name=%self.ctx.name(), workflow_id=%self.ctx.workflow_id(), diff --git a/lib/chirp-workflow/core/src/ctx/message.rs b/lib/chirp-workflow/core/src/ctx/message.rs index 3201adf3d0..37da81121c 100644 --- a/lib/chirp-workflow/core/src/ctx/message.rs +++ b/lib/chirp-workflow/core/src/ctx/message.rs @@ -322,6 +322,12 @@ impl MessageCtx { let message = if let Some(message_buf) = message_buf { let message = ReceivedMessage::::deserialize(message_buf.as_slice())?; tracing::info!(?message, "immediate read tail message"); + + let recv_lag = (rivet_util::timestamp::now() as f64 - message.ts as f64) / 1000.; + crate::metrics::MESSAGE_RECV_LAG + .with_label_values(&[M::NAME]) + .observe(recv_lag); + Some(message) } else { tracing::info!("no tail message to read"); @@ -520,6 +526,11 @@ where let message = ReceivedMessage::::deserialize(&nats_message.payload[..])?; tracing::info!(?message, "received message"); + let recv_lag = (rivet_util::timestamp::now() as f64 - message.ts as f64) / 1000.; + crate::metrics::MESSAGE_RECV_LAG + .with_label_values(&[M::NAME]) + .observe(recv_lag); + return Ok(message); } diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 14e7b321b5..3bcb416e3e 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Instant}; use global_error::{GlobalError, GlobalResult}; use serde::{de::DeserializeOwned, Serialize}; @@ -15,9 +15,10 @@ use crate::{ executable::{closure, AsyncResult, Executable}, listen::{CustomListener, Listen}, message::Message, + metrics, registry::RegistryHandle, signal::Signal, - util::Location, + util::{GlobalErrorExt, Location}, workflow::{Workflow, WorkflowInput}, }; @@ -211,7 +212,7 @@ impl WorkflowCtx { // Retry the workflow if its recoverable let deadline_ts = if let Some(deadline_ts) = err.backoff() { Some(deadline_ts) - } else if err.is_recoverable() { + } else if err.is_retryable() { Some(rivet_util::timestamp::now() + RETRY_TIMEOUT_MS as i64) } else { None @@ -225,11 +226,14 @@ impl WorkflowCtx { // finish. This workflow will be retried when the sub workflow completes let wake_sub_workflow = err.sub_workflow(); - if deadline_ts.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some() - { + if err.is_recoverable() && !err.is_retryable() { tracing::info!(name=%self.name, id=%self.workflow_id, ?err, "workflow sleeping"); } else { tracing::error!(name=%self.name, id=%self.workflow_id, ?err, "workflow error"); + + metrics::WORKFLOW_ERRORS + .with_label_values(&[&self.name, err.to_string().as_str()]) + .inc(); } let err_str = err.to_string(); @@ -288,10 +292,14 @@ impl WorkflowCtx { A::NAME, ); + let start_instant = Instant::now(); + let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input)) .await .map_err(|_| WorkflowError::ActivityTimeout); + let dt = start_instant.elapsed().as_secs_f64(); + match res { Ok(Ok(output)) => { tracing::debug!("activity success"); @@ -313,14 +321,20 @@ impl WorkflowCtx { ) .await?; + metrics::ACTIVITY_DURATION + .with_label_values(&[&self.name, A::NAME, ""]) + .observe(dt); + Ok(output) } Ok(Err(err)) => { tracing::debug!(?err, "activity error"); - // Write error (failed state) + let err_str = err.to_string(); let input_val = serde_json::to_value(input).map_err(WorkflowError::SerializeActivityInput)?; + + // Write error (failed state) self.db .commit_workflow_activity_event( self.workflow_id, @@ -328,18 +342,29 @@ impl WorkflowCtx { activity_id, create_ts, input_val, - Err(&err.to_string()), + Err(&err_str), self.loop_location(), ) .await?; + if !err.is_workflow_recoverable() { + metrics::ACTIVITY_ERRORS + .with_label_values(&[&self.name, A::NAME, &err_str]) + .inc(); + } + metrics::ACTIVITY_DURATION + .with_label_values(&[&self.name, A::NAME, &err_str]) + .observe(dt); + Err(WorkflowError::ActivityFailure(err, 0)) } Err(err) => { tracing::debug!("activity timeout"); + let err_str = err.to_string(); let input_val = serde_json::to_value(input).map_err(WorkflowError::SerializeActivityInput)?; + self.db .commit_workflow_activity_event( self.workflow_id, @@ -347,11 +372,18 @@ impl WorkflowCtx { activity_id, create_ts, input_val, - Err(&err.to_string()), + Err(&err_str), self.loop_location(), ) .await?; + metrics::ACTIVITY_ERRORS + .with_label_values(&[&self.name, A::NAME, &err_str]) + .inc(); + metrics::ACTIVITY_DURATION + .with_label_values(&[&self.name, A::NAME, &err_str]) + .observe(dt); + Err(err) } } diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index 2caa1a20eb..7e9307b56d 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -236,6 +236,7 @@ pub struct SignalRow { pub signal_id: Uuid, pub signal_name: String, pub body: serde_json::Value, + pub create_ts: i64, } #[derive(sqlx::FromRow)] diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index b03eefef35..16f60e37d8 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -103,11 +103,11 @@ impl Database for DatabasePostgres { self.query(|| async { sqlx::query(indoc!( " - INSERT INTO db_workflow.workflows ( - workflow_id, workflow_name, create_ts, ray_id, tags, input, wake_immediate - ) - VALUES ($1, $2, $3, $4, $5, $6, true) - ", + INSERT INTO db_workflow.workflows ( + workflow_id, workflow_name, create_ts, ray_id, tags, input, wake_immediate + ) + VALUES ($1, $2, $3, $4, $5, $6, true) + ", )) .bind(workflow_id) .bind(workflow_name) @@ -361,10 +361,10 @@ impl Database for DatabasePostgres { self.query(|| async { sqlx::query(indoc!( " - UPDATE db_workflow.workflows - SET output = $2 - WHERE workflow_id = $1 - ", + UPDATE db_workflow.workflows + SET output = $2 + WHERE workflow_id = $1 + ", )) .bind(workflow_id) .bind(output) @@ -390,16 +390,16 @@ impl Database for DatabasePostgres { self.query(|| async { sqlx::query(indoc!( " - UPDATE db_workflow.workflows - SET - worker_instance_id = NULL, - wake_immediate = $2, - wake_deadline_ts = $3, - wake_signals = $4, - wake_sub_workflow_id = $5, - error = $6 - WHERE workflow_id = $1 - ", + UPDATE db_workflow.workflows + SET + worker_instance_id = NULL, + wake_immediate = $2, + wake_deadline_ts = $3, + wake_signals = $4, + wake_sub_workflow_id = $5, + error = $6 + WHERE workflow_id = $1 + ", )) .bind(workflow_id) .bind(immediate) @@ -426,10 +426,10 @@ impl Database for DatabasePostgres { self.query(|| async { sqlx::query(indoc!( " - UPDATE db_workflow.workflows - SET tags = $2 - WHERE workflow_id = $1 - ", + UPDATE db_workflow.workflows + SET tags = $2 + WHERE workflow_id = $1 + ", )) .bind(workflow_id) .bind(tags) @@ -478,7 +478,6 @@ impl Database for DatabasePostgres { .bind(activity_id.input_hash.to_le_bytes()) .bind(&input) .bind(&output) - .bind(rivet_util::timestamp::now()) .bind(create_ts) .bind(loop_location.map(|l| l.iter().map(|x| *x as i64).collect::>())) .execute(&mut *self.conn().await?) @@ -491,30 +490,30 @@ impl Database for DatabasePostgres { self.query(|| async { sqlx::query(indoc!( " - WITH - event AS ( - INSERT INTO db_workflow.workflow_activity_events ( - workflow_id, - location, - activity_name, - input_hash, - input, - create_ts, - loop_location - ) - VALUES ($1, $2, $3, $4, $5, $7, $8) - ON CONFLICT (workflow_id, location) DO NOTHING - RETURNING 1 - ), - err AS ( - INSERT INTO db_workflow.workflow_activity_errors ( - workflow_id, location, activity_name, error, ts + WITH + event AS ( + INSERT INTO db_workflow.workflow_activity_events ( + workflow_id, + location, + activity_name, + input_hash, + input, + create_ts, + loop_location + ) + VALUES ($1, $2, $3, $4, $5, $7, $8) + ON CONFLICT (workflow_id, location) DO NOTHING + RETURNING 1 + ), + err AS ( + INSERT INTO db_workflow.workflow_activity_errors ( + workflow_id, location, activity_name, error, ts + ) + VALUES ($1, $2, $3, $6, $9) + RETURNING 1 ) - VALUES ($1, $2, $3, $6, $9) - RETURNING 1 - ) - SELECT 1 - ", + SELECT 1 + ", )) .bind(workflow_id) .bind(location.iter().map(|x| *x as i64).collect::>()) @@ -547,62 +546,62 @@ impl Database for DatabasePostgres { .query(|| async { sqlx::query_as::<_, SignalRow>(indoc!( " - WITH - -- Finds the oldest signal matching the signal name filter in either the normal signals table - -- or tagged signals table - next_signal AS ( - SELECT false AS tagged, signal_id, create_ts, signal_name, body - FROM db_workflow.signals - WHERE - workflow_id = $1 AND - signal_name = ANY($2) AND - ack_ts IS NULL - UNION ALL - SELECT true AS tagged, signal_id, create_ts, signal_name, body - FROM db_workflow.tagged_signals - WHERE - signal_name = ANY($2) AND - tags <@ (SELECT tags FROM db_workflow.workflows WHERE workflow_id = $1) AND - ack_ts IS NULL - ORDER BY create_ts ASC - LIMIT 1 - ), - -- If the next signal is not tagged, acknowledge it with this statement - ack_signal AS ( - UPDATE db_workflow.signals - SET ack_ts = $4 - WHERE signal_id = ( - SELECT signal_id FROM next_signal WHERE tagged = false - ) - RETURNING 1 - ), - -- If the next signal is tagged, acknowledge it with this statement - ack_tagged_signal AS ( - UPDATE db_workflow.tagged_signals - SET ack_ts = $4 - WHERE signal_id = ( - SELECT signal_id FROM next_signal WHERE tagged = true - ) - RETURNING 1 - ), - -- After acking the signal, add it to the events table - insert_event AS ( - INSERT INTO db_workflow.workflow_signal_events ( - workflow_id, location, signal_id, signal_name, body, ack_ts, loop_location - ) - SELECT - $1 AS workflow_id, - $3 AS location, - signal_id, - signal_name, - body, - $4 AS ack_ts, - $5 AS loop_location - FROM next_signal - RETURNING 1 - ) - SELECT * FROM next_signal - ", + WITH + -- Finds the oldest signal matching the signal name filter in either the normal signals table + -- or tagged signals table + next_signal AS ( + SELECT false AS tagged, signal_id, create_ts, signal_name, body + FROM db_workflow.signals + WHERE + workflow_id = $1 AND + signal_name = ANY($2) AND + ack_ts IS NULL + UNION ALL + SELECT true AS tagged, signal_id, create_ts, signal_name, body + FROM db_workflow.tagged_signals + WHERE + signal_name = ANY($2) AND + tags <@ (SELECT tags FROM db_workflow.workflows WHERE workflow_id = $1) AND + ack_ts IS NULL + ORDER BY create_ts ASC + LIMIT 1 + ), + -- If the next signal is not tagged, acknowledge it with this statement + ack_signal AS ( + UPDATE db_workflow.signals + SET ack_ts = $4 + WHERE signal_id = ( + SELECT signal_id FROM next_signal WHERE tagged = false + ) + RETURNING 1 + ), + -- If the next signal is tagged, acknowledge it with this statement + ack_tagged_signal AS ( + UPDATE db_workflow.tagged_signals + SET ack_ts = $4 + WHERE signal_id = ( + SELECT signal_id FROM next_signal WHERE tagged = true + ) + RETURNING 1 + ), + -- After acking the signal, add it to the events table + insert_event AS ( + INSERT INTO db_workflow.workflow_signal_events ( + workflow_id, location, signal_id, signal_name, body, ack_ts, loop_location + ) + SELECT + $1 AS workflow_id, + $3 AS location, + signal_id, + signal_name, + body, + $4 AS ack_ts, + $5 AS loop_location + FROM next_signal + RETURNING 1 + ) + SELECT * FROM next_signal + ", )) .bind(workflow_id) .bind(filter) @@ -629,9 +628,9 @@ impl Database for DatabasePostgres { self.query(|| async { sqlx::query(indoc!( " - INSERT INTO db_workflow.signals (signal_id, workflow_id, signal_name, body, ray_id, create_ts) - VALUES ($1, $2, $3, $4, $5, $6) - ", + INSERT INTO db_workflow.signals (signal_id, workflow_id, signal_name, body, ray_id, create_ts) + VALUES ($1, $2, $3, $4, $5, $6) + ", )) .bind(signal_id) .bind(workflow_id) @@ -659,9 +658,9 @@ impl Database for DatabasePostgres { self.query(|| async { sqlx::query(indoc!( " - INSERT INTO db_workflow.tagged_signals (signal_id, tags, signal_name, body, ray_id, create_ts) - VALUES ($1, $2, $3, $4, $5, $6) - ", + INSERT INTO db_workflow.tagged_signals (signal_id, tags, signal_name, body, ray_id, create_ts) + VALUES ($1, $2, $3, $4, $5, $6) + ", )) .bind(signal_id) .bind(tags) diff --git a/lib/chirp-workflow/core/src/error.rs b/lib/chirp-workflow/core/src/error.rs index dd5fa51486..0cb33a10a8 100644 --- a/lib/chirp-workflow/core/src/error.rs +++ b/lib/chirp-workflow/core/src/error.rs @@ -131,7 +131,7 @@ pub enum WorkflowError { } impl WorkflowError { - pub fn backoff(&self) -> Option { + pub(crate) fn backoff(&self) -> Option { if let WorkflowError::ActivityFailure(_, error_count) = self { // NOTE: Max retry is handled in `WorkflowCtx::activity` let mut backoff = @@ -154,6 +154,17 @@ impl WorkflowError { } pub fn is_recoverable(&self) -> bool { + match self { + WorkflowError::ActivityFailure(_, _) + | WorkflowError::ActivityTimeout + | WorkflowError::OperationTimeout + | WorkflowError::NoSignalFound(_) + | WorkflowError::SubWorkflowIncomplete(_) => true, + _ => false, + } + } + + pub(crate) fn is_retryable(&self) -> bool { match self { WorkflowError::ActivityFailure(_, _) | WorkflowError::ActivityTimeout diff --git a/lib/chirp-workflow/core/src/lib.rs b/lib/chirp-workflow/core/src/lib.rs index 9cc87259d5..36871b71f1 100644 --- a/lib/chirp-workflow/core/src/lib.rs +++ b/lib/chirp-workflow/core/src/lib.rs @@ -7,6 +7,7 @@ mod event; mod executable; mod listen; pub mod message; +pub mod metrics; pub mod operation; pub mod prelude; pub mod registry; diff --git a/lib/chirp-workflow/core/src/metrics.rs b/lib/chirp-workflow/core/src/metrics.rs new file mode 100644 index 0000000000..7e836e8bfc --- /dev/null +++ b/lib/chirp-workflow/core/src/metrics.rs @@ -0,0 +1,76 @@ +use rivet_metrics::{prometheus::*, BUCKETS, REGISTRY}; + +lazy_static::lazy_static! { + pub static ref WORKER_ACTIVE: IntGaugeVec = register_int_gauge_vec_with_registry!( + "chirp_workflow_worker_active", + "Total active workers.", + &[], + *REGISTRY, + ).unwrap(); + + pub static ref WORKFLOW_TOTAL: IntGaugeVec = register_int_gauge_vec_with_registry!( + "chirp_workflow_total", + "Total workflows.", + &["workflow_name"], + *REGISTRY, + ).unwrap(); + pub static ref WORKFLOW_ACTIVE: IntGaugeVec = register_int_gauge_vec_with_registry!( + "chirp_workflow_active", + "Total active workflows.", + &["workflow_name"], + *REGISTRY, + ).unwrap(); + pub static ref WORKFLOW_DEAD: IntGaugeVec = register_int_gauge_vec_with_registry!( + "chirp_workflow_dead", + "Total dead workflows.", + &["workflow_name", "error_code"], + *REGISTRY, + ).unwrap(); + pub static ref WORKFLOW_SLEEPING: IntGaugeVec = register_int_gauge_vec_with_registry!( + "chirp_workflow_sleeping", + "Total sleeping workflows.", + &["workflow_name"], + *REGISTRY, + ).unwrap(); + pub static ref WORKFLOW_ERRORS: IntCounterVec = register_int_counter_vec_with_registry!( + "chirp_workflow_errors", + "All errors made in a workflow.", + &["workflow_name", "error_code"], + *REGISTRY, + ).unwrap(); + + pub static ref ACTIVITY_DURATION: HistogramVec = register_histogram_vec_with_registry!( + "chirp_workflow_activity_duration", + "Total duration of an activity.", + &["workflow_name", "activity_name", "error_code"], + BUCKETS.to_vec(), + *REGISTRY, + ).unwrap(); + pub static ref ACTIVITY_ERRORS: IntCounterVec = register_int_counter_vec_with_registry!( + "chirp_workflow_activity_errors", + "All errors made in an activity.", + &["workflow_name", "activity_name", "error_code"], + *REGISTRY, + ).unwrap(); + + pub static ref SIGNAL_PENDING: IntGaugeVec = register_int_gauge_vec_with_registry!( + "chirp_workflow_signal_pending", + "Total pending signals.", + &["signal_name"], + *REGISTRY, + ).unwrap(); + pub static ref SIGNAL_RECV_LAG: HistogramVec = register_histogram_vec_with_registry!( + "chirp_workflow_signal_recv_lag", + "Time between the publish timestamp and the timestamp the signal was received.", + &["workflow_name", "signal_name"], + BUCKETS.to_vec(), + *REGISTRY, + ).unwrap(); + pub static ref MESSAGE_RECV_LAG: HistogramVec = register_histogram_vec_with_registry!( + "chirp_workflow_message_recv_lag", + "Time between the publish timestamp and the timestamp the message was received.", + &["message_name"], + BUCKETS.to_vec(), + *REGISTRY, + ).unwrap(); +} diff --git a/lib/chirp/metrics/src/lib.rs b/lib/chirp/metrics/src/lib.rs index cea53adba3..0faa86ef5a 100644 --- a/lib/chirp/metrics/src/lib.rs +++ b/lib/chirp/metrics/src/lib.rs @@ -16,7 +16,7 @@ lazy_static::lazy_static! { ).unwrap(); pub static ref CHIRP_REQUEST_DURATION: HistogramVec = register_histogram_vec_with_registry!( "chirp_request_duration", - "Total number of requests.", + "Total duration of a request.", &["context_name", "error_code"], BUCKETS.to_vec(), *REGISTRY, diff --git a/svc/Cargo.lock b/svc/Cargo.lock index a359372a2f..ca06393be2 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -2156,6 +2156,7 @@ dependencies = [ "futures-util", "global-error", "indoc 2.0.5", + "lazy_static", "prost 0.12.6", "prost-types 0.12.6", "rand", @@ -10753,6 +10754,22 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "workflow-metrics-publish" +version = "0.0.1" +dependencies = [ + "chirp-client", + "chirp-workflow", + "rivet-connection", + "rivet-health-checks", + "rivet-metrics", + "rivet-runtime", + "sqlx 0.7.4 (git+https://github.com/rivet-gg/sqlx?rev=08d6e61aa0572e7ec557abbedb72cebb96e1ac5b)", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/svc/Cargo.toml b/svc/Cargo.toml index 84a7e79d3e..24a45f058f 100644 --- a/svc/Cargo.toml +++ b/svc/Cargo.toml @@ -251,7 +251,8 @@ members = [ "pkg/user/standalone/delete-pending", "pkg/user/standalone/search-user-gc", "pkg/user/worker", - "pkg/workflow/standalone/gc" + "pkg/workflow/standalone/gc", + "pkg/workflow/standalone/metrics-publish" ] # Speed up compilation diff --git a/svc/pkg/cluster/standalone/metrics-publish/src/lib.rs b/svc/pkg/cluster/standalone/metrics-publish/src/lib.rs index b34fb6e95d..bf46ea590f 100644 --- a/svc/pkg/cluster/standalone/metrics-publish/src/lib.rs +++ b/svc/pkg/cluster/standalone/metrics-publish/src/lib.rs @@ -89,7 +89,7 @@ async fn select_servers(ctx: &StandaloneCtx) -> GlobalResult> { (drain_ts IS NOT NULL) AS is_draining, (drain_complete_ts IS NOT NULL) AS is_drained, (taint_ts IS NOT NULL) AS is_tainted - FROM db_cluster.servers AS OF SYSTEM TIME '-5s' + FROM db_cluster.servers AS OF SYSTEM TIME '-1s' WHERE -- Filters out servers that are being destroyed/already destroyed cloud_destroy_ts IS NULL diff --git a/svc/pkg/workflow/standalone/metrics-publish/Cargo.toml b/svc/pkg/workflow/standalone/metrics-publish/Cargo.toml new file mode 100644 index 0000000000..48360feee1 --- /dev/null +++ b/svc/pkg/workflow/standalone/metrics-publish/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "workflow-metrics-publish" +version = "0.0.1" +edition = "2018" +authors = ["Rivet Gaming, LLC "] +license = "Apache-2.0" + +[dependencies] +chirp-client = { path = "../../../../../lib/chirp/client" } +chirp-workflow = { path = "../../../../../lib/chirp-workflow/core" } +rivet-connection = { path = "../../../../../lib/connection" } +rivet-health-checks = { path = "../../../../../lib/health-checks" } +rivet-metrics = { path = "../../../../../lib/metrics" } +rivet-runtime = { path = "../../../../../lib/runtime" } +tokio = { version = "1.29", features = ["full"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "json", "ansi"] } + +[dependencies.sqlx] +git = "https://github.com/rivet-gg/sqlx" +rev = "08d6e61aa0572e7ec557abbedb72cebb96e1ac5b" +default-features = false + +[dev-dependencies] diff --git a/svc/pkg/workflow/standalone/metrics-publish/Service.toml b/svc/pkg/workflow/standalone/metrics-publish/Service.toml new file mode 100644 index 0000000000..52568723c4 --- /dev/null +++ b/svc/pkg/workflow/standalone/metrics-publish/Service.toml @@ -0,0 +1,8 @@ +[service] +name = "workflow-metrics-publish" + +[runtime] +kind = "rust" + +[headless] +singleton = true diff --git a/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs b/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs new file mode 100644 index 0000000000..67de568a2f --- /dev/null +++ b/svc/pkg/workflow/standalone/metrics-publish/src/lib.rs @@ -0,0 +1,143 @@ +use chirp_workflow::prelude::*; + +#[tracing::instrument(skip_all)] +pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> { + let client = + chirp_client::SharedClient::from_env(pools.clone())?.wrap_new("workflow-metrics-publish"); + let cache = rivet_cache::CacheInner::from_env(pools.clone())?; + let ctx = StandaloneCtx::new( + chirp_workflow::compat::db_from_pools(&pools).await?, + rivet_connection::Connection::new(client, pools, cache), + "workflow-metrics-publish", + ) + .await?; + + let ( + (active_worker_count,), + total_workflow_count, + active_workflow_count, + dead_workflow_count, + sleeping_workflow_count, + pending_signal_count, + ) = tokio::try_join!( + sql_fetch_one!( + [ctx, (i64,)] + " + SELECT COUNT(*) + FROM db_workflow.worker_instances AS OF SYSTEM TIME '-1s' + WHERE last_ping_ts > $1 + ", + util::timestamp::now() - util::duration::seconds(30), + ), + sql_fetch_all!( + [ctx, (String, i64)] + " + SELECT workflow_name, COUNT(*) + FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' + GROUP BY workflow_name + ", + ), + sql_fetch_all!( + [ctx, (String, i64)] + " + SELECT workflow_name, COUNT(*) + FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' + WHERE + output IS NULL AND + worker_instance_id IS NOT NULL + GROUP BY workflow_name + ", + ), + sql_fetch_all!( + [ctx, (String, String, i64)] + " + SELECT workflow_name, error, COUNT(*) + FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' + WHERE + error IS NOT NULL AND + output IS NULL AND + wake_immediate = FALSE AND + wake_deadline_ts IS NULL AND + cardinality(wake_signals) = 0 AND + wake_sub_workflow_id IS NULL + GROUP BY workflow_name, error + ", + ), + sql_fetch_all!( + [ctx, (String, i64)] + " + SELECT workflow_name, COUNT(*) + FROM db_workflow.workflows AS OF SYSTEM TIME '-1s' + WHERE + worker_instance_id IS NULL AND + output IS NULL AND + ( + wake_immediate OR + wake_deadline_ts IS NOT NULL OR + cardinality(wake_signals) > 0 OR + wake_sub_workflow_id IS NOT NULL + ) + GROUP BY workflow_name + ", + ), + sql_fetch_all!( + [ctx, (String, i64)] + " + SELECT signal_name, COUNT(*) + FROM ( + SELECT signal_name + FROM db_workflow.signals + WHERE ack_ts IS NULL + UNION ALL + SELECT signal_name + FROM db_workflow.tagged_signals + WHERE ack_ts IS NULL + ) AS OF SYSTEM TIME '-1s' + GROUP BY signal_name + ", + ), + )?; + + // Get rid of metrics that don't exist in the db anymore (stateful) + chirp_workflow::metrics::WORKFLOW_TOTAL.reset(); + chirp_workflow::metrics::WORKFLOW_ACTIVE.reset(); + chirp_workflow::metrics::WORKFLOW_DEAD.reset(); + chirp_workflow::metrics::WORKFLOW_SLEEPING.reset(); + chirp_workflow::metrics::SIGNAL_PENDING.reset(); + + chirp_workflow::metrics::WORKER_ACTIVE + .with_label_values(&[]) + .set(active_worker_count); + + for (workflow_name, count) in total_workflow_count { + chirp_workflow::metrics::WORKFLOW_TOTAL + .with_label_values(&[&workflow_name]) + .set(count); + } + + for (workflow_name, count) in active_workflow_count { + chirp_workflow::metrics::WORKFLOW_ACTIVE + .with_label_values(&[&workflow_name]) + .set(count); + } + + for (workflow_name, error, count) in dead_workflow_count { + chirp_workflow::metrics::WORKFLOW_DEAD + .with_label_values(&[&workflow_name, &error]) + .set(count); + } + + for (workflow_name, count) in sleeping_workflow_count { + chirp_workflow::metrics::WORKFLOW_SLEEPING + .with_label_values(&[&workflow_name]) + .set(count); + } + + for (signal_name, count) in pending_signal_count { + chirp_workflow::metrics::SIGNAL_PENDING + .with_label_values(&[&signal_name]) + .set(count); + } + + Ok(()) +} diff --git a/svc/pkg/workflow/standalone/metrics-publish/src/main.rs b/svc/pkg/workflow/standalone/metrics-publish/src/main.rs new file mode 100644 index 0000000000..8dad989eb6 --- /dev/null +++ b/svc/pkg/workflow/standalone/metrics-publish/src/main.rs @@ -0,0 +1,30 @@ +use std::time::Duration; + +use chirp_workflow::prelude::*; + +fn main() -> GlobalResult<()> { + rivet_runtime::run(start()).unwrap() +} + +async fn start() -> GlobalResult<()> { + let pools = rivet_pools::from_env("workflow-metrics-publish").await?; + + tokio::task::Builder::new() + .name("workflow_metrics_publish::health_checks") + .spawn(rivet_health_checks::run_standalone( + rivet_health_checks::Config { + pools: Some(pools.clone()), + }, + ))?; + + tokio::task::Builder::new() + .name("workflow_metrics_publish::metrics") + .spawn(rivet_metrics::run_standalone())?; + + let mut interval = tokio::time::interval(Duration::from_secs(5)); + loop { + interval.tick().await; + + workflow_metrics_publish::run_from_env(pools.clone()).await?; + } +} diff --git a/svc/pkg/workflow/standalone/metrics-publish/tests/integration.rs b/svc/pkg/workflow/standalone/metrics-publish/tests/integration.rs new file mode 100644 index 0000000000..6c8ea4d0f2 --- /dev/null +++ b/svc/pkg/workflow/standalone/metrics-publish/tests/integration.rs @@ -0,0 +1 @@ +// TODO: