Skip to content

Commit

Permalink
Remove sasl config (#1545)
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Apr 2, 2024
1 parent 29e713d commit 1ae9f50
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 88 deletions.
3 changes: 0 additions & 3 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,6 @@ This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
# When this field is enabled it allows the use of SASL authentication. If you intend to use SASL this field must be enabled, it is false by default.
sasl_enabled: false
# When this field is provided TLS is used when connecting to the remote address.
# Removing this field will disable TLS.
#tls:
Expand Down
1 change: 0 additions & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl KafkaBench {
broker_id: 0,
}],
tls: None,
sasl_enabled: Some(false),
}),
});
common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ sources:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
sasl_enabled: true
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ sources:
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ sources:
listen_addr: "127.0.0.1:9193"
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
Expand Down
150 changes: 70 additions & 80 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ pub struct KafkaSinkClusterConfig {
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub tls: Option<TlsConnectorConfig>,
pub sasl_enabled: Option<bool>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -97,7 +96,6 @@ impl TransformConfig for KafkaSinkClusterConfig {
self.connect_timeout_ms,
self.read_timeout,
tls,
self.sasl_enabled.unwrap_or(false),
)))
}
}
Expand All @@ -114,7 +112,6 @@ pub struct KafkaSinkClusterBuilder {
topic_by_id: Arc<DashMap<Uuid, Topic>>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
tls: Option<TlsConnector>,
sasl_enabled: bool,
}

impl KafkaSinkClusterBuilder {
Expand All @@ -125,7 +122,6 @@ impl KafkaSinkClusterBuilder {
connect_timeout_ms: u64,
timeout: Option<u64>,
tls: Option<TlsConnector>,
sasl_enabled: bool,
) -> KafkaSinkClusterBuilder {
let receive_timeout = timeout.map(Duration::from_secs);

Expand All @@ -146,7 +142,6 @@ impl KafkaSinkClusterBuilder {
topic_by_id: Arc::new(DashMap::new()),
nodes_shared: Arc::new(RwLock::new(vec![])),
tls,
sasl_enabled,
}
}
}
Expand All @@ -163,7 +158,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
topic_by_name: self.topic_by_name.clone(),
topic_by_id: self.topic_by_id.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
sasl_status: SaslStatus::new(self.sasl_enabled),
auth_complete: false,
connection_factory: ConnectionFactory::new(
self.tls.clone(),
self.connect_timeout,
Expand Down Expand Up @@ -210,33 +205,6 @@ impl AtomicBrokerId {
}
}

#[derive(Debug)]
struct SaslStatus {
enabled: bool,
handshake_complete: bool,
}

impl SaslStatus {
fn new(enabled: bool) -> Self {
Self {
enabled,
handshake_complete: false,
}
}

fn set_handshake_complete(&mut self) {
self.handshake_complete = true;
}

fn is_handshake_complete(&self) -> bool {
if self.enabled {
self.handshake_complete
} else {
true
}
}
}

pub struct KafkaSinkCluster {
first_contact_points: Vec<String>,
shotover_nodes: Vec<ShotoverNode>,
Expand All @@ -247,7 +215,7 @@ pub struct KafkaSinkCluster {
topic_by_name: Arc<DashMap<TopicName, Topic>>,
topic_by_id: Arc<DashMap<Uuid, Topic>>,
rng: SmallRng,
sasl_status: SaslStatus,
auth_complete: bool,
connection_factory: ConnectionFactory,
first_contact_node: Option<KafkaAddress>,
control_connection: Option<SinkConnection>,
Expand Down Expand Up @@ -382,6 +350,54 @@ impl KafkaSinkCluster {
}

async fn route_requests(&mut self, mut requests: Vec<Message>) -> Result<()> {
if !self.auth_complete {
let mut handshake_request_count = 0;
for request in &mut requests {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::SaslHandshake(_),
..
})) => {
self.connection_factory
.add_handshake_message(request.clone());
handshake_request_count += 1;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::SaslAuthenticate(_),
..
})) => {
self.connection_factory.add_auth_message(request.clone());
handshake_request_count += 1;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ApiVersions(_),
..
})) => {
handshake_request_count += 1;
}
_ => {
// The client is no longer performing authentication
self.auth_complete = true;
break;
}
}
}
// route all handshake messages
for _ in 0..handshake_request_count {
let request = requests.remove(0);
self.route_to_first_contact_node(request.clone());
}

if requests.is_empty() {
// all messages received in this batch are handshake messages,
// so dont continue with regular message handling
return Ok(());
} else {
// the later messages in this batch are not handshake messages,
// so continue onto the regular message handling
}
}

let mut topics = vec![];
let mut groups = vec![];
for request in &mut requests {
Expand Down Expand Up @@ -435,9 +451,7 @@ impl KafkaSinkCluster {
}

// request and process metadata if we are missing topics or the controller broker id
if (!topics.is_empty() || self.controller_broker.get().is_none())
&& self.sasl_status.is_handshake_complete()
{
if !topics.is_empty() || self.controller_broker.get().is_none() {
let mut metadata = self.get_metadata_of_topics(topics).await?;
match metadata.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
Expand Down Expand Up @@ -616,30 +630,6 @@ impl KafkaSinkCluster {
body: RequestBody::CreateTopics(_),
..
})) => self.route_to_controller(message),

Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ApiVersions(_),
..
})) => self.route_to_first_contact_node(message.clone()),

Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::SaslHandshake(_),
..
})) => {
self.route_to_first_contact_node(message.clone());
self.connection_factory
.add_handshake_message(message.clone());
}

Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::SaslAuthenticate(_),
..
})) => {
self.route_to_first_contact_node(message.clone());
self.connection_factory.add_auth_message(message.clone());
self.sasl_status.set_handshake_complete();
}

// route to random node
_ => {
let destination = self.nodes.choose(&mut self.rng).unwrap().broker_id;
Expand All @@ -653,25 +643,6 @@ impl KafkaSinkCluster {
Ok(())
}

fn route_to_first_contact_node(&mut self, message: Message) {
let destination = if let Some(first_contact_node) = &self.first_contact_node {
self.nodes
.iter_mut()
.find(|node| node.kafka_address == *first_contact_node)
.unwrap()
.broker_id
} else {
let node = self.nodes.get_mut(0).unwrap();
self.first_contact_node = Some(node.kafka_address.clone());
node.broker_id
};

self.pending_requests.push_back(PendingRequest::Routed {
destination,
request: message,
});
}

async fn find_coordinator_of_group(
&mut self,
group: GroupId,
Expand Down Expand Up @@ -892,6 +863,25 @@ impl KafkaSinkCluster {
Ok(())
}

fn route_to_first_contact_node(&mut self, message: Message) {
let destination = if let Some(first_contact_node) = &self.first_contact_node {
self.nodes
.iter_mut()
.find(|node| node.kafka_address == *first_contact_node)
.unwrap()
.broker_id
} else {
let node = self.nodes.get_mut(0).unwrap();
self.first_contact_node = Some(node.kafka_address.clone());
node.broker_id
};

self.pending_requests.push_back(PendingRequest::Routed {
destination,
request: message,
});
}

fn route_to_controller(&mut self, message: Message) {
let broker_id = self.controller_broker.get().unwrap();

Expand Down

0 comments on commit 1ae9f50

Please sign in to comment.