From 09521491736ea9f1a4260afa377eced18dfacfae Mon Sep 17 00:00:00 2001 From: Conor Date: Thu, 31 Aug 2023 22:09:45 +1000 Subject: [PATCH] Opensearch support (#1268) --- Cargo.lock | 12 + shotover-proxy/tests/lib.rs | 1 + .../tests/opensearch_int_tests/mod.rs | 266 ++++++++++--- .../docker-compose.yaml | 1 + .../opensearch-passthrough/topology.yaml | 12 + shotover/Cargo.toml | 3 + shotover/src/codec/cassandra.rs | 3 +- shotover/src/codec/mod.rs | 2 + shotover/src/codec/opensearch.rs | 357 ++++++++++++++++++ shotover/src/frame/mod.rs | 20 + shotover/src/frame/opensearch.rs | 40 ++ shotover/src/message/mod.rs | 13 +- shotover/src/sources/cassandra.rs | 4 +- shotover/src/sources/mod.rs | 6 + shotover/src/sources/opensearch.rs | 96 +++++ shotover/src/transforms/mod.rs | 12 + shotover/src/transforms/opensearch/mod.rs | 119 ++++++ shotover/src/transforms/query_counter.rs | 3 + test-helpers/src/docker_compose.rs | 2 +- 19 files changed, 914 insertions(+), 58 deletions(-) create mode 100644 shotover/src/codec/opensearch.rs create mode 100644 shotover/src/frame/opensearch.rs create mode 100644 shotover/src/sources/opensearch.rs create mode 100644 shotover/src/transforms/opensearch/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b0e773962..d8d47395f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,6 +213,15 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic" version = "0.5.3" @@ -4409,6 +4418,7 @@ dependencies = [ "anyhow", "async-recursion", "async-trait", + "atoi", "atomic_enum", "aws-config", "aws-sdk-kms", @@ -4435,6 +4445,8 @@ dependencies = [ "halfbrown", "hex", "hex-literal", + "http", + "httparse", "hyper", "itertools 0.10.5", "kafka-protocol", diff --git a/shotover-proxy/tests/lib.rs b/shotover-proxy/tests/lib.rs index 375ea2d30..be984d876 100644 --- a/shotover-proxy/tests/lib.rs +++ b/shotover-proxy/tests/lib.rs @@ -3,6 +3,7 @@ use tokio_bin_process::bin_path; mod cassandra_int_tests; mod kafka_int_tests; +#[cfg(feature = "alpha-transforms")] mod opensearch_int_tests; mod redis_int_tests; mod runner; diff --git a/shotover-proxy/tests/opensearch_int_tests/mod.rs b/shotover-proxy/tests/opensearch_int_tests/mod.rs index 8f689fa36..be39351e8 100644 --- a/shotover-proxy/tests/opensearch_int_tests/mod.rs +++ b/shotover-proxy/tests/opensearch_int_tests/mod.rs @@ -1,52 +1,242 @@ +use crate::shotover_process; use opensearch::{ auth::Credentials, cert::CertificateValidation, - http::Url, + http::response::Response, http::{ - response::Response, transport::{SingleNodeConnectionPool, TransportBuilder}, - StatusCode, + Method, StatusCode, Url, }, - indices::IndicesExistsParts, + indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts}, params::Refresh, - BulkOperation, BulkParts, OpenSearch, SearchParts, + BulkOperation, BulkParts, DeleteParts, Error, IndexParts, OpenSearch, SearchParts, }; use serde_json::{json, Value}; use test_helpers::docker_compose::docker_compose; -pub async fn index_documents(client: &OpenSearch) -> Response { - let index = "posts"; - let exists_response = client - .indices() - .exists(IndicesExistsParts::Index(&[index])) - .send() - .await - .unwrap(); +async fn assert_ok_and_get_json(response: Result) -> Value { + let response = response.unwrap(); + let status = response.status_code(); - assert_eq!(exists_response.status_code(), StatusCode::NOT_FOUND); + if response.method() == Method::Head { + if status != StatusCode::OK { + panic!("Opensearch HEAD query returned status code {status}"); + } + Value::Null + } else { + let json = response.json().await.unwrap(); + if status != StatusCode::OK && status != StatusCode::CREATED { + panic!("Opensearch query failed: {status:#?}\n{json:#?}"); + } + json + } +} + +pub async fn test_bulk(client: &OpenSearch) { + assert_ok_and_get_json( + client + .indices() + .create(IndicesCreateParts::Index("posts")) + .send() + .await, + ) + .await; let mut body: Vec> = vec![]; - for i in 1..=10 { - let op = BulkOperation::index(json!({"title":"OpenSearch"})) + for i in 0..10 { + let op = BulkOperation::index(json!({"title": "OpenSearch", "i": i})) .id(i.to_string()) .into(); body.push(op); } - client - .bulk(BulkParts::Index(index)) - .body(body) - .refresh(Refresh::WaitFor) + assert_ok_and_get_json( + client + .bulk(BulkParts::Index("posts")) + .body(body) + .refresh(Refresh::WaitFor) + .send() + .await, + ) + .await; + + let results = assert_ok_and_get_json( + client + .search(SearchParts::None) + .body(json!({ + "query": { + "match_all": {} + }, + "sort": [ + { + "i": { + "order": "asc" + } + } + ] + })) + .allow_no_indices(true) + .send() + .await, + ) + .await; + + assert!(results["took"].is_i64()); + let hits = results["hits"]["hits"].as_array().unwrap(); + assert_eq!( + hits.iter().map(|x| &x["_source"]).collect::>(), + vec!( + &json!({ "title": "OpenSearch", "i": 0 }), + &json!({ "title": "OpenSearch", "i": 1 }), + &json!({ "title": "OpenSearch", "i": 2 }), + &json!({ "title": "OpenSearch", "i": 3 }), + &json!({ "title": "OpenSearch", "i": 4 }), + &json!({ "title": "OpenSearch", "i": 5 }), + &json!({ "title": "OpenSearch", "i": 6 }), + &json!({ "title": "OpenSearch", "i": 7 }), + &json!({ "title": "OpenSearch", "i": 8 }), + &json!({ "title": "OpenSearch", "i": 9 }), + ) + ); +} + +async fn test_create_index(client: &OpenSearch) { + assert_ok_and_get_json( + client + .indices() + .create(IndicesCreateParts::Index("test-index")) + .send() + .await, + ) + .await; + + assert_ok_and_get_json( + client + .indices() + .exists(IndicesExistsParts::Index(&["test-index"])) + .send() + .await, + ) + .await; +} + +async fn test_index_and_search_document(client: &OpenSearch) -> String { + assert_ok_and_get_json( + client + .index(IndexParts::Index("test-index")) + .body(json!({ + "name": "John", + "age": 30 + })) + .refresh(Refresh::WaitFor) + .send() + .await, + ) + .await; + + let response = assert_ok_and_get_json( + client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(10) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .send() + .await, + ) + .await; + + assert!(response["took"].is_i64()); + let hits = response["hits"]["hits"].as_array().unwrap(); + assert_eq!( + hits.iter().map(|x| &x["_source"]).collect::>(), + vec!(&json!({ + "name": "John", + "age": 30, + })) + ); + hits[0]["_id"].as_str().unwrap().to_owned() +} + +async fn test_delete_and_search_document(client: &OpenSearch, id: String) { + assert_ok_and_get_json( + client + .delete(DeleteParts::IndexId("test-index", &id)) + .refresh(Refresh::WaitFor) + .send() + .await, + ) + .await; + + let response = assert_ok_and_get_json( + client + .search(SearchParts::Index(&["test-index"])) + .from(0) + .size(10) + .body(json!({ + "query": { + "match": { + "name": "John", + } + } + })) + .allow_no_indices(true) + .send() + .await, + ) + .await; + + // let results = response.json::().await.unwrap(); + assert!(response["took"].is_i64()); + assert_eq!(response["hits"]["hits"].as_array().unwrap().len(), 0); +} + +async fn test_delete_index(client: &OpenSearch) { + assert_ok_and_get_json( + client + .indices() + .delete(IndicesDeleteParts::Index(&["test-index"])) + .send() + .await, + ) + .await; + + let exists_response = client + .indices() + .exists(IndicesExistsParts::Index(&["test-index"])) .send() .await - .unwrap() + .unwrap(); + + assert_eq!(exists_response.status_code(), StatusCode::NOT_FOUND); +} + +async fn opensearch_test_suite(client: &OpenSearch) { + test_create_index(client).await; + + let doc_id = test_index_and_search_document(client).await; + test_delete_and_search_document(client, doc_id).await; + + test_bulk(client).await; + test_delete_index(client).await; } #[tokio::test(flavor = "multi_thread")] async fn passthrough_standard() { let _compose = docker_compose("tests/test-configs/opensearch-passthrough/docker-compose.yaml"); - let url = Url::parse("https://localhost:9200").unwrap(); + let shotover = shotover_process("tests/test-configs/opensearch-passthrough/topology.yaml") + .start() + .await; + + let addr = "http://localhost:9201"; + + let url = Url::parse(addr).unwrap(); let credentials = Credentials::Basic("admin".into(), "admin".into()); let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) .cert_validation(CertificateValidation::None) @@ -55,35 +245,7 @@ async fn passthrough_standard() { .unwrap(); let client = OpenSearch::new(transport); - index_documents(&client).await; - - let response = client - .search(SearchParts::None) - .body(json!({ - "query": { - "match_all": {} - } - })) - .allow_no_indices(true) - .send() - .await - .unwrap(); - - assert!(response.content_length().unwrap() > 0); - assert_eq!( - response.url(), - &Url::parse("https://localhost:9200/_search?allow_no_indices=true").unwrap() - ); - assert_eq!(response.status_code(), StatusCode::OK); - assert_eq!(response.method(), opensearch::http::Method::Post); + opensearch_test_suite(&client).await; - let response_body = response.json::().await.unwrap(); - assert!(response_body["took"].as_i64().is_some()); - assert_eq!( - response_body["hits"].as_object().unwrap()["hits"] - .as_array() - .unwrap() - .len(), - 10 - ); + shotover.shutdown_and_then_consume_events(&[]).await; } diff --git a/shotover-proxy/tests/test-configs/opensearch-passthrough/docker-compose.yaml b/shotover-proxy/tests/test-configs/opensearch-passthrough/docker-compose.yaml index 4ef5a0a24..f6261a928 100644 --- a/shotover-proxy/tests/test-configs/opensearch-passthrough/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/opensearch-passthrough/docker-compose.yaml @@ -9,6 +9,7 @@ services: - bootstrap.memory_lock=true - "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" - discovery.type=single-node + - plugins.security.disabled=true ulimits: memlock: soft: -1 diff --git a/shotover-proxy/tests/test-configs/opensearch-passthrough/topology.yaml b/shotover-proxy/tests/test-configs/opensearch-passthrough/topology.yaml index e69de29bb..bc8ae4de8 100644 --- a/shotover-proxy/tests/test-configs/opensearch-passthrough/topology.yaml +++ b/shotover-proxy/tests/test-configs/opensearch-passthrough/topology.yaml @@ -0,0 +1,12 @@ +--- +sources: + opensearch_prod: + OpenSearch: + listen_addr: "127.0.0.1:9201" +chain_config: + main_chain: + - OpenSearchSinkSingle: + remote_address: "127.0.0.1:9200" + connect_timeout_ms: 3000 +source_to_chain_mapping: + opensearch_prod: main_chain diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 9207aabbc..89ad23f9a 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -56,6 +56,8 @@ num = { version = "0.4.0", features = ["serde"] } uuid.workspace = true bigdecimal = {version = "0.4.0", features = ["serde"] } base64 = "0.21.0" +httparse = "1.8.0" +http = "0.2.9" #Observability metrics = "0.21.0" @@ -90,6 +92,7 @@ rustls-pemfile = "1.0.2" string = "0.3.0" xxhash-rust = { version = "0.8.6", features = ["xxh3"] } dashmap = "5.4.0" +atoi = "2.0.0" [dev-dependencies] criterion = { version = "0.5.0", features = ["async_tokio"] } diff --git a/shotover/src/codec/cassandra.rs b/shotover/src/codec/cassandra.rs index 042f5655c..ec8ddd307 100644 --- a/shotover/src/codec/cassandra.rs +++ b/shotover/src/codec/cassandra.rs @@ -1,5 +1,4 @@ -use super::{CodecWriteError, Direction}; -use crate::codec::{CodecBuilder, CodecReadError}; +use super::{CodecBuilder, CodecReadError, CodecWriteError, Direction}; use crate::frame::cassandra::{CassandraMetadata, CassandraOperation, Tracing}; use crate::frame::{CassandraFrame, Frame, MessageType}; use crate::message::{Encodable, Message, Messages, Metadata}; diff --git a/shotover/src/codec/mod.rs b/shotover/src/codec/mod.rs index f5c9f2e54..c3f7ee42c 100644 --- a/shotover/src/codec/mod.rs +++ b/shotover/src/codec/mod.rs @@ -8,6 +8,7 @@ use tokio_util::codec::{Decoder, Encoder}; pub mod cassandra; pub mod kafka; +pub mod opensearch; pub mod redis; #[derive(Eq, PartialEq, Copy, Clone)] @@ -35,6 +36,7 @@ pub enum CodecState { request_header: Option, }, Dummy, + OpenSearch, } impl CodecState { diff --git a/shotover/src/codec/opensearch.rs b/shotover/src/codec/opensearch.rs new file mode 100644 index 000000000..57db1debb --- /dev/null +++ b/shotover/src/codec/opensearch.rs @@ -0,0 +1,357 @@ +use super::{CodecBuilder, CodecReadError, CodecWriteError, Direction}; +use crate::frame::{ + opensearch::{HttpHead, RequestParts, ResponseParts}, + Frame, MessageType, OpenSearchFrame, +}; +use crate::message::{Encodable, Message, Messages}; +use anyhow::{anyhow, Result}; +use bytes::{Buf, BytesMut}; +use http::{header, HeaderName, HeaderValue, Method, Request, Response}; +use std::sync::{Arc, Mutex}; +use tokio_util::codec::{Decoder, Encoder}; + +#[derive(Clone)] +pub struct OpenSearchCodecBuilder { + direction: Direction, +} + +impl CodecBuilder for OpenSearchCodecBuilder { + type Decoder = OpenSearchDecoder; + type Encoder = OpenSearchEncoder; + + fn new(direction: Direction) -> Self { + Self { direction } + } + + fn build(&self) -> (OpenSearchDecoder, OpenSearchEncoder) { + let last_outgoing_method = Arc::new(Mutex::new(None)); + + ( + OpenSearchDecoder::new(self.direction, last_outgoing_method.clone()), + OpenSearchEncoder::new(self.direction, last_outgoing_method), + ) + } + + fn websocket_subprotocol(&self) -> &'static str { + "opensearch" + } +} + +pub struct OpenSearchDecoder { + direction: Direction, + state: State, + last_outgoing_method: Arc>>, +} + +struct DecodeResult { + body_start: usize, + http_headers: HttpHead, + content_length: usize, +} + +impl OpenSearchDecoder { + pub fn new(direction: Direction, last_outgoing_method: Arc>>) -> Self { + Self { + direction, + state: State::ParsingResponse, + last_outgoing_method, + } + } + + fn decode_request(&self, src: &mut BytesMut) -> Result> { + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut request = httparse::Request::new(&mut headers); + + let body_start = match request.parse(src)? { + httparse::Status::Complete(body_start) => body_start, + httparse::Status::Partial => return Ok(None), + }; + match request.version.unwrap() { + 1 => (), + version => { + return Err(anyhow!( + "HTTP version: {} unsupported. Requires HTTP/1", + version + )) + } + } + + let mut builder = Request::builder() + .method(request.method.unwrap()) + .uri(request.path.unwrap()); + + let builder_headers = builder.headers_mut().unwrap(); + + for header in request.headers { + if header.name.is_empty() && header.value.is_empty() { + break; + } + builder_headers.insert( + HeaderName::from_bytes(header.name.as_bytes()).unwrap(), + HeaderValue::from_bytes(header.value).unwrap(), + ); + } + + let r = builder.body(()).unwrap(); + let content_length = match r.headers().get(header::CONTENT_LENGTH) { + Some(content_length) => match atoi::atoi(content_length.as_bytes()) { + Some(content_length) => content_length, + None => return Err(anyhow!("content-length header invalid")), + }, + None => 0, + }; + let (parts, _) = r.into_parts(); + Ok(Some(DecodeResult { + body_start, + http_headers: HttpHead::Request(RequestParts { + method: parts.method, + uri: parts.uri, + version: parts.version, + headers: parts.headers, + }), + content_length, + })) + } + + fn decode_response(&self, src: &mut BytesMut) -> Result> { + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut response = httparse::Response::new(&mut headers); + + let body_start = match response.parse(src).unwrap() { + httparse::Status::Complete(body_start) => body_start, + httparse::Status::Partial => return Ok(None), + }; + match response.version.unwrap() { + 1 => (), + version => { + return Err(anyhow!( + "HTTP version: {} unsupported. Requires HTTP/1", + version + )) + } + } + + let mut builder = Response::builder().status(response.code.unwrap()); + + let builder_headers = builder.headers_mut().unwrap(); + + for header in response.headers { + if header.name.is_empty() && header.value.is_empty() { + break; + } + builder_headers.insert( + HeaderName::from_bytes(header.name.as_bytes()).unwrap(), + HeaderValue::from_bytes(header.value).unwrap(), + ); + } + + let r = builder.body(()).unwrap(); + let content_length = match r.headers().get(header::CONTENT_LENGTH) { + Some(cl) => match atoi::atoi(cl.as_bytes()) { + Some(cl) => cl, + None => return Err(anyhow!("content-length header invalid")), + }, + None => 0, + }; + let (parts, _) = r.into_parts(); + Ok(Some(DecodeResult { + body_start, + http_headers: HttpHead::Response(ResponseParts { + version: parts.version, + status: parts.status, + headers: parts.headers, + }), + content_length, + })) + } +} + +#[derive(Debug)] +enum State { + ParsingResponse, + ReadingBody(HttpHead, usize), +} + +impl Decoder for OpenSearchDecoder { + type Item = Messages; + type Error = CodecReadError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, CodecReadError> { + if src.is_empty() { + return Ok(None); + } + + loop { + match std::mem::replace(&mut self.state, State::ParsingResponse) { + State::ParsingResponse => { + let decode_result = if self.direction == Direction::Source { + self.decode_request(src).map_err(CodecReadError::Parser)? + } else { + self.decode_response(src).map_err(CodecReadError::Parser)? + }; + + if let Some(DecodeResult { + body_start, + http_headers, + content_length, + }) = decode_result + { + self.state = State::ReadingBody(http_headers, content_length); + src.advance(body_start); + } else { + return Ok(None); + }; + } + State::ReadingBody(http_headers, content_length) => { + if let Some(Method::HEAD) = *self.last_outgoing_method.lock().unwrap() { + return Ok(Some(vec![Message::from_frame(Frame::OpenSearch( + OpenSearchFrame::new(http_headers, bytes::Bytes::new()), + ))])); + } + + if src.len() < content_length { + return Ok(None); + } + + let body = src.split_to(content_length).freeze(); + return Ok(Some(vec![Message::from_frame(Frame::OpenSearch( + OpenSearchFrame::new(http_headers, body), + ))])); + } + } + } + } +} + +pub struct OpenSearchEncoder { + direction: Direction, + last_outgoing_method: Arc>>, +} + +impl OpenSearchEncoder { + pub fn new(direction: Direction, last_outgoing_method: Arc>>) -> Self { + Self { + direction, + last_outgoing_method, + } + } +} + +impl Encoder for OpenSearchEncoder { + type Error = CodecWriteError; + + fn encode( + &mut self, + item: Messages, + dst: &mut BytesMut, + ) -> std::result::Result<(), Self::Error> { + item.into_iter().try_for_each(|m| { + let start = dst.len(); + m.ensure_message_type(MessageType::OpenSearch) + .map_err(CodecWriteError::Encoder)?; + let result = match m.into_encodable() { + Encodable::Bytes(bytes) => { + dst.extend_from_slice(&bytes); + Ok(()) + } + Encodable::Frame(frame) => { + let opensearch_frame = frame.into_opensearch().unwrap(); + + match opensearch_frame.headers { + HttpHead::Request(request_parts) => { + self.last_outgoing_method + .lock() + .unwrap() + .replace(request_parts.method.clone()); + + dst.extend_from_slice( + format!("{} ", request_parts.method.as_str()).as_bytes(), + ); + dst.extend_from_slice(format!("{} ", request_parts.uri).as_bytes()); + dst.extend_from_slice(b"HTTP/1.1"); + dst.extend_from_slice(b"\r\n"); + + for (k, v) in &request_parts.headers { + dst.extend_from_slice(k.as_str().as_bytes()); + dst.extend_from_slice(b": "); + dst.extend_from_slice(v.as_bytes()); + dst.extend_from_slice(b"\r\n"); + } + } + HttpHead::Response(response_parts) => { + *self.last_outgoing_method.lock().unwrap() = None; + + dst.extend_from_slice(b"HTTP/1.1 "); + dst.extend_from_slice(format!("{}", response_parts.status).as_bytes()); + dst.extend_from_slice(b"\r\n"); + + for (k, v) in &response_parts.headers { + dst.extend_from_slice(k.as_str().as_bytes()); + dst.extend_from_slice(b": "); + dst.extend_from_slice(v.as_bytes()); + dst.extend_from_slice(b"\r\n"); + } + } + } + + dst.extend_from_slice(b"\r\n"); + dst.extend_from_slice(&opensearch_frame.body); + + Ok(()) + } + }; + + tracing::debug!( + "{}: outgoing OpenSearch message:\n{}", + self.direction, + pretty_hex::pretty_hex(&&dst[start..]) + ); + + result.map_err(CodecWriteError::Encoder) + }) + } +} + +#[cfg(test)] +mod opensearch_tests { + use crate::codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}; + use bytes::BytesMut; + use tokio_util::codec::{Decoder, Encoder}; + + fn test_frame(raw_frame: &[u8], direction: Direction) { + let (mut decoder, mut encoder) = OpenSearchCodecBuilder::new(direction).build(); + let message = decoder + .decode(&mut BytesMut::from(raw_frame)) + .unwrap() + .unwrap(); + + let mut dest = BytesMut::new(); + encoder.encode(message, &mut dest).unwrap(); + assert_eq!(raw_frame, &dest); + } + + const RESPONSE: [u8; 186] = *b"\ + HTTP/1.1 200 OK\r\n\ + date: Mon, 27 Jul 2009 12:28:53 GMT\r\n\ + server: Apache/2.2.14 (Win32)\r\n\ + last-modified: Wed, 22 Jul 2009 19:15:56 GMT\r\n\ + content-length: 9\r\n\ + content-type: text/html\r\n\r\n\ + something"; + + const REQUEST: [u8; 90] = *b"\ + POST /cgi-bin/process.cgi HTTP/1.1\r\n\ + connection: Keep-Alive\r\n\ + content-length: 9\r\n\r\n\ + something"; + + #[test] + fn test_request() { + test_frame(&REQUEST, Direction::Source); + } + + #[test] + fn test_response() { + test_frame(&RESPONSE, Direction::Sink); + } +} diff --git a/shotover/src/frame/mod.rs b/shotover/src/frame/mod.rs index 22fd09320..2cfe33440 100644 --- a/shotover/src/frame/mod.rs +++ b/shotover/src/frame/mod.rs @@ -6,11 +6,13 @@ use bytes::Bytes; pub use cassandra::{CassandraFrame, CassandraOperation, CassandraResult}; use cassandra_protocol::compression::Compression; use kafka::KafkaFrame; +pub use opensearch::OpenSearchFrame; pub use redis_protocol::resp2::types::Frame as RedisFrame; use std::fmt::{Display, Formatter, Result as FmtResult}; pub mod cassandra; pub mod kafka; +pub mod opensearch; pub mod redis; pub mod value; @@ -20,6 +22,7 @@ pub enum MessageType { Cassandra, Kafka, Dummy, + OpenSearch, } impl From<&ProtocolType> for MessageType { @@ -28,6 +31,7 @@ impl From<&ProtocolType> for MessageType { ProtocolType::Cassandra { .. } => Self::Cassandra, ProtocolType::Redis => Self::Redis, ProtocolType::Kafka { .. } => Self::Kafka, + ProtocolType::OpenSearch => Self::OpenSearch, } } } @@ -43,6 +47,7 @@ impl Frame { request_header: None, }, Frame::Dummy => CodecState::Dummy, + Frame::OpenSearch(_) => CodecState::OpenSearch, } } } @@ -55,6 +60,7 @@ pub enum Frame { /// Represents a message that has must exist due to shotovers requirement that every request has a corresponding response. /// It exists purely to keep transform invariants and codecs will completely ignore this frame when they receive it Dummy, + OpenSearch(OpenSearchFrame), } impl Frame { @@ -74,6 +80,7 @@ impl Frame { KafkaFrame::from_bytes(bytes, codec_state.as_kafka()).map(Frame::Kafka) } MessageType::Dummy => Ok(Frame::Dummy), + MessageType::OpenSearch => Ok(Frame::OpenSearch(OpenSearchFrame::from_bytes(&bytes)?)), } } @@ -83,6 +90,7 @@ impl Frame { Frame::Cassandra(_) => "Cassandra", Frame::Kafka(_) => "Kafka", Frame::Dummy => "Dummy", + Frame::OpenSearch(_) => "OpenSearch", } } @@ -92,6 +100,7 @@ impl Frame { Frame::Redis(_) => MessageType::Redis, Frame::Kafka(_) => MessageType::Kafka, Frame::Dummy => MessageType::Dummy, + Frame::OpenSearch(_) => MessageType::OpenSearch, } } @@ -134,6 +143,16 @@ impl Frame { )), } } + + pub fn into_opensearch(self) -> Result { + match self { + Frame::OpenSearch(frame) => Ok(frame), + frame => Err(anyhow!( + "Expected opensearch frame but received {} frame", + frame.name() + )), + } + } } impl Display for Frame { @@ -143,6 +162,7 @@ impl Display for Frame { Frame::Redis(frame) => write!(f, "Redis {:?})", frame), Frame::Kafka(frame) => write!(f, "Kafka {})", frame), Frame::Dummy => write!(f, "Shotover internal dummy message"), + Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame), } } } diff --git a/shotover/src/frame/opensearch.rs b/shotover/src/frame/opensearch.rs new file mode 100644 index 000000000..fe76f16c5 --- /dev/null +++ b/shotover/src/frame/opensearch.rs @@ -0,0 +1,40 @@ +use anyhow::Result; +use bytes::Bytes; +use http::{HeaderMap, Method, StatusCode, Uri, Version}; + +#[derive(Debug, Clone, PartialEq)] +pub struct ResponseParts { + pub status: StatusCode, + pub version: Version, + pub headers: HeaderMap, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct RequestParts { + pub method: Method, + pub uri: Uri, + pub version: Version, + pub headers: HeaderMap, +} + +#[derive(Debug, Clone, PartialEq)] +pub enum HttpHead { + Response(ResponseParts), + Request(RequestParts), +} + +#[derive(Debug, Clone, PartialEq)] +pub struct OpenSearchFrame { + pub headers: HttpHead, + pub body: Bytes, +} + +impl OpenSearchFrame { + pub fn new(headers: HttpHead, body: Bytes) -> Self { + Self { headers, body } + } + + pub fn from_bytes(_bytes: &Bytes) -> Result { + todo!(); + } +} diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 953aa2ce8..7bc37240f 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -21,6 +21,7 @@ pub enum Metadata { Cassandra(CassandraMetadata), Redis, Kafka, + OpenSearch, } #[derive(PartialEq)] @@ -32,6 +33,7 @@ pub enum ProtocolType { Kafka { request_header: Option, }, + OpenSearch, } impl From<&ProtocolType> for CodecState { @@ -44,6 +46,7 @@ impl From<&ProtocolType> for CodecState { ProtocolType::Kafka { request_header } => Self::Kafka { request_header: *request_header, }, + ProtocolType::OpenSearch => Self::OpenSearch, } } } @@ -210,12 +213,14 @@ impl Message { MessageType::Cassandra => cassandra::raw_frame::cell_count(bytes)?, MessageType::Kafka => todo!(), MessageType::Dummy => nonzero!(1u32), + MessageType::OpenSearch => todo!(), }, MessageInner::Modified { frame } | MessageInner::Parsed { frame, .. } => match frame { Frame::Cassandra(frame) => frame.cell_count()?, Frame::Redis(_) => nonzero!(1u32), Frame::Kafka(_) => todo!(), Frame::Dummy => nonzero!(1u32), + Frame::OpenSearch(_) => todo!(), }, }) } @@ -239,6 +244,7 @@ impl Message { Some(Frame::Redis(redis)) => redis_query_type(redis), // free-standing function as we cant define methods on RedisFrame Some(Frame::Kafka(_)) => todo!(), Some(Frame::Dummy) => todo!(), + Some(Frame::OpenSearch(_)) => todo!(), None => QueryType::ReadWrite, } } @@ -272,6 +278,7 @@ impl Message { Metadata::Kafka => return Err(anyhow!(error).context( "A generic error cannot be formed because the kafka protocol does not support it", )), + Metadata::OpenSearch => unimplemented!() })) } @@ -287,13 +294,15 @@ impl Message { } MessageType::Redis => Ok(Metadata::Redis), MessageType::Kafka => Ok(Metadata::Kafka), - MessageType::Dummy => Err(anyhow!("dummy has no metadata")), + MessageType::Dummy => Err(anyhow!("Dummy has no metadata")), + MessageType::OpenSearch => Err(anyhow!("OpenSearch has no metadata")), }, MessageInner::Parsed { frame, .. } | MessageInner::Modified { frame } => match frame { Frame::Cassandra(frame) => Ok(Metadata::Cassandra(frame.metadata())), Frame::Kafka(_) => Ok(Metadata::Kafka), Frame::Redis(_) => Ok(Metadata::Redis), Frame::Dummy => Err(anyhow!("dummy has no metadata")), + Frame::OpenSearch(_) => Err(anyhow!("OpenSearch has no metadata")), }, } } @@ -319,6 +328,7 @@ impl Message { } Metadata::Redis => unimplemented!(), Metadata::Kafka => unimplemented!(), + Metadata::OpenSearch => unimplemented!(), }); Ok(()) @@ -348,6 +358,7 @@ impl Message { Frame::Redis(_) => None, Frame::Kafka(_) => None, Frame::Dummy => None, + Frame::OpenSearch(_) => None, } } None => None, diff --git a/shotover/src/sources/cassandra.rs b/shotover/src/sources/cassandra.rs index a5f10a4ed..bd52330b2 100644 --- a/shotover/src/sources/cassandra.rs +++ b/shotover/src/sources/cassandra.rs @@ -62,7 +62,7 @@ impl CassandraSource { tls: Option, timeout: Option, transport: Option, - ) -> Result { + ) -> Result { let name = "CassandraSource"; info!("Starting Cassandra source on [{}]", listen_addr); @@ -97,7 +97,7 @@ impl CassandraSource { } }); - Ok(CassandraSource { + Ok(Self { name, join_handle, listen_addr, diff --git a/shotover/src/sources/mod.rs b/shotover/src/sources/mod.rs index a2f41b959..9b6a57309 100644 --- a/shotover/src/sources/mod.rs +++ b/shotover/src/sources/mod.rs @@ -1,5 +1,6 @@ use crate::sources::cassandra::{CassandraConfig, CassandraSource}; use crate::sources::kafka::{KafkaConfig, KafkaSource}; +use crate::sources::opensearch::{OpenSearchConfig, OpenSearchSource}; use crate::sources::redis::{RedisConfig, RedisSource}; use crate::transforms::chain::TransformChainBuilder; use anyhow::Result; @@ -9,6 +10,7 @@ use tokio::task::JoinHandle; pub mod cassandra; pub mod kafka; +pub mod opensearch; pub mod redis; #[derive(Deserialize, Debug, Clone, Copy)] @@ -23,6 +25,7 @@ pub enum Source { Cassandra(CassandraSource), Redis(RedisSource), Kafka(KafkaSource), + OpenSearch(OpenSearchSource), } impl Source { @@ -31,6 +34,7 @@ impl Source { Source::Cassandra(c) => c.join_handle, Source::Redis(r) => r.join_handle, Source::Kafka(r) => r.join_handle, + Source::OpenSearch(o) => o.join_handle, } } } @@ -41,6 +45,7 @@ pub enum SourceConfig { Cassandra(CassandraConfig), Redis(RedisConfig), Kafka(KafkaConfig), + OpenSearch(OpenSearchConfig), } impl SourceConfig { @@ -53,6 +58,7 @@ impl SourceConfig { SourceConfig::Cassandra(c) => c.get_source(chain_builder, trigger_shutdown_rx).await, SourceConfig::Redis(r) => r.get_source(chain_builder, trigger_shutdown_rx).await, SourceConfig::Kafka(r) => r.get_source(chain_builder, trigger_shutdown_rx).await, + SourceConfig::OpenSearch(r) => r.get_source(chain_builder, trigger_shutdown_rx).await, } } } diff --git a/shotover/src/sources/opensearch.rs b/shotover/src/sources/opensearch.rs new file mode 100644 index 000000000..c3ad34e20 --- /dev/null +++ b/shotover/src/sources/opensearch.rs @@ -0,0 +1,96 @@ +use crate::codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}; +use crate::server::TcpCodecListener; +use crate::sources::{Source, Transport}; +use crate::transforms::chain::TransformChainBuilder; +use anyhow::Result; +use serde::Deserialize; +use std::sync::Arc; +use tokio::sync::{watch, Semaphore}; +use tokio::task::JoinHandle; +use tracing::{error, info}; + +#[derive(Deserialize, Debug, Clone)] +pub struct OpenSearchConfig { + pub listen_addr: String, + pub connection_limit: Option, + pub hard_connection_limit: Option, + pub timeout: Option, +} + +impl OpenSearchConfig { + pub async fn get_source( + &self, + chain_builder: TransformChainBuilder, + trigger_shutdown_rx: watch::Receiver, + ) -> Result> { + Ok(vec![Source::OpenSearch( + OpenSearchSource::new( + chain_builder, + self.listen_addr.clone(), + trigger_shutdown_rx, + self.connection_limit, + self.hard_connection_limit, + self.timeout, + ) + .await?, + )]) + } +} + +#[derive(Debug)] +pub struct OpenSearchSource { + pub name: &'static str, + pub join_handle: JoinHandle<()>, + pub listen_addr: String, +} + +impl OpenSearchSource { + pub async fn new( + chain_builder: TransformChainBuilder, + listen_addr: String, + mut trigger_shutdown_rx: watch::Receiver, + connection_limit: Option, + hard_connection_limit: Option, + timeout: Option, + ) -> Result { + let name = "OpenSearchSource"; + + info!("Starting OpenSearch source on [{}]", listen_addr); + + let mut listener = TcpCodecListener::new( + chain_builder, + name.to_string(), + listen_addr.clone(), + hard_connection_limit.unwrap_or(false), + OpenSearchCodecBuilder::new(Direction::Source), + Arc::new(Semaphore::new(connection_limit.unwrap_or(512))), + trigger_shutdown_rx.clone(), + None, + timeout, + Transport::Tcp, + ) + .await?; + + let join_handle = tokio::spawn(async move { + // Check we didn't receive a shutdown signal before the receiver was created + if !*trigger_shutdown_rx.borrow() { + tokio::select! { + res = listener.run() => { + if let Err(err) = res { + error!(cause = %err, "failed to accept"); + } + } + _ = trigger_shutdown_rx.changed() => { + listener.shutdown().await; + } + } + } + }); + + Ok(Self { + name, + join_handle, + listen_addr, + }) + } +} diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index 92a7eeeb8..c043a30d8 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -17,6 +17,8 @@ use crate::transforms::kafka::sink_single::KafkaSinkSingle; use crate::transforms::load_balance::ConnectionBalanceAndPool; use crate::transforms::loopback::Loopback; use crate::transforms::null::NullSink; +#[cfg(feature = "alpha-transforms")] +use crate::transforms::opensearch::OpenSearchSinkSingle; use crate::transforms::parallel_map::ParallelMap; use crate::transforms::protect::Protect; use crate::transforms::query_counter::QueryCounter; @@ -52,6 +54,8 @@ pub mod load_balance; pub mod loopback; pub mod noop; pub mod null; +#[cfg(feature = "alpha-transforms")] +pub mod opensearch; pub mod parallel_map; pub mod protect; pub mod query_counter; @@ -114,6 +118,8 @@ pub enum Transforms { QueryCounter(QueryCounter), RequestThrottling(RequestThrottling), Custom(Box), + #[cfg(feature = "alpha-transforms")] + OpenSearchSinkSingle(OpenSearchSinkSingle), } impl Debug for Transforms { @@ -152,6 +158,8 @@ impl Transforms { Transforms::QueryCounter(s) => s.transform(requests_wrapper).await, Transforms::RequestThrottling(s) => s.transform(requests_wrapper).await, Transforms::Custom(s) => s.transform(requests_wrapper).await, + #[cfg(feature = "alpha-transforms")] + Transforms::OpenSearchSinkSingle(s) => s.transform(requests_wrapper).await, } } @@ -186,6 +194,8 @@ impl Transforms { Transforms::QueryCounter(s) => s.transform_pushed(requests_wrapper).await, Transforms::RequestThrottling(s) => s.transform_pushed(requests_wrapper).await, Transforms::Custom(s) => s.transform_pushed(requests_wrapper).await, + #[cfg(feature = "alpha-transforms")] + Transforms::OpenSearchSinkSingle(s) => s.transform_pushed(requests_wrapper).await, } } @@ -224,6 +234,8 @@ impl Transforms { Transforms::DebugRandomDelay(d) => d.set_pushed_messages_tx(pushed_messages_tx), Transforms::RequestThrottling(d) => d.set_pushed_messages_tx(pushed_messages_tx), Transforms::Custom(d) => d.set_pushed_messages_tx(pushed_messages_tx), + #[cfg(feature = "alpha-transforms")] + Transforms::OpenSearchSinkSingle(s) => s.set_pushed_messages_tx(pushed_messages_tx), } } } diff --git a/shotover/src/transforms/opensearch/mod.rs b/shotover/src/transforms/opensearch/mod.rs new file mode 100644 index 000000000..55f952efd --- /dev/null +++ b/shotover/src/transforms/opensearch/mod.rs @@ -0,0 +1,119 @@ +use crate::tcp; +use crate::transforms::{ + Messages, Transform, TransformBuilder, TransformConfig, Transforms, Wrapper, +}; +use crate::{ + codec::{opensearch::OpenSearchCodecBuilder, CodecBuilder, Direction}, + transforms::util::{ + cluster_connection_pool::{spawn_read_write_tasks, Connection}, + Request, + }, +}; +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use serde::Deserialize; +use std::time::Duration; +use tokio::sync::oneshot; +use tracing::trace; + +#[derive(Deserialize, Debug)] +pub struct OpenSearchSinkSingleConfig { + #[serde(rename = "remote_address")] + address: String, + connect_timeout_ms: u64, +} + +#[typetag::deserialize(name = "OpenSearchSinkSingle")] +#[async_trait(?Send)] +impl TransformConfig for OpenSearchSinkSingleConfig { + async fn get_builder(&self, chain_name: String) -> Result> { + Ok(Box::new(OpenSearchSinkSingleBuilder::new( + self.address.clone(), + chain_name, + self.connect_timeout_ms, + ))) + } +} + +#[derive(Clone)] +pub struct OpenSearchSinkSingleBuilder { + address: String, + connect_timeout: Duration, +} + +impl OpenSearchSinkSingleBuilder { + pub fn new(address: String, _chain_name: String, connect_timeout_ms: u64) -> Self { + let connect_timeout = Duration::from_millis(connect_timeout_ms); + + Self { + address, + connect_timeout, + } + } +} + +impl TransformBuilder for OpenSearchSinkSingleBuilder { + fn build(&self) -> Transforms { + Transforms::OpenSearchSinkSingle(OpenSearchSinkSingle { + address: self.address.clone(), + connect_timeout: self.connect_timeout, + codec_builder: OpenSearchCodecBuilder::new(Direction::Sink), + connection: None, + }) + } + + fn get_name(&self) -> &'static str { + "OpenSearchSinkSingle" + } + + fn is_terminating(&self) -> bool { + true + } +} + +pub struct OpenSearchSinkSingle { + address: String, + connection: Option, + connect_timeout: Duration, + codec_builder: OpenSearchCodecBuilder, +} + +#[async_trait] +impl Transform for OpenSearchSinkSingle { + async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { + // Return immediately if we have no messages. + // If we tried to send no messages we would block forever waiting for a reply that will never come. + if requests_wrapper.requests.is_empty() { + return Ok(requests_wrapper.requests); + } + + if self.connection.is_none() { + trace!("creating outbound connection {:?}", self.address); + + let tcp_stream = tcp::tcp_stream(self.connect_timeout, self.address.clone()).await?; + let (rx, tx) = tcp_stream.into_split(); + self.connection = Some(spawn_read_write_tasks(&self.codec_builder, rx, tx)); + } + + let connection = self.connection.as_mut().unwrap(); + + let messages_len = requests_wrapper.requests.len(); + + let mut result = Vec::with_capacity(messages_len); + for message in requests_wrapper.requests { + let (tx, rx) = oneshot::channel(); + + connection + .send(Request { + message, + return_chan: Some(tx), + }) + .map_err(|_| anyhow!("Failed to send"))?; + + let message = rx.await?.response?; + result.push(message); + } + + Ok(result) + } +} diff --git a/shotover/src/transforms/query_counter.rs b/shotover/src/transforms/query_counter.rs index 2e9028631..fc9076a3f 100644 --- a/shotover/src/transforms/query_counter.rs +++ b/shotover/src/transforms/query_counter.rs @@ -60,6 +60,9 @@ impl Transform for QueryCounter { Some(Frame::Dummy) => { // Dummy does not count as a message } + Some(Frame::OpenSearch(_)) => { + todo!(); + } None => { counter!("query_count", 1, "name" => self.counter_name.clone(), "query" => "unknown", "type" => "none") } diff --git a/test-helpers/src/docker_compose.rs b/test-helpers/src/docker_compose.rs index 7ca54a110..4f6af73d5 100644 --- a/test-helpers/src/docker_compose.rs +++ b/test-helpers/src/docker_compose.rs @@ -73,7 +73,7 @@ pub fn get_image_waiters() -> &'static [Image] { }, Image { name: "opensearchproject/opensearch:2.9.0", - log_regex_to_wait_for: r"Node '(?s)(.*)' initialized", + log_regex_to_wait_for: r"Node started", timeout: 120, }, ]