diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index cc913bfda0e75..613b0c3a9f385 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2716,7 +2716,11 @@ private static Map logDirDescriptions(DescribeLogDirs new ReplicaInfo(p.partitionSize(), p.offsetLag(), p.isFutureKey())); } } - result.put(logDirResult.logDir(), new LogDirDescription(Errors.forCode(logDirResult.errorCode()).exception(), replicaInfoMap)); + result.put(logDirResult.logDir(), new LogDirDescription( + Errors.forCode(logDirResult.errorCode()).exception(), + replicaInfoMap, + logDirResult.totalBytes(), + logDirResult.usableBytes())); } return result; } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java index 1c326ec43b926..665c86649ba37 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java @@ -20,8 +20,10 @@ import org.apache.kafka.common.errors.ApiException; import java.util.Map; +import java.util.OptionalLong; import static java.util.Collections.unmodifiableMap; +import static org.apache.kafka.common.requests.DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES; /** * A description of a log directory on a particular broker. @@ -29,10 +31,18 @@ public class LogDirDescription { private final Map replicaInfos; private final ApiException error; + private final OptionalLong totalBytes; + private final OptionalLong usableBytes; public LogDirDescription(ApiException error, Map replicaInfos) { + this(error, replicaInfos, UNKNOWN_VOLUME_BYTES, UNKNOWN_VOLUME_BYTES); + } + + public LogDirDescription(ApiException error, Map replicaInfos, long totalBytes, long usableBytes) { this.error = error; this.replicaInfos = replicaInfos; + this.totalBytes = (totalBytes == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(totalBytes); + this.usableBytes = (usableBytes == UNKNOWN_VOLUME_BYTES) ? OptionalLong.empty() : OptionalLong.of(usableBytes); } /** @@ -54,11 +64,29 @@ public Map replicaInfos() { return unmodifiableMap(replicaInfos); } + /** + * The total size of the volume this log directory is on or empty if the broker did not return a value. + * For volumes larger than Long.MAX_VALUE, Long.MAX_VALUE is returned. + */ + public OptionalLong totalBytes() { + return totalBytes; + } + + /** + * The usable size on the volume this log directory is on or empty if the broker did not return a value. + * For usable sizes larger than Long.MAX_VALUE, Long.MAX_VALUE is returned. + */ + public OptionalLong usableBytes() { + return usableBytes; + } + @Override public String toString() { return "LogDirDescription(" + "replicaInfos=" + replicaInfos + ", error=" + error + + ", totalBytes=" + totalBytes + + ", usableBytes=" + usableBytes + ')'; } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java index 537d188ec120e..fe8aebbc4f6b8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeLogDirsResponse.java @@ -31,6 +31,7 @@ public class DescribeLogDirsResponse extends AbstractResponse { public static final long INVALID_OFFSET_LAG = -1L; + public static final long UNKNOWN_VOLUME_BYTES = -1L; private final DescribeLogDirsResponseData data; diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json index a133b6c68e392..41cc9e2289571 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json @@ -19,9 +19,10 @@ "listeners": ["zkBroker", "broker"], "name": "DescribeLogDirsRequest", // Version 1 is the same as version 0. - "validVersions": "0-3", + "validVersions": "0-4", // Version 2 is the first flexible version. // Version 3 is the same as version 2 (new field in response). + // Version 4 is the same as version 2 (new fields in response). "flexibleVersions": "2+", "fields": [ { "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+", diff --git a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json index c79e756aada1d..fec69d17a030c 100644 --- a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json +++ b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json @@ -18,9 +18,10 @@ "type": "response", "name": "DescribeLogDirsResponse", // Starting in version 1, on quota violation, brokers send out responses before throttling. - "validVersions": "0-3", + "validVersions": "0-4", // Version 2 is the first flexible version. // Version 3 adds the top-level ErrorCode field + // Version 4 adds the TotalBytes and UsableBytes fields "flexibleVersions": "2+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", @@ -47,7 +48,13 @@ { "name": "IsFutureKey", "type": "bool", "versions": "0+", "about": "True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of the replica in the future." } ]} - ]} + ]}, + { "name": "TotalBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1", + "about": "The total size in bytes of the volume the log directory is in." + }, + { "name": "UsableBytes", "type": "int64", "versions": "4+", "ignorable": true, "default": "-1", + "about": "The usable size in bytes of the volume the log directory is in." + } ]} ] } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 637c3d15cb927..6eb936f5b457d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -1589,6 +1589,11 @@ private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors err prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false)); } + private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, TopicPartition tp, long partitionSize, long offsetLag, long totalBytes, long usableBytes) { + return prepareDescribeLogDirsResponse(error, logDir, + prepareDescribeLogDirsTopics(partitionSize, offsetLag, tp.topic(), tp.partition(), false), totalBytes, usableBytes); + } + private static List prepareDescribeLogDirsTopics( long partitionSize, long offsetLag, String topic, int partition, boolean isFuture) { return singletonList(new DescribeLogDirsTopic() @@ -1610,6 +1615,19 @@ private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors err ))); } + private static DescribeLogDirsResponse prepareDescribeLogDirsResponse(Errors error, String logDir, + List topics, + long totalBytes, long usableBytes) { + return new DescribeLogDirsResponse( + new DescribeLogDirsResponseData().setResults(singletonList(new DescribeLogDirsResponseData.DescribeLogDirsResult() + .setErrorCode(error.code()) + .setLogDir(logDir) + .setTopics(topics) + .setTotalBytes(totalBytes) + .setUsableBytes(usableBytes) + ))); + } + private static DescribeLogDirsResponse prepareEmptyDescribeLogDirsResponse(Optional error) { DescribeLogDirsResponseData data = new DescribeLogDirsResponseData(); if (error.isPresent()) data.setErrorCode(error.get().code()); @@ -1661,6 +1679,11 @@ public void testDescribeLogDirs() throws ExecutionException, InterruptedExceptio private static void assertDescriptionContains(Map descriptionsMap, String logDir, TopicPartition tp, long partitionSize, long offsetLag) { + assertDescriptionContains(descriptionsMap, logDir, tp, partitionSize, offsetLag, OptionalLong.empty(), OptionalLong.empty()); + } + + private static void assertDescriptionContains(Map descriptionsMap, String logDir, + TopicPartition tp, long partitionSize, long offsetLag, OptionalLong totalBytes, OptionalLong usableBytes) { assertNotNull(descriptionsMap); assertEquals(singleton(logDir), descriptionsMap.keySet()); assertNull(descriptionsMap.get(logDir).error()); @@ -1669,6 +1692,53 @@ private static void assertDescriptionContains(Map des assertEquals(partitionSize, descriptionsReplicaInfos.get(tp).size()); assertEquals(offsetLag, descriptionsReplicaInfos.get(tp).offsetLag()); assertFalse(descriptionsReplicaInfos.get(tp).isFuture()); + assertEquals(totalBytes, descriptionsMap.get(logDir).totalBytes()); + assertEquals(usableBytes, descriptionsMap.get(logDir).usableBytes()); + } + + @Test + public void testDescribeLogDirsWithVolumeBytes() throws ExecutionException, InterruptedException { + Set brokers = singleton(0); + String logDir = "/var/data/kafka"; + TopicPartition tp = new TopicPartition("topic", 12); + long partitionSize = 1234567890; + long offsetLag = 24; + long totalBytes = 123L; + long usableBytes = 456L; + + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + env.kafkaClient().prepareResponseFrom( + prepareDescribeLogDirsResponse(Errors.NONE, logDir, tp, partitionSize, offsetLag, totalBytes, usableBytes), + env.cluster().nodeById(0)); + + DescribeLogDirsResult result = env.adminClient().describeLogDirs(brokers); + + Map>> descriptions = result.descriptions(); + assertEquals(brokers, descriptions.keySet()); + assertNotNull(descriptions.get(0)); + assertDescriptionContains(descriptions.get(0).get(), logDir, tp, partitionSize, offsetLag, OptionalLong.of(totalBytes), OptionalLong.of(usableBytes)); + + Map> allDescriptions = result.allDescriptions().get(); + assertEquals(brokers, allDescriptions.keySet()); + assertDescriptionContains(allDescriptions.get(0), logDir, tp, partitionSize, offsetLag, OptionalLong.of(totalBytes), OptionalLong.of(usableBytes)); + + // Empty results when not authorized with version < 3 + env.kafkaClient().prepareResponseFrom( + prepareEmptyDescribeLogDirsResponse(Optional.empty()), + env.cluster().nodeById(0)); + final DescribeLogDirsResult errorResult = env.adminClient().describeLogDirs(brokers); + ExecutionException exception = assertThrows(ExecutionException.class, () -> errorResult.allDescriptions().get()); + assertTrue(exception.getCause() instanceof ClusterAuthorizationException); + + // Empty results with an error with version >= 3 + env.kafkaClient().prepareResponseFrom( + prepareEmptyDescribeLogDirsResponse(Optional.of(Errors.UNKNOWN_SERVER_ERROR)), + env.cluster().nodeById(0)); + final DescribeLogDirsResult errorResult2 = env.adminClient().describeLogDirs(brokers); + exception = assertThrows(ExecutionException.class, () -> errorResult2.allDescriptions().get()); + assertTrue(exception.getCause() instanceof UnknownServerException); + } } @SuppressWarnings("deprecation") diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 03983ad98d089..dee3a20aba79d 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -62,6 +62,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.server.common.MetadataVersion._ +import java.nio.file.{Files, Paths} import scala.jdk.CollectionConverters._ import scala.collection.{Map, Seq, Set, mutable} import scala.compat.java8.OptionConverters._ @@ -790,11 +791,15 @@ class ReplicaManager(val config: KafkaConfig, val logsByDir = logManager.allLogs.groupBy(log => log.parentDir) config.logDirs.toSet.map { logDir: String => - val absolutePath = new File(logDir).getAbsolutePath + val file = Paths.get(logDir) + val absolutePath = file.toAbsolutePath.toString try { if (!logManager.isLogDirOnline(absolutePath)) throw new KafkaStorageException(s"Log directory $absolutePath is offline") + val fileStore = Files.getFileStore(file) + val totalBytes = adjustForLargeFileSystems(fileStore.getTotalSpace) + val usableBytes = adjustForLargeFileSystems(fileStore.getUsableSpace) logsByDir.get(absolutePath) match { case Some(logs) => val topicInfos = logs.groupBy(_.topicPartition.topic).map{case (topic, logs) => @@ -812,9 +817,11 @@ class ReplicaManager(val config: KafkaConfig, new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) .setErrorCode(Errors.NONE.code).setTopics(topicInfos) + .setTotalBytes(totalBytes).setUsableBytes(usableBytes) case None => new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) .setErrorCode(Errors.NONE.code) + .setTotalBytes(totalBytes).setUsableBytes(usableBytes) } } catch { @@ -832,6 +839,13 @@ class ReplicaManager(val config: KafkaConfig, }.toList } + // See: https://bugs.openjdk.java.net/browse/JDK-8162520 + def adjustForLargeFileSystems(space: Long): Long = { + if (space < 0) + return Long.MaxValue + space + } + def getLogEndOffsetLag(topicPartition: TopicPartition, logEndOffset: Long, isFuture: Boolean): Long = { localLog(topicPartition) match { case Some(log) => diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index d563bcb0c82c3..203c04a68a7a9 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -265,6 +265,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(expectedPartitions.toSet, replicaInfos.keys.map(_.partition).toSet) logDirInfos.forEach { (logDir, logDirInfo) => + assertTrue(logDirInfo.totalBytes.isPresent) + assertTrue(logDirInfo.usableBytes.isPresent) logDirInfo.replicaInfos.asScala.keys.foreach(tp => assertEquals(server.logManager.getLog(tp).get.dir.getParent, logDir) ) diff --git a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala index 9ab3f86d77090..0ad110dcdbfa8 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala @@ -53,9 +53,13 @@ class DescribeLogDirsRequestTest extends BaseRequestTest { val offlineResult = response.data.results.asScala.find(logDirResult => logDirResult.logDir == offlineDir).get assertEquals(Errors.KAFKA_STORAGE_ERROR.code, offlineResult.errorCode) assertEquals(0, offlineResult.topics.asScala.map(t => t.partitions().size()).sum) + assertEquals(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES, offlineResult.totalBytes) + assertEquals(DescribeLogDirsResponse.UNKNOWN_VOLUME_BYTES, offlineResult.usableBytes) val onlineResult = response.data.results.asScala.find(logDirResult => logDirResult.logDir == onlineDir).get assertEquals(Errors.NONE.code, onlineResult.errorCode) + assertTrue(onlineResult.totalBytes > 0) + assertTrue(onlineResult.usableBytes > 0) val onlinePartitionsMap = onlineResult.topics.asScala.flatMap { topic => topic.partitions().asScala.map { partitionResult => new TopicPartition(topic.name, partitionResult.partitionIndex) -> partitionResult diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index e5eef65d0aa0e..a48f43e860f0b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -32,7 +32,7 @@ import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile} import kafka.server.epoch.util.MockBlockingSender import kafka.utils.timer.MockTimer -import kafka.utils.{MockScheduler, MockTime, TestUtils} +import kafka.utils.{MockScheduler, MockTime, Pool, TestUtils} import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.message.FetchResponseData @@ -65,7 +65,7 @@ import org.junit.jupiter.params.provider.ValueSource import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.mockito.ArgumentMatchers -import org.mockito.ArgumentMatchers.{any, anyInt} +import org.mockito.ArgumentMatchers.{any, anyInt, anyString} import org.mockito.Mockito.{mock, never, reset, times, verify, when} import scala.collection.{Map, Seq, mutable} @@ -1954,6 +1954,10 @@ class ReplicaManagerTest { when(mockLogMgr.liveLogDirs).thenReturn(config.logDirs.map(new File(_).getAbsoluteFile)) when(mockLogMgr.getOrCreateLog(ArgumentMatchers.eq(topicPartitionObj), ArgumentMatchers.eq(false), ArgumentMatchers.eq(false), any())).thenReturn(mockLog) when(mockLogMgr.getLog(topicPartitionObj, isFuture = true)).thenReturn(None) + val allLogs = new Pool[TopicPartition, UnifiedLog]() + allLogs.put(topicPartitionObj, mockLog) + when(mockLogMgr.allLogs).thenReturn(allLogs.values) + when(mockLogMgr.isLogDirOnline(anyString)).thenReturn(true) val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId) val aliveBrokers = aliveBrokerIds.map(brokerId => new Node(brokerId, s"host$brokerId", brokerId)) @@ -4144,6 +4148,31 @@ class ReplicaManagerTest { replicaManager.shutdown(checkpointHW = false) } } + + @Test + def testDescribeLogDirs(): Unit = { + val topicPartition = 0 + val topicId = Uuid.randomUuid() + val followerBrokerId = 0 + val leaderBrokerId = 1 + val leaderEpoch = 1 + val leaderEpochIncrement = 2 + val countDownLatch = new CountDownLatch(1) + val offsetFromLeader = 5 + + // Prepare the mocked components for the test + val (replicaManager, mockLogMgr) = prepareReplicaManagerAndLogManager(new MockTimer(time), + topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId, leaderBrokerId, countDownLatch, + expectTruncation = false, localLogOffset = Some(10), offsetFromLeader = offsetFromLeader, topicId = Some(topicId)) + + val responses = replicaManager.describeLogDirs(Set(new TopicPartition(topic, topicPartition))) + assertEquals(mockLogMgr.liveLogDirs.size, responses.size) + responses.foreach { response => + assertEquals(Errors.NONE.code, response.errorCode) + assertTrue(response.totalBytes > 0) + assertTrue(response.usableBytes >= 0) + } + } } class MockReplicaSelector extends ReplicaSelector {