Skip to content

Commit

Permalink
Add new interBrokerTls boolean flag
Browse files Browse the repository at this point in the history
The new flag defaults to 'true' to preserve the existing behavior of
configuring the ControlPlane and Replication listeners with TLS.
When set to 'false', they are created in PLAINTEXT protocol mode.

strimzi#2266
  • Loading branch information
frankgrimes97 committed Jan 12, 2023
1 parent 230647b commit 6de9bcd
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* `ImageStream` validation for Kafka Connect builds on OpenShift
* Support for configuring the metadata for the Role / RoleBinding of Entity Operator
* Add liveness and readiness probes specifically for nodes running in KRaft combined mode
* Add new interBrokerTls boolean flag [#2266](https://github.com/strimzi/strimzi-kafka-operator/issues/2266)

### Changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"version", "replicas", "image", "listeners", "config", "storage", "authorization", "rack", "brokerRackInitImage",
"livenessProbe", "readinessProbe", "jvmOptions", "jmxOptions", "resources", "metricsConfig", "logging", "template"})
"livenessProbe", "readinessProbe", "jvmOptions", "jmxOptions", "resources", "metricsConfig", "logging", "template",
"interBrokerTls"})
@EqualsAndHashCode
public class KafkaClusterSpec implements HasConfigurableMetrics, UnknownPropertyPreserving, Serializable {

Expand Down Expand Up @@ -71,6 +72,7 @@ public class KafkaClusterSpec implements HasConfigurableMetrics, UnknownProperty
private List<GenericKafkaListener> listeners;
private KafkaAuthorization authorization;
private KafkaClusterTemplate template;
private boolean interBrokerTls = true;
private Map<String, Object> additionalProperties = new HashMap<>(0);

@Description("The kafka broker version. Defaults to {DefaultKafkaVersion}. " +
Expand Down Expand Up @@ -251,6 +253,16 @@ public void setTemplate(KafkaClusterTemplate template) {
this.template = template;
}

@Description("Inter-broker communication should be over TLS. Defaults to 'true'." +
"Affects CONTROLPLANE and REPLICATION listener creation.")
public boolean getInterBrokerTls() {
return interBrokerTls;
}

public void setInterBrokerTls(boolean interBrokerTls) {
this.interBrokerTls = interBrokerTls;
}

@Override
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ public KafkaBrokerConfigurationBuilder withListeners(String clusterName, String
() -> KafkaResources.kafkaStatefulSetName(clusterName) + "-" + brokerId,
(listenerId) -> String.format(PLACEHOLDER_ADVERTISED_HOSTNAME, listenerId),
(listenerId) -> String.format(PLACEHOLDER_ADVERTISED_PORT, listenerId),
false);
false,
true);
}

/**
Expand All @@ -265,6 +266,8 @@ public KafkaBrokerConfigurationBuilder withListeners(String clusterName, String
* @param advertisedPortProvider Lambda method which provides the advertised port for given listener and broker.
* This is used to configure the user-configurable listeners.
* @param useKRaft Use KRaft mode in the configuration
* @param interBrokerTls Inter-broker communication should be over TLS
* This affects CONTROLPLANE and REPLICATION listener creation.
*
* @return Returns the builder instance
*/
Expand All @@ -274,12 +277,15 @@ public KafkaBrokerConfigurationBuilder withListeners(
Supplier<String> podNameProvider,
Function<String, String> advertisedHostnameProvider,
Function<String, String> advertisedPortProvider,
boolean useKRaft
boolean useKRaft,
boolean interBrokerTls
) {
List<String> listeners = new ArrayList<>();
List<String> advertisedListeners = new ArrayList<>();
List<String> securityProtocol = new ArrayList<>();

final String interBrokerProtocol = interBrokerTls ? "SSL" : "PLAINTEXT";

// Control Plane listener
listeners.add(CONTROL_PLANE_LISTENER_NAME + "://0.0.0.0:9090");
if (!useKRaft) {
Expand All @@ -290,8 +296,10 @@ public KafkaBrokerConfigurationBuilder withListeners(
podNameProvider.get())
));
}
securityProtocol.add(CONTROL_PLANE_LISTENER_NAME + ":SSL");
configureControlPlaneListener();
securityProtocol.add(CONTROL_PLANE_LISTENER_NAME + ":" + interBrokerProtocol);
if (interBrokerTls) {
configureControlPlaneListener();
}

// Replication listener
listeners.add(REPLICATION_LISTENER_NAME + "://0.0.0.0:9091");
Expand All @@ -301,8 +309,10 @@ public KafkaBrokerConfigurationBuilder withListeners(
DnsNameGenerator.podDnsNameWithoutClusterDomain(namespace, KafkaResources.brokersServiceName(clusterName),
podNameProvider.get())
));
securityProtocol.add(REPLICATION_LISTENER_NAME + ":SSL");
configureReplicationListener();
securityProtocol.add(REPLICATION_LISTENER_NAME + ":" + interBrokerProtocol);
if (interBrokerTls) {
configureReplicationListener();
}

for (GenericKafkaListener listener : kafkaListeners) {
int port = listener.getPort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public class KafkaCluster extends AbstractModel {
private boolean isJmxAuthenticated;
private boolean useKRaft = false;
private String clusterId;
private boolean interBrokerTls;

// Templates
protected Map<String, String> templateExternalBootstrapServiceLabels;
Expand Down Expand Up @@ -529,6 +530,8 @@ public static KafkaCluster fromCrd(Reconciliation reconciliation, Kafka kafkaAss
KafkaSpecChecker specChecker = new KafkaSpecChecker(kafkaSpec, versions, result);
result.warningConditions.addAll(specChecker.run());

result.interBrokerTls = kafkaSpec.getKafka().getInterBrokerTls();

return result;
}

Expand Down Expand Up @@ -2103,7 +2106,8 @@ public String generatePerBrokerBrokerConfiguration(int brokerId, Map<Integer, Ma
() -> getPodName(brokerId),
listenerId -> advertisedHostnames.get(brokerId).get(listenerId),
listenerId -> advertisedPorts.get(brokerId).get(listenerId),
true)
true,
interBrokerTls)
.withAuthorization(cluster, authorization, true)
.withCruiseControl(cluster, cruiseControlSpec, ccNumPartitions, ccReplicationFactor, ccMinInSyncReplicas)
.withUserConfiguration(configuration)
Expand All @@ -2120,7 +2124,8 @@ public String generatePerBrokerBrokerConfiguration(int brokerId, Map<Integer, Ma
() -> getPodName(brokerId),
listenerId -> advertisedHostnames.get(brokerId).get(listenerId),
listenerId -> advertisedPorts.get(brokerId).get(listenerId),
false)
false,
interBrokerTls)
.withAuthorization(cluster, authorization, false)
.withCruiseControl(cluster, cruiseControlSpec, ccNumPartitions, ccReplicationFactor, ccMinInSyncReplicas)
.withUserConfiguration(configuration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,12 @@ public void testPerBrokerWithPlainListenersWithoutAuth() {

String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION)
.withBrokerId("2")
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2", listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092", false)
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2",
listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc",
listenerId -> "9092",
false,
true
)
.build();

assertThat(configuration, isEquivalent("broker.id=2",
Expand Down Expand Up @@ -750,7 +755,12 @@ public void testKraftListeners() {
String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION)
.withBrokerId("2")
.withKRaft("my-cluster", "my-namespace", 3)
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2", listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc", listenerId -> "9092", true)
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2",
listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc",
listenerId -> "9092",
true,
true
)
.build();

assertThat(configuration, isEquivalent("broker.id=2",
Expand Down Expand Up @@ -780,6 +790,39 @@ public void testKraftListeners() {
"ssl.endpoint.identification.algorithm=HTTPS"));
}

@ParallelTest
public void testKraftListenersInterBrokerTlsDisabled() {
GenericKafkaListener listener = new GenericKafkaListenerBuilder()
.withName("plain")
.withPort(9092)
.withType(KafkaListenerType.INTERNAL)
.withTls(false)
.build();

String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION)
.withBrokerId("2")
.withKRaft("my-cluster", "my-namespace", 3)
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2",
listenerId -> "my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc",
listenerId -> "9092",
true,
false
)
.build();

assertThat(configuration, isEquivalent("broker.id=2",
"node.id=2",
"process.roles=broker,controller",
"controller.listener.names=CONTROLPLANE-9090",
"controller.quorum.voters=0@my-cluster-kafka-0.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,1@my-cluster-kafka-1.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090,2@my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc.cluster.local:9090",
"listeners=CONTROLPLANE-9090://0.0.0.0:9090,REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092",
"advertised.listeners=REPLICATION-9091://my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc:9091,PLAIN-9092://my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc:9092",
"listener.security.protocol.map=CONTROLPLANE-9090:PLAINTEXT,REPLICATION-9091:PLAINTEXT,PLAIN-9092:PLAINTEXT",
"inter.broker.listener.name=REPLICATION-9091",
"sasl.enabled.mechanisms=",
"ssl.endpoint.identification.algorithm=HTTPS"));
}

@ParallelTest
public void testWithPlainListenersWithSaslAuth() {
GenericKafkaListener listener = new GenericKafkaListenerBuilder()
Expand Down Expand Up @@ -1173,7 +1216,12 @@ public void testPerBrokerWithExternalListeners() {

String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION)
.withBrokerId("2")
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2", listenerId -> "my-lb.com", listenerId -> "9094", false)
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2",
listenerId -> "my-lb.com",
listenerId -> "9094",
false,
true
)
.build();

assertThat(configuration, isEquivalent("broker.id=2",
Expand Down Expand Up @@ -1204,6 +1252,39 @@ public void testPerBrokerWithExternalListeners() {
"listener.name.external-9094.ssl.keystore.type=PKCS12"));
}

@ParallelTest
public void testPerBrokerWithExternalListenersInterBrokerTlsDisabled() {
GenericKafkaListener listener = new GenericKafkaListenerBuilder()
.withName("external")
.withPort(9094)
.withType(KafkaListenerType.LOADBALANCER)
.withTls(true)
.build();

String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION)
.withBrokerId("2")
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2",
listenerId -> "my-lb.com",
listenerId -> "9094",
false,
false
)
.build();

assertThat(configuration, isEquivalent("broker.id=2",
"node.id=2",
"listeners=CONTROLPLANE-9090://0.0.0.0:9090,REPLICATION-9091://0.0.0.0:9091,EXTERNAL-9094://0.0.0.0:9094",
"advertised.listeners=CONTROLPLANE-9090://my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc:9090,REPLICATION-9091://my-cluster-kafka-2.my-cluster-kafka-brokers.my-namespace.svc:9091,EXTERNAL-9094://my-lb.com:9094",
"listener.security.protocol.map=CONTROLPLANE-9090:PLAINTEXT,REPLICATION-9091:PLAINTEXT,EXTERNAL-9094:SSL",
"control.plane.listener.name=CONTROLPLANE-9090",
"inter.broker.listener.name=REPLICATION-9091",
"sasl.enabled.mechanisms=",
"ssl.endpoint.identification.algorithm=HTTPS",
"listener.name.external-9094.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12",
"listener.name.external-9094.ssl.keystore.password=${CERTS_STORE_PASSWORD}",
"listener.name.external-9094.ssl.keystore.type=PKCS12"));
}

@ParallelTest
public void testWithExternalListenersLoadBalancerWithoutTls() {
GenericKafkaListener listener = new GenericKafkaListenerBuilder()
Expand Down Expand Up @@ -1326,7 +1407,12 @@ public void testPerBrokerWithExternalListenersNodePortWithoutTls() {

String configuration = new KafkaBrokerConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION)
.withBrokerId("2")
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2", listenerId -> "${STRIMZI_NODEPORT_DEFAULT_ADDRESS}", listenerId -> "31234", false)
.withListeners("my-cluster", "my-namespace", singletonList(listener), () -> "my-cluster-kafka-2",
listenerId -> "${STRIMZI_NODEPORT_DEFAULT_ADDRESS}",
listenerId -> "31234",
false,
true
)
.build();

assertThat(configuration, isEquivalent("broker.id=2",
Expand Down
2 changes: 2 additions & 0 deletions documentation/modules/appendix_crds.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ include::../api/io.strimzi.api.kafka.model.KafkaClusterSpec.adoc[leveloffset=+1]
|xref:type-InlineLogging-{context}[`InlineLogging`], xref:type-ExternalLogging-{context}[`ExternalLogging`]
|template 1.2+<.<a|Template for Kafka cluster resources. The template allows users to specify how the `StatefulSet`, `Pods`, and `Services` are generated.
|xref:type-KafkaClusterTemplate-{context}[`KafkaClusterTemplate`]
|interBrokerTls 1.2+<.<a|Inter-broker communication should be over TLS. Defaults to 'true'.Affects CONTROLPLANE and REPLICATION listener creation.
|boolean
|====

[id='type-GenericKafkaListener-{context}']
Expand Down
3 changes: 3 additions & 0 deletions packaging/install/cluster-operator/040-Crd-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,9 @@ spec:
description: Metadata applied to the resource.
description: Template for Kafka `StrimziPodSet` resource.
description: "Template for Kafka cluster resources. The template allows users to specify how the `StatefulSet`, `Pods`, and `Services` are generated."
interBrokerTls:
type: boolean
description: Inter-broker communication should be over TLS. Defaults to 'true'.Affects CONTROLPLANE and REPLICATION listener creation.
required:
- replicas
- listeners
Expand Down

0 comments on commit 6de9bcd

Please sign in to comment.