Skip to content

Commit

Permalink
Fix port handling in the Kafka Agent (#10516)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj authored Aug 30, 2024
1 parent bf1f12e commit c872720
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,8 @@ public Secret generateCertificatesSecret(ClusterCa clusterCa, ClientsCa clientsC
/* test */ List<ContainerPort> getContainerPortList(KafkaPool pool) {
List<ContainerPort> ports = new ArrayList<>(listeners.size() + 3);

ports.add(ContainerUtils.createContainerPort(KAFKA_AGENT_PORT_NAME, KAFKA_AGENT_PORT));

if (kafkaMetadataConfigState.isZooKeeperToMigration() || pool.isController()) {
// The control plane listener is on all nodes in ZooKeeper based clusters and on nodes with controller role in KRaft
// this excludes all the KRaft broker-only nodes even during the migration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,29 +255,35 @@ public void testPortsOnMigration() {

// controllers
List<ContainerPort> ports = kc.getContainerPortList(KAFKA_POOL_CONTROLLERS);
// control plane port is always set
assertThat(ports.get(0).getContainerPort(), is(9090));

if (state.isZooKeeperToPostMigration()) {
assertThat(ports.size(), is(3));
assertThat(ports.size(), is(4));
// Agent and control plane ports are always set
assertThat(ports.get(0).getContainerPort(), is(8443));
assertThat(ports.get(1).getContainerPort(), is(9090));
// replication and clients only up to post-migration to contact brokers
assertThat(ports.get(1).getContainerPort(), is(9091));
assertThat(ports.get(2).getContainerPort(), is(9092));
assertThat(ports.get(2).getContainerPort(), is(9091));
assertThat(ports.get(3).getContainerPort(), is(9092));
} else {
assertThat(ports.size(), is(1));
assertThat(ports.size(), is(2));
// Agent and control plane ports are always set
assertThat(ports.get(0).getContainerPort(), is(8443));
assertThat(ports.get(1).getContainerPort(), is(9090));
}

// brokers
ports = kc.getContainerPortList(KAFKA_POOL_BROKERS);
if (state.isZooKeeperToMigration()) {
assertThat(ports.size(), is(4));
assertThat(ports.get(0).getContainerPort(), is(8443));
assertThat(ports.get(1).getContainerPort(), is(9090)); // control plane port exposed up to migration when it's still ZooKeeper in the configuration
assertThat(ports.get(2).getContainerPort(), is(9091));
assertThat(ports.get(3).getContainerPort(), is(9092));
} else {
assertThat(ports.size(), is(3));
// control plane port exposed up to migration when it's still ZooKeeper in the configuration
assertThat(ports.get(0).getContainerPort(), is(9090));
assertThat(ports.get(0).getContainerPort(), is(8443));
assertThat(ports.get(1).getContainerPort(), is(9091));
assertThat(ports.get(2).getContainerPort(), is(9092));
} else {
assertThat(ports.size(), is(2));
assertThat(ports.get(0).getContainerPort(), is(9091));
assertThat(ports.get(1).getContainerPort(), is(9092));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,22 +968,25 @@ public void testContainerPorts() {
List<ContainerPort> ports = kc.createContainer(null, pool).getPorts();

if ("controllers".equals(pool.poolName)) {
assertThat(ports.size(), is(2));
assertThat(ports.get(0).getContainerPort(), is(9090));
assertThat(ports.get(1).getContainerPort(), is(9404));
assertThat(ports.size(), is(3));
assertThat(ports.get(0).getContainerPort(), is(8443));
assertThat(ports.get(1).getContainerPort(), is(9090));
assertThat(ports.get(2).getContainerPort(), is(9404));
} else if ("mixed".equals(pool.poolName)) {
assertThat(ports.size(), is(6));
assertThat(ports.get(0).getContainerPort(), is(8443));
assertThat(ports.get(1).getContainerPort(), is(9090));
assertThat(ports.get(2).getContainerPort(), is(9091));
assertThat(ports.get(3).getContainerPort(), is(9093));
assertThat(ports.get(4).getContainerPort(), is(9094));
assertThat(ports.get(5).getContainerPort(), is(9404));
} else {
assertThat(ports.size(), is(5));
assertThat(ports.get(0).getContainerPort(), is(9090));
assertThat(ports.get(0).getContainerPort(), is(8443));
assertThat(ports.get(1).getContainerPort(), is(9091));
assertThat(ports.get(2).getContainerPort(), is(9093));
assertThat(ports.get(3).getContainerPort(), is(9094));
assertThat(ports.get(4).getContainerPort(), is(9404));
} else {
assertThat(ports.size(), is(4));
assertThat(ports.get(0).getContainerPort(), is(9091));
assertThat(ports.get(1).getContainerPort(), is(9093));
assertThat(ports.get(2).getContainerPort(), is(9094));
assertThat(ports.get(3).getContainerPort(), is(9404));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,15 @@ private void startHttpServer() throws Exception {
ServerConnector httpsConn = new ServerConnector(server,
new SslConnectionFactory(getSSLContextFactory(), "http/1.1"),
new HttpConnectionFactory(https));
httpsConn.setHost("0.0.0.0");
httpsConn.setPort(HTTPS_PORT);

ContextHandler brokerStateContext = new ContextHandler(BROKER_STATE_PATH);
brokerStateContext.setHandler(getBrokerStateHandler());

ServerConnector httpConn = new ServerConnector(server);
// The HTTP port should not be exposed outside the Pod, so it listens only on localhost
httpConn.setHost("localhost");
httpConn.setPort(HTTP_PORT);

ContextHandler readinessContext = new ContextHandler(READINESS_ENDPOINT_PATH);
Expand Down

0 comments on commit c872720

Please sign in to comment.