diff --git a/arroyo-api/src/jobs.rs b/arroyo-api/src/jobs.rs index 716072cee..55cf7e804 100644 --- a/arroyo-api/src/jobs.rs +++ b/arroyo-api/src/jobs.rs @@ -1,9 +1,7 @@ use crate::queries::api_queries::{DbCheckpoint, DbLogMessage, DbPipelineJob}; -use arroyo_datastream::Program; use arroyo_rpc::grpc; use arroyo_rpc::grpc::api::{ - CheckpointDetailsResp, CheckpointOverview, CreateJobReq, JobDetailsResp, JobStatus, - PipelineProgram, StopType, + CheckpointDetailsResp, CheckpointOverview, CreateJobReq, JobStatus, StopType, }; use arroyo_rpc::grpc::controller_grpc_client::ControllerGrpcClient; use arroyo_rpc::public_ids::{generate_id, IdTypes}; @@ -14,7 +12,6 @@ use cornucopia_async::GenericClient; use deadpool_postgres::Transaction; use futures_util::stream::Stream; use http::StatusCode; -use prost::Message; use std::convert::Infallible; use std::{collections::HashMap, time::Duration}; use tokio_stream::wrappers::ReceiverStream; @@ -193,62 +190,6 @@ pub(crate) fn get_action(state: &str, running_desired: &bool) -> (String, Option (a.to_string(), s, in_progress) } -pub(crate) async fn get_job_details( - job_id: &str, - auth: &AuthData, - client: &impl GenericClient, -) -> Result { - let res = api_queries::get_job_details() - .bind(client, &auth.organization_id, &job_id) - .opt() - .await - .map_err(log_and_map)? - .ok_or_else(|| Status::not_found(format!("There is no job with id '{}'", job_id)))?; - - let mut program: Program = PipelineProgram::decode(&res.program[..]) - .map_err(log_and_map)? - .try_into() - .map_err(log_and_map)?; - - program.update_parallelism( - &res.parallelism_overrides - .as_object() - .unwrap() - .into_iter() - .map(|(k, v)| (k.clone(), v.as_u64().unwrap() as usize)) - .collect(), - ); - - let state = res.state.unwrap_or_else(|| "Created".to_string()); - let running_desired = res.stop == public::StopMode::none; - - let (action_text, action, in_progress) = get_action(&state, &running_desired); - - let status = JobStatus { - job_id: job_id.to_string(), - pipeline_name: res.pipeline_name, - running_desired, - state, - run_id: res.run_id.unwrap_or(0) as u64, - start_time: res.start_time.map(to_micros), - finish_time: res.finish_time.map(to_micros), - tasks: res.tasks.map(|t| t as u64), - definition: res.textual_repr, - pipeline_id: format!("{}", res.pipeline_id), - udfs: serde_json::from_value(res.udfs).map_err(log_and_map)?, - failure_message: res.failure_message, - }; - - Ok(JobDetailsResp { - job_status: Some(status), - job_graph: Some(program.as_job_graph()), - - action: action.map(|action| action as i32), - action_text: action_text.to_string(), - in_progress, - }) -} - pub(crate) async fn checkpoint_details( job_pub_id: &str, epoch: u32, diff --git a/arroyo-api/src/lib.rs b/arroyo-api/src/lib.rs index 29a3989dc..bc765be99 100644 --- a/arroyo-api/src/lib.rs +++ b/arroyo-api/src/lib.rs @@ -1,6 +1,7 @@ use crate::jobs::{ __path_get_job_checkpoints, __path_get_job_errors, __path_get_job_output, __path_get_jobs, }; +use crate::metrics::__path_get_operator_metric_groups; use crate::pipelines::__path_get_pipelines; use crate::pipelines::__path_post_pipeline; use crate::pipelines::{ @@ -10,9 +11,9 @@ use crate::pipelines::{ use crate::rest::__path_ping; use crate::rest_types::{ Checkpoint, CheckpointCollection, Job, JobCollection, JobLogLevel, JobLogMessage, - JobLogMessageCollection, OutputData, Pipeline, PipelineCollection, PipelineEdge, PipelineGraph, - PipelineNode, PipelinePatch, PipelinePost, StopType as StopTypeRest, Udf, UdfLanguage, - ValidatePipelinePost, + JobLogMessageCollection, Metric, MetricGroup, MetricNames, OperatorMetricGroup, OutputData, + Pipeline, PipelineCollection, PipelineEdge, PipelineGraph, PipelineNode, PipelinePatch, + PipelinePost, StopType as StopTypeRest, SubtaskMetrics, Udf, UdfLanguage, ValidatePipelinePost, }; use crate::rest_utils::ErrorResp; use arroyo_connectors::connectors; @@ -441,12 +442,10 @@ impl ApiGrpc for ApiServer { async fn get_job_metrics( &self, - request: Request, + _request: Request, ) -> Result, Status> { - let (request, auth) = self.authenticate(request).await?; - - Ok(Response::new( - metrics::get_metrics(request.into_inner().job_id, auth, &self.client().await?).await?, + Err(Status::unimplemented( + "This functionality has been moved to the REST API.", )) } @@ -544,7 +543,8 @@ impl ApiGrpc for ApiServer { get_pipeline_jobs, get_job_errors, get_job_checkpoints, - get_job_output + get_job_output, + get_operator_metric_groups, ), components(schemas( ValidatePipelinePost, @@ -565,7 +565,12 @@ impl ApiGrpc for ApiServer { JobLogLevel, Checkpoint, CheckpointCollection, - OutputData + OutputData, + MetricNames, + Metric, + SubtaskMetrics, + MetricGroup, + OperatorMetricGroup, )), tags( (name = "pipelines", description = "Pipeline management endpoints"), diff --git a/arroyo-api/src/metrics.rs b/arroyo-api/src/metrics.rs index 86a2c62f7..f966a8ceb 100644 --- a/arroyo-api/src/metrics.rs +++ b/arroyo-api/src/metrics.rs @@ -1,22 +1,25 @@ +use axum::extract::{Path, State}; +use axum::Json; use base64::engine::general_purpose; use base64::Engine; -use cornucopia_async::GenericClient; use std::str::FromStr; use std::{collections::HashMap, env, time::SystemTime}; -use arroyo_rpc::grpc::api::{job_metrics_resp::OperatorMetrics, JobMetricsResp}; -use arroyo_rpc::grpc::api::{Metric, SubtaskMetrics}; +use crate::pipelines::query_job_by_pub_id; +use crate::rest::AppState; +use crate::rest_types::{ + Metric, MetricGroup, MetricNames, OperatorMetricGroup, OperatorMetricGroupCollection, + SubtaskMetrics, +}; +use crate::rest_utils::{authenticate, client, BearerAuth, ErrorResp}; use arroyo_types::{ to_millis, API_METRICS_RATE_ENV, BYTES_RECV, BYTES_SENT, MESSAGES_RECV, MESSAGES_SENT, TX_QUEUE_REM, TX_QUEUE_SIZE, }; +use http::StatusCode; use http::{header::AUTHORIZATION, HeaderMap, HeaderValue}; use once_cell::sync::Lazy; use prometheus_http_query::Client; -use tonic::Status; - -use crate::queries::api_queries; -use crate::{jobs, log_and_map, AuthData}; const METRICS_GRANULARITY_SECS: f64 = 5.0; @@ -41,41 +44,41 @@ static METRICS_CLIENT: Lazy = Lazy::new(|| { prometheus_http_query::Client::from(client, &prometheus_endpoint).unwrap() }); -pub(crate) async fn get_metrics( - job_pub_id: String, - auth: AuthData, - client: &impl GenericClient, -) -> Result { - let job_id = api_queries::get_pipeline_job() - .bind(client, &auth.organization_id, &job_pub_id) - .one() - .await - .map_err(log_and_map)? - .id; - - // validate that the job exists and user can access it - let job_details = jobs::get_job_details(&job_id, &auth, client).await?; - +/// Get a job's metrics +#[utoipa::path( + get, + path = "/v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups", + tag = "metrics", + params( + ("pipeline_id" = String, Path, description = "Pipeline id"), + ("job_id" = String, Path, description = "Job id"), + ), + responses( + (status = 200, description = "Got metric groups", body = OperatorMetricGroupCollection), + ), +)] +pub async fn get_operator_metric_groups( + State(state): State, + bearer_auth: BearerAuth, + Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>, +) -> Result, ErrorResp> { + let client = client(&state.pool).await?; + let auth_data = authenticate(&state.pool, bearer_auth).await?; + + let job = query_job_by_pub_id(&pipeline_pub_id, &job_pub_id, &client, &auth_data).await?; let rate = env::var(API_METRICS_RATE_ENV).unwrap_or_else(|_| "15s".to_string()); + let end = (to_millis(SystemTime::now()) / 1000) as i64; + let start = end - 5 * 60; - #[derive(Copy, Clone)] - enum QueryMetrics { - BytesRecv, - BytesSent, - MessagesRecv, - MessagesSent, - Backpressure, - } - - impl QueryMetrics { - fn simple_query(&self, metric: &str, job_id: &str, run_id: u64, rate: &str) -> String { + impl MetricNames { + fn simple_query(&self, metric: &str, job_id: &str, run_id: &u64, rate: &str) -> String { format!( "rate({}{{job_id=\"{}\",run_id=\"{}\"}}[{}])", metric, job_id, run_id, rate ) } - fn backpressure_query(&self, job_id: &str, run_id: u64) -> String { + fn backpressure_query(&self, job_id: &str, run_id: &u64) -> String { let tx_queue_size: String = format!( "{}{{job_id=\"{}\",run_id=\"{}\"}}", TX_QUEUE_SIZE, job_id, run_id @@ -89,27 +92,21 @@ pub(crate) async fn get_metrics( format!("1 - (({} + 1) / ({} + 1))", tx_queue_rem, tx_queue_size) } - fn get_query(&self, job_id: &str, run_id: u64, rate: &str) -> String { + fn get_query(&self, job_id: &str, run_id: &u64, rate: &str) -> String { match self { - BytesRecv => self.simple_query(BYTES_RECV, job_id, run_id, rate), - BytesSent => self.simple_query(BYTES_SENT, job_id, run_id, rate), - MessagesRecv => self.simple_query(MESSAGES_RECV, job_id, run_id, rate), - MessagesSent => self.simple_query(MESSAGES_SENT, job_id, run_id, rate), - Backpressure => self.backpressure_query(job_id, run_id), + MetricNames::BytesRecv => self.simple_query(BYTES_RECV, job_id, run_id, rate), + MetricNames::BytesSent => self.simple_query(BYTES_SENT, job_id, run_id, rate), + MetricNames::MessagesRecv => self.simple_query(MESSAGES_RECV, job_id, run_id, rate), + MetricNames::MessagesSent => self.simple_query(MESSAGES_SENT, job_id, run_id, rate), + MetricNames::Backpressure => self.backpressure_query(job_id, run_id), } } } - use QueryMetrics::*; - - let end = (to_millis(SystemTime::now()) / 1000) as i64; - let start = end - 5 * 60; - let run_id = job_details.job_status.unwrap().run_id; - let result = tokio::try_join!( METRICS_CLIENT .query_range( - BytesRecv.get_query(&job_id, run_id, &rate), + MetricNames::BytesRecv.get_query(&job.id, &job.run_id, &rate), start, end, METRICS_GRANULARITY_SECS @@ -117,7 +114,7 @@ pub(crate) async fn get_metrics( .get(), METRICS_CLIENT .query_range( - BytesSent.get_query(&job_id, run_id, &rate), + MetricNames::BytesSent.get_query(&job.id, &job.run_id, &rate), start, end, METRICS_GRANULARITY_SECS @@ -125,7 +122,7 @@ pub(crate) async fn get_metrics( .get(), METRICS_CLIENT .query_range( - MessagesRecv.get_query(&job_id, run_id, &rate), + MetricNames::MessagesRecv.get_query(&job.id, &job.run_id, &rate), start, end, METRICS_GRANULARITY_SECS @@ -133,7 +130,7 @@ pub(crate) async fn get_metrics( .get(), METRICS_CLIENT .query_range( - MessagesSent.get_query(&job_id, run_id, &rate), + MetricNames::MessagesSent.get_query(&job.id, &job.run_id, &rate), start, end, METRICS_GRANULARITY_SECS @@ -141,7 +138,7 @@ pub(crate) async fn get_metrics( .get(), METRICS_CLIENT .query_range( - Backpressure.get_query(&job_id, run_id, &rate), + MetricNames::Backpressure.get_query(&job.id, &job.run_id, &rate), start, end, METRICS_GRANULARITY_SECS @@ -149,24 +146,30 @@ pub(crate) async fn get_metrics( .get(), ); + let mut collection = OperatorMetricGroupCollection { + data: vec![], + has_more: false, + }; + match result { Ok((r1, r2, r3, r4, r5)) => { let mut metrics = HashMap::new(); - for (q, r) in [ - (BytesRecv, r1), - (BytesSent, r2), - (MessagesRecv, r3), - (MessagesSent, r4), - (Backpressure, r5), + for (metric_name, query_result) in [ + (MetricNames::BytesRecv, r1), + (MetricNames::BytesSent, r2), + (MetricNames::MessagesRecv, r3), + (MetricNames::MessagesSent, r4), + (MetricNames::Backpressure, r5), ] { - for v in r.data().as_matrix().unwrap() { + // for each metric query + + for v in query_result.data().as_matrix().unwrap() { + // for each operator/subtask pair + let operator_id = v.metric().get("operator_id").unwrap().clone(); let subtask_idx = u32::from_str(v.metric().get("subtask_idx").unwrap()).unwrap(); - let op = metrics.entry(operator_id).or_insert(OperatorMetrics { - subtasks: HashMap::new(), - }); let data = v .samples() @@ -177,33 +180,45 @@ pub(crate) async fn get_metrics( }) .collect(); - let entry = op.subtasks.entry(subtask_idx).or_insert(SubtaskMetrics { - bytes_recv: vec![], - bytes_sent: vec![], - messages_recv: vec![], - messages_sent: vec![], - backpressure: vec![], - }); - - match q { - BytesRecv => entry.bytes_recv = data, - BytesSent => entry.bytes_sent = data, - MessagesRecv => entry.messages_recv = data, - MessagesSent => entry.messages_sent = data, - Backpressure => entry.backpressure = data, + metrics + .entry(operator_id) + .or_insert(HashMap::new()) + .entry(metric_name.clone()) + .or_insert(vec![]) + .push(SubtaskMetrics { + idx: subtask_idx, + metrics: data, + }); + } + } + + for (operator_id, metric_groups) in metrics.iter_mut() { + let mut o = OperatorMetricGroup { + operator_id: operator_id.clone(), + metric_groups: vec![], + }; + + for (metric_name, subtask_metrics) in metric_groups.iter_mut() { + let m = MetricGroup { + name: metric_name.clone(), + subtasks: subtask_metrics.clone(), }; + + if !m.subtasks.is_empty() { + o.metric_groups.push(m); + } + } + + if !o.metric_groups.is_empty() { + collection.data.push(o); } } - Ok(JobMetricsResp { - job_id, - start_time: start as u64 * 1000, - end_time: end as u64 * 1000, - metrics, - }) + + Ok(Json(collection)) } - Err(err) => Err(Status::internal(format!( - "Failed to query prometheus: {}", - err - ))), + Err(_) => Err(ErrorResp { + status_code: StatusCode::INTERNAL_SERVER_ERROR, + message: "Failed to query Prometheus".to_string(), + }), } } diff --git a/arroyo-api/src/rest.rs b/arroyo-api/src/rest.rs index e12bea553..b20b6cb74 100644 --- a/arroyo-api/src/rest.rs +++ b/arroyo-api/src/rest.rs @@ -18,6 +18,7 @@ use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; use crate::jobs::{get_job_checkpoints, get_job_errors, get_job_output, get_jobs}; +use crate::metrics::get_operator_metric_groups; use crate::pipelines::{ delete_pipeline, get_pipeline, get_pipeline_jobs, get_pipelines, patch_pipeline, post_pipeline, validate_pipeline, @@ -87,7 +88,11 @@ pub fn create_rest_app(server: ApiServer, pool: Pool) -> Router { .route("/", get(get_pipeline_jobs)) .route("/:job_id/errors", get(get_job_errors)) .route("/:job_id/checkpoints", get(get_job_checkpoints)) - .route("/:job_id/output", get(get_job_output)); + .route("/:job_id/output", get(get_job_output)) + .route( + "/:job_id/operator_metric_groups", + get(get_operator_metric_groups), + ); let api_routes = Router::new() .route("/ping", get(ping)) diff --git a/arroyo-api/src/rest_types.rs b/arroyo-api/src/rest_types.rs index 35297d96a..f6e7190ef 100644 --- a/arroyo-api/src/rest_types.rs +++ b/arroyo-api/src/rest_types.rs @@ -241,6 +241,7 @@ pub struct Checkpoint { PipelineCollection = Collection, JobLogMessageCollection = Collection, CheckpointCollection = Collection, + OperatorMetricGroupCollection = Collection )] pub struct Collection { pub data: Vec, @@ -266,3 +267,41 @@ impl From for OutputData { } } } + +#[derive(Serialize, Deserialize, Clone, Debug, ToSchema, Hash, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum MetricNames { + BytesRecv, + BytesSent, + MessagesRecv, + MessagesSent, + Backpressure, +} + +#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct Metric { + pub time: u64, + pub value: f64, +} + +#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct SubtaskMetrics { + pub idx: u32, + pub metrics: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct MetricGroup { + pub name: MetricNames, + pub subtasks: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)] +#[serde(rename_all = "camelCase")] +pub struct OperatorMetricGroup { + pub operator_id: String, + pub metric_groups: Vec, +} diff --git a/arroyo-console/src/components/OperatorDetail.tsx b/arroyo-console/src/components/OperatorDetail.tsx index 72c7db5a8..05ed411a9 100644 --- a/arroyo-console/src/components/OperatorDetail.tsx +++ b/arroyo-console/src/components/OperatorDetail.tsx @@ -1,102 +1,83 @@ -import { JobMetricsResp } from '../gen/api_pb'; import { Badge, Box, Code, HStack, Spacer } from '@chakra-ui/react'; -import { getOperatorBackpressure } from '../lib/util'; +import { getCurrentMaxMetric, transformMetricGroup } from '../lib/util'; import React from 'react'; import { TimeSeriesGraph } from './TimeSeriesGraph'; import Loading from './Loading'; -import { PipelineGraph } from '../lib/data_fetching'; +import { PipelineNode, useJobMetrics, usePipeline } from '../lib/data_fetching'; export interface OperatorDetailProps { - operator_id: string; - graph?: PipelineGraph; - metrics?: JobMetricsResp; + pipelineId: string; + jobId: string; + operatorId: string; } -const OperatorDetail: React.FC = ({ operator_id, graph, metrics }) => { - if (graph == undefined || metrics == undefined || metrics.metrics?.[operator_id] == undefined) { +const OperatorDetail: React.FC = ({ pipelineId, jobId, operatorId }) => { + const { pipeline } = usePipeline(pipelineId); + const { operatorMetricGroups } = useJobMetrics(pipelineId, jobId); + + if (!operatorMetricGroups) { return ; - } else { - const node = graph.nodes.find(n => n.nodeId == operator_id); - const node_metrics = metrics.metrics[operator_id]; + } - const backpressure = getOperatorBackpressure(metrics, operator_id); + const node = (pipeline.graph.nodes as PipelineNode[]).find(n => n.nodeId == operatorId); + const operatorMetricGroup = operatorMetricGroups.find(o => o.operatorId == operatorId); - let backpressureBadge; - if (backpressure < 0.33) { - backpressureBadge = LOW; - } else if (backpressure < 0.66) { - backpressureBadge = MEDIUM; - } else { - backpressureBadge = HIGH; - } + if (!operatorMetricGroup) { + return ; + } - let msgRecv = 0; - let msgSent = 0; - let msgSentData; - let msgRecvData; + const metricGroups = operatorMetricGroup.metricGroups; + const backpressureMetrics = metricGroups.find(m => m.name == 'backpressure'); + const backpressure = backpressureMetrics ? getCurrentMaxMetric(backpressureMetrics) : 0; - if ( - node_metrics != null && - node_metrics.subtasks != null && - node_metrics.subtasks[0].messagesRecv.length > 0 - ) { - msgRecv = Object.values(node_metrics.subtasks) - .map(n => n.messagesRecv) - .reduce((s, a) => s + a[a.length - 1].value, 0); - msgSent = Object.values(node_metrics.subtasks) - .map(n => n.messagesSent) - .reduce((s, a) => s + a[a.length - 1].value, 0); + let backpressureBadge; + if (backpressure < 0.33) { + backpressureBadge = LOW; + } else if (backpressure < 0.66) { + backpressureBadge = MEDIUM; + } else { + backpressureBadge = HIGH; + } - msgRecvData = Object.entries(node_metrics.subtasks).map(kv => { - return kv[1].messagesRecv - .filter(m => m != undefined) - .map(m => { - return { - label: kv[0], - x: new Date(Number(m.time) / 1000), - y: m.value + m.value * Math.random() * 0.01, - }; - }); - }); + const msgRecv = metricGroups + .find(m => m.name == 'messages_recv')! + .subtasks.map(s => s.metrics[s.metrics.length - 1].value) + .reduce((a, c) => a + c, 0); - msgSentData = Object.entries(node_metrics.subtasks).map(kv => { - return kv[1].messagesSent.map(m => { - return { - label: kv[0], - x: new Date(Number(m.time) / 1000), - y: m.value + m.value * Math.random() * 0.01, - }; - }); - }); - } + const msgSent = metricGroups + .find(m => m.name == 'messages_sent')! + .subtasks.map(s => s.metrics[s.metrics.length - 1].value) + .reduce((a, c) => a + c, 0); - return ( - - - operator - - - {node?.parallelism} - - - Backpressure: {backpressureBadge} - {node?.operator} - - {Math.round(msgRecv)} eps rx - {Math.round(msgSent)} eps tx - - - Events RX - - + const msgSentData = transformMetricGroup(metricGroups.find(m => m.name == 'messages_sent')!); + const msgRecvData = transformMetricGroup(metricGroups.find(m => m.name == 'messages_recv')!); - - Events TX - + return ( + + + operator + + + {node?.parallelism} + + Backpressure: {backpressureBadge} + {node?.operator} + + {Math.round(msgRecv)} eps rx + {Math.round(msgSent)} eps tx - ); - } + + Events RX + + + + + Events TX + + + + ); }; export default OperatorDetail; diff --git a/arroyo-console/src/gen/api-types.ts b/arroyo-console/src/gen/api-types.ts index f62a7cc9d..8aa5b4045 100644 --- a/arroyo-console/src/gen/api-types.ts +++ b/arroyo-console/src/gen/api-types.ts @@ -74,6 +74,13 @@ export interface paths { */ get: operations["get_job_errors"]; }; + "/v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups": { + /** + * Get a job's metrics + * @description Get a job's metrics + */ + get: operations["get_operator_metric_groups"]; + }; "/v1/pipelines/{pipeline_id}/jobs/{job_id}/output": { /** * Subscribe to a job's output @@ -136,6 +143,26 @@ export interface components { data: (components["schemas"]["JobLogMessage"])[]; hasMore: boolean; }; + Metric: { + /** Format: int64 */ + time: number; + /** Format: double */ + value: number; + }; + MetricGroup: { + name: components["schemas"]["MetricNames"]; + subtasks: (components["schemas"]["SubtaskMetrics"])[]; + }; + /** @enum {string} */ + MetricNames: "bytes_recv" | "bytes_sent" | "messages_recv" | "messages_sent" | "backpressure"; + OperatorMetricGroup: { + metricGroups: (components["schemas"]["MetricGroup"])[]; + operatorId: string; + }; + OperatorMetricGroupCollection: { + data: (components["schemas"]["OperatorMetricGroup"])[]; + hasMore: boolean; + }; OutputData: { key: string; operatorId: string; @@ -197,6 +224,11 @@ export interface components { }; /** @enum {string} */ StopType: "none" | "checkpoint" | "graceful" | "immediate" | "force"; + SubtaskMetrics: { + /** Format: int32 */ + idx: number; + metrics: (components["schemas"]["Metric"])[]; + }; Udf: { definition: string; language: components["schemas"]["UdfLanguage"]; @@ -418,6 +450,28 @@ export interface operations { }; }; }; + /** + * Get a job's metrics + * @description Get a job's metrics + */ + get_operator_metric_groups: { + parameters: { + path: { + /** @description Pipeline id */ + pipeline_id: string; + /** @description Job id */ + job_id: string; + }; + }; + responses: { + /** @description Got metric groups */ + 200: { + content: { + "application/json": components["schemas"]["OperatorMetricGroupCollection"]; + }; + }; + }; + }; /** * Subscribe to a job's output * @description Subscribe to a job's output diff --git a/arroyo-console/src/lib/data_fetching.ts b/arroyo-console/src/lib/data_fetching.ts index b5ce52415..3266653c3 100644 --- a/arroyo-console/src/lib/data_fetching.ts +++ b/arroyo-console/src/lib/data_fetching.ts @@ -3,7 +3,6 @@ import { GetConnectionsReq, GetConnectionTablesReq, GetConnectorsReq, - JobMetricsReq, } from '../gen/api_pb'; import { ApiClient } from '../main'; import useSWR, { mutate as globalMutate } from 'swr'; @@ -18,6 +17,9 @@ export type PipelineGraph = schemas['PipelineGraph']; export type JobLogMessage = schemas['JobLogMessage']; export type PipelineNode = schemas['PipelineNode']; export type OutputData = schemas['OutputData']; +export type MetricGroup = schemas['MetricGroup']; +export type Metric = schemas['Metric']; +export type OperatorMetricGroup = schemas['OperatorMetricGroup']; const BASE_URL = 'http://localhost:8000/api'; export const { get, post, patch, del } = createClient({ baseUrl: BASE_URL }); @@ -45,8 +47,8 @@ const connectionTablesKey = () => { return { key: 'ConnectionTables' }; }; -const jobMetricsKey = (jobId?: string) => { - return jobId ? { key: 'JobMetrics', jobId } : null; +const jobMetricsKey = (pipelineId?: string, jobId?: string) => { + return pipelineId && jobId ? { key: 'JobMetrics', pipelineId, jobId } : null; }; const jobCheckpointsKey = (pipelineId?: string, jobId?: string) => { @@ -170,29 +172,33 @@ export const useJobs = () => { }; }; -// JobMetricsReq +// Job Metrics -const jobMetricsFetcher = (client: ApiClient) => { - return async (params: { jobId: string }) => { - return await ( - await ( - await client - )() - ).getJobMetrics( - new JobMetricsReq({ - jobId: params.jobId, - }) +const jobMetricsFetcher = () => { + return async (params: { key: string; pipelineId: string; jobId: string }) => { + const { data, error } = await get( + '/v1/pipelines/{pipeline_id}/jobs/{job_id}/operator_metric_groups', + { + params: { + path: { + pipeline_id: params.pipelineId, + job_id: params.jobId, + }, + }, + } ); + + return processResponse(data, error); }; }; -export const useJobMetrics = (client: ApiClient, jobId?: string) => { - const { data } = useSWR(jobMetricsKey(jobId), jobMetricsFetcher(client), { - refreshInterval: 5000, +export const useJobMetrics = (pipelineId?: string, jobId?: string) => { + const { data } = useSWR(jobMetricsKey(pipelineId, jobId), jobMetricsFetcher(), { + refreshInterval: 1000, }); return { - metrics: data, + operatorMetricGroups: data?.data as OperatorMetricGroup[], }; }; diff --git a/arroyo-console/src/lib/util.ts b/arroyo-console/src/lib/util.ts index e19cb19b9..5fdfbc302 100644 --- a/arroyo-console/src/lib/util.ts +++ b/arroyo-console/src/lib/util.ts @@ -1,4 +1,4 @@ -import { JobMetricsResp, Metric } from '../gen/api_pb'; +import { Metric, MetricGroup } from './data_fetching'; export function durationFormat(micros: number): string { const units = [ @@ -100,43 +100,33 @@ export function getBackpressureColor(backpressure: number): string { return colors[colors.length - 1]; } -export function getOperatorBackpressure( - metrics: JobMetricsResp | undefined, - operator: string -): number { - if (!metrics) { - return 0; - } - - // reduce the subtasks backpressure to an operator-level backpressure - - const nodeMetrics = metrics.metrics[operator]; - - if (!nodeMetrics) { - return 0; - } +export function getCurrentMaxMetric(metricGroup: MetricGroup): number { + return metricGroup.subtasks + .map(s => s.metrics) + .filter(m => m.length) + .map(m => + m.reduce((r: Metric, c: Metric) => { + if (c.time > r.time) { + return c; + } else { + return r; + } + }, m[0]) + ) + .map(m => m.value) + .reduce((max, curr) => Math.max(max, curr)); +} - const backPressureMetrics = Object.entries(nodeMetrics.subtasks) - .map(kv => { - return kv[1].backpressure; +export function transformMetricGroup(metric_group: MetricGroup) { + return metric_group.subtasks.map(s => + s.metrics.map(m => { + return { + label: s.idx, + x: new Date(Number(m.time) / 1000), + y: m.value + m.value * Math.random() * 0.01, + }; }) - .filter(ml => ml.length); - - if (!backPressureMetrics.length) { - return 0; - } - - const recent = backPressureMetrics.map(m => - m.reduce((r: Metric, c: Metric) => { - if (c.time > r.time) { - return c; - } else { - return r; - } - }, m[0]) ); - - return recent.map(m => m.value).reduce((max, curr) => Math.max(max, curr)); } export function formatDate(timestamp: bigint) { diff --git a/arroyo-console/src/routes/pipelines/CreatePipeline.tsx b/arroyo-console/src/routes/pipelines/CreatePipeline.tsx index 9c5bb5bc6..ceebb0234 100644 --- a/arroyo-console/src/routes/pipelines/CreatePipeline.tsx +++ b/arroyo-console/src/routes/pipelines/CreatePipeline.tsx @@ -32,6 +32,7 @@ import { post, useConnectionTables, useJobOutput, + useJobMetrics, useOperatorErrors, usePipeline, usePipelineGraph, @@ -62,6 +63,8 @@ export function CreatePipeline({ client }: { client: ApiClient }) { queryInputToCheck, udfsInputToCheck ); + const { operatorMetricGroups } = useJobMetrics(pipelineId, job?.id); + const { isOpen, onOpen, onClose } = useDisclosure(); const [options, setOptions] = useState({ parallelism: 4, checkpointMS: 5000 }); const navigate = useNavigate(); @@ -340,7 +343,11 @@ export function CreatePipeline({ client }: { client: ApiClient }) { }} overflow="auto" > - {}} /> + {}} + /> ); diff --git a/arroyo-console/src/routes/pipelines/PipelineDetails.tsx b/arroyo-console/src/routes/pipelines/PipelineDetails.tsx index 06af5caa7..b098b9d02 100644 --- a/arroyo-console/src/routes/pipelines/PipelineDetails.tsx +++ b/arroyo-console/src/routes/pipelines/PipelineDetails.tsx @@ -30,13 +30,12 @@ import { ApiClient } from '../../main'; import { CodeEditor } from './SqlEditor'; import PipelineConfigModal from './PipelineConfigModal'; import { - Job, OutputData, PipelineNode, StopType, useJobCheckpoints, - useJobMetrics, useJobOutput, + useJobMetrics, useOperatorErrors, usePipeline, usePipelineJobs, @@ -65,14 +64,10 @@ export function PipelineDetails({ client }: { client: ApiClient }) { const { pipeline, pipelineError, updatePipeline } = usePipeline(id, true); const { jobs, jobsError } = usePipelineJobs(id, true); - let job: Job | undefined; - if (jobs?.length) { - job = jobs[0]; - } - - const { metrics } = useJobMetrics(client, job?.id); + const job = jobs?.length ? jobs[0] : undefined; const { checkpoints } = useJobCheckpoints(id, job?.id); const { operatorErrors } = useOperatorErrors(id, job?.id); + const { operatorMetricGroups } = useJobMetrics(id, job?.id); if (pipelineError || jobsError) { return ( @@ -118,7 +113,7 @@ export function PipelineDetails({ client }: { client: ApiClient }) { let operatorDetail = undefined; if (activeOperator) { operatorDetail = ( - + ); } @@ -127,7 +122,7 @@ export function PipelineDetails({ client }: { client: ApiClient }) { diff --git a/arroyo-console/src/routes/pipelines/PipelineGraph.tsx b/arroyo-console/src/routes/pipelines/PipelineGraph.tsx index fb23fb22c..b1bd9b461 100644 --- a/arroyo-console/src/routes/pipelines/PipelineGraph.tsx +++ b/arroyo-console/src/routes/pipelines/PipelineGraph.tsx @@ -1,16 +1,15 @@ import { Box, Text } from '@chakra-ui/react'; import dagre from 'dagre'; -import { useMemo } from 'react'; import ReactFlow, { Handle, Position, Background } from 'reactflow'; -import { JobNode, JobMetricsResp } from '../../gen/api_pb'; -import { getBackpressureColor, getOperatorBackpressure } from '../../lib/util'; -import { PipelineGraph } from '../../lib/data_fetching'; +import { getBackpressureColor, getCurrentMaxMetric } from '../../lib/util'; +import { OperatorMetricGroup, PipelineGraph, PipelineNode } from '../../lib/data_fetching'; +import { useMemo } from 'react'; function PipelineGraphNode({ data, }: { data: { - node: JobNode; + node: PipelineNode; setActiveOperator: (op: string) => void; isActive: boolean; operatorBackpressure: number; @@ -42,18 +41,28 @@ function PipelineGraphNode({ export function PipelineGraphViewer({ graph, - metrics, + operatorMetricGroups, setActiveOperator, activeOperator, }: { graph: PipelineGraph; - metrics?: JobMetricsResp; + operatorMetricGroups?: OperatorMetricGroup[]; setActiveOperator: (op: string) => void; activeOperator?: string; }) { const nodeTypes = useMemo(() => ({ pipelineNode: PipelineGraphNode }), []); const nodes = graph.nodes.map(node => { + let backpressure = 0; + if (operatorMetricGroups && operatorMetricGroups.length > 0) { + const operatorMetricGroup = operatorMetricGroups.find(o => o.operatorId == node.nodeId); + if (operatorMetricGroup) { + const metricGroups = operatorMetricGroup.metricGroups; + const backpressureMetrics = metricGroups.find(m => m.name == 'backpressure'); + backpressure = backpressureMetrics ? getCurrentMaxMetric(backpressureMetrics) : 0; + } + } + return { id: node.nodeId, type: 'pipelineNode', @@ -62,7 +71,7 @@ export function PipelineGraphViewer({ node: node, setActiveOperator: setActiveOperator, isActive: node.nodeId == activeOperator, - operatorBackpressure: getOperatorBackpressure(metrics, node.nodeId), + operatorBackpressure: backpressure, }, position: { x: 0,