Skip to content
Merged
11 changes: 8 additions & 3 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -871,12 +871,17 @@ class LogManager(logDirs: Seq[File],
* Update the configuration of the provided topic.
*/
def updateTopicConfig(topic: String,
newTopicConfig: Properties): Unit = {
newTopicConfig: Properties,
isRemoteLogStorageSystemEnabled: Boolean): Unit = {
topicConfigUpdated(topic)
val logs = logsByTopic(topic)
// Combine the default properties with the overrides in zk to create the new LogConfig
val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig)
// We would like to validate the configuration no matter whether the logs have materialised on disk or not.
// Otherwise we risk someone creating a tiered-topic, disabling Tiered Storage cluster-wide and the check
// failing since the logs for the topic are non-existent.
LogConfig.validateRemoteStorageOnlyIfSystemEnabled(newLogConfig.values(), isRemoteLogStorageSystemEnabled, true)
Copy link
Contributor

@kamalcph kamalcph Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we allow the tiered storage functionality disablement since no data is in remote when logs is empty?

Copy link
Contributor Author

@clolov clolov Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm, I would prefer we don't. Otherwise one can end up in a situation where they have topics with remote.storage.enable=true and tiered storage cluster-wide disabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

it's beneficial to have named arguments for readibility. See:

Use named arguments when passing in literal values if the meaning is at all unclear, for example instead of Utils.delete(true) prefer Utils.delete(recursive=true).

from https://kafka.apache.org/coding-guide

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this doesn't work in this situation since the method I am calling is from a Java class

if (logs.nonEmpty) {
// Combine the default properties with the overrides in zk to create the new LogConfig
val newLogConfig = LogConfig.fromProps(currentDefaultConfig.originals, newTopicConfig)
logs.foreach { log =>
val oldLogConfig = log.updateConfig(newLogConfig)
if (oldLogConfig.compact && !newLogConfig.compact) {
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
topicConfig.asScala.forKeyValue { (key, value) =>
if (!configNamesToExclude.contains(key)) props.put(key, value)
}

val logs = logManager.logsByTopic(topic)
val wasRemoteLogEnabledBeforeUpdate = logs.exists(_.remoteLogEnabled())
logManager.updateTopicConfig(topic, props)

logManager.updateTopicConfig(topic, props, kafkaConfig.isRemoteLogStorageSystemEnabled)
maybeBootstrapRemoteLogComponents(topic, logs, wasRemoteLogEnabledBeforeUpdate)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.{ConfigException, ConfigResource, TopicConfig}
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.utils.MockTime
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager,
RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState}
import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteLogSegmentState}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Tag, TestInfo}
Expand Down Expand Up @@ -299,6 +298,43 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
"Remote log segments should be deleted only once by the leader")
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testClusterWideDisablementOfTieredStorageWithEnabledTieredTopic(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")

TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount,
topicConfig = topicConfig)

val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head
instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))

if (isKRaftTest()) {
recreateBrokers(startup = true)
assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException])
// Normally the exception is thrown as part of the TearDown method of the parent class(es). We would like to not do this.
faultHandler.setIgnore(true)
} else {
assertThrows(classOf[ConfigException], () => recreateBrokers(startup = true))
}
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testClusterWithoutTieredStorageStartsSuccessfullyIfTopicWithTieringDisabled(quorum: String): Unit = {
val topicConfig = new Properties()
topicConfig.setProperty(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, false.toString)

TestUtils.createTopicWithAdmin(createAdminClient(), testTopicName, brokers, numPartitions, brokerCount,
topicConfig = topicConfig)

val tsDisabledProps = TestUtils.createBrokerConfigs(1, zkConnectOrNull).head
instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))

recreateBrokers(startup = true)
}

private def assertThrowsException(exceptionType: Class[_ <: Throwable],
executable: Executable,
message: String = ""): Throwable = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ class LogManagerTest {
val newProperties = new Properties()
newProperties.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE)

spyLogManager.updateTopicConfig(topic, newProperties)
spyLogManager.updateTopicConfig(topic, newProperties, false)

assertTrue(log0.config.delete)
assertTrue(log1.config.delete)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,21 +548,26 @@ public static void validateBrokerLogConfigValues(Map<?, ?> props,
* @param props The properties to be validated
*/
private static void validateTopicLogConfigValues(Map<?, ?> props,
boolean isRemoteLogStorageSystemEnabled) {
boolean isRemoteLogStorageSystemEnabled) {
validateValues(props);
boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled) {
validateRemoteStorageOnlyIfSystemEnabled(isRemoteLogStorageSystemEnabled);
validateRemoteStorageOnlyIfSystemEnabled(props, isRemoteLogStorageSystemEnabled, false);
validateNoRemoteStorageForCompactedTopic(props);
validateRemoteStorageRetentionSize(props);
validateRemoteStorageRetentionTime(props);
}
}

private static void validateRemoteStorageOnlyIfSystemEnabled(boolean isRemoteLogStorageSystemEnabled) {
if (!isRemoteLogStorageSystemEnabled) {
throw new ConfigException("Tiered Storage functionality is disabled in the broker. " +
"Topic cannot be configured with remote log storage.");
public static void validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boolean isRemoteLogStorageSystemEnabled, boolean isReceivingConfigFromStore) {
boolean isRemoteLogStorageEnabled = (Boolean) props.get(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG);
if (isRemoteLogStorageEnabled && !isRemoteLogStorageSystemEnabled) {
if (isReceivingConfigFromStore) {
throw new ConfigException("You have to delete all topics with the property remote.storage.enable=true before disabling tiered storage cluster-wide");
} else {
throw new ConfigException("Tiered Storage functionality is disabled in the broker. " +
"Topic cannot be configured with remote log storage.");
}
}
}

Expand Down