Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaSinkCluster: authorize_scram_over_mtls #1605

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 21 additions & 1 deletion docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,14 @@ Instead Shotover will pretend to be either a single Kafka node or part of a clus

This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster messages to contain the nodes in the shotover cluster instead of the kafka cluster.

KafkaSinkCluster does not support SASL SCRAM authentication, if a client attempts to use SCRAM it will appear as if its not enabled in the server.
By default KafkaSinkCluster does not support SASL SCRAM authentication, if a client attempts to use SCRAM it will appear as if its not enabled in the server.
SCRAM can not be supported normally as it is protected against replaying of auth messages, preventing shotover from opening multiple outgoing connections.

However, SCRAM support can be achieved by enabling the `authorize_scram_over_mtls` option.
This will, hidden from the client, generate delegation tokens over an mTLS connection that correspond to the username sent over the SCRAM auth requests.
First the clients SCRAM auth requests are routed to a single kafka broker to verify the user has the correct credentials.
Once authentication is confirmed, shotover creates new outgoing connections to different brokers via delegation token authentication. (Outgoing connections are accessible only to the one incoming connection)
If SCRAM authentication against the first kafka broker fails, shotover will terminate the connection before processing any non-auth requests, to ensure the client can not escalate privileges.

```yaml
- KafkaSinkCluster:
Expand Down Expand Up @@ -293,6 +300,19 @@ KafkaSinkCluster does not support SASL SCRAM authentication, if a client attempt
# private_key_path: "tls/localhost.key"
# # Enable/disable verifying the hostname of the certificate provided by the destination.
# #verify_hostname: true

# When this field is provided authorization of SCRAM over mTLS is enabled.
# Removing this field will disable the feature.
#authorize_scram_over_mtls:
# # This must point at a kafka port that exposes mTLS authentication for a user capable of creating delegation tokens.
# mtls_port_contact_points: ["172.16.1.2:9094"]
# # The TLS certs for an mTLS user capable of creating delegation tokens
# tls:
# certificate_authority_path: "tls/mtls_localhost_CA.crt"
# certificate_path: "tls/mtls_localhost.crt"
# private_key_path: "tls/mtls_localhost.key"
# verify_hostname: true

```

### KafkaSinkSingle
Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl KafkaBench {
rack: "rack1".into(),
broker_id: 0,
}],
authorize_scram_over_mtls: None,
tls: None,
}),
});
Expand Down
28 changes: 28 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,34 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml");

let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_scram("user", "password");
test_cases::standard_test_suite(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "kafka-cpp-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1

services:
kafka0:
image: &image 'bitnami/kafka:3.6.1-debian-11-r24'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093"
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093,SHOTOVER_MTLS://:9094"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:SSL,BROKER:SASL_SSL,SHOTOVER_MTLS:SSL"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092,SHOTOVER_MTLS://172.16.1.2:9094"
KAFKA_CFG_DELEGATION_TOKEN_MASTER_KEY: THE_MASTER_KEY
KAFKA_CLIENT_USERS: "user"
KAFKA_CLIENT_PASSWORDS: "password"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: "PLAIN"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "SCRAM-SHA-256"
KAFKA_INTER_BROKER_USER: "controller_user"
KAFKA_INTER_BROKER_PASSWORD: "controller_password"
KAFKA_CERTIFICATE_PASSWORD: "password"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_TLS_CLIENT_AUTH: required
KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
# Give the following super user access:
# * the user named `user`
# * any clients connected via a TLS certificate of `O=ShotoverTestCertificate,CN=Generic-Cert`
KAFKA_CFG_SUPER_USERS: "User:user;User:O=ShotoverTestCertificate,CN=Generic-Cert"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
- type: bind
source: "../tls/certs"
target: "/opt/bitnami/kafka/config/certs"
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092,SHOTOVER_MTLS://172.16.1.3:9094"
KAFKA_CFG_NODE_ID: 1
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092,SHOTOVER_MTLS://172.16.1.4:9094"
KAFKA_CFG_NODE_ID: 2
volumes: *volumes
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- DebugPrinter
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
authorize_scram_over_mtls:
mtls_port_contact_points: ["172.16.1.2:9094"]
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key"
verify_hostname: true
connect_timeout_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key"
verify_hostname: true
5 changes: 5 additions & 0 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ kafka = [
"dep:dashmap",
"dep:xxhash-rust",
"dep:string",
"dep:base64",
"dep:sasl",
]
redis = [
"dep:redis-protocol",
Expand Down Expand Up @@ -119,6 +121,9 @@ xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true }
dashmap = { version = "5.4.0", optional = true }
atoi = { version = "2.0.0", optional = true }
fnv = "1.0.7"
# sasl fork hosted at https://github.com/shotover/xmpp-rs/tree/sasl_fork
# once https://gitlab.com/xmpp-rs/xmpp-rs/-/merge_requests/324 is merged and in a release we can replace this with upstream
sasl = { version = "0.5.1", optional = true, default-features = false, features = ["scram"] , package = "a8da96aa9ee5ce956b7069f92a4ca762efc75133" }

[dev-dependencies]
criterion = { version = "0.5.0", features = ["async_tokio"] }
Expand Down
Loading