Skip to content

Commit

Permalink
Opensearch support (#1268)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Aug 31, 2023
1 parent e6eb8b1 commit 0952149
Show file tree
Hide file tree
Showing 19 changed files with 914 additions and 58 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions shotover-proxy/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use tokio_bin_process::bin_path;

mod cassandra_int_tests;
mod kafka_int_tests;
#[cfg(feature = "alpha-transforms")]
mod opensearch_int_tests;
mod redis_int_tests;
mod runner;
Expand Down
266 changes: 214 additions & 52 deletions shotover-proxy/tests/opensearch_int_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,242 @@
use crate::shotover_process;
use opensearch::{
auth::Credentials,
cert::CertificateValidation,
http::Url,
http::response::Response,
http::{
response::Response,
transport::{SingleNodeConnectionPool, TransportBuilder},
StatusCode,
Method, StatusCode, Url,
},
indices::IndicesExistsParts,
indices::{IndicesCreateParts, IndicesDeleteParts, IndicesExistsParts},
params::Refresh,
BulkOperation, BulkParts, OpenSearch, SearchParts,
BulkOperation, BulkParts, DeleteParts, Error, IndexParts, OpenSearch, SearchParts,
};
use serde_json::{json, Value};
use test_helpers::docker_compose::docker_compose;

pub async fn index_documents(client: &OpenSearch) -> Response {
let index = "posts";
let exists_response = client
.indices()
.exists(IndicesExistsParts::Index(&[index]))
.send()
.await
.unwrap();
async fn assert_ok_and_get_json(response: Result<Response, Error>) -> Value {
let response = response.unwrap();
let status = response.status_code();

assert_eq!(exists_response.status_code(), StatusCode::NOT_FOUND);
if response.method() == Method::Head {
if status != StatusCode::OK {
panic!("Opensearch HEAD query returned status code {status}");
}
Value::Null
} else {
let json = response.json().await.unwrap();
if status != StatusCode::OK && status != StatusCode::CREATED {
panic!("Opensearch query failed: {status:#?}\n{json:#?}");
}
json
}
}

pub async fn test_bulk(client: &OpenSearch) {
assert_ok_and_get_json(
client
.indices()
.create(IndicesCreateParts::Index("posts"))
.send()
.await,
)
.await;

let mut body: Vec<BulkOperation<_>> = vec![];
for i in 1..=10 {
let op = BulkOperation::index(json!({"title":"OpenSearch"}))
for i in 0..10 {
let op = BulkOperation::index(json!({"title": "OpenSearch", "i": i}))
.id(i.to_string())
.into();
body.push(op);
}

client
.bulk(BulkParts::Index(index))
.body(body)
.refresh(Refresh::WaitFor)
assert_ok_and_get_json(
client
.bulk(BulkParts::Index("posts"))
.body(body)
.refresh(Refresh::WaitFor)
.send()
.await,
)
.await;

let results = assert_ok_and_get_json(
client
.search(SearchParts::None)
.body(json!({
"query": {
"match_all": {}
},
"sort": [
{
"i": {
"order": "asc"
}
}
]
}))
.allow_no_indices(true)
.send()
.await,
)
.await;

assert!(results["took"].is_i64());
let hits = results["hits"]["hits"].as_array().unwrap();
assert_eq!(
hits.iter().map(|x| &x["_source"]).collect::<Vec<_>>(),
vec!(
&json!({ "title": "OpenSearch", "i": 0 }),
&json!({ "title": "OpenSearch", "i": 1 }),
&json!({ "title": "OpenSearch", "i": 2 }),
&json!({ "title": "OpenSearch", "i": 3 }),
&json!({ "title": "OpenSearch", "i": 4 }),
&json!({ "title": "OpenSearch", "i": 5 }),
&json!({ "title": "OpenSearch", "i": 6 }),
&json!({ "title": "OpenSearch", "i": 7 }),
&json!({ "title": "OpenSearch", "i": 8 }),
&json!({ "title": "OpenSearch", "i": 9 }),
)
);
}

async fn test_create_index(client: &OpenSearch) {
assert_ok_and_get_json(
client
.indices()
.create(IndicesCreateParts::Index("test-index"))
.send()
.await,
)
.await;

assert_ok_and_get_json(
client
.indices()
.exists(IndicesExistsParts::Index(&["test-index"]))
.send()
.await,
)
.await;
}

async fn test_index_and_search_document(client: &OpenSearch) -> String {
assert_ok_and_get_json(
client
.index(IndexParts::Index("test-index"))
.body(json!({
"name": "John",
"age": 30
}))
.refresh(Refresh::WaitFor)
.send()
.await,
)
.await;

let response = assert_ok_and_get_json(
client
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(10)
.body(json!({
"query": {
"match": {
"name": "John",
}
}
}))
.send()
.await,
)
.await;

assert!(response["took"].is_i64());
let hits = response["hits"]["hits"].as_array().unwrap();
assert_eq!(
hits.iter().map(|x| &x["_source"]).collect::<Vec<_>>(),
vec!(&json!({
"name": "John",
"age": 30,
}))
);
hits[0]["_id"].as_str().unwrap().to_owned()
}

async fn test_delete_and_search_document(client: &OpenSearch, id: String) {
assert_ok_and_get_json(
client
.delete(DeleteParts::IndexId("test-index", &id))
.refresh(Refresh::WaitFor)
.send()
.await,
)
.await;

let response = assert_ok_and_get_json(
client
.search(SearchParts::Index(&["test-index"]))
.from(0)
.size(10)
.body(json!({
"query": {
"match": {
"name": "John",
}
}
}))
.allow_no_indices(true)
.send()
.await,
)
.await;

// let results = response.json::<Value>().await.unwrap();
assert!(response["took"].is_i64());
assert_eq!(response["hits"]["hits"].as_array().unwrap().len(), 0);
}

async fn test_delete_index(client: &OpenSearch) {
assert_ok_and_get_json(
client
.indices()
.delete(IndicesDeleteParts::Index(&["test-index"]))
.send()
.await,
)
.await;

let exists_response = client
.indices()
.exists(IndicesExistsParts::Index(&["test-index"]))
.send()
.await
.unwrap()
.unwrap();

assert_eq!(exists_response.status_code(), StatusCode::NOT_FOUND);
}

async fn opensearch_test_suite(client: &OpenSearch) {
test_create_index(client).await;

let doc_id = test_index_and_search_document(client).await;
test_delete_and_search_document(client, doc_id).await;

test_bulk(client).await;
test_delete_index(client).await;
}

#[tokio::test(flavor = "multi_thread")]
async fn passthrough_standard() {
let _compose = docker_compose("tests/test-configs/opensearch-passthrough/docker-compose.yaml");

let url = Url::parse("https://localhost:9200").unwrap();
let shotover = shotover_process("tests/test-configs/opensearch-passthrough/topology.yaml")
.start()
.await;

let addr = "http://localhost:9201";

let url = Url::parse(addr).unwrap();
let credentials = Credentials::Basic("admin".into(), "admin".into());
let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url))
.cert_validation(CertificateValidation::None)
Expand All @@ -55,35 +245,7 @@ async fn passthrough_standard() {
.unwrap();
let client = OpenSearch::new(transport);

index_documents(&client).await;

let response = client
.search(SearchParts::None)
.body(json!({
"query": {
"match_all": {}
}
}))
.allow_no_indices(true)
.send()
.await
.unwrap();

assert!(response.content_length().unwrap() > 0);
assert_eq!(
response.url(),
&Url::parse("https://localhost:9200/_search?allow_no_indices=true").unwrap()
);
assert_eq!(response.status_code(), StatusCode::OK);
assert_eq!(response.method(), opensearch::http::Method::Post);
opensearch_test_suite(&client).await;

let response_body = response.json::<Value>().await.unwrap();
assert!(response_body["took"].as_i64().is_some());
assert_eq!(
response_body["hits"].as_object().unwrap()["hits"]
.as_array()
.unwrap()
.len(),
10
);
shotover.shutdown_and_then_consume_events(&[]).await;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ services:
- bootstrap.memory_lock=true
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
- plugins.security.disabled=true
ulimits:
memlock:
soft: -1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
sources:
opensearch_prod:
OpenSearch:
listen_addr: "127.0.0.1:9201"
chain_config:
main_chain:
- OpenSearchSinkSingle:
remote_address: "127.0.0.1:9200"
connect_timeout_ms: 3000
source_to_chain_mapping:
opensearch_prod: main_chain
3 changes: 3 additions & 0 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ num = { version = "0.4.0", features = ["serde"] }
uuid.workspace = true
bigdecimal = {version = "0.4.0", features = ["serde"] }
base64 = "0.21.0"
httparse = "1.8.0"
http = "0.2.9"

#Observability
metrics = "0.21.0"
Expand Down Expand Up @@ -90,6 +92,7 @@ rustls-pemfile = "1.0.2"
string = "0.3.0"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
dashmap = "5.4.0"
atoi = "2.0.0"

[dev-dependencies]
criterion = { version = "0.5.0", features = ["async_tokio"] }
Expand Down
3 changes: 1 addition & 2 deletions shotover/src/codec/cassandra.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::{CodecWriteError, Direction};
use crate::codec::{CodecBuilder, CodecReadError};
use super::{CodecBuilder, CodecReadError, CodecWriteError, Direction};
use crate::frame::cassandra::{CassandraMetadata, CassandraOperation, Tracing};
use crate::frame::{CassandraFrame, Frame, MessageType};
use crate::message::{Encodable, Message, Messages, Metadata};
Expand Down
Loading

0 comments on commit 0952149

Please sign in to comment.