Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2716,7 +2716,11 @@ private static Map<String, LogDirDescription> logDirDescriptions(DescribeLogDirs
new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey()));
}
}
result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap));
result.put(logDirResult.logDir(), new LogDirDescription(
Errors.forCode(logDirResult.errorCode()).exception(),
replicaInfoMap,
logDirResult.totalBytes(),
logDirResult.usableBytes()));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,29 @@
import org.apache.kafka.common.errors.ApiException;

import java.util.Map;
import java.util.OptionalLong;

import static java.util.Collections.unmodifiableMap;
import static org.apache.kafka.common.requests.DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES;

/**
* A description of a log directory on a particular broker.
*/
public class LogDirDescription {
private final Map<TopicPartition, ReplicaInfo> replicaInfos;
private final ApiException error;
private final OptionalLong totalBytes;
private final OptionalLong usableBytes;

public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos) {
this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of UNKNOWN_VOLUME_BYTES, this could be null or Optional.Empty()? That way we could get rid of UNKNOWN_VOLUME_BYTES completely.

}

public LogDirDescription(ApiException error, Map<TopicPartition, ReplicaInfo> replicaInfos, long totalBytes, long usableBytes) {
this.error = error;
this.replicaInfos = replicaInfos;
this.totalBytes = (totalBytes == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(totalBytes);
this.usableBytes = (usableBytes == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(usableBytes);
}

/**
Expand All @@ -54,11 +64,29 @@ public Map<TopicPartition, ReplicaInfo> replicaInfos() {
return unmodifiableMap(replicaInfos);
}

/**
* The total size of the volume this log directory is on or empty if the broker did not return a value.
* For volumes larger than Long.MAX_VALUE, Long.MAX_VALUE is returned.
*/
public OptionalLong totalBytes() {
return totalBytes;
}

/**
* The usable size on the volume this log directory is on or empty if the broker did not return a value.
* For usable sizes larger than Long.MAX_VALUE, Long.MAX_VALUE is returned.
*/
public OptionalLong usableBytes() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also do we want to say something about the contraints on usableBytes and totalBytes (individually and compared with each other).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some details, let me know if it's not what you had in mind

return usableBytes;
}

@Override
public String toString() {
return "LogDirDescription(" +
"replicaInfos=" + replicaInfos +
", error=" + error +
", totalBytes=" + totalBytes +
", usableBytes=" + usableBytes +
')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public class DescribeLogDirsResponse extends AbstractResponse {

public static final long INVALID_OFFSET_LAG = -1L;
public static final long UNKNOWN_VOLUME_BYTES = -1L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want this?
Correct me if I am wrong here but I thought that we reached a conclusion in the KIP that Optional will cover the scenario when client is using a newer API and broker is old.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wire protocol does not have nullable integers. So the new fields are defined to use -1 as the default value in the protocol:

{ "name": "TotalBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1",
"about": "The total size in bytes of the volume the log directory is in."
},
{ "name": "UsableBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1",
"about": "The usable size in bytes of the volume the log directory is in."

This constant is used to map the value in DescribeLogDirsResponse to an Optional:

this.totalBytes = (totalBytes == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(totalBytes);
this.usableBytes = (usableBytes == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(usableBytes);


private final DescribeLogDirsResponseData data;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
"listeners": ["zkBroker", "broker"],
"name": "DescribeLogDirsRequest",
// Version 1 is the same as version 0.
"validVersions": "0-3",
"validVersions": "0-4",
// Version 2 is the first flexible version.
// Version 3 is the same as version 2 (new field in response).
// Version 4 is the same as version 2 (new fields in response).
"flexibleVersions": "2+",
"fields": [
{ "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
"type": "response",
"name": "DescribeLogDirsResponse",
// Starting in version 1, on quota violation, brokers send out responses before throttling.
"validVersions": "0-3",
"validVersions": "0-4",
// Version 2 is the first flexible version.
// Version 3 adds the top-level ErrorCode field
// Version 4 adds the TotalBytes and UsableBytes fields
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
Expand All @@ -47,7 +48,13 @@
{ "name": "IsFutureKey", "type": "bool", "versions": "0+",
"about": "True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of the replica in the future." }
]}
]}
]},
{ "name": "TotalBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1",
"about": "The total size in bytes of the volume the log directory is in."
},
{ "name": "UsableBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1",
"about": "The usable size in bytes of the volume the log directory is in."
}
]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,11 @@ private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors err
prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false));
}

private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag, long totalBytes, long usableBytes) {
return prepareDescribeLogDirsResponse(error, logDir,
prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false), totalBytes, usableBytes);
}

private static List<DescribeLogDirsTopic> prepareDescribeLogDirsTopics(
long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) {
return singletonList(new DescribeLogDirsTopic()
Expand All @@ -1610,6 +1615,19 @@ private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors err
)));
}

private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir,
List<DescribeLogDirsTopic> topics,
long totalBytes, long usableBytes) {
return new DescribeLogDirsResponse(
new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult()
.setErrorCode(error.code())
.setLogDir(logDir)
.setTopics(topics)
.setTotalBytes(totalBytes)
.setUsableBytes(usableBytes)
)));
}

private static DescribeLogDirsResponse prepareEmptyDescribeLogDirsResponse(Optional<Errors> error) {
DescribeLogDirsResponseData data = new DescribeLogDirsResponseData();
if (error.isPresent()) data.setErrorCode(error.get().code());
Expand Down Expand Up @@ -1661,6 +1679,11 @@ public void testDescribeLogDirs() throws ExecutionException, InterruptedExceptio

private static void assertDescriptionContains(Map<String, LogDirDescription> descriptionsMap, String logDir,
TopicPartition tp, long partitionSize, long offsetLag) {
assertDescriptionContains(descriptionsMap, logDir, tp, partitionSize, offsetLag, OptionalLong.empty(), OptionalLong.empty());
}

private static void assertDescriptionContains(Map<String, LogDirDescription> descriptionsMap, String logDir,
TopicPartition tp, long partitionSize, long offsetLag, OptionalLong totalBytes, OptionalLong usableBytes) {
assertNotNull(descriptionsMap);
assertEquals(singleton(logDir), descriptionsMap.keySet());
assertNull(descriptionsMap.get(logDir).error());
Expand All @@ -1669,6 +1692,53 @@ private static void assertDescriptionContains(Map<String, LogDirDescription> des
assertEquals(partitionSize, descriptionsReplicaInfos.get(tp).size());
assertEquals(offsetLag, descriptionsReplicaInfos.get(tp).offsetLag());
assertFalse(descriptionsReplicaInfos.get(tp).isFuture());
assertEquals(totalBytes, descriptionsMap.get(logDir).totalBytes());
assertEquals(usableBytes, descriptionsMap.get(logDir).usableBytes());
}

@Test
public void testDescribeLogDirsWithVolumeBytes() throws ExecutionException, InterruptedException {
Set<Integer> brokers = singleton(0);
String logDir = "/var/data/kafka";
TopicPartition tp = new TopicPartition("topic", 12);
long partitionSize = 1234567890;
long offsetLag = 24;
long totalBytes = 123L;
long usableBytes = 456L;

try (AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponseFrom(
prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, partitionSize, offsetLag, totalBytes, usableBytes),
env.cluster().nodeById(0));

DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers);

Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> descriptions = result.descriptions();
assertEquals(brokers, descriptions.keySet());
assertNotNull(descriptions.get(0));
assertDescriptionContains(descriptions.get(0).get(), logDir, tp, partitionSize, offsetLag, OptionalLong.of(totalBytes), OptionalLong.of(usableBytes));

Map<Integer, Map<String, LogDirDescription>> allDescriptions = result.allDescriptions().get();
assertEquals(brokers, allDescriptions.keySet());
assertDescriptionContains(allDescriptions.get(0), logDir, tp, partitionSize, offsetLag, OptionalLong.of(totalBytes), OptionalLong.of(usableBytes));

// Empty results when not authorized with version < 3
env.kafkaClient().prepareResponseFrom(
prepareEmptyDescribeLogDirsResponse(Optional.empty()),
env.cluster().nodeById(0));
final DescribeLogDirsResult errorResult = env.adminClient().describeLogDirs(brokers);
ExecutionException exception = assertThrows(ExecutionException.class, () -> errorResult.allDescriptions().get());
assertTrue(exception.getCause() instanceof ClusterAuthorizationException);

// Empty results with an error with version >= 3
env.kafkaClient().prepareResponseFrom(
prepareEmptyDescribeLogDirsResponse(Optional.of(Errors.UNKNOWN_SERVER_ERROR)),
env.cluster().nodeById(0));
final DescribeLogDirsResult errorResult2 = env.adminClient().describeLogDirs(brokers);
exception = assertThrows(ExecutionException.class, () -> errorResult2.allDescriptions().get());
assertTrue(exception.getCause() instanceof UnknownServerException);
}
}

@SuppressWarnings("deprecation")
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.server.common.MetadataVersion._

import java.nio.file.{Files, Paths}
import scala.jdk.CollectionConverters._
import scala.collection.{Map, Seq, Set, mutable}
import scala.compat.java8.OptionConverters._
Expand Down Expand Up @@ -790,11 +791,15 @@ class ReplicaManager(val config: KafkaConfig,
val logsByDir = logManager.allLogs.groupBy(log => log.parentDir)

config.logDirs.toSet.map { logDir: String =>
val absolutePath = new File(logDir).getAbsolutePath
val file = Paths.get(logDir)
val absolutePath = file.toAbsolutePath.toString
try {
if (!logManager.isLogDirOnline(absolutePath))
throw new KafkaStorageException(s"Log directory $absolutePath is offline")

val fileStore = Files.getFileStore(file)
val totalBytes = adjustForLargeFileSystems(fileStore.getTotalSpace)
val usableBytes = adjustForLargeFileSystems(fileStore.getUsableSpace)
logsByDir.get(absolutePath) match {
case Some(logs) =>
val topicInfos = logs.groupBy(_.topicPartition.topic).map{case (topic, logs) =>
Expand All @@ -812,9 +817,11 @@ class ReplicaManager(val config: KafkaConfig,

new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
.setErrorCode(Errors.NONE.code).setTopics(topicInfos)
.setTotalBytes(totalBytes).setUsableBytes(usableBytes)
case None =>
new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath)
.setErrorCode(Errors.NONE.code)
.setTotalBytes(totalBytes).setUsableBytes(usableBytes)
}

} catch {
Expand All @@ -832,6 +839,13 @@ class ReplicaManager(val config: KafkaConfig,
}.toList
}

// See: https://bugs.openjdk.java.net/browse/JDK-8162520
def adjustForLargeFileSystems(space: Long): Long = {
if (space < 0)
return Long.MaxValue
space
}

def getLogEndOffsetLag(topicPartition: TopicPartition, logEndOffset: Long, isFuture: Boolean): Long = {
localLog(topicPartition) match {
case Some(log) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {

assertEquals(expectedPartitions.toSet, replicaInfos.keys.map(_.partition).toSet)
logDirInfos.forEach { (logDir, logDirInfo) =>
assertTrue(logDirInfo.totalBytes.isPresent)
assertTrue(logDirInfo.usableBytes.isPresent)
logDirInfo.replicaInfos.asScala.keys.foreach(tp =>
assertEquals(server.logManager.getLog(tp).get.dir.getParent, logDir)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
val offlineResult = response.data.results.asScala.find(logDirResult => logDirResult.logDir == offlineDir).get
assertEquals(Errors.KAFKA_STORAGE_ERROR.code, offlineResult.errorCode)
assertEquals(0, offlineResult.topics.asScala.map(t => t.partitions().size()).sum)
assertEquals(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES, offlineResult.totalBytes)
assertEquals(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES, offlineResult.usableBytes)

val onlineResult = response.data.results.asScala.find(logDirResult => logDirResult.logDir == onlineDir).get
assertEquals(Errors.NONE.code, onlineResult.errorCode)
assertTrue(onlineResult.totalBytes > 0)
assertTrue(onlineResult.usableBytes > 0)
val onlinePartitionsMap = onlineResult.topics.asScala.flatMap { topic =>
topic.partitions().asScala.map { partitionResult =>
new TopicPartition(topic.name, partitionResult.partitionIndex) -> partitionResult
Expand Down
33 changes: 31 additions & 2 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile}
import kafka.server.epoch.util.MockBlockingSender
import kafka.utils.timer.MockTimer
import kafka.utils.{MockScheduler, MockTime, TestUtils}
import kafka.utils.{MockScheduler, MockTime, Pool, TestUtils}
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.FetchResponseData
Expand Down Expand Up @@ -65,7 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyInt}
import org.mockito.ArgumentMatchers.{any, anyInt, anyString}
import org.mockito.Mockito.{mock, never, reset, times, verify, when}

import scala.collection.{Map, Seq, mutable}
Expand Down Expand Up @@ -1954,6 +1954,10 @@ class ReplicaManagerTest {
when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.map(new File(_).getAbsoluteFile))
when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any())).thenReturn(mockLog)
when(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).thenReturn(None)
val allLogs = new Pool[TopicPartition, UnifiedLog]()
allLogs.put(topicPartitionObj, mockLog)
when(mockLogMgr.allLogs).thenReturn(allLogs.values)
when(mockLogMgr.isLogDirOnline(anyString)).thenReturn(true)

val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId))
Expand Down Expand Up @@ -4144,6 +4148,31 @@ class ReplicaManagerTest {
replicaManager.shutdown(checkpointHW = false)
}
}

@Test
def testDescribeLogDirs(): Unit = {
val topicPartition = 0
val topicId = Uuid.randomUuid()
val followerBrokerId = 0
val leaderBrokerId = 1
val leaderEpoch = 1
val leaderEpochIncrement = 2
val countDownLatch = new CountDownLatch(1)
val offsetFromLeader = 5

// Prepare the mocked components for the test
val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time),
topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch,
expectTruncation = false, localLogOffset = Some(10), offsetFromLeader = offsetFromLeader, topicId = Some(topicId))

val responses = replicaManager.describeLogDirs(Set(new TopicPartition(topic, topicPartition)))
assertEquals(mockLogMgr.liveLogDirs.size, responses.size)
responses.foreach { response =>
assertEquals(Errors.NONE.code, response.errorCode)
assertTrue(response.totalBytes > 0)
assertTrue(response.usableBytes >= 0)
}
}
}

class MockReplicaSelector extends ReplicaSelector {
Expand Down