diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 34de2a307..6f5ae56f6 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -4,6 +4,8 @@ use crate::shotover_process; use pretty_assertions::assert_eq; use rstest::rstest; use std::time::Duration; +use test_cases::smoke_test; +use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls}; use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver}; use test_helpers::docker_compose::docker_compose; use test_helpers::shotover_process::{Count, EventMatcher}; @@ -357,22 +359,76 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive let _docker_compose = docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml"); - let shotover = shotover_process( - "tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml", - ) - .start() - .await; + // test concurrent connections with different access levels to ensure that: + // * clients with bad auth are not authorized + // * tokens are not mixed up + // * requests are not sent to the super user connection + { + // admin requests sent by admin user are successful + let shotover = shotover_process( + "tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml", + ) + .start() + .await; + let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") + .use_sasl_scram("super_user", "super_password"); + setup_basic_user_acls(&connection_super, "basic_user").await; + test_cases::standard_test_suite(connection_super).await; + assert_connection_fails_with_incorrect_password(driver, "super_user").await; + + // admin requests sent by basic user are unsuccessful + let connection_basic = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") + .use_sasl_scram("basic_user", "basic_password"); + assert_topic_creation_is_denied_due_to_acl(&connection_basic).await; + assert_connection_fails_with_incorrect_password(driver, "basic_user").await; - let connection_builder = - KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_scram("user", "password"); - test_cases::standard_test_suite(connection_builder).await; + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } - tokio::time::timeout( - Duration::from_secs(10), - shotover.shutdown_and_then_consume_events(&[]), - ) - .await - .expect("Shotover did not shutdown within 10s"); + // rerun same tests as before with different ordering + { + // admin requests sent by regular user are unsuccessful + let shotover = shotover_process( + "tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml", + ) + .start() + .await; + assert_connection_fails_with_incorrect_password(driver, "basic_user").await; + let connection_basic = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") + .use_sasl_scram("basic_user", "basic_password"); + assert_topic_creation_is_denied_due_to_acl(&connection_basic).await; + assert_connection_fails_with_incorrect_password(driver, "basic_user").await; + + // admin requests sent by admin user are successful + // admin requests sent by regular user remain unsuccessful + let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") + .use_sasl_scram("super_user", "super_password"); + smoke_test(&connection_super).await; + assert_topic_creation_is_denied_due_to_acl(&connection_basic).await; + assert_connection_fails_with_incorrect_password(driver, "basic_user").await; + assert_connection_fails_with_incorrect_password(driver, "super_user").await; + + tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); + } +} + +async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, username: &str) { + let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192") + .use_sasl_scram(username, "not_the_password"); + assert_eq!( + connection_builder.assert_admin_error().await.to_string(), + "org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256\n" + ); } #[rstest] diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 1cad5dc81..157673a42 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -1,6 +1,7 @@ use test_helpers::connection::kafka::{ - AlterConfig, ConfigEntry, ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic, - Record, ResourceSpecifier, + Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse, + KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, + ResourceType, }; async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { @@ -243,3 +244,67 @@ pub async fn standard_test_suite(connection_builder: KafkaConnectionBuilder) { produce_consume_acks0(&connection_builder).await; connection_builder.admin_cleanup().await; } + +pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) { + let admin = connection.connect_admin().await; + admin + .create_acls(vec![Acl { + resource_type: ResourceType::Topic, + resource_name: "*".to_owned(), + resource_pattern_type: ResourcePatternType::Literal, + principal: format!("User:{username}"), + host: "*".to_owned(), + operation: AclOperation::Describe, + permission_type: AclPermissionType::Allow, + }]) + .await; +} + +// A quick test without running the whole test suite +// Assumes that admin_setup or standard_test_suite has already been run. +pub async fn smoke_test(connection_builder: &KafkaConnectionBuilder) { + let producer = connection_builder.connect_producer(1).await; + + producer + .assert_produce( + Record { + payload: "initial", + topic_name: "partitions1", + key: Some("Key"), + }, + None, + ) + .await; +} + +/// Invariants: +/// * The passed connection is a user setup with the ACL's of `setup_basic_user_acls` +/// Assertions: +/// * Asserts that the user cannot perform the admin operation of creating new topics (not allowed by ACL) +/// + Asserts that the topic was not created as a result of the failed topic creation. +/// * Asserts that the user can perform the describe operation on topics (explicitly allowed by ACL) +pub async fn assert_topic_creation_is_denied_due_to_acl(connection: &KafkaConnectionBuilder) { + let admin = connection.connect_admin().await; + // attempt to create topic and get auth failure due to missing ACL + assert_eq!( + connection.assert_admin_error().await.to_string(), + admin + .create_topics_fallible(&[NewTopic { + name: "acl_check_topic", + num_partitions: 1, + replication_factor: 1, + }]) + .await + .unwrap_err() + .to_string(), + "org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.\n" + ); + + // attempt to describe topic: + // * The request succeeds because user has AclOperation::Describe. + // * But no topic is found since the topic creation was denied. + assert_eq!( + admin.describe_topic("acl_check_topic").await.unwrap_err().to_string(), + "org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n" + ) +} diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml index 5536a51bb..83567db4b 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml @@ -22,8 +22,8 @@ services: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:SSL,BROKER:SASL_SSL,SHOTOVER_MTLS:SSL" KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092,SHOTOVER_MTLS://172.16.1.2:9094" KAFKA_CFG_DELEGATION_TOKEN_MASTER_KEY: THE_MASTER_KEY - KAFKA_CLIENT_USERS: "user" - KAFKA_CLIENT_PASSWORDS: "password" + KAFKA_CLIENT_USERS: "super_user,basic_user" + KAFKA_CLIENT_PASSWORDS: "super_password,basic_password" KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER" KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: "PLAIN" KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER" @@ -37,7 +37,7 @@ services: # Give the following super user access: # * the user named `user` # * any clients connected via a TLS certificate of `O=ShotoverTestCertificate,CN=Generic-Cert` - KAFKA_CFG_SUPER_USERS: "User:user;User:O=ShotoverTestCertificate,CN=Generic-Cert" + KAFKA_CFG_SUPER_USERS: "User:super_user;User:O=ShotoverTestCertificate,CN=Generic-Cert" volumes: &volumes - type: tmpfs target: /bitnami/kafka diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 77086ccd6..07802314b 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,4 +1,7 @@ -use super::{AlterConfig, ExpectedResponse, NewPartition, NewTopic, Record, ResourceSpecifier}; +use super::{ + Acl, AclOperation, AclPermissionType, AlterConfig, ExpectedResponse, NewPartition, NewTopic, + Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, +}; use anyhow::Result; use j4rs::{errors::J4RsError, Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact}; use pretty_assertions::assert_eq; @@ -346,6 +349,20 @@ impl KafkaAdminJava { self.create_topics_fallible(topics).await.unwrap(); } + pub async fn describe_topic(&self, topic_name: &str) -> Result { + let topics = self + .jvm + .java_list("java.lang.String", vec![topic_name]) + .unwrap(); + + let result = self + .jvm + .invoke(&self.admin, "describeTopics", &[&topics.into()]) + .unwrap(); + self.jvm.invoke_async(&result, "allTopicNames", &[]).await?; + Ok(TopicDescription {}) + } + pub async fn create_topics_fallible(&self, topics: &[NewTopic<'_>]) -> Result<()> { let topics: Vec<_> = topics .iter() @@ -518,6 +535,124 @@ impl KafkaAdminJava { .unwrap(); } + pub async fn create_acls(&self, acls: Vec) { + let resource_type = self + .jvm + .static_class("org.apache.kafka.common.resource.ResourceType") + .unwrap(); + let resource_pattern_type = self + .jvm + .static_class("org.apache.kafka.common.resource.PatternType") + .unwrap(); + let acl_operation = self + .jvm + .static_class("org.apache.kafka.common.acl.AclOperation") + .unwrap(); + let acl_permission_type = self + .jvm + .static_class("org.apache.kafka.common.acl.AclPermissionType") + .unwrap(); + + let acls: Vec<_> = acls + .iter() + .map(|acl| { + let resource_type_field = match acl.resource_type { + ResourceType::Any => "ANY", + ResourceType::Cluster => "CLUSTER", + ResourceType::DelegationToken => "DELEGATION_TOKEN", + ResourceType::Group => "GROUP", + ResourceType::Topic => "TOPIC", + ResourceType::TransactionalId => "TRANSACTIONAL_ID", + ResourceType::User => "USER", + }; + let resource_pattern_type_field = match acl.resource_pattern_type { + ResourcePatternType::Literal => "LITERAL", + ResourcePatternType::Prefixed => "PREFIXED", + }; + let resource = self + .jvm + .create_instance( + "org.apache.kafka.common.resource.ResourcePattern", + &[ + &self + .jvm + .field(&resource_type, resource_type_field) + .unwrap() + .into(), + &InvocationArg::try_from(acl.resource_name.as_str()).unwrap(), + &self + .jvm + .field(&resource_pattern_type, resource_pattern_type_field) + .unwrap() + .into(), + ], + ) + .unwrap(); + + let acl_operation_field = match acl.operation { + AclOperation::All => "ALL", + AclOperation::Alter => "ALTER", + AclOperation::AlterConfigs => "ALTER_CONFIGS", + AclOperation::ClusterAction => "CLUSTER_ACTION", + AclOperation::Create => "CREATE", + AclOperation::CreateTokens => "CREATE_TOKENS", + AclOperation::Delete => "DELETE", + AclOperation::Describe => "DESCRIBE", + AclOperation::DescribeConfigs => "DESCRIBE_CONFIGS", + AclOperation::DescribeTokens => "DESCRIBE_TOKENS", + AclOperation::Read => "READ", + AclOperation::Write => "WRITE", + }; + let acl_permission_type_field = match acl.permission_type { + AclPermissionType::Allow => "ALLOW", + AclPermissionType::Deny => "DENY", + }; + let entry = self + .jvm + .create_instance( + "org.apache.kafka.common.acl.AccessControlEntry", + &[ + &InvocationArg::try_from(acl.principal.as_str()).unwrap(), + &InvocationArg::try_from(acl.host.as_str()).unwrap(), + &self + .jvm + .field(&acl_operation, acl_operation_field) + .unwrap() + .into(), + &self + .jvm + .field(&acl_permission_type, acl_permission_type_field) + .unwrap() + .into(), + ], + ) + .unwrap(); + + Ok(self + .jvm + .create_instance( + "org.apache.kafka.common.acl.AclBinding", + &[&resource.into(), &entry.into()], + ) + .unwrap()) + }) + .collect(); + + let acls = self + .jvm + .java_list("org.apache.kafka.common.acl.AclBinding", acls) + .unwrap(); + + let result = self + .jvm + .invoke(&self.admin, "createAcls", &[&acls.into()]) + .unwrap(); + self.jvm + .invoke_async(&result, "all", InvocationArg::empty()) + .await + .unwrap(); + } + fn java_map(&self, key_values: Vec<(Instance, Instance)>) -> Instance { let map = self .jvm diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 270931958..a4d5e2b9a 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -212,6 +212,14 @@ impl KafkaAdmin { } } + pub async fn describe_topic(&self, topic_name: &str) -> Result { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + KafkaAdmin::Cpp(_) => unimplemented!(), + KafkaAdmin::Java(java) => java.describe_topic(topic_name).await, + } + } + pub async fn delete_topics(&self, to_delete: &[&str]) { match self { #[cfg(feature = "kafka-cpp-driver-tests")] @@ -243,6 +251,14 @@ impl KafkaAdmin { KafkaAdmin::Java(java) => java.alter_configs(alter_configs).await, } } + + pub async fn create_acls(&self, acls: Vec) { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + KafkaAdmin::Cpp(_) => unimplemented!("CPP driver does not support creating ACL's"), + KafkaAdmin::Java(java) => java.create_acls(acls).await, + } + } } pub struct NewTopic<'a> { @@ -269,3 +285,59 @@ pub struct ConfigEntry { pub key: String, pub value: String, } + +pub struct Acl { + pub resource_type: ResourceType, + pub resource_name: String, + pub resource_pattern_type: ResourcePatternType, + pub principal: String, + pub host: String, + pub operation: AclOperation, + pub permission_type: AclPermissionType, +} + +/// https://docs.confluent.io/platform/current/clients/javadocs/javadoc/org/apache/kafka/common/resource/ResourceType.html +pub enum ResourceType { + Any, + Cluster, + DelegationToken, + Group, + Topic, + TransactionalId, + User, +} + +/// https://docs.confluent.io/platform/current/clients/javadocs/javadoc/org/apache/kafka/common/resource/PatternType.html +pub enum ResourcePatternType { + Literal, + Prefixed, +} + +/// https://docs.confluent.io/platform/current/clients/javadocs/javadoc/org/apache/kafka/common/acl/AclOperation.html +pub enum AclOperation { + All, + Alter, + AlterConfigs, + ClusterAction, + Create, + CreateTokens, + Delete, + Describe, + DescribeConfigs, + DescribeTokens, + Read, + Write, +} + +/// https://docs.confluent.io/platform/current/clients/javadocs/javadoc/org/apache/kafka/common/acl/AclPermissionType.html +pub enum AclPermissionType { + Allow, + Deny, +} + +#[derive(Debug)] +pub struct TopicDescription { + // None of our tests actually make use of the contents of TopicDescription, + // instead they just check if the describe succeeded or failed, + // so this is intentionally left empty for now +}