Skip to content

Commit

Permalink
Merge branch 'main' into windsock_kafka_fix_routing
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jan 21, 2024
2 parents 08af0f4 + 4da469a commit 43fa2f3
Show file tree
Hide file tree
Showing 36 changed files with 961 additions and 520 deletions.
629 changes: 284 additions & 345 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ tokio-util = { version = "0.7.7" }
tokio-openssl = "0.6.2"
itertools = "0.12.0"
openssl = { version = "0.10.36", features = ["vendored"] }
anyhow = "1.0.42"
anyhow = "1.0.76"
serde = { version = "1.0.111", features = ["derive"] }
serde_yaml = "0.9.17"
uuid = { version = "1.0.0", features = ["serde", "v4"] }
Expand Down
26 changes: 26 additions & 0 deletions docs/src/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,29 @@ Redis:
Transform2
...
```
## Kafka
```yaml
Kafka:
# The address to listen from
listen_addr: "127.0.0.1:6379"

# The number of concurrent connections the source will accept.
# If not provided defaults to 512
connection_limit: 512

# Defines the behaviour that occurs when Once the configured connection limit is reached:
# * when true: the connection is dropped.
# * when false: the connection will wait until a connection can be made within the limit.
# If not provided defaults to false
hard_connection_limit: false

# Timeout in seconds after which to terminate an idle connection. This field is optional, if not provided, idle connections will never be terminated.
# timeout: 60

chain:
Transform1
Transform2
...
```
85 changes: 77 additions & 8 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ Future transforms won't be added to the public API while in alpha. But in these
| [TuneableConsistencyScatter](#tuneableconsistencyscatter)|| Alpha |
| [DebugPrinter](#debugprinter) || Alpha |
| [DebugReturner](#debugreturner) || Alpha |
| [KafkaSinkCluster](#kafkasinkcluster) || Beta |
| [KafkaSinkSingle](#kafkasinksingle) || Beta |
| [NullSink](#nullsink) || Beta |
| [ParallelMap](#parallelmap) || Alpha |
| [Protect](#protect) || Alpha |
Expand Down Expand Up @@ -123,10 +125,10 @@ While `system.peers`/`system.peers_v2` will be rewritten to list the configured
# # Enable/disable verifying the hostname of the certificate provided by the destination.
# #verify_hostname: true

# Timeout in seconds after which to give up waiting for a response from the destination.
# This field is optional, if not provided, timeout will never occur.
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
# Timeout in seconds after which to give up waiting for a response from the destination.
# This field is optional, if not provided, timeout will never occur.
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
```

#### Error handling
Expand Down Expand Up @@ -170,10 +172,10 @@ No cluster discovery or routing occurs with this transform.
# # Enable/disable verifying the hostname of the certificate provided by the destination.
# #verify_hostname: true

# Timeout in seconds after which to give up waiting for a response from the destination.
# This field is optional, if not provided, timeout will never occur.
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
# Timeout in seconds after which to give up waiting for a response from the destination.
# This field is optional, if not provided, timeout will never occur.
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
```

This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in.
Expand Down Expand Up @@ -267,6 +269,73 @@ Delay the transform chain at the position that this transform sits at.
```
-->

### KafkaSinkCluster

This transform will route kafka messages to a broker within a Kafka cluster:

* produce messages are routed to the partition leader
* fetch messages are routed to a random partition replica
* heartbeat, syncgroup, offsetfetch and joingroup are all routed to the group coordinator
* all other messages go to a random node.

The fact that Shotover is routing to multiple destination nodes will be hidden from the client.
Instead Shotover will pretend to be either a single Kafka node or part of a cluster of Kafka nodes consisting entirely of Shotover instances.

This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster messages to contain the nodes in the shotover cluster instead of the kafka cluster.

```yaml
- CassandraSinkCluster:
# Addresses of the initial kafka brokers to connect to.
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
# A list of every Shotover node that will be proxying to the same kafka cluster.
# This field should be identical for all Shotover nodes proxying to the same kafka cluster.
shotover_nodes:
# Address of the Shotover node.
# This is usually the same address as the Shotover source that is connected to this sink.
# But it may be different if you want Shotover to report a different address.
- "127.0.0.1:9042"
# If you only have a single Shotover instance then you only want a single node.
# Otherwise if you have multiple Shotover instances then add more nodes e.g.
#- "127.0.0.2:9042"
# Number of milliseconds to wait for a connection to be created to a destination kafka broker.
# If the timeout is exceeded then connection to another node is attempted
# If all known nodes have resulted in connection timeouts an error will be returned to the client.
connect_timeout_ms: 3000
# Timeout in seconds after which to give up waiting for a response from the destination.
# This field is optional, if not provided, timeout will never occur.
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
```

### KafkaSinkSingle

This transform will send/receive Kafka messages to a single Kafka node running on the same machine as shotover.
All kafka brokers in the cluster must be configured with a shotover instance in front of them.
All shotover instances must be on the same port X and all kafka instances must use another port Y.
The client will then connect via shotovers port X.

In order to force clients to connect through shotover the FindCoordinator, Metadata and DescribeCluster messages are rewritten to use the shotover port.

```yaml
- KafkaSinkSingle:
# The port of the upstream Cassandra node/service.
destination_port: 9042
# Number of milliseconds to wait for a connection to be created to the destination cassandra instance.
# If the timeout is exceeded then an error is returned to the client.
connect_timeout_ms: 3000
# Timeout in seconds after which to give up waiting for a response from the destination.
# This field is optional, if not provided, timeout will never occur.
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
```

This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in.

### NullSink

This transform will drop any messages it receives and return an empty response.
Expand Down
10 changes: 9 additions & 1 deletion docs/src/user-guide/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,13 @@ A single value that can increment or decrement over time. Starts out with an ini
You can configure log levels and filters at `/filter`. This can be done by a POST HTTP request to the `/filter` endpoint with the `env_filter` string set as the POST data. For example:

```shell
curl -X PUT -d 'info,shotover_proxy=info' http://127.0.0.1:9001/filter
curl -X PUT -d 'info, shotover_proxy=info, shotover::connection_span::info` http://127.0.0.1:9001/filter
```
Some examples of how you can tweak this filter:
* configure the first `info` to set the log level for dependencies
* configure `shotover=info` to set the log level for shotover itself
* set `shotover::connection_span=info` to `shotover::connection_span=debug` to attach connection info to most log events, this is disabled by default due to a minor performance hit.
For more control over filtering you should understand [The tracing filter format](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#directives).
3 changes: 1 addition & 2 deletions ec2-cargo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,5 @@ clap.workspace = true
tracing-subscriber.workspace = true
aws-throwaway.workspace = true
tracing-appender.workspace = true
shellfish = { version = "0.8.0", features = ["async"] }
rustyline = "11.0.0"
shellfish = { version = "0.9.0", features = ["async"] }
cargo_metadata = "0.18.0"
3 changes: 1 addition & 2 deletions ec2-cargo/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use aws_throwaway::{Aws, CleanupResources, Ec2Instance, Ec2InstanceDefinition, InstanceType};
use cargo_metadata::{Metadata, MetadataCommand};
use clap::Parser;
use rustyline::DefaultEditor;
use shellfish::{async_fn, handler::DefaultAsyncHandler, Command, Shell};
use shellfish::{async_fn, handler::DefaultAsyncHandler, rustyline::DefaultEditor, Command, Shell};
use std::error::Error;
use std::fs::Permissions;
use std::os::unix::prelude::PermissionsExt;
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/benches/windsock/profilers/samply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ impl Samply {
"install",
"samply",
"--git",
"https://github.com/rukai/samply",
"https://github.com/mstange/samply",
"--rev",
"af14e56ac9e809a445e4f0bd80fcd4fefc45b552",
"4c8d5eb164e44c4eda1b29de116f5ea546d64c65",
],
)
.await;
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/benches/windsock/shotover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub async fn shotover_process_custom_topology(
let topology_path = std::env::temp_dir().join(Uuid::new_v4().to_string());
std::fs::write(&topology_path, topology_contents).unwrap();
ShotoverProcessBuilder::new_with_topology(topology_path.to_str().unwrap())
.with_config("config/config.yaml")
.with_bin(bin_path!("shotover-proxy"))
.with_profile(profiler.shotover_profile())
.start()
Expand Down
6 changes: 4 additions & 2 deletions shotover-proxy/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
---
main_log_level: "info,shotover_proxy=info"
# configure the first `info` to set the log level for dependencies
# configure `shotover=info` to set the log level for shotover itself
# set `shotover::connection_span=info` to `shotover::connection_span=debug` to attach connection info to most log events, this is disabled by default due to a minor performance hit.
main_log_level: "info, shotover=info, shotover::connection_span=info"
observability_interface: "0.0.0.0:9001"
12 changes: 6 additions & 6 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,19 +263,19 @@ async fn cluster_multi_rack_1_per_rack(#[case] driver: CassandraDriver) {
let shotover_rack1 =
shotover_process("tests/test-configs/cassandra/cluster-multi-rack/topology_rack1.yaml")
.with_log_name("Rack1")
.with_observability_port(9001)
.with_config("tests/test-configs/shotover-config/config1.yaml")
.start()
.await;
let shotover_rack2 =
shotover_process("tests/test-configs/cassandra/cluster-multi-rack/topology_rack2.yaml")
.with_log_name("Rack2")
.with_observability_port(9002)
.with_config("tests/test-configs/shotover-config/config2.yaml")
.start()
.await;
let shotover_rack3 =
shotover_process("tests/test-configs/cassandra/cluster-multi-rack/topology_rack3.yaml")
.with_log_name("Rack3")
.with_observability_port(9003)
.with_config("tests/test-configs/shotover-config/config3.yaml")
.start()
.await;

Expand Down Expand Up @@ -318,22 +318,22 @@ async fn cluster_multi_rack_2_per_rack(#[case] driver: CassandraDriver) {
let shotover_rack1 = shotover_process(
"tests/test-configs/cassandra/cluster-multi-rack-2-per-rack/topology_rack1.yaml",
)
.with_config("tests/test-configs/shotover-config/config1.yaml")
.with_log_name("Rack1")
.with_observability_port(9001)
.start()
.await;
let shotover_rack2 = shotover_process(
"tests/test-configs/cassandra/cluster-multi-rack-2-per-rack/topology_rack2.yaml",
)
.with_log_name("Rack2")
.with_observability_port(9002)
.with_config("tests/test-configs/shotover-config/config2.yaml")
.start()
.await;
let shotover_rack3 = shotover_process(
"tests/test-configs/cassandra/cluster-multi-rack-2-per-rack/topology_rack3.yaml",
)
.with_config("tests/test-configs/shotover-config/config3.yaml")
.with_log_name("Rack3")
.with_observability_port(9003)
.start()
.await;

Expand Down
12 changes: 3 additions & 9 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
#[cfg(feature = "alpha-transforms")]
#[cfg(feature = "rdkafka-driver-tests")]
use crate::shotover_process;
#[cfg(feature = "alpha-transforms")]
#[cfg(feature = "rdkafka-driver-tests")]
use std::time::Duration;
#[cfg(feature = "alpha-transforms")]
#[cfg(feature = "rdkafka-driver-tests")]
use test_helpers::docker_compose::docker_compose;

#[cfg(feature = "alpha-transforms")]
#[cfg(feature = "rdkafka-driver-tests")]
mod test_cases;

#[cfg(feature = "alpha-transforms")]
#[cfg(feature = "rdkafka-driver-tests")]
#[tokio::test]
async fn passthrough_standard() {
Expand All @@ -33,7 +28,6 @@ async fn passthrough_standard() {
}

#[cfg(feature = "rdkafka-driver-tests")]
#[cfg(feature = "alpha-transforms")]
#[tokio::test]
async fn passthrough_encode() {
let _docker_compose =
Expand All @@ -48,7 +42,6 @@ async fn passthrough_encode() {
}

#[cfg(feature = "rdkafka-driver-tests")]
#[cfg(feature = "alpha-transforms")]
#[tokio::test]
async fn cluster_single_shotover() {
let _docker_compose = docker_compose("tests/test-configs/kafka/cluster/docker-compose.yaml");
Expand All @@ -67,7 +60,6 @@ async fn cluster_single_shotover() {
}

#[cfg(feature = "rdkafka-driver-tests")]
#[cfg(feature = "alpha-transforms")]
#[tokio::test]
async fn cluster_multi_shotover() {
let _docker_compose = docker_compose("tests/test-configs/kafka/cluster/docker-compose.yaml");
Expand All @@ -77,8 +69,10 @@ async fn cluster_multi_shotover() {
shotover_process(&format!(
"tests/test-configs/kafka/cluster/topology{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.with_observability_port(9000 + i)
.start()
.await,
);
Expand Down
Loading

0 comments on commit 43fa2f3

Please sign in to comment.