Skip to content

Commit

Permalink
remove sasl config
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Mar 21, 2024
1 parent 0a15793 commit ab552d7
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 26 deletions.
3 changes: 0 additions & 3 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,6 @@ This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
# When this field is enabled it allows the use of SASL authentication. If you intend to use SASL this field must be enabled, it is false by default.
sasl_enabled: false
# When this field is provided TLS is used when connecting to the remote address.
# Removing this field will disable TLS.
#tls:
Expand Down
1 change: 0 additions & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ impl KafkaBench {
broker_id: 0,
}],
tls: None,
sasl_enabled: Some(false),
}),
});
common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig {
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/config/config.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# configure the first `info` to set the log level for dependencies
# configure `shotover=info` to set the log level for shotover itself
# set `shotover::connection_span=info` to `shotover::connection_span=debug` to attach connection info to most log events, this is disabled by default due to a minor performance hit.
main_log_level: "info, shotover=info, shotover::connection_span=info"
main_log_level: "debug, shotover=debug, shotover::connection_span=debug"
observability_interface: "0.0.0.0:9001"
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ sources:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
sasl_enabled: true
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ sources:
listen_addr: "127.0.0.1:9191"
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ sources:
listen_addr: "127.0.0.1:9192"
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ sources:
listen_addr: "127.0.0.1:9193"
chain:
- KafkaSinkCluster:
sasl_enabled: true
shotover_nodes:
- address: "127.0.0.1:9191"
rack: "rack0"
Expand Down
59 changes: 42 additions & 17 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ pub struct KafkaSinkClusterConfig {
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub tls: Option<TlsConnectorConfig>,
pub sasl_enabled: Option<bool>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -99,7 +98,6 @@ impl TransformConfig for KafkaSinkClusterConfig {
self.connect_timeout_ms,
self.read_timeout,
tls,
self.sasl_enabled.unwrap_or(false),
)))
}
}
Expand All @@ -116,7 +114,6 @@ pub struct KafkaSinkClusterBuilder {
topic_by_id: Arc<DashMap<Uuid, Topic>>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
tls: Option<TlsConnector>,
sasl_enabled: bool,
}

impl KafkaSinkClusterBuilder {
Expand All @@ -127,7 +124,6 @@ impl KafkaSinkClusterBuilder {
connect_timeout_ms: u64,
timeout: Option<u64>,
tls: Option<TlsConnector>,
sasl_enabled: bool,
) -> KafkaSinkClusterBuilder {
let receive_timeout = timeout.map(Duration::from_secs);

Expand All @@ -148,7 +144,6 @@ impl KafkaSinkClusterBuilder {
topic_by_id: Arc::new(DashMap::new()),
nodes_shared: Arc::new(RwLock::new(vec![])),
tls,
sasl_enabled,
}
}
}
Expand All @@ -166,7 +161,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
topic_by_name: self.topic_by_name.clone(),
topic_by_id: self.topic_by_id.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
sasl_status: SaslStatus::new(self.sasl_enabled),
sasl_status: SaslStatus::new(),
connection_factory: ConnectionFactory::new(self.tls.clone(), self.connect_timeout),
first_contact_node: None,
fetch_session_id_to_broker: HashMap::new(),
Expand Down Expand Up @@ -207,28 +202,33 @@ impl AtomicBrokerId {

#[derive(Debug)]
struct SaslStatus {
enabled: bool,
handshake_complete: bool,
auth_complete: bool,
sasl_handshake_complete: bool,
}

impl SaslStatus {
fn new(enabled: bool) -> Self {
fn new() -> Self {
Self {
enabled,
handshake_complete: false,
auth_complete: false,
sasl_handshake_complete: false,
}
}

fn set_auth_complete(&mut self, v: bool) {
self.auth_complete = v;
}

fn set_handshake_complete(&mut self) {
self.handshake_complete = true;
self.sasl_handshake_complete = true;
}

fn is_handshake_complete(&self) -> bool {
if self.enabled {
self.handshake_complete
} else {
true
}
self.auth_complete
// if self.auth_complete {
// self.sasl_handshake_complete
// } else {
// false
// }
}
}

Expand Down Expand Up @@ -774,6 +774,29 @@ impl KafkaSinkCluster {
Ok(rx.await.unwrap().response.unwrap())
}

fn responses_contain_non_handshake_message(&mut self, responses: &mut Vec<Message>) {
for response in responses.iter_mut() {
match response.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ApiVersions(_),
..
})) => {}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::SaslHandshake(_),
..
})) => {}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::SaslAuthenticate(_),
..
})) => {}
_ => {
self.sasl_status.set_auth_complete(true);
return;
}
}
}
}

async fn receive_responses(
&mut self,
find_coordinator_requests: &[FindCoordinator],
Expand All @@ -786,6 +809,8 @@ impl KafkaSinkCluster {
read_responses(responses).await
}?;

self.responses_contain_non_handshake_message(&mut responses);

// TODO: Handle errors like NOT_COORDINATOR by removing element from self.topics and self.coordinator_broker_id

for (i, response) in responses.iter_mut().enumerate() {
Expand Down

0 comments on commit ab552d7

Please sign in to comment.