Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTTP connections and add support for event source tables in SQL #119

Merged
merged 1 commit into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions arroyo-api/migrations/V5__add_http_connection.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TYPE connection_type
ADD VALUE 'http';
9 changes: 9 additions & 0 deletions arroyo-api/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tonic::Status;
use crate::handle_delete;
use crate::queries::api_queries;
use crate::queries::api_queries::DbConnection;
use crate::testers::HttpTester;
use crate::types::public;
use crate::{handle_db_error, log_and_map, required_field, testers::KafkaTester, AuthData};

Expand All @@ -29,6 +30,10 @@ pub(crate) async fn create_connection(
ReqConnectionType::Kinesis(_) => {
return Err(Status::failed_precondition("Kinesis is not yet supported"));
}
ReqConnectionType::Http(c) => (
public::ConnectionType::http,
serde_json::to_value(c).map_err(log_and_map)?,
),
};

api_queries::create_connection()
Expand Down Expand Up @@ -57,6 +62,9 @@ impl From<DbConnection> for Connection {
public::ConnectionType::kinesis => {
ConnectionType::Kinesis(serde_json::from_value(val.config).unwrap())
}
public::ConnectionType::http => {
ConnectionType::Http(serde_json::from_value(val.config).unwrap())
}
}),
sources: val.source_count as i32,
sinks: val.sink_count as i32,
Expand Down Expand Up @@ -102,6 +110,7 @@ pub(crate) async fn test_connection(req: CreateConnectionReq) -> Result<TestSour
ReqConnectionType::Kafka(kafka) => Ok(KafkaTester::new(kafka, None, None, tx)
.test_connection()
.await),
ReqConnectionType::Http(http) => Ok((HttpTester { connection: &http }).test().await),
_ => Ok(TestSourceMessage {
error: false,
done: true,
Expand Down
51 changes: 41 additions & 10 deletions arroyo-api/src/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use arroyo_rpc::grpc::api::{
source_def::SourceType,
source_schema::{self, Schema},
ConfluentSchemaReq, ConfluentSchemaResp, Connection, CreateSourceReq, DeleteSourceReq,
JsonSchemaDef, KafkaSourceConfig, KafkaSourceDef, RawJsonDef, SourceDef, SourceField,
SourceMetadataResp, TestSourceMessage,
EventSourceSourceConfig, EventSourceSourceDef, JsonSchemaDef, KafkaSourceConfig,
KafkaSourceDef, RawJsonDef, SourceDef, SourceField, SourceMetadataResp, TestSourceMessage,
};
use arroyo_sql::{
types::{StructDef, StructField, TypeDef},
Expand Down Expand Up @@ -601,11 +601,30 @@ pub(crate) async fn create_source(
Some(connection.id),
)
}
create_source_req::TypeOneof::EventSource(event) => (
public::SourceType::event_source,
serde_json::to_value(&event).unwrap(),
None,
),
create_source_req::TypeOneof::EventSource(event) => {
let connection = connections
.iter()
.find(|c| c.name == event.connection)
.ok_or_else(|| {
Status::failed_precondition(format!(
"Could not find connection with name '{}'",
event.connection
))
})?;

if connection.r#type != public::ConnectionType::http {
return Err(Status::invalid_argument(format!(
"Connection '{}' is not an HTTP endpoint",
event.connection
)));
}

(
public::SourceType::event_source,
serde_json::to_value(&event).unwrap(),
Some(connection.id),
)
}
create_source_req::TypeOneof::Impulse(impulse) => {
if impulse.events_per_second > auth.org_metadata.max_impulse_qps as f32 {
return rate_limit_error("impulse", auth.org_metadata.max_impulse_qps as usize);
Expand Down Expand Up @@ -707,9 +726,21 @@ pub(crate) async fn get_sources<E: GenericClient>(
topic: config.topic,
})
}
public::SourceType::event_source => SourceType::EventSource(
serde_json::from_value(rec.source_config.unwrap()).unwrap(),
),
public::SourceType::event_source => {
let config: EventSourceSourceConfig =
serde_json::from_value(rec.source_config.unwrap()).unwrap();
assert_eq!(rec.connection_type.unwrap(), public::ConnectionType::http);

SourceType::EventSource(EventSourceSourceDef {
connection_name: rec.connection_name.as_ref().unwrap().clone(),
connection: serde_json::from_value(
rec.connection_config.as_ref().unwrap().clone(),
)
.unwrap(),
path: config.path,
events: config.events,
})
}
};

SourceDef {
Expand Down
63 changes: 61 additions & 2 deletions arroyo-api/src/testers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::time::{Duration, Instant};
use std::{
str::FromStr,
time::{Duration, Instant},
};

use arroyo_datastream::auth_config_to_hashmap;
use arroyo_rpc::grpc::api::{source_schema::Schema, KafkaConnection, TestSourceMessage};
use arroyo_rpc::grpc::api::{
source_schema::Schema, HttpConnection, KafkaConnection, TestSourceMessage,
};
use arroyo_types::string_to_map;
use http::{HeaderMap, HeaderName, HeaderValue};
use rdkafka::{
consumer::{BaseConsumer, Consumer},
message::BorrowedMessage,
Expand Down Expand Up @@ -240,3 +247,55 @@ impl KafkaTester {
});
}
}

pub struct HttpTester<'a> {
pub connection: &'a HttpConnection,
}

impl<'a> HttpTester<'a> {
pub async fn test(&self) -> TestSourceMessage {
match self.test_internal().await {
Ok(_) => TestSourceMessage {
error: false,
done: true,
message: "HTTP endpoint is valid".to_string(),
},
Err(e) => TestSourceMessage {
error: true,
done: true,
message: e,
},
}
}

async fn test_internal(&self) -> Result<(), String> {
let headers = string_to_map(&self.connection.headers)
.ok_or_else(|| "Headers are invalid; should be comma-separated pairs".to_string())?;

let mut header_map = HeaderMap::new();

for (k, v) in headers {
header_map.append(
HeaderName::from_str(&k).map_err(|s| format!("Invalid header name: {:?}", s))?,
HeaderValue::from_str(&v).map_err(|s| format!("Invalid header value: {:?}", s))?,
);
}

let client = reqwest::Client::builder()
.default_headers(header_map)
.build()
.unwrap();

let req = client
.head(&self.connection.url)
.build()
.map_err(|e| format!("Invalid URL: {:?}", e))?;

client
.execute(req)
.await
.map_err(|e| format!("HEAD request failed with: {:?}", e.status()))?;

Ok(())
}
}
136 changes: 124 additions & 12 deletions arroyo-console/src/gen/api_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4106,6 +4106,49 @@ export class KinesisConnection extends Message<KinesisConnection> {
}
}

/**
* @generated from message arroyo_api.HttpConnection
*/
export class HttpConnection extends Message<HttpConnection> {
/**
* @generated from field: string url = 1;
*/
url = "";

/**
* @generated from field: string headers = 2;
*/
headers = "";

constructor(data?: PartialMessage<HttpConnection>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime = proto3;
static readonly typeName = "arroyo_api.HttpConnection";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "url", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "headers", kind: "scalar", T: 9 /* ScalarType.STRING */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): HttpConnection {
return new HttpConnection().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): HttpConnection {
return new HttpConnection().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): HttpConnection {
return new HttpConnection().fromJsonString(jsonString, options);
}

static equals(a: HttpConnection | PlainMessage<HttpConnection> | undefined, b: HttpConnection | PlainMessage<HttpConnection> | undefined): boolean {
return proto3.util.equals(HttpConnection, a, b);
}
}

/**
* @generated from message arroyo_api.Connection
*/
Expand All @@ -4130,6 +4173,12 @@ export class Connection extends Message<Connection> {
*/
value: KinesisConnection;
case: "kinesis";
} | {
/**
* @generated from field: arroyo_api.HttpConnection http = 6;
*/
value: HttpConnection;
case: "http";
} | { case: undefined; value?: undefined } = { case: undefined };

/**
Expand All @@ -4153,6 +4202,7 @@ export class Connection extends Message<Connection> {
{ no: 1, name: "name", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "kafka", kind: "message", T: KafkaConnection, oneof: "connection_type" },
{ no: 3, name: "kinesis", kind: "message", T: KinesisConnection, oneof: "connection_type" },
{ no: 6, name: "http", kind: "message", T: HttpConnection, oneof: "connection_type" },
{ no: 4, name: "sources", kind: "scalar", T: 5 /* ScalarType.INT32 */ },
{ no: 5, name: "sinks", kind: "scalar", T: 5 /* ScalarType.INT32 */ },
]);
Expand Down Expand Up @@ -4198,6 +4248,12 @@ export class CreateConnectionReq extends Message<CreateConnectionReq> {
*/
value: KinesisConnection;
case: "kinesis";
} | {
/**
* @generated from field: arroyo_api.HttpConnection http = 4;
*/
value: HttpConnection;
case: "http";
} | { case: undefined; value?: undefined } = { case: undefined };

constructor(data?: PartialMessage<CreateConnectionReq>) {
Expand All @@ -4211,6 +4267,7 @@ export class CreateConnectionReq extends Message<CreateConnectionReq> {
{ no: 1, name: "name", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "kafka", kind: "message", T: KafkaConnection, oneof: "connection_type" },
{ no: 3, name: "kinesis", kind: "message", T: KinesisConnection, oneof: "connection_type" },
{ no: 4, name: "http", kind: "message", T: HttpConnection, oneof: "connection_type" },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): CreateConnectionReq {
Expand Down Expand Up @@ -4943,19 +5000,19 @@ export class NexmarkSourceConfig extends Message<NexmarkSourceConfig> {
*/
export class EventSourceSourceConfig extends Message<EventSourceSourceConfig> {
/**
* @generated from field: string url = 1;
* @generated from field: string connection = 1;
*/
url = "";
connection = "";

/**
* @generated from field: string events = 2;
* @generated from field: string path = 2;
*/
events = "";
path = "";

/**
* @generated from field: string headers = 3;
* @generated from field: string events = 3;
*/
headers = "";
events = "";

constructor(data?: PartialMessage<EventSourceSourceConfig>) {
super();
Expand All @@ -4965,9 +5022,9 @@ export class EventSourceSourceConfig extends Message<EventSourceSourceConfig> {
static readonly runtime: typeof proto3 = proto3;
static readonly typeName = "arroyo_api.EventSourceSourceConfig";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "url", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "events", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 3, name: "headers", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 1, name: "connection", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "path", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 3, name: "events", kind: "scalar", T: 9 /* ScalarType.STRING */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): EventSourceSourceConfig {
Expand All @@ -4987,6 +5044,61 @@ export class EventSourceSourceConfig extends Message<EventSourceSourceConfig> {
}
}

/**
* @generated from message arroyo_api.EventSourceSourceDef
*/
export class EventSourceSourceDef extends Message<EventSourceSourceDef> {
/**
* @generated from field: string connection_name = 1;
*/
connectionName = "";

/**
* @generated from field: arroyo_api.HttpConnection connection = 2;
*/
connection?: HttpConnection;

/**
* @generated from field: string path = 3;
*/
path = "";

/**
* @generated from field: string events = 4;
*/
events = "";

constructor(data?: PartialMessage<EventSourceSourceDef>) {
super();
proto3.util.initPartial(data, this);
}

static readonly runtime = proto3;
static readonly typeName = "arroyo_api.EventSourceSourceDef";
static readonly fields: FieldList = proto3.util.newFieldList(() => [
{ no: 1, name: "connection_name", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 2, name: "connection", kind: "message", T: HttpConnection },
{ no: 3, name: "path", kind: "scalar", T: 9 /* ScalarType.STRING */ },
{ no: 4, name: "events", kind: "scalar", T: 9 /* ScalarType.STRING */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): EventSourceSourceDef {
return new EventSourceSourceDef().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): EventSourceSourceDef {
return new EventSourceSourceDef().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): EventSourceSourceDef {
return new EventSourceSourceDef().fromJsonString(jsonString, options);
}

static equals(a: EventSourceSourceDef | PlainMessage<EventSourceSourceDef> | undefined, b: EventSourceSourceDef | PlainMessage<EventSourceSourceDef> | undefined): boolean {
return proto3.util.equals(EventSourceSourceDef, a, b);
}
}

/**
* @generated from message arroyo_api.CreateSourceReq
*/
Expand Down Expand Up @@ -5208,9 +5320,9 @@ export class SourceDef extends Message<SourceDef> {
case: "nexmark";
} | {
/**
* @generated from field: arroyo_api.EventSourceSourceConfig event_source = 11;
* @generated from field: arroyo_api.EventSourceSourceDef event_source = 11;
*/
value: EventSourceSourceConfig;
value: EventSourceSourceDef;
case: "eventSource";
} | { case: undefined; value?: undefined } = { case: undefined };

Expand All @@ -5236,7 +5348,7 @@ export class SourceDef extends Message<SourceDef> {
{ no: 4, name: "impulse", kind: "message", T: ImpulseSourceConfig, oneof: "source_type" },
{ no: 5, name: "file", kind: "message", T: FileSourceConfig, oneof: "source_type" },
{ no: 6, name: "nexmark", kind: "message", T: NexmarkSourceConfig, oneof: "source_type" },
{ no: 11, name: "event_source", kind: "message", T: EventSourceSourceConfig, oneof: "source_type" },
{ no: 11, name: "event_source", kind: "message", T: EventSourceSourceDef, oneof: "source_type" },
{ no: 7, name: "sql_fields", kind: "message", T: SourceField, repeated: true },
]);

Expand Down
Loading