diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 78e48010e5240..216063ae6ead1 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -871,12 +871,17 @@ class LogManager(logDirs: Seq[File], * Update the configuration of the provided topic. */ def updateTopicConfig(topic: String, - newTopicConfig: Properties): Unit = { + newTopicConfig: Properties, + isRemoteLogStorageSystemEnabled: Boolean): Unit = { topicConfigUpdated(topic) val logs = logsByTopic(topic) + // Combine the default properties with the overrides in zk to create the new LogConfig + val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig) + // We would like to validate the configuration no matter whether the logs have materialised on disk or not. + // Otherwise we risk someone creating a tiered-topic, disabling Tiered Storage cluster-wide and the check + // failing since the logs for the topic are non-existent. + LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(), isRemoteLogStorageSystemEnabled, true) if (logs.nonEmpty) { - // Combine the default properties with the overrides in zk to create the new LogConfig - val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig) logs.foreach { log => val oldLogConfig = log.updateConfig(newLogConfig) if (oldLogConfig.compact && !newLogConfig.compact) { diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 0e8c55a1d6937..02e57b4009f6a 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -66,9 +66,11 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, topicConfig.asScala.forKeyValue { (key, value) => if (!configNamesToExclude.contains(key)) props.put(key, value) } + val logs = logManager.logsByTopic(topic) val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled()) - logManager.updateTopicConfig(topic, props) + + logManager.updateTopicConfig(topic, props, kafkaConfig.isRemoteLogStorageSystemEnabled) maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate) } diff --git a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala index 23439e120cf74..6d8fbe1bbe79e 100644 --- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala +++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala @@ -21,11 +21,10 @@ import kafka.server.KafkaConfig import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry} import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.common.config.{ConfigResource, TopicConfig} +import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig} import org.apache.kafka.common.errors.{InvalidConfigurationException, UnknownTopicOrPartitionException} import org.apache.kafka.common.utils.MockTime -import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, - RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState} +import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo} @@ -299,6 +298,43 @@ class RemoteTopicCrudTest extends IntegrationTestHarness { "Remote log segments should be deleted only once by the leader") } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(quorum: String): Unit = { + val topicConfig = new Properties() + topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true") + + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, + topicConfig = topicConfig) + + val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head + instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps)) + + if (isKRaftTest()) { + recreateBrokers(startup = true) + assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException]) + // Normally the exception is thrown as part of the TearDown method of the parent class(es). We would like to not do this. + faultHandler.setIgnore(true) + } else { + assertThrows(classOf[ConfigException], () => recreateBrokers(startup = true)) + } + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(quorum: String): Unit = { + val topicConfig = new Properties() + topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString) + + TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount, + topicConfig = topicConfig) + + val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head + instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps)) + + recreateBrokers(startup = true) + } + private def assertThrowsException(exceptionType: Class[_ <: Throwable], executable: Executable, message: String = ""): Throwable = { diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 5cab80b18642e..824ce7ea3278d 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -630,7 +630,7 @@ class LogManagerTest { val newProperties = new Properties() newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE) - spyLogManager.updateTopicConfig(topic, newProperties) + spyLogManager.updateTopicConfig(topic, newProperties, false) assertTrue(log0.config.delete) assertTrue(log1.config.delete) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index e177dfcfcb7d4..d73a37485cb19 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -548,21 +548,26 @@ public static void validateBrokerLogConfigValues(Map props, * @param props The properties to be validated */ private static void validateTopicLogConfigValues(Map props, - boolean isRemoteLogStorageSystemEnabled) { + boolean isRemoteLogStorageSystemEnabled) { validateValues(props); boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); if (isRemoteLogStorageEnabled) { - validateRemoteStorageOnlyIfSystemEnabled(isRemoteLogStorageSystemEnabled); + validateRemoteStorageOnlyIfSystemEnabled(props, isRemoteLogStorageSystemEnabled, false); validateNoRemoteStorageForCompactedTopic(props); validateRemoteStorageRetentionSize(props); validateRemoteStorageRetentionTime(props); } } - private static void validateRemoteStorageOnlyIfSystemEnabled(boolean isRemoteLogStorageSystemEnabled) { - if (!isRemoteLogStorageSystemEnabled) { - throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + - "Topic cannot be configured with remote log storage."); + public static void validateRemoteStorageOnlyIfSystemEnabled(Map props, boolean isRemoteLogStorageSystemEnabled, boolean isReceivingConfigFromStore) { + boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG); + if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) { + if (isReceivingConfigFromStore) { + throw new ConfigException("You have to delete all topics with the property remote.storage.enable=true before disabling tiered storage cluster-wide"); + } else { + throw new ConfigException("Tiered Storage functionality is disabled in the broker. " + + "Topic cannot be configured with remote log storage."); + } } }