Skip to content

Commit

Permalink
KafkaSinkCluster: authorize_scram_over_mtls
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed May 9, 2024
1 parent 78b0173 commit 03552b0
Show file tree
Hide file tree
Showing 9 changed files with 604 additions and 50 deletions.
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl KafkaBench {
rack: "rack1".into(),
broker_id: 0,
}],
authorize_scram_over_mtls: None,
tls: None,
}),
});
Expand Down
28 changes: 28 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,34 @@ async fn cluster_sasl_scram_single_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();

let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml");

let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_scram("user", "password");
test_cases::standard_test_suite(connection_builder).await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "kafka-cpp-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1

services:
kafka0:
image: &image 'bitnami/kafka:3.6.1-debian-11-r24'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093"
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093,SHOTOVER_MTLS://:9094"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:SSL,BROKER:SASL_SSL,SHOTOVER_MTLS:SSL"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092,SHOTOVER_MTLS://172.16.1.2:9094"
KAFKA_CFG_DELEGATION_TOKEN_MASTER_KEY: THE_MASTER_KEY
KAFKA_CLIENT_USERS: "user"
KAFKA_CLIENT_PASSWORDS: "password"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: "PLAIN"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: "SCRAM-SHA-256"
KAFKA_INTER_BROKER_USER: "controller_user"
KAFKA_INTER_BROKER_PASSWORD: "controller_password"
KAFKA_CERTIFICATE_PASSWORD: "password"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
# TODO: Does this enable MTLS auth for the SCRAM connection as well?
# If so, we need to be careful that kafka doesnt elevate our token'd SCRAM credentials to super user.
# I dont yet know how it would actually behave.
# If so, a possible solution could be seperate certificates for the token'd connections
KAFKA_TLS_CLIENT_AUTH: required
KAFKA_CFG_AUTHORIZER_CLASS_NAME: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
# Give the following super user access:
# * the user named `user`
# * any clients connected via a TLS certificate of `O=ShotoverTestCertificate,CN=Generic-Cert`
KAFKA_CFG_SUPER_USERS: "User:user;User:O=ShotoverTestCertificate,CN=Generic-Cert"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
- type: bind
source: "../tls/certs"
target: "/opt/bitnami/kafka/config/certs"
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092,SHOTOVER_MTLS://172.16.1.3:9094"
KAFKA_CFG_NODE_ID: 1
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092,SHOTOVER_MTLS://172.16.1.4:9094"
KAFKA_CFG_NODE_ID: 2
volumes: *volumes
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
sources:
- Kafka:
name: "kafka"
listen_addr: "127.0.0.1:9192"
chain:
- DebugPrinter
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
authorize_scram_over_mtls:
mtls_port_contact_points: ["172.16.1.2:9094"]
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key"
verify_hostname: true
connect_timeout_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
private_key_path: "tests/test-configs/kafka/tls/certs/localhost.key"
verify_hostname: true
4 changes: 4 additions & 0 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ kafka = [
"dep:dashmap",
"dep:xxhash-rust",
"dep:string",
"dep:base64",
"dep:sasl",
]
redis = [
"dep:redis-protocol",
Expand Down Expand Up @@ -119,6 +121,8 @@ xxhash-rust = { version = "0.8.6", features = ["xxh3"], optional = true }
dashmap = { version = "5.4.0", optional = true }
atoi = { version = "2.0.0", optional = true }
fnv = "1.0.7"
#sasl = { version = "0.5.1", optional = true, default-features = false, features = ["scram"] }
sasl = { version = "0.5.1", optional = true, default-features = false, features = ["scram"], git = "https://gitlab.com/rukai/xmpp-rs", branch = "sasl_scram_extensions" }

[dev-dependencies]
criterion = { version = "0.5.0", features = ["async_tokio"] }
Expand Down
Loading

0 comments on commit 03552b0

Please sign in to comment.