Skip to content

Commit

Permalink
Split ShotoverNode::address into address_for_peers and address_for_cl…
Browse files Browse the repository at this point in the history
…ient
  • Loading branch information
rukai committed Oct 20, 2024
1 parent 26550b8 commit 59804a5
Show file tree
Hide file tree
Showing 20 changed files with 99 additions and 53 deletions.
10 changes: 7 additions & 3 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,14 @@ If SCRAM authentication against the first kafka broker fails, shotover will term
# A list of every Shotover node that will be proxying to the same kafka cluster.
# This field should be identical for all Shotover nodes proxying to the same kafka cluster.
shotover_nodes:
# Address of the Shotover node.
# Address of the Shotover node that is reported to the kafka clients.
# This is usually the same address as the Shotover source that is connected to this sink.
# But it may be different if you want Shotover to report a different address.
- address: "127.0.0.1:9092"
# But it may be different if you want Shotover to report a different address to its clients.
- address_for_client: "127.0.0.1:9092"
# Address of the shotover node as used to check for peers that are up.
# This is usually the same address as the Shotover source that is connected to this sink.
# But it may be different if you want Shotover to connect to its peers via a different address.
address_for_peers: "127.0.0.1:9092"
# The rack the Shotover node will report as and route messages to.
# For performance reasons, the Shotover node should be physically located in this rack.
rack: "rack0"
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl KafkaBench {
check_shotover_peers_delay_ms: Some(3000),
first_contact_points: vec![kafka_address],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
address_for_clients: host_address.parse().unwrap(),
rack: "rack1".into(),
broker_id: 0,
}],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "localhost:9191"
- address_for_peers: "localhost:9191"
address_for_client: "localhost:9191"
rack: "rack1"
broker_id: 0
- address: "localhost:9192"
- address_for_peers: "localhost:9192"
address_for_client: "localhost:9192"
rack: "rack2"
broker_id: 1
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "localhost:9191"
- address_for_peers: "localhost:9191"
address_for_client: "localhost:9191"
rack: "rack1"
broker_id: 0
- address: "localhost:9192"
- address_for_peers: "localhost:9192"
address_for_client: "localhost:9192"
rack: "rack2"
broker_id: 1
local_shotover_broker_id: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
14 changes: 7 additions & 7 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl TransformConfig for KafkaSinkClusterConfig {
if !unique_broker_ids.insert(node.broker_id) {
return Err(anyhow::anyhow!(
"Duplicate broker_id found in shotover node {}",
node.address
node.address_for_clients
));
}
}
Expand Down Expand Up @@ -2644,8 +2644,8 @@ impl KafkaSinkCluster {
partition_shotover_nodes_by_rack(&up_shotover_nodes, coordinator_rack);
let shotover_node = select_shotover_node_by_hash(shotover_nodes_by_rack, hash);

find_coordinator.host = shotover_node.address.host.clone();
find_coordinator.port = shotover_node.address.port;
find_coordinator.host = shotover_node.address_for_clients.host.clone();
find_coordinator.port = shotover_node.address_for_clients.port;
find_coordinator.node_id = shotover_node.broker_id;
}
} else {
Expand Down Expand Up @@ -2674,8 +2674,8 @@ impl KafkaSinkCluster {
partition_shotover_nodes_by_rack(&up_shotover_nodes, coordinator_rack);
let shotover_node = select_shotover_node_by_hash(shotover_nodes_by_rack, hash);

coordinator.host = shotover_node.address.host.clone();
coordinator.port = shotover_node.address.port;
coordinator.host = shotover_node.address_for_clients.host.clone();
coordinator.port = shotover_node.address_for_clients.port;
coordinator.node_id = shotover_node.broker_id;
}
}
Expand Down Expand Up @@ -2705,8 +2705,8 @@ impl KafkaSinkCluster {
.map(|shotover_node| {
MetadataResponseBroker::default()
.with_node_id(shotover_node.broker_id)
.with_host(shotover_node.address.host.clone())
.with_port(shotover_node.address.port)
.with_host(shotover_node.address_for_clients.host.clone())
.with_port(shotover_node.address_for_clients.port)
.with_rack(Some(shotover_node.rack.clone()))
})
.collect();
Expand Down
18 changes: 12 additions & 6 deletions shotover/src/transforms/kafka/sink_cluster/shotover_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ use tokio::time::sleep;
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct ShotoverNodeConfig {
pub address: String,
pub address_for_clients: String,
pub address_for_peers: String,
pub rack: String,
pub broker_id: i32,
}

impl ShotoverNodeConfig {
pub(crate) fn build(self) -> anyhow::Result<ShotoverNode> {
Ok(ShotoverNode {
address: KafkaAddress::from_str(&self.address)?,
address_for_clients: KafkaAddress::from_str(&self.address_for_clients)?,
address_for_peers: KafkaAddress::from_str(&self.address_for_peers)?,
rack: StrBytes::from_string(self.rack),
broker_id: BrokerId(self.broker_id),
state: Arc::new(AtomicShotoverNodeState::new(ShotoverNodeState::Up)),
Expand All @@ -32,7 +34,8 @@ impl ShotoverNodeConfig {

#[derive(Clone)]
pub(crate) struct ShotoverNode {
pub address: KafkaAddress,
pub address_for_clients: KafkaAddress,
pub address_for_peers: KafkaAddress,
pub rack: StrBytes,
pub broker_id: BrokerId,
#[allow(unused)]
Expand Down Expand Up @@ -99,8 +102,8 @@ async fn check_shotover_peers(
let tcp_stream = tcp_stream(
connect_timeout,
(
shotover_peer.address.host.as_str(),
shotover_peer.address.port as u16,
shotover_peer.address_for_peers.host.as_str(),
shotover_peer.address_for_peers.port as u16,
),
)
.await;
Expand All @@ -109,7 +112,10 @@ async fn check_shotover_peers(
shotover_peer.set_state(ShotoverNodeState::Up);
}
Err(_) => {
tracing::warn!("Shotover peer {} is down", shotover_peer.address);
tracing::warn!(
"Shotover peer {} is down",
shotover_peer.address_for_clients
);
shotover_peer.set_state(ShotoverNodeState::Down);
}
}
Expand Down

0 comments on commit 59804a5

Please sign in to comment.