Skip to content

Commit

Permalink
Stream job output using server-sent events
Browse files Browse the repository at this point in the history
Add an endpoint to the REST API to stream the job output using
Server-Sent Events. Consume the output in the console.
  • Loading branch information
jbeisen committed Aug 4, 2023
1 parent 31b058f commit c214ce2
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 157 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion arroyo-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ petgraph = {version = "0.6", features = ["serde-1"]}

http = "0.2"
tower-http = {version = "0.4", features = ["trace", "fs", "cors", "validate-request", "auth"]}
axum = {version = "0.6.12", features = ["headers"]}
axum = {version = "0.6.12", features = ["headers", "tokio"]}
axum-extra = "0.7.4"
thiserror = "1.0.40"
utoipa = "3"
Expand Down Expand Up @@ -71,6 +71,7 @@ postgres-types = { version = "*", features = ["derive"] }
tokio-postgres = { version = "*", features = ["with-serde_json-1", "with-time-0_3", "with-uuid-1"] }
deadpool-postgres = { version = "0.10" }
futures = "0.3"
futures-util = "0.3.28"
time = "0.3"
cornucopia_async = { version = "0.4", features = ["with-serde_json-1"] }
jwt-simple = "0.11.4"
Expand Down
96 changes: 94 additions & 2 deletions arroyo-api/src/jobs.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
use crate::queries::api_queries::{DbCheckpoint, DbLogMessage, DbPipelineJob};
use arroyo_datastream::Program;
use arroyo_rpc::grpc;
use arroyo_rpc::grpc::api::{
CheckpointDetailsResp, CheckpointOverview, CreateJobReq, JobDetailsResp, JobStatus,
PipelineProgram, StopType,
};
use arroyo_rpc::grpc::controller_grpc_client::ControllerGrpcClient;
use arroyo_rpc::public_ids::{generate_id, IdTypes};
use axum::extract::{Path, State};
use axum::response::sse::{Event, Sse};
use axum::Json;
use cornucopia_async::GenericClient;
use deadpool_postgres::Transaction;
use futures_util::stream::Stream;
use http::StatusCode;
use prost::Message;
use std::convert::Infallible;
use std::{collections::HashMap, time::Duration};
use tonic::Status;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt as _;
use tonic::{Request, Status};
use tracing::info;

const PREVIEW_TTL: Duration = Duration::from_secs(60);

use crate::pipelines::query_job_by_pub_id;
use crate::pipelines::{query_job_by_pub_id, query_pipeline_by_pub_id};
use crate::rest::AppState;
use crate::rest_types::{
Checkpoint, CheckpointCollection, JobCollection, JobLogMessage, JobLogMessageCollection,
OutputData,
};
use crate::rest_utils::{authenticate, client, log_and_map_rest, BearerAuth, ErrorResp};
use crate::{log_and_map, queries::api_queries, to_micros, types::public, AuthData};
Expand Down Expand Up @@ -368,6 +377,89 @@ pub async fn get_job_checkpoints(
}))
}

/// Subscribe to a job's output
#[utoipa::path(
get,
path = "/v1/pipelines/{pipeline_id}/jobs/{job_id}/output",
tag = "jobs",
params(
("pipeline_id" = String, Path, description = "Pipeline id"),
("job_id" = String, Path, description = "Job id")
),
responses(
(status = 200, description = "Job output as 'text/event-stream'"),
),
)]
pub async fn get_job_output(
State(state): State<AppState>,
bearer_auth: BearerAuth,
Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, ErrorResp> {
let client = client(&state.pool).await?;
let auth_data = authenticate(&state.pool, bearer_auth).await?;

// validate that the job exists, the user has access, and the graph has a GrpcSink
query_job_by_pub_id(&pipeline_pub_id, &job_pub_id, &client, &auth_data).await?;
let pipeline = query_pipeline_by_pub_id(&pipeline_pub_id, &client, &auth_data).await?;
if !pipeline
.graph
.nodes
.iter()
.any(|n| n.operator.contains("WebSink"))
{
// TODO: make this check more robust
return Err(ErrorResp {
status_code: StatusCode::BAD_REQUEST,
message: "Job does not have a web sink".to_string(),
});
}
let (tx, rx) = tokio::sync::mpsc::channel(32);

let mut controller =
ControllerGrpcClient::connect(state.grpc_api_server.controller_addr.clone())
.await
.unwrap();

let mut stream = controller
.subscribe_to_output(Request::new(grpc::GrpcOutputSubscription {
job_id: job_pub_id.clone(),
}))
.await
.unwrap()
.into_inner();

info!("Subscribed to output");
tokio::spawn(async move {
let _controller = controller;
let mut message_count = 0;
while let Some(d) = stream.next().await {
if d.as_ref().map(|t| t.done).unwrap_or(false) {
info!("Stream done for {}", job_pub_id);
break;
}

let v = match d {
Ok(d) => d,
Err(_) => break,
};

let output_data: OutputData = v.into();
let json = serde_json::to_string(&output_data).unwrap();
let e = Ok(Event::default().data(json).id(message_count.to_string()));

if tx.send(e).await.is_err() {
break;
}

message_count += 1;
}

info!("Closing watch stream for {}", job_pub_id);
});

Ok(Sse::new(ReceiverStream::new(rx)))
}

/// Get all jobs
#[utoipa::path(
get,
Expand Down
118 changes: 24 additions & 94 deletions arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,36 @@
use crate::jobs::{__path_get_job_checkpoints, __path_get_job_errors, __path_get_jobs};
use crate::jobs::{
__path_get_job_checkpoints, __path_get_job_errors, __path_get_job_output, __path_get_jobs,
};
use crate::pipelines::__path_get_pipelines;
use crate::pipelines::__path_post_pipeline;
use crate::pipelines::{
__path_delete_pipeline, __path_get_pipeline, __path_get_pipeline_jobs, __path_patch_pipeline,
__path_validate_pipeline,
};
use crate::queries::api_queries;
use crate::rest::__path_ping;
use crate::rest_types::{
Checkpoint, CheckpointCollection, Job, JobCollection, JobLogLevel, JobLogMessage,
JobLogMessageCollection, Pipeline, PipelineCollection, PipelineEdge, PipelineGraph,
JobLogMessageCollection, OutputData, Pipeline, PipelineCollection, PipelineEdge, PipelineGraph,
PipelineNode, PipelinePatch, PipelinePost, StopType as StopTypeRest, Udf, UdfLanguage,
ValidatePipelinePost,
};
use crate::rest_utils::ErrorResp;
use arroyo_connectors::connectors;
use arroyo_rpc::grpc::api::{
api_grpc_server::ApiGrpc, CheckpointDetailsReq, CheckpointDetailsResp, ConfluentSchemaReq,
ConfluentSchemaResp, CreateConnectionReq, CreateConnectionResp, CreateJobReq, CreateJobResp,
CreatePipelineReq, CreatePipelineResp, GetConnectionsReq, GetConnectionsResp, GetJobsReq,
GetJobsResp, GetPipelineReq, GrpcOutputSubscription, JobCheckpointsReq, JobCheckpointsResp,
JobDetailsReq, JobDetailsResp, JobMetricsReq, JobMetricsResp, OperatorErrorsReq,
OperatorErrorsRes, OutputData as OutputDataProto, PipelineDef, PipelineGraphReq,
PipelineGraphResp, StopType, TestSourceMessage, UpdateJobReq, UpdateJobResp,
};
use arroyo_rpc::grpc::api::{
CreateConnectionTableReq, CreateConnectionTableResp, DeleteConnectionReq, DeleteConnectionResp,
DeleteConnectionTableReq, DeleteConnectionTableResp, DeleteJobReq, DeleteJobResp,
GetConnectionTablesReq, GetConnectionTablesResp, GetConnectorsReq, GetConnectorsResp,
PipelineProgram, TestSchemaReq, TestSchemaResp,
};
use arroyo_rpc::grpc::{
self,
api::{
api_grpc_server::ApiGrpc, CheckpointDetailsReq, CheckpointDetailsResp, ConfluentSchemaReq,
ConfluentSchemaResp, CreateConnectionReq, CreateConnectionResp, CreateJobReq,
CreateJobResp, CreatePipelineReq, CreatePipelineResp, GetConnectionsReq,
GetConnectionsResp, GetJobsReq, GetJobsResp, GetPipelineReq, GrpcOutputSubscription,
JobCheckpointsReq, JobCheckpointsResp, JobDetailsReq, JobDetailsResp, JobMetricsReq,
JobMetricsResp, OperatorErrorsReq, OperatorErrorsRes, OutputData, PipelineDef,
PipelineGraphReq, PipelineGraphResp, StopType, TestSourceMessage, UpdateJobReq,
UpdateJobResp,
},
controller_grpc_client::ControllerGrpcClient,
};
use arroyo_server_common::log_event;
use cornucopia_async::GenericClient;
use deadpool_postgres::{Object, Pool};
Expand All @@ -45,9 +41,9 @@ use std::collections::HashMap;
use std::time::Duration;
use time::OffsetDateTime;
use tokio_postgres::error::SqlState;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tracing::{error, info, warn};
use tracing::{error, warn};
use utoipa::OpenApi;

mod cloud;
Expand Down Expand Up @@ -520,83 +516,15 @@ impl ApiGrpc for ApiServer {
}
}

type SubscribeToOutputStream = ReceiverStream<Result<OutputData, Status>>;
type SubscribeToOutputStream = ReceiverStream<Result<OutputDataProto, Status>>;

async fn subscribe_to_output(
&self,
request: Request<GrpcOutputSubscription>,
_request: Request<GrpcOutputSubscription>,
) -> Result<Response<Self::SubscribeToOutputStream>, Status> {
let (request, auth) = self.authenticate(request).await?;

let job_id = api_queries::get_pipeline_job()
.bind(
&self.client().await?,
&auth.organization_id,
&request.into_inner().job_id,
)
.one()
.await
.map_err(log_and_map)?
.id;

// validate that the job exists, the user has access, and the graph has a GrpcSink
let details = jobs::get_job_details(&job_id, &auth, &self.client().await?).await?;

if !details
.job_graph
.unwrap()
.nodes
.iter()
.any(|n| n.operator.contains("WebSink"))
{
// TODO: make this check more robust
return Err(Status::invalid_argument(format!(
"Job {} does not have a web sink",
job_id
)));
}

let (tx, rx) = tokio::sync::mpsc::channel(32);

let mut controller = ControllerGrpcClient::connect(self.controller_addr.clone())
.await
.map_err(log_and_map)?;

info!("connected to controller");

let mut stream = controller
.subscribe_to_output(Request::new(grpc::GrpcOutputSubscription {
job_id: job_id.clone(),
}))
.await
.map_err(log_and_map)?
.into_inner();

info!("subscribed to output");
tokio::spawn(async move {
let _controller = controller;
while let Some(d) = stream.next().await {
if d.as_ref().map(|t| t.done).unwrap_or(false) {
info!("Stream done for {}", job_id);
break;
}

let v = d.map(|d| OutputData {
operator_id: d.operator_id,
timestamp: d.timestamp,
key: d.key,
value: d.value,
});

if tx.send(v).await.is_err() {
break;
}
}

info!("Closing watch stream for {}", job_id);
});

Ok(Response::new(ReceiverStream::new(rx)))
Err(Status::unimplemented(
"This functionality has been moved to the REST API.",
))
}
}

Expand All @@ -615,7 +543,8 @@ impl ApiGrpc for ApiServer {
get_jobs,
get_pipeline_jobs,
get_job_errors,
get_job_checkpoints
get_job_checkpoints,
get_job_output
),
components(schemas(
ValidatePipelinePost,
Expand All @@ -635,7 +564,8 @@ impl ApiGrpc for ApiServer {
JobLogMessageCollection,
JobLogLevel,
Checkpoint,
CheckpointCollection
CheckpointCollection,
OutputData
)),
tags(
(name = "pipelines", description = "Pipeline management endpoints"),
Expand Down
5 changes: 3 additions & 2 deletions arroyo-api/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tower_http::services::ServeDir;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

use crate::jobs::{get_job_checkpoints, get_job_errors, get_jobs};
use crate::jobs::{get_job_checkpoints, get_job_errors, get_job_output, get_jobs};
use crate::pipelines::{
delete_pipeline, get_pipeline, get_pipeline_jobs, get_pipelines, patch_pipeline, post_pipeline,
validate_pipeline,
Expand Down Expand Up @@ -86,7 +86,8 @@ pub fn create_rest_app(server: ApiServer, pool: Pool) -> Router {
let jobs_routes = Router::new()
.route("/", get(get_pipeline_jobs))
.route("/:job_id/errors", get(get_job_errors))
.route("/:job_id/checkpoints", get(get_job_checkpoints));
.route("/:job_id/checkpoints", get(get_job_checkpoints))
.route("/:job_id/output", get(get_job_output));

let api_routes = Router::new()
.route("/ping", get(ping))
Expand Down
35 changes: 28 additions & 7 deletions arroyo-api/src/rest_types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::types::public::LogLevel;
use crate::types::public::StopMode;
use arroyo_datastream::Program;
use arroyo_rpc::grpc;
use arroyo_rpc::grpc::api;
use petgraph::visit::EdgeRef;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -127,14 +128,14 @@ impl From<StopMode> for StopType {
}
}

impl Into<arroyo_rpc::grpc::api::StopType> for StopType {
fn into(self) -> arroyo_rpc::grpc::api::StopType {
impl Into<api::StopType> for StopType {
fn into(self) -> api::StopType {
match self {
StopType::None => arroyo_rpc::grpc::api::StopType::None,
StopType::Checkpoint => arroyo_rpc::grpc::api::StopType::Checkpoint,
StopType::Graceful => arroyo_rpc::grpc::api::StopType::Graceful,
StopType::Immediate => arroyo_rpc::grpc::api::StopType::Immediate,
StopType::Force => arroyo_rpc::grpc::api::StopType::Force,
StopType::None => api::StopType::None,
StopType::Checkpoint => api::StopType::Checkpoint,
StopType::Graceful => api::StopType::Graceful,
StopType::Immediate => api::StopType::Immediate,
StopType::Force => api::StopType::Force,
}
}
}
Expand Down Expand Up @@ -245,3 +246,23 @@ pub struct Collection<T> {
pub data: Vec<T>,
pub has_more: bool,
}

#[derive(Serialize, Deserialize, Clone, Debug, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct OutputData {
pub operator_id: String,
pub timestamp: u64,
pub key: String,
pub value: String,
}

impl From<grpc::OutputData> for OutputData {
fn from(value: grpc::OutputData) -> Self {
OutputData {
operator_id: value.operator_id,
timestamp: value.timestamp,
key: value.key,
value: value.value,
}
}
}
Loading

0 comments on commit c214ce2

Please sign in to comment.