Skip to content

Commit

Permalink
Use REST API for metrics
Browse files Browse the repository at this point in the history
Add an endpoint to the REST API to retrieve metrics. Use the new
endpoint in the console.
  • Loading branch information
jbeisen committed Aug 4, 2023
1 parent 96463ed commit ea66bc2
Show file tree
Hide file tree
Showing 12 changed files with 355 additions and 308 deletions.
61 changes: 1 addition & 60 deletions arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::queries::api_queries::{DbCheckpoint, DbLogMessage, DbPipelineJob};
use arroyo_datastream::Program;
use arroyo_rpc::grpc;
use arroyo_rpc::grpc::api::{
CheckpointDetailsResp, CheckpointOverview, CreateJobReq, JobDetailsResp, JobStatus,
PipelineProgram, StopType,
CheckpointDetailsResp, CheckpointOverview, CreateJobReq, JobStatus, StopType,
};
use arroyo_rpc::grpc::controller_grpc_client::ControllerGrpcClient;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
Expand All @@ -14,7 +12,6 @@ use cornucopia_async::GenericClient;
use deadpool_postgres::Transaction;
use futures_util::stream::Stream;
use http::StatusCode;
use prost::Message;
use std::convert::Infallible;
use std::{collections::HashMap, time::Duration};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -193,62 +190,6 @@ pub(crate) fn get_action(state: &str, running_desired: &bool) -> (String, Option
(a.to_string(), s, in_progress)
}

pub(crate) async fn get_job_details(
job_id: &str,
auth: &AuthData,
client: &impl GenericClient,
) -> Result<JobDetailsResp, Status> {
let res = api_queries::get_job_details()
.bind(client, &auth.organization_id, &job_id)
.opt()
.await
.map_err(log_and_map)?
.ok_or_else(|| Status::not_found(format!("There is no job with id '{}'", job_id)))?;

let mut program: Program = PipelineProgram::decode(&res.program[..])
.map_err(log_and_map)?
.try_into()
.map_err(log_and_map)?;

program.update_parallelism(
&res.parallelism_overrides
.as_object()
.unwrap()
.into_iter()
.map(|(k, v)| (k.clone(), v.as_u64().unwrap() as usize))
.collect(),
);

let state = res.state.unwrap_or_else(|| "Created".to_string());
let running_desired = res.stop == public::StopMode::none;

let (action_text, action, in_progress) = get_action(&state, &running_desired);

let status = JobStatus {
job_id: job_id.to_string(),
pipeline_name: res.pipeline_name,
running_desired,
state,
run_id: res.run_id.unwrap_or(0) as u64,
start_time: res.start_time.map(to_micros),
finish_time: res.finish_time.map(to_micros),
tasks: res.tasks.map(|t| t as u64),
definition: res.textual_repr,
pipeline_id: format!("{}", res.pipeline_id),
udfs: serde_json::from_value(res.udfs).map_err(log_and_map)?,
failure_message: res.failure_message,
};

Ok(JobDetailsResp {
job_status: Some(status),
job_graph: Some(program.as_job_graph()),

action: action.map(|action| action as i32),
action_text: action_text.to_string(),
in_progress,
})
}

pub(crate) async fn checkpoint_details(
job_pub_id: &str,
epoch: u32,
Expand Down
25 changes: 15 additions & 10 deletions arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::jobs::{
__path_get_job_checkpoints, __path_get_job_errors, __path_get_job_output, __path_get_jobs,
};
use crate::metrics::__path_get_operator_metric_groups;
use crate::pipelines::__path_get_pipelines;
use crate::pipelines::__path_post_pipeline;
use crate::pipelines::{
Expand All @@ -10,9 +11,9 @@ use crate::pipelines::{
use crate::rest::__path_ping;
use crate::rest_types::{
Checkpoint, CheckpointCollection, Job, JobCollection, JobLogLevel, JobLogMessage,
JobLogMessageCollection, OutputData, Pipeline, PipelineCollection, PipelineEdge, PipelineGraph,
PipelineNode, PipelinePatch, PipelinePost, StopType as StopTypeRest, Udf, UdfLanguage,
ValidatePipelinePost,
JobLogMessageCollection, Metric, MetricGroup, MetricNames, OperatorMetricGroup, OutputData,
Pipeline, PipelineCollection, PipelineEdge, PipelineGraph, PipelineNode, PipelinePatch,
PipelinePost, StopType as StopTypeRest, SubtaskMetrics, Udf, UdfLanguage, ValidatePipelinePost,
};
use crate::rest_utils::ErrorResp;
use arroyo_connectors::connectors;
Expand Down Expand Up @@ -441,12 +442,10 @@ impl ApiGrpc for ApiServer {

async fn get_job_metrics(
&self,
request: Request<JobMetricsReq>,
_request: Request<JobMetricsReq>,
) -> Result<Response<JobMetricsResp>, Status> {
let (request, auth) = self.authenticate(request).await?;

Ok(Response::new(
metrics::get_metrics(request.into_inner().job_id, auth, &self.client().await?).await?,
Err(Status::unimplemented(
"This functionality has been moved to the REST API.",
))
}

Expand Down Expand Up @@ -544,7 +543,8 @@ impl ApiGrpc for ApiServer {
get_pipeline_jobs,
get_job_errors,
get_job_checkpoints,
get_job_output
get_job_output,
get_operator_metric_groups,
),
components(schemas(
ValidatePipelinePost,
Expand All @@ -565,7 +565,12 @@ impl ApiGrpc for ApiServer {
JobLogLevel,
Checkpoint,
CheckpointCollection,
OutputData
OutputData,
MetricNames,
Metric,
SubtaskMetrics,
MetricGroup,
OperatorMetricGroup,
)),
tags(
(name = "pipelines", description = "Pipeline management endpoints"),
Expand Down
Loading

0 comments on commit ea66bc2

Please sign in to comment.