Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check and apply configurations for controllers via Admin API #10913

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,148 +52,6 @@ public class KafkaConfiguration extends AbstractConfiguration {
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
}

/**
* List of configuration options that are relevant to controllers and should be considered when deciding whether
* a controller-only node needs to be rolled or not.
*/
private static final Set<String> CONTROLLER_RELEVANT_CONFIGS = Set.of(
"alter.config.policy.class.name",
"authorizer.class.name",
"auto.create.topics.enable",
"background.threads",
"broker.heartbeat.interval.ms",
"broker.rack",
"broker.session.timeout.ms",
"connection.failed.authentication.delay.ms",
"connections.max.idle.ms",
"connections.max.reauth.ms",
"controlled.shutdown.enable",
"controlled.shutdown.max.retries",
"controlled.shutdown.retry.backoff.ms",
"controller.listener.names",
"controller.quorum.append.linger.ms",
"controller.quorum.election.backoff.max.ms",
"controller.quorum.election.timeout.ms",
"controller.quorum.fetch.timeout.ms",
"controller.quorum.request.timeout.ms",
"controller.quorum.retry.backoff.ms",
"controller.quorum.voters",
"controller.quota.window.num",
"controller.quota.window.size.seconds",
"controller.socket.timeout.ms",
"create.topic.policy.class.name",
"default.replication.factor",
"delete.topic.enable",
"early.start.listeners",
"kafka.metrics.polling.interval.secs",
"kafka.metrics.reporters",
"leader.imbalance.check.interval.seconds",
"leader.imbalance.per.broker.percentage",
"listener.name.controlplane-9090.ssl.keystore.location",
"listener.name.controlplane-9090.ssl.keystore.password",
"listener.name.controlplane-9090.ssl.keystore.type",
"listener.name.controlplane-9090.ssl.truststore.location",
"listener.name.controlplane-9090.ssl.truststore.password",
"listener.name.controlplane-9090.ssl.truststore.type",
"listener.name.controlplane-9090.ssl.client.auth",
"listener.security.protocol.map",
"listeners",
"log.dir",
"log.dirs",
"min.insync.replicas",
"max.connection.creation.rate",
"max.connections.per.ip.overrides",
"max.connections.per.ip",
"max.connections",
"metadata.log.dir",
"metadata.log.max.record.bytes.between.snapshots",
"metadata.log.max.snapshot.interval.ms",
"metadata.log.segment.bytes",
"metadata.log.segment.min.bytes",
"metadata.log.segment.ms",
"metadata.max.idle.interval.ms",
"metadata.max.retention.bytes",
"metadata.max.retention.ms",
"metric.reporters",
"metrics.num.samples",
"metrics.recording.level",
"metrics.sample.window.ms",
"node.id",
"num.io.threads",
"num.network.threads",
"num.partitions",
"offsets.topic.replication.factor",
"principal.builder.class",
"process.roles",
"replica.selector.class",
"reserved.broker.max.id",
"sasl.enabled.mechanisms",
"sasl.kerberos.kinit.cmd",
"sasl.kerberos.min.time.before.relogin",
"sasl.kerberos.principal.to.local.rules",
"sasl.kerberos.service.name",
"sasl.kerberos.ticket.renew.jitter",
"sasl.kerberos.ticket.renew.window.factor",
"sasl.login.callback.handler.class",
"sasl.login.class",
"sasl.login.connect.timeout.ms",
"sasl.login.read.timeout.ms",
"sasl.login.refresh.buffer.seconds",
"sasl.login.refresh.min.period.seconds",
"sasl.login.refresh.window.factor",
"sasl.login.refresh.window.jitter",
"sasl.login.retry.backoff.max.ms",
"sasl.login.retry.backoff.ms",
"sasl.mechanism.controller.protocol",
"sasl.oauthbearer.clock.skew.seconds",
"sasl.oauthbearer.expected.audience",
"sasl.oauthbearer.expected.issuer",
"sasl.oauthbearer.jwks.endpoint.refresh.ms",
"sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms",
"sasl.oauthbearer.jwks.endpoint.retry.backoff.ms",
"sasl.oauthbearer.jwks.endpoint.url",
"sasl.oauthbearer.scope.claim.name",
"sasl.oauthbearer.sub.claim.name",
"sasl.oauthbearer.token.endpoint.url",
"sasl.server.callback.handler.class",
"sasl.server.max.receive.size",
"security.providers",
"server.max.startup.time.ms",
"socket.connection.setup.timeout.max.ms",
"socket.connection.setup.timeout.ms",
"socket.listen.backlog.size",
"socket.receive.buffer.bytes",
"socket.request.max.bytes",
"socket.send.buffer.bytes",
"ssl.cipher.suites",
"ssl.client.auth",
"ssl.enabled.protocols",
"ssl.endpoint.identification.algorithm",
"ssl.engine.factory.class",
"ssl.key.password",
"ssl.keymanager.algorithm",
"ssl.keystore.certificate.chain",
"ssl.keystore.key",
"ssl.keystore.location",
"ssl.keystore.password",
"ssl.keystore.type",
"ssl.principal.mapping.rules",
"ssl.protocol",
"ssl.provider",
"ssl.secure.random.implementation",
"ssl.trustmanager.algorithm",
"ssl.truststore.certificates",
"ssl.truststore.location",
"ssl.truststore.password",
"ssl.truststore.type",
"super.users",
"transaction.state.log.min.isr",
"transaction.state.log.replication.factor",
"queued.max.requests",
"queued.max.requests.bytes",
"unclean.leader.election.enable"
);

/**
* Copy constructor which creates new instance of the Kafka Configuration from existing configuration. It is
* useful when you need to modify an instance of the configuration without permanently changing the original.
Expand Down Expand Up @@ -294,24 +152,6 @@ public Set<String> unknownConfigsWithValues(KafkaVersion kafkaVersion) {
return result;
}

/**
* Return the config properties with their values in this KafkaConfiguration which are known to be relevant for the
* Kafka controller nodes.
*
* @return The configuration options relevant for controllers
*/
public Set<String> controllerConfigsWithValues() {
Set<String> result = new HashSet<>();

for (Map.Entry<String, String> e :this.asOrderedProperties().asMap().entrySet()) {
if (CONTROLLER_RELEVANT_CONFIGS.contains(e.getKey())) {
result.add(e.getKey() + "=" + e.getValue());
}
}

return result;
}

/**
* @return True if the configuration is empty. False otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public class KafkaReconciler {
private final boolean continueOnManualRUFailure;

private String logging = "";
private final Map<Integer, String> brokerLoggingHash = new HashMap<>();
private final Map<Integer, String> nodeLoggingHash = new HashMap<>();
private final Map<Integer, String> brokerConfigurationHash = new HashMap<>();
private final Map<Integer, String> kafkaServerCertificateHash = new HashMap<>();
/* test */ TlsPemIdentity coTlsPemIdentity;
Expand Down Expand Up @@ -709,15 +709,7 @@ protected Future<Void> perBrokerKafkaConfiguration(MetricsAndLogging metricsAndL
// We collect the configuration options related to various plugins
nodeConfiguration += kc.unknownConfigsWithValues(kafka.getKafkaVersion()).toString();

// We collect the information relevant to controller-only nodes
if (pool.isController() && !pool.isBroker()) {
// For controllers only, we extract the controller-relevant configurations and use it in the configuration annotations
nodeConfiguration = kc.controllerConfigsWithValues().toString();
// For controllers only, we use the full logging configuration in the logging annotation
this.brokerLoggingHash.put(nodeId, Util.hashStub(logging));
} else {
this.brokerLoggingHash.put(nodeId, Util.hashStub(Util.getLoggingDynamicallyUnmodifiableEntries(logging)));
}
this.nodeLoggingHash.put(nodeId, Util.hashStub(Util.getLoggingDynamicallyUnmodifiableEntries(logging)));

// We store hash of the broker configurations for later use in Pod and in rolling updates
this.brokerConfigurationHash.put(nodeId, Util.hashStub(nodeConfiguration));
Expand Down Expand Up @@ -810,7 +802,7 @@ private Map<String, String> podSetPodAnnotations(NodeRef node) {
podAnnotations.put(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_CERT_GENERATION, String.valueOf(this.clusterCa.caCertGeneration()));
podAnnotations.put(Ca.ANNO_STRIMZI_IO_CLUSTER_CA_KEY_GENERATION, String.valueOf(this.clusterCa.caKeyGeneration()));
podAnnotations.put(Ca.ANNO_STRIMZI_IO_CLIENTS_CA_CERT_GENERATION, String.valueOf(this.clientsCa.caCertGeneration()));
podAnnotations.put(Annotations.ANNO_STRIMZI_LOGGING_HASH, brokerLoggingHash.get(node.nodeId()));
podAnnotations.put(Annotations.ANNO_STRIMZI_LOGGING_HASH, nodeLoggingHash.get(node.nodeId()));
podAnnotations.put(KafkaCluster.ANNO_STRIMZI_BROKER_CONFIGURATION_HASH, brokerConfigurationHash.get(node.nodeId()));
podAnnotations.put(ANNO_STRIMZI_IO_KAFKA_VERSION, kafka.getKafkaVersion().version());

Expand Down
Loading
Loading