KAFKA-14557; Lock metadata log dir#13058
Conversation
| val raftManager = createRaftManager( | ||
| new TopicPartition("__raft_id_test", 0), | ||
| createConfig( | ||
| Set(BrokerRole), |
There was a problem hiding this comment.
We are testing controller error so should this be ControllerRole.
There was a problem hiding this comment.
I wanted to test having different metadata.log.dir and log.dirs with the broker. The matrix in testLogDirLockWhenControllerOnly already tests having different metadata.log.dir and log.dirs for the controller.
| netChannel.close() | ||
| replicatedLog.close() | ||
|
|
||
| dataDirLock.foreach(_.destroy()) |
There was a problem hiding this comment.
In LogManager we do this in a finally block, should we do it here?
There was a problem hiding this comment.
Okay. I'll add CoreUtils.swallow to all of this calls.
|
I should mention that the long term solution would be to extend the LogManager to support the kraft metadata log but there are issues like https://issues.apache.org/jira/browse/KAFKA-14241 that need to be fixed to make that possible. |
5a0630d to
9a36355
Compare
rondagostino
left a comment
There was a problem hiding this comment.
LGTM, left a few comments.
| props.setProperty(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093") | ||
| props.setProperty(KafkaConfig.QuorumVotersProp, s"${nodeId}@localhost:9093") | ||
| } else { // broker-only | ||
| val voterId = (nodeId.toInt + 1) |
There was a problem hiding this comment.
s/(nodeId.toInt + 1)/nodeId + 1/
| def testNodeIdPresentIfColocated(): Unit = { | ||
| val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller,broker", "1") | ||
| assertEquals(1, raftManager.client.nodeId.getAsInt) | ||
| def testLogDirLockWhenMetadataDir(): Unit = { |
There was a problem hiding this comment.
A better name might be testLogDirLockWhenBrokerOnlyWithSeparateMetadataDir
| @ValueSource(strings = Array("metadata", "log", "metadata,log")) | ||
| def testLogDirLockWhenControllerOnly(dirType: String): Unit = { | ||
| val logDir = if (dirType.contains("metadata")) { | ||
| Some(TestUtils.tempDir().toPath) | ||
| } else { | ||
| None | ||
| } | ||
|
|
||
| val metadataDir = if (dirType.contains("log")) { | ||
| Some(TestUtils.tempDir().toPath) | ||
| } else { | ||
| None | ||
| } |
There was a problem hiding this comment.
I think this might be clearer.
@ValueSource(strings = Array("metadata-only", "log-only", "both"))
def testLogDirLockWhenControllerOnly(dirType: String): Unit = {
val logDir = if (!dirType.equals("metadata-only")) {
Some(TestUtils.tempDir().toPath)
} else {
None
}
val metadataDir = if (!dirType.equals("log-only")) {
Some(TestUtils.tempDir().toPath)
} else {
None
}
rondagostino
left a comment
There was a problem hiding this comment.
LGTM, just one minor comment.
| val metadataDir = if (dirType.contains("log")) { | ||
| Some(TestUtils.tempDir().toPath) | ||
| } else { | ||
| val metadataDir = if (dirType.contains("log-only")) { |
This change makes sure that Kafka grabs a log dir lock in the following additional cases: 1. When a Kafka node runs in controller only. The current implementation doesn't grab a file lock because the LogManager is never instantiated. 2. When the metadata log dir is different from the log dir(s). The current implementation of LogManager doesn't load or grab a lock on the metadata dir. Reviewers: Ron Dagostino <rdagostino@confluent.io> , dengziming <dengziming1993@gmail.com>
This change makes sure that Kafka grabs a log dir lock in the following additional cases: 1. When a Kafka node runs in controller only. The current implementation doesn't grab a file lock because the LogManager is never instantiated. 2. When the metadata log dir is different from the log dir(s). The current implementation of LogManager doesn't load or grab a lock on the metadata dir. Reviewers: Ron Dagostino <rdagostino@confluent.io> , dengziming <dengziming1993@gmail.com>
* apache-github/trunk: KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089 KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886) KAFKA-14530: Check state updater more often (apache#13017) KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103) KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301) KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092) KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870) KAFKA-14557; Lock metadata log dir (apache#13058) MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101) KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818) KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032) KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087) KAFKA-14279: Add 3.3.x streams system tests (apache#13077) MINOR: bump streams quickstart pom versions and add to list in gradle.properties (apache#13064) MINOR: Update KRaft cluster upgrade documentation for 3.4 (apache#13063) KAFKA-14493: Introduce Zk to KRaft migration state machine STUBs in KRaft controller. (apache#12998) KAFKA-14570: Fix parenthesis in verifyFullFetchResponsePartitions output (apache#13072) MINOR: Remove public mutable fields from ProducerAppendInfo (apache#13091)
…master * apache-github/trunk: (23 commits) MINOR: Include the inner exception stack trace when re-throwing an exception (apache#12229) MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (apache#13109) Update ProducerConfig.java (apache#13115) KAFKA-14618; Fix off by one error in snapshot id (apache#13108) KAFKA-13709 (follow-up): Avoid mention of 'exactly-once delivery' or 'delivery guarantees' in Connect (apache#13106) KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface (apache#12901) KAFKA-14568: Move FetchDataInfo and related to storage module (apache#13085) KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (apache#13104) KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089 KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886) KAFKA-14530: Check state updater more often (apache#13017) KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103) KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301) KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092) KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870) KAFKA-14557; Lock metadata log dir (apache#13058) MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101) KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818) KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032) KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087) ...
This change makes sure that Kafka grabs a log dir lock in the following additional cases: 1. When a Kafka node runs in controller only. The current implementation doesn't grab a file lock because the LogManager is never instantiated. 2. When the metadata log dir is different from the log dir(s). The current implementation of LogManager doesn't load or grab a lock on the metadata dir. Reviewers: Ron Dagostino <rdagostino@confluent.io> , dengziming <dengziming1993@gmail.com>
This change makes sure that Kafka grabs a log dir lock in the following additional cases:
Committer Checklist (excluded from commit message)