Skip to content

Commit

Permalink
Replace smoke_test with produce_consume_partitions1
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jun 11, 2024
1 parent 48e9d0f commit 7af0a21
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 23 deletions.
5 changes: 3 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use pretty_assertions::assert_eq;
use rstest::rstest;
use std::time::Duration;
use std::time::Instant;
use test_cases::smoke_test;
use test_cases::produce_consume_partitions1;
use test_cases::{assert_topic_creation_is_denied_due_to_acl, setup_basic_user_acls};
use test_helpers::connection::kafka::{KafkaConnectionBuilder, KafkaDriver};
use test_helpers::docker_compose::docker_compose;
Expand Down Expand Up @@ -411,7 +411,8 @@ async fn cluster_sasl_scram_over_mtls_single_shotover(#[case] driver: KafkaDrive
// admin requests sent by regular user remain unsuccessful
let connection_super = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192")
.use_sasl_scram("super_user", "super_password");
smoke_test(&connection_super).await;
produce_consume_partitions1(&connection_super, "c3220ff0-9390-425d-a56d-9d880a339c8c")
.await;
assert_topic_creation_is_denied_due_to_acl(&connection_basic).await;
assert_connection_fails_with_incorrect_password(driver, "basic_user").await;
assert_connection_fails_with_incorrect_password(driver, "super_user").await;
Expand Down
17 changes: 0 additions & 17 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,20 +290,3 @@ pub async fn assert_topic_creation_is_denied_due_to_acl(connection: &KafkaConnec
"org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n"
)
}

// A quick test without running the whole test suite
// Assumes that admin_setup or standard_test_suite has already been run.
pub async fn smoke_test(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer(1).await;

producer
.assert_produce(
Record {
payload: "initial",
topic_name: "partitions1",
key: Some("Key"),
},
None,
)
.await;
}
11 changes: 7 additions & 4 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,17 +1603,20 @@ routing message to a random node so that:
}

async fn add_node_if_new(&mut self, new_node: KafkaNode) {
let new = self
let missing_from_shared = self
.nodes_shared
.read()
.await
.iter()
.all(|node| node.broker_id != new_node.broker_id);
if new {
if missing_from_shared {
self.nodes_shared.write().await.push(new_node);

self.update_local_nodes().await;
}

// We need to run this every time, not just when `missing_from_shared`.
// This is because nodes_shared could already contain this node while its missing from local nodes.
// This could happen when another KafkaSinkCluster instance updates nodes_shared just before we read from it.
self.update_local_nodes().await;
}

fn broker_within_rack(&self, broker_id: BrokerId) -> bool {
Expand Down
4 changes: 4 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::tls::TlsConnector;
use crate::transforms::kafka::sink_cluster::SASL_SCRAM_MECHANISMS;
use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use derivative::Derivative;
use kafka_protocol::messages::{ApiKey, BrokerId, RequestHeader, SaslAuthenticateRequest};
use kafka_protocol::protocol::{Builder, StrBytes};
use kafka_protocol::ResponseError;
Expand Down Expand Up @@ -256,10 +257,13 @@ impl KafkaAddress {
}
}

#[derive(Derivative)]
#[derivative(Debug)]
pub struct KafkaNode {
pub broker_id: BrokerId,
pub rack: Option<StrBytes>,
pub kafka_address: KafkaAddress,
#[derivative(Debug = "ignore")]
connection: Option<SinkConnection>,
}

Expand Down

0 comments on commit 7af0a21

Please sign in to comment.