Skip to content

Commit

Permalink
Use REST API for metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeisen committed Aug 4, 2023
1 parent c214ce2 commit 546198d
Show file tree
Hide file tree
Showing 12 changed files with 368 additions and 292 deletions.
61 changes: 1 addition & 60 deletions arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::queries::api_queries::{DbCheckpoint, DbLogMessage, DbPipelineJob};
use arroyo_datastream::Program;
use arroyo_rpc::grpc;
use arroyo_rpc::grpc::api::{
CheckpointDetailsResp, CheckpointOverview, CreateJobReq, JobDetailsResp, JobStatus,
PipelineProgram, StopType,
CheckpointDetailsResp, CheckpointOverview, CreateJobReq, JobStatus, StopType,
};
use arroyo_rpc::grpc::controller_grpc_client::ControllerGrpcClient;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
Expand All @@ -14,7 +12,6 @@ use cornucopia_async::GenericClient;
use deadpool_postgres::Transaction;
use futures_util::stream::Stream;
use http::StatusCode;
use prost::Message;
use std::convert::Infallible;
use std::{collections::HashMap, time::Duration};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -193,62 +190,6 @@ pub(crate) fn get_action(state: &str, running_desired: &bool) -> (String, Option
(a.to_string(), s, in_progress)
}

pub(crate) async fn get_job_details(
job_id: &str,
auth: &AuthData,
client: &impl GenericClient,
) -> Result<JobDetailsResp, Status> {
let res = api_queries::get_job_details()
.bind(client, &auth.organization_id, &job_id)
.opt()
.await
.map_err(log_and_map)?
.ok_or_else(|| Status::not_found(format!("There is no job with id '{}'", job_id)))?;

let mut program: Program = PipelineProgram::decode(&res.program[..])
.map_err(log_and_map)?
.try_into()
.map_err(log_and_map)?;

program.update_parallelism(
&res.parallelism_overrides
.as_object()
.unwrap()
.into_iter()
.map(|(k, v)| (k.clone(), v.as_u64().unwrap() as usize))
.collect(),
);

let state = res.state.unwrap_or_else(|| "Created".to_string());
let running_desired = res.stop == public::StopMode::none;

let (action_text, action, in_progress) = get_action(&state, &running_desired);

let status = JobStatus {
job_id: job_id.to_string(),
pipeline_name: res.pipeline_name,
running_desired,
state,
run_id: res.run_id.unwrap_or(0) as u64,
start_time: res.start_time.map(to_micros),
finish_time: res.finish_time.map(to_micros),
tasks: res.tasks.map(|t| t as u64),
definition: res.textual_repr,
pipeline_id: format!("{}", res.pipeline_id),
udfs: serde_json::from_value(res.udfs).map_err(log_and_map)?,
failure_message: res.failure_message,
};

Ok(JobDetailsResp {
job_status: Some(status),
job_graph: Some(program.as_job_graph()),

action: action.map(|action| action as i32),
action_text: action_text.to_string(),
in_progress,
})
}

pub(crate) async fn checkpoint_details(
job_pub_id: &str,
epoch: u32,
Expand Down
25 changes: 15 additions & 10 deletions arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::jobs::{
__path_get_job_checkpoints, __path_get_job_errors, __path_get_job_output, __path_get_jobs,
};
use crate::metrics::__path_get_operator_metric_groups;
use crate::pipelines::__path_get_pipelines;
use crate::pipelines::__path_post_pipeline;
use crate::pipelines::{
Expand All @@ -10,9 +11,9 @@ use crate::pipelines::{
use crate::rest::__path_ping;
use crate::rest_types::{
Checkpoint, CheckpointCollection, Job, JobCollection, JobLogLevel, JobLogMessage,
JobLogMessageCollection, OutputData, Pipeline, PipelineCollection, PipelineEdge, PipelineGraph,
PipelineNode, PipelinePatch, PipelinePost, StopType as StopTypeRest, Udf, UdfLanguage,
ValidatePipelinePost,
JobLogMessageCollection, Metric, MetricGroup, MetricNames, OperatorMetricGroup, OutputData,
Pipeline, PipelineCollection, PipelineEdge, PipelineGraph, PipelineNode, PipelinePatch,
PipelinePost, StopType as StopTypeRest, SubtaskMetrics, Udf, UdfLanguage, ValidatePipelinePost,
};
use crate::rest_utils::ErrorResp;
use arroyo_connectors::connectors;
Expand Down Expand Up @@ -441,12 +442,10 @@ impl ApiGrpc for ApiServer {

async fn get_job_metrics(
&self,
request: Request<JobMetricsReq>,
_request: Request<JobMetricsReq>,
) -> Result<Response<JobMetricsResp>, Status> {
let (request, auth) = self.authenticate(request).await?;

Ok(Response::new(
metrics::get_metrics(request.into_inner().job_id, auth, &self.client().await?).await?,
Err(Status::unimplemented(
"This functionality has been moved to the REST API.",
))
}

Expand Down Expand Up @@ -544,7 +543,8 @@ impl ApiGrpc for ApiServer {
get_pipeline_jobs,
get_job_errors,
get_job_checkpoints,
get_job_output
get_job_output,
get_operator_metric_groups,
),
components(schemas(
ValidatePipelinePost,
Expand All @@ -565,7 +565,12 @@ impl ApiGrpc for ApiServer {
JobLogLevel,
Checkpoint,
CheckpointCollection,
OutputData
OutputData,
MetricNames,
Metric,
SubtaskMetrics,
MetricGroup,
OperatorMetricGroup,
)),
tags(
(name = "pipelines", description = "Pipeline management endpoints"),
Expand Down
Loading

0 comments on commit 546198d

Please sign in to comment.