Skip to content

Commit

Permalink
KafkaSinkCluster scram_over_mtls - thorough integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jun 4, 2024
1 parent ec2a06d commit b39162b
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 20 deletions.
84 changes: 70 additions & 14 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use crate::shotover_process;
use pretty_assertions::assert_eq;
use rstest::rstest;
use std::time::Duration;
use test_cases::smoke_test;
use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls};
use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver};
use test_helpers::docker_compose::docker_compose;
use test_helpers::shotover_process::{Count, EventMatcher};
Expand Down Expand Up @@ -357,22 +359,76 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl-scram-over-mtls/docker-compose.yaml");

let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;
// test concurrent connections with different access levels to ensure that:
// * clients with bad auth are not authorized
// * tokens are not mixed up
// * requests are not sent to the super user connection
{
// admin requests sent by admin user are successful
let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;
let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("super_user", "super_password");
setup_basic_user_acls(&connection_super, "basic_user").await;
test_cases::standard_test_suite(connection_super).await;
assert_connection_fails_with_incorrect_password(driver, "super_user").await;

// admin requests sent by basic user are unsuccessful
let connection_basic = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("basic_user", "basic_password");
assert_topic_creation_is_denied_due_to_acl(&connection_basic).await;
assert_connection_fails_with_incorrect_password(driver, "basic_user").await;

let connection_builder =
KafkaConnectionBuilder::new(driver, "127.0.0.1:9192").use_sasl_scram("user", "password");
test_cases::standard_test_suite(connection_builder).await;
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
// rerun same tests as before with different ordering
{
// admin requests sent by regular user are unsuccessful
let shotover = shotover_process(
"tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml",
)
.start()
.await;
assert_connection_fails_with_incorrect_password(driver, "basic_user").await;
let connection_basic = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("basic_user", "basic_password");
assert_topic_creation_is_denied_due_to_acl(&connection_basic).await;
assert_connection_fails_with_incorrect_password(driver, "basic_user").await;

// admin requests sent by admin user are successful
// admin requests sent by regular user remain unsuccessful
let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("super_user", "super_password");
smoke_test(&connection_super).await;
assert_topic_creation_is_denied_due_to_acl(&connection_basic).await;
assert_connection_fails_with_incorrect_password(driver, "basic_user").await;
assert_connection_fails_with_incorrect_password(driver, "super_user").await;

tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

async fn assert_connection_fails_with_incorrect_password(driver: KafkaDriver, username: &str) {
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram(username, "not_the_password");
assert_eq!(
connection_builder.assert_admin_error().await.to_string(),
"org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256\n"
);
}

#[rstest]
Expand Down
69 changes: 67 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use test_helpers::connection::kafka::{
AlterConfig, ConfigEntry, ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic,
Record, ResourceSpecifier,
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ExpectedResponse,
KafkaConnectionBuilder, NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier,
ResourceType,
};

async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -243,3 +244,67 @@ pub async fn standard_test_suite(connection_builder: KafkaConnectionBuilder) {
produce_consume_acks0(&connection_builder).await;
connection_builder.admin_cleanup().await;
}

pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) {
let admin = connection.connect_admin().await;
admin
.create_acls(vec![Acl {
resource_type: ResourceType::Topic,
resource_name: "*".to_owned(),
resource_pattern_type: ResourcePatternType::Literal,
principal: format!("User:{username}"),
host: "*".to_owned(),
operation: AclOperation::Describe,
permission_type: AclPermissionType::Allow,
}])
.await;
}

// A quick test without running the whole test suite
// Assumes that admin_setup or standard_test_suite has already been run.
pub async fn smoke_test(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer(1).await;

producer
.assert_produce(
Record {
payload: "initial",
topic_name: "partitions1",
key: Some("Key"),
},
None,
)
.await;
}

/// Invariants:
/// * The passed connection is a user setup with the ACL's of `setup_basic_user_acls`
/// Assertions:
/// * Asserts that the user cannot perform the admin operation of creating new topics (not allowed by ACL)
/// + Asserts that the topic was not created as a result of the failed topic creation.
/// * Asserts that the user can perform the describe operation on topics (explicitly allowed by ACL)
pub async fn assert_topic_creation_is_denied_due_to_acl(connection: &KafkaConnectionBuilder) {
let admin = connection.connect_admin().await;
// attempt to create topic and get auth failure due to missing ACL
assert_eq!(
connection.assert_admin_error().await.to_string(),
admin
.create_topics_fallible(&[NewTopic {
name: "acl_check_topic",
num_partitions: 1,
replication_factor: 1,
}])
.await
.unwrap_err()
.to_string(),
"org.apache.kafka.common.errors.TopicAuthorizationException: Authorization failed.\n"
);

// attempt to describe topic:
// * The request succeeds because user has AclOperation::Describe.
// * But no topic is found since the topic creation was denied.
assert_eq!(
admin.describe_topic("acl_check_topic").await.unwrap_err().to_string(),
"org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n"
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ services:
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:SSL,BROKER:SASL_SSL,SHOTOVER_MTLS:SSL"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092,SHOTOVER_MTLS://172.16.1.2:9094"
KAFKA_CFG_DELEGATION_TOKEN_MASTER_KEY: THE_MASTER_KEY
KAFKA_CLIENT_USERS: "user"
KAFKA_CLIENT_PASSWORDS: "password"
KAFKA_CLIENT_USERS: "super_user,basic_user"
KAFKA_CLIENT_PASSWORDS: "super_password,basic_password"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL: "PLAIN"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
Expand All @@ -37,7 +37,7 @@ services:
# Give the following super user access:
# * the user named `user`
# * any clients connected via a TLS certificate of `O=ShotoverTestCertificate,CN=Generic-Cert`
KAFKA_CFG_SUPER_USERS: "User:user;User:O=ShotoverTestCertificate,CN=Generic-Cert"
KAFKA_CFG_SUPER_USERS: "User:super_user;User:O=ShotoverTestCertificate,CN=Generic-Cert"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
Expand Down
137 changes: 136 additions & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::{AlterConfig, ExpectedResponse, NewPartition, NewTopic, Record, ResourceSpecifier};
use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ExpectedResponse, NewPartition, NewTopic,
Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription,
};
use anyhow::Result;
use j4rs::{errors::J4RsError, Instance, InvocationArg, Jvm, JvmBuilder, MavenArtifact};
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -346,6 +349,20 @@ impl KafkaAdminJava {
self.create_topics_fallible(topics).await.unwrap();
}

pub async fn describe_topic(&self, topic_name: &str) -> Result<TopicDescription> {
let topics = self
.jvm
.java_list("java.lang.String", vec![topic_name])
.unwrap();

let result = self
.jvm
.invoke(&self.admin, "describeTopics", &[&topics.into()])
.unwrap();
self.jvm.invoke_async(&result, "allTopicNames", &[]).await?;
Ok(TopicDescription {})
}

pub async fn create_topics_fallible(&self, topics: &[NewTopic<'_>]) -> Result<()> {
let topics: Vec<_> = topics
.iter()
Expand Down Expand Up @@ -518,6 +535,124 @@ impl KafkaAdminJava {
.unwrap();
}

pub async fn create_acls(&self, acls: Vec<Acl>) {
let resource_type = self
.jvm
.static_class("org.apache.kafka.common.resource.ResourceType")
.unwrap();
let resource_pattern_type = self
.jvm
.static_class("org.apache.kafka.common.resource.PatternType")
.unwrap();
let acl_operation = self
.jvm
.static_class("org.apache.kafka.common.acl.AclOperation")
.unwrap();
let acl_permission_type = self
.jvm
.static_class("org.apache.kafka.common.acl.AclPermissionType")
.unwrap();

let acls: Vec<_> = acls
.iter()
.map(|acl| {
let resource_type_field = match acl.resource_type {
ResourceType::Any => "ANY",
ResourceType::Cluster => "CLUSTER",
ResourceType::DelegationToken => "DELEGATION_TOKEN",
ResourceType::Group => "GROUP",
ResourceType::Topic => "TOPIC",
ResourceType::TransactionalId => "TRANSACTIONAL_ID",
ResourceType::User => "USER",
};
let resource_pattern_type_field = match acl.resource_pattern_type {
ResourcePatternType::Literal => "LITERAL",
ResourcePatternType::Prefixed => "PREFIXED",
};
let resource = self
.jvm
.create_instance(
"org.apache.kafka.common.resource.ResourcePattern",
&[
&self
.jvm
.field(&resource_type, resource_type_field)
.unwrap()
.into(),
&InvocationArg::try_from(acl.resource_name.as_str()).unwrap(),
&self
.jvm
.field(&resource_pattern_type, resource_pattern_type_field)
.unwrap()
.into(),
],
)
.unwrap();

let acl_operation_field = match acl.operation {
AclOperation::All => "ALL",
AclOperation::Alter => "ALTER",
AclOperation::AlterConfigs => "ALTER_CONFIGS",
AclOperation::ClusterAction => "CLUSTER_ACTION",
AclOperation::Create => "CREATE",
AclOperation::CreateTokens => "CREATE_TOKENS",
AclOperation::Delete => "DELETE",
AclOperation::Describe => "DESCRIBE",
AclOperation::DescribeConfigs => "DESCRIBE_CONFIGS",
AclOperation::DescribeTokens => "DESCRIBE_TOKENS",
AclOperation::Read => "READ",
AclOperation::Write => "WRITE",
};
let acl_permission_type_field = match acl.permission_type {
AclPermissionType::Allow => "ALLOW",
AclPermissionType::Deny => "DENY",
};
let entry = self
.jvm
.create_instance(
"org.apache.kafka.common.acl.AccessControlEntry",
&[
&InvocationArg::try_from(acl.principal.as_str()).unwrap(),
&InvocationArg::try_from(acl.host.as_str()).unwrap(),
&self
.jvm
.field(&acl_operation, acl_operation_field)
.unwrap()
.into(),
&self
.jvm
.field(&acl_permission_type, acl_permission_type_field)
.unwrap()
.into(),
],
)
.unwrap();

Ok(self
.jvm
.create_instance(
"org.apache.kafka.common.acl.AclBinding",
&[&resource.into(), &entry.into()],
)
.unwrap())
})
.collect();

let acls = self
.jvm
.java_list("org.apache.kafka.common.acl.AclBinding", acls)
.unwrap();

let result = self
.jvm
.invoke(&self.admin, "createAcls", &[&acls.into()])
.unwrap();
self.jvm
.invoke_async(&result, "all", InvocationArg::empty())
.await
.unwrap();
}

fn java_map(&self, key_values: Vec<(Instance, Instance)>) -> Instance {
let map = self
.jvm
Expand Down
Loading

0 comments on commit b39162b

Please sign in to comment.