Skip to content

Commit

Permalink
tee switch main chain (#1361)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Oct 25, 2023
1 parent 416c323 commit 9e58c6d
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,4 @@ typetag = "0.2.5"
aws-throwaway = "0.3.0"
tokio-bin-process = "0.4.0"
ordered-float = { version = "4.0.0", features = ["serde"] }
hyper = { version = "0.14.14", features = ["server"] }
10 changes: 10 additions & 0 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,12 @@ This is mainly used in conjunction with the `TuneableConsistencyScatter` transfo
This transform sends messages to both the defined sub chain and the remaining down-chain transforms.
The response from the down-chain transform is returned back up-chain but various behaviours can be defined by the `behaviour` field to handle the case when the responses from the sub chain and down-chain do not match.

Tee also exposes an optional HTTP API to switch which chain to use as the "result source", that is the chain to return responses from.

`GET` `/transform/tee/result-source` will return `regular-chain` or `tee-chain` indicating which chain is being used for the result source.

`PUT` `/transform/tee/result-source` with the body content as either `regular-chain` or `tee-chain` to set the result source.

```yaml
- Tee:
# Ignore responses returned by the sub chain
Expand All @@ -528,6 +534,10 @@ The response from the down-chain transform is returned back up-chain but various
# filter: Read
# - NullSink
# The port that the HTTP API will listen on.
# When this field is not provided the HTTP API will not be run.
# http_api_port: 1234
#
# Timeout for sending to the sub chain in microseconds
timeout_micros: 1000
# The number of message batches that the tee can hold onto in its buffer of messages to send.
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ serde_json = "1.0.103"
time = { version = "0.3.25" }
inferno = { version = "0.11.15", default-features = false, features = ["multithreaded", "nameattr"] }
shell-quote = "0.3.0"
hyper.workspace = true

[features]
# Include WIP alpha transforms in the public API
Expand Down
46 changes: 46 additions & 0 deletions shotover-proxy/tests/test-configs/tee/switch_chain.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
sources:
- Redis:
name: "redis-1"
listen_addr: "127.0.0.1:6371"
connection_limit:
chain:
- Tee:
behavior: Ignore
buffer_size: 10000
switch_port: 1231
chain:
- DebugReturner:
Redis: "b"
- DebugReturner:
Redis: "a"
- Redis:
name: "redis-3"
listen_addr: "127.0.0.1:6372"
connection_limit:
chain:
- Tee:
behavior:
SubchainOnMismatch:
- NullSink
buffer_size: 10000
switch_port: 1232
chain:
- DebugReturner:
Redis: "b"
- DebugReturner:
Redis: "a"
- Redis:
name: "redis-3"
listen_addr: "127.0.0.1:6373"
connection_limit:
chain:
- Tee:
behavior: LogWarningOnMismatch
buffer_size: 10000
switch_port: 1233
chain:
- DebugReturner:
Redis: "b"
- DebugReturner:
Redis: "a"
97 changes: 97 additions & 0 deletions shotover-proxy/tests/transforms/tee.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::shotover_process;
use hyper::{body, Body, Client, Method, Request, Response};
use test_helpers::connection::redis_connection;
use test_helpers::docker_compose::docker_compose;
use test_helpers::shotover_process::{EventMatcher, Level};
Expand Down Expand Up @@ -193,3 +194,99 @@ async fn test_subchain_with_mismatch() {
assert_eq!("myvalue", result);
shotover.shutdown_and_then_consume_events(&[]).await;
}

async fn read_response_body(res: Response<Body>) -> Result<String, hyper::Error> {
let bytes = body::to_bytes(res.into_body()).await?;
Ok(String::from_utf8(bytes.to_vec()).expect("response was not valid utf-8"))
}

async fn hyper_request(uri: String, method: Method, body: Body) -> Response<Body> {
let client = Client::new();

let req = Request::builder()
.method(method)
.uri(uri)
.body(body)
.expect("request builder");

client.request(req).await.unwrap()
}

#[tokio::test(flavor = "multi_thread")]
async fn test_switch_main_chain() {
let shotover = shotover_process("tests/test-configs/tee/switch_chain.yaml")
.start()
.await;

for i in 1..=3 {
let redis_port = 6370 + i;
let switch_port = 1230 + i;

let mut connection = redis_connection::new_async("127.0.0.1", redis_port).await;

let result = redis::cmd("SET")
.arg("key")
.arg("myvalue")
.query_async::<_, String>(&mut connection)
.await
.unwrap();

assert_eq!("a", result);

let _ = hyper_request(
format!(
"http://localhost:{}/transform/tee/result-source",
switch_port
),
Method::PUT,
Body::from("tee-chain"),
)
.await;

let res = hyper_request(
format!(
"http://localhost:{}/transform/tee/result-source",
switch_port
),
Method::GET,
Body::empty(),
)
.await;
let body = read_response_body(res).await.unwrap();
assert_eq!("tee-chain", body);

let result = redis::cmd("SET")
.arg("key")
.arg("myvalue")
.query_async::<_, String>(&mut connection)
.await
.unwrap();

assert_eq!("b", result);

let _ = hyper_request(
format!(
"http://localhost:{}/transform/tee/result-source",
switch_port
),
Method::PUT,
Body::from("regular-chain"),
)
.await;

let result = redis::cmd("SET")
.arg("key")
.arg("myvalue")
.query_async::<_, String>(&mut connection)
.await
.unwrap();

assert_eq!("a", result);
}

shotover
.shutdown_and_then_consume_events(&[EventMatcher::new()
.with_level(Level::Warn)
.with_count(tokio_bin_process::event_matcher::Count::Times(3))])
.await;
}
2 changes: 1 addition & 1 deletion shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ metrics-exporter-prometheus = "0.12.0"
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
hyper = { version = "0.14.14", features = ["server"] }
hyper.workspace = true
halfbrown = "0.2.1"

# Transform dependencies
Expand Down
Loading

0 comments on commit 9e58c6d

Please sign in to comment.