Skip to content

Commit

Permalink
Merge f13653e into 960afca
Browse files Browse the repository at this point in the history
  • Loading branch information
niksaveliev authored Mar 4, 2024
2 parents 960afca + f13653e commit 3a46186
Show file tree
Hide file tree
Showing 23 changed files with 198 additions and 105 deletions.
5 changes: 5 additions & 0 deletions ydb/core/kafka_proxy/actors/actors.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@

namespace NKafka {

static constexpr int ProxyNodeId = 1;
static constexpr char UnderlayPrefix[] = "u-";

static_assert(sizeof(UnderlayPrefix) == 3);

enum EAuthSteps {
WAIT_HANDSHAKE,
WAIT_AUTH,
Expand Down
28 changes: 14 additions & 14 deletions ydb/core/kafka_proxy/actors/kafka_api_versions_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,21 @@ TApiVersionsResponseData::TPtr GetApiVersions() {
TApiVersionsResponseData::TPtr response = std::make_shared<TApiVersionsResponseData>();
response->ErrorCode = EKafkaErrors::NONE_ERROR;

AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3});
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS);
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA);
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID);
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE);
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE);
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS);
AddApiKey<TProduceRequestData>(response->ApiKeys, PRODUCE, {.MinVersion=3, .MaxVersion=9});
AddApiKey<TApiVersionsRequestData>(response->ApiKeys, API_VERSIONS, {.MaxVersion=2});
AddApiKey<TMetadataRequestData>(response->ApiKeys, METADATA, {.MaxVersion=9});
AddApiKey<TInitProducerIdRequestData>(response->ApiKeys, INIT_PRODUCER_ID, {.MaxVersion=4});
AddApiKey<TSaslHandshakeRequestData>(response->ApiKeys, SASL_HANDSHAKE, {.MaxVersion=1});
AddApiKey<TSaslAuthenticateRequestData>(response->ApiKeys, SASL_AUTHENTICATE, {.MaxVersion=2});
AddApiKey<TListOffsetsRequestData>(response->ApiKeys, LIST_OFFSETS, {.MinVersion=1, .MaxVersion=1});
AddApiKey<TFetchRequestData>(response->ApiKeys, FETCH, {.MaxVersion=3});
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP);
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP);
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP);
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT);
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR);
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=1});
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH);
AddApiKey<TJoinGroupRequestData>(response->ApiKeys, JOIN_GROUP, {.MaxVersion=9});
AddApiKey<TSyncGroupRequestData>(response->ApiKeys, SYNC_GROUP, {.MaxVersion=3});
AddApiKey<TLeaveGroupRequestData>(response->ApiKeys, LEAVE_GROUP, {.MaxVersion=5});
AddApiKey<THeartbeatRequestData>(response->ApiKeys, HEARTBEAT, {.MaxVersion=4});
AddApiKey<TFindCoordinatorRequestData>(response->ApiKeys, FIND_COORDINATOR, {.MaxVersion=0});
AddApiKey<TOffsetCommitRequestData>(response->ApiKeys, OFFSET_COMMIT, {.MaxVersion=0});
AddApiKey<TOffsetFetchRequestData>(response->ApiKeys, OFFSET_FETCH, {.MaxVersion=8});

return response;
}
Expand Down
17 changes: 15 additions & 2 deletions ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) {

bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();
if (withProxy) {
SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), -1, ctx);
SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), NKafka::ProxyNodeId, ctx);
return;
}

Expand All @@ -49,6 +49,13 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
response->Coordinators.push_back(coordinator);
}

response->ErrorCode = NONE_ERROR;
response->Host = host;
response->Port = port;
response->NodeId = nodeId;

KAFKA_LOG_D("FIND_COORDINATOR response. Host#: " << host << ", Port#: " << port << ", NodeId# " << nodeId);

Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
Expand All @@ -66,15 +73,21 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons

response->Coordinators.push_back(coordinator);
}


response->ErrorCode = error;

Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}

void TKafkaFindCoordinatorActor::Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
Y_ABORT_UNLESS(!iter.IsEnd());

auto host = (*ev->Get()->Nodes)[iter->second].Host;
if (host.StartsWith(UnderlayPrefix)) {
host = host.substr(sizeof(UnderlayPrefix) - 1);
}
KAFKA_LOG_D("FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host);
SendResponseOkAndDie(host, Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx);
}
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
namespace NKafka {
using namespace NKikimr::NGRpcProxy::V1;

static constexpr int ProxyNodeId = 1;
static constexpr char UnderlayPrefix[] = "u-";

static_assert(sizeof(UnderlayPrefix) == 3);

NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
const ui64 correlationId,
const TMessagePtr<TMetadataRequestData>& message) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kafka_proxy/actors/kafka_offset_fetch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ TOffsetFetchResponseData::TPtr TKafkaOffsetFetchActor::GetOffsetFetchResponse()
partition.CommittedOffset = sourcePartition.CommittedOffset;
partition.PartitionIndex = sourcePartition.PartitionIndex;
partition.ErrorCode = sourcePartition.ErrorCode;
topic.Partitions.push_back(partition);
}
response->Topics.push_back(topic);
}
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kafka_proxy/actors/kafka_read_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void TKafkaReadSessionActor::HandleWakeup(TEvKafka::TEvWakeup::TPtr, const TActo
for (auto& topicToPartitions: NewPartitionsToLockOnTime) {
auto& partitions = topicToPartitions.second;
for (auto partitionsIt = partitions.begin(); partitionsIt != partitions.end(); ) {
if (partitionsIt->LockOn >= ctx.Now()) {
if (partitionsIt->LockOn <= ctx.Now()) {
TopicPartitions[topicToPartitions.first].ToLock.emplace(partitionsIt->PartitionId);
NeedRebalance = true;
partitionsIt = partitions.erase(partitionsIt);
Expand Down Expand Up @@ -86,8 +86,8 @@ void TKafkaReadSessionActor::HandleJoinGroup(TEvKafka::TEvJoinGroupRequest::TPtr

switch (ReadStep) {
case WAIT_JOIN_GROUP: { // join first time
if (joinGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType);
if (joinGroupRequest->ProtocolType.has_value() && !joinGroupRequest->ProtocolType.value().empty() && joinGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << joinGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
Expand Down Expand Up @@ -156,8 +156,8 @@ void TKafkaReadSessionActor::HandleSyncGroup(TEvKafka::TEvSyncGroupRequest::TPtr
return;
}

if (syncGroupRequest->ProtocolType != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendJoinGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType);
if (syncGroupRequest->ProtocolType.has_value() && !syncGroupRequest->ProtocolType.value().empty() && syncGroupRequest->ProtocolType.value() != SUPPORTED_JOIN_GROUP_PROTOCOL) {
SendSyncGroupResponseFail(ctx, ev->Get()->CorrelationId, INVALID_REQUEST, TStringBuilder() << "unknown protocolType# " << syncGroupRequest->ProtocolType.value());
CloseReadSession(ctx);
return;
}
Expand Down Expand Up @@ -361,9 +361,9 @@ TConsumerProtocolAssignment TKafkaReadSessionActor::BuildAssignmentAndInformBala
for (auto part: finalPartitionsToRead) {
KAFKA_LOG_D("SYNC_GROUP assigned partition number: " << part);
topicPartition.Partitions.push_back(part);
assignment.AssignedPartitions.push_back(topicPartition);
partitions.ReadingNow.emplace(part);
}
assignment.AssignedPartitions.push_back(topicPartition);
}

return assignment;
Expand Down
9 changes: 8 additions & 1 deletion ydb/core/kafka_proxy/kafka_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,14 @@ class TKafkaConnection: public TActorBootstrapped<TKafkaConnection>, public TNet
KAFKA_LOG_D("process message: ApiKey=" << Request->Header.RequestApiKey << ", ExpectedSize=" << Request->ExpectedSize
<< ", Size=" << Request->Size);

Request->Method = EApiKeyNames.find(static_cast<EApiKey>(Request->Header.RequestApiKey))->second;
auto apiKeyNameIt = EApiKeyNames.find(static_cast<EApiKey>(Request->Header.RequestApiKey));
if (apiKeyNameIt == EApiKeyNames.end()) {
KAFKA_LOG_ERROR("Unsupported message: ApiKey=" << Request->Header.RequestApiKey);
PassAway();
return false;
}

Request->Method = apiKeyNameIt->second;

PendingRequestsQueue.push_back(Request);
PendingRequests[Request->Header.CorrelationId] = Request;
Expand Down
10 changes: 9 additions & 1 deletion ydb/core/kafka_proxy/kafka_consumer_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,20 @@ void TConsumerProtocolAssignment::Read(TKafkaReadable& _readable, TKafkaVersion
}

void TConsumerProtocolAssignment::Write(TKafkaWritable& _writable, TKafkaVersion _version) const {
auto useVarintSize = _version > 3;
_version = ASSIGNMENT_VERSION;

if (!NPrivate::VersionCheck<MessageMeta::PresentVersions.Min, MessageMeta::PresentVersions.Max>(_version)) {
ythrow yexception() << "Can't write version " << _version << " of TConsumerProtocolAssignment";
}

_writable.writeUnsignedVarint(Size(ASSIGNMENT_VERSION) + 1);
if (useVarintSize) {
_writable.writeUnsignedVarint(Size(_version) + 1);
} else {
TKafkaInt32 size = Size(_version);
_writable << size;
}

_writable << _version;
NPrivate::TWriteCollector _collector;
NPrivate::Write<AssignedPartitionsMeta>(_collector, _writable, _version, AssignedPartitions);
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kafka_proxy/kafka_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4517,7 +4517,12 @@ i32 TSyncGroupResponseData::Size(TKafkaVersion _version) const {
if (NPrivate::VersionCheck<MessageMeta::FlexibleVersions.Min, MessageMeta::FlexibleVersions.Max>(_version)) {
_collector.Size += NPrivate::SizeOfUnsignedVarint(_collector.NumTaggedFields);
}
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
auto useVarintSize = _version > 3;
if (useVarintSize) {
return _collector.Size + NPrivate::SizeOfUnsignedVarint(_assignmentCollector.Size + 1);
} else {
return _collector.Size + sizeof(TKafkaInt32);
}
}


Expand Down
7 changes: 6 additions & 1 deletion ydb/core/kafka_proxy/ut/ut_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,12 @@ class TTestClient {
TReadInfo readInfo;
for (;;) {
readInfo = JoinAndSyncGroup(topics, groupId);
if (readInfo.Partitions.size() == expectedPartitionsCount) {
ui32 partitionsCount = 0;
for (auto topicPartitions: readInfo.Partitions) {
partitionsCount += topicPartitions.Partitions.size();
}

if (partitionsCount == expectedPartitionsCount) {
break;
}
WaitRebalance(readInfo.MemberId, readInfo.GenerationId, groupId);
Expand Down
20 changes: 17 additions & 3 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
auto& userInfo = userInfoPair.second;
if (!userInfo.LabeledCounters)
continue;
if (!userInfo.HasReadRule && !userInfo.Important)
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
auto* cac = ac->AddConsumerAggregatedCounters();
cac->SetConsumer(userInfo.User);
Expand Down Expand Up @@ -1083,7 +1083,7 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
auto& userInfo = userInfoPair.second;
if (!userInfo.LabeledCounters)
continue;
if (!userInfo.HasReadRule && !userInfo.Important)
if (userInfoPair.first != CLIENTID_WITHOUT_CONSUMER && !userInfo.HasReadRule && !userInfo.Important)
continue;
bool haveChanges = false;
userInfo.EndOffset = EndOffset;
Expand Down Expand Up @@ -1187,6 +1187,12 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Set(quotaUsage);
}
}

if (userInfoPair.first == CLIENTID_WITHOUT_CONSUMER ) {
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_BYTES].Get());
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_USAGE].Set(userInfo.LabeledCounters->GetCounters()[METRIC_READ_QUOTA_PER_CONSUMER_USAGE].Get());
}

if (haveChanges) {
ctx.Send(Tablet, new TEvPQ::TEvPartitionLabeledCounters(Partition, *userInfo.LabeledCounters));
}
Expand Down Expand Up @@ -1299,6 +1305,14 @@ bool TPartition::UpdateCounters(const TActorContext& ctx, bool force) {
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
}
}

if (PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_NO_CONSUMER_BYTES].Get()) {
ui64 quotaUsage = ui64(AvgReadBytes.GetValue()) * 1000000 / PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_BYTES].Get() / 60;
if (quotaUsage != PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Get()) {
haveChanges = true;
PartitionCountersLabeled->GetCounters()[METRIC_READ_QUOTA_PARTITION_TOTAL_USAGE].Set(quotaUsage);
}
}
return haveChanges;
}

Expand Down Expand Up @@ -1805,7 +1819,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorC
} else {
TabletCounters.Cumulative()[COUNTER_PQ_WRITE_TIMESTAMP_CACHE_HIT].Increment(1);
}
} else {
} else if (user != CLIENTID_WITHOUT_CONSUMER) {
auto ui = UsersInfoStorage->GetIfExists(user);
if (ui && ui->LabeledCounters) {
ScheduleDropPartitionLabeledCounters(ui->LabeledCounters->GetGroup());
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -885,8 +885,8 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
5000, 10'000, 30'000, 99'999'999});
SLIBigLatency = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WriteBigLatency"}, true, "name", false);
WritesTotal = NKikimr::NPQ::TMultiCounter(subGroup, aggr, {}, {"WritesTotal"}, true, "name", false);
if (IsQuotingEnabled() && !TopicWriteQuotaResourcePath.empty()) {
subgroups.push_back({"name", "api.grpc.topic.stream_write.topic_throttled_milliseconds"});
if (IsQuotingEnabled()) {
subgroups.push_back({"name", "topic.write.topic_throttled_milliseconds"});
TopicWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {},
Expand All @@ -899,7 +899,7 @@ void TPartition::SetupStreamCounters(const TActorContext& ctx) {
subgroups.pop_back();
}

subgroups.push_back({"name", "api.grpc.topic.stream_write.partition_throttled_milliseconds"});
subgroups.push_back({"name", "topic.write.partition_throttled_milliseconds"});
PartitionWriteQuotaWaitCounter = THolder<NKikimr::NPQ::TPercentileCounter>(
new NKikimr::NPQ::TPercentileCounter(
NPersQueue::GetCountersForTopic(counters, IsServerless), {}, subgroups, "bin",
Expand Down
38 changes: 38 additions & 0 deletions ydb/core/persqueue/ut/counters_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,44 @@ Y_UNIT_TEST(Partition) {
}
}

Y_UNIT_TEST(PartitionWriteQuota) {
TTestContext tc;

TFinalizer finalizer(tc);
bool activeZone{false};
tc.Prepare("", [](TTestActorRuntime&) {}, activeZone, false, true);
tc.Runtime->SetScheduledLimit(100);
tc.Runtime->GetAppData(0).PQConfig.MutableQuotingConfig()->SetEnableQuoting(true);

PQTabletPrepare({.partitions = 1, .writeSpeed = 50_KB}, {}, tc);
TVector<std::pair<ui64, TString>> data;
TString s{50_KB, 'c'};
data.push_back({1, s});
for (auto i = 0u; i < 7; i++) {
CmdWrite(0, "sourceid0", data, tc, false);
data[0].first++;
}

{
auto counters = tc.Runtime->GetAppData(0).Counters;
Y_ABORT_UNLESS(counters);
auto dbGroup = GetServiceCounters(counters, "pqproxy");

auto quotaWait = dbGroup->FindSubgroup("subsystem", "partitionWriteQuotaWait")
->FindSubgroup("Account", "total")
->FindSubgroup("Producer", "total")
->FindSubgroup("Topic", "total")
->FindSubgroup("TopicPath", "total")
->FindSubgroup("OriginDC", "cluster");
auto histogram = quotaWait->FindSubgroup("sensor", "PartitionWriteQuotaWaitOriginal");
TStringStream histogramStr;
histogram->OutputHtml(histogramStr);
Cerr << "**** Total histogram: **** \n " << histogramStr.Str() << "**** **** **** ****" << Endl;
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "1000ms")->Val(), 3);
UNIT_ASSERT_VALUES_EQUAL(histogram->FindNamedCounter("Interval", "2500ms")->Val(), 1);
}
}

Y_UNIT_TEST(PartitionFirstClass) {
TTestContext tc;
TFinalizer finalizer(tc);
Expand Down
30 changes: 15 additions & 15 deletions ydb/core/persqueue/ut/resources/counters_datastreams.html
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,6 @@
bin=60000: 0
bin=999999: 0

name=api.grpc.topic.stream_write.partition_throttled_milliseconds:
bin=0: 30
bin=1: 0
bin=10: 0
bin=100: 0
bin=1000: 0
bin=10000: 0
bin=20: 0
bin=2500: 0
bin=5: 0
bin=50: 0
bin=500: 0
bin=5000: 0
bin=999999: 0

name=topic.write.lag_milliseconds:
bin=100: 0
bin=1000: 10
Expand Down Expand Up @@ -68,4 +53,19 @@
bin=5242880: 0
bin=67108864: 0
bin=99999999: 0

name=topic.write.partition_throttled_milliseconds:
bin=0: 30
bin=1: 0
bin=10: 0
bin=100: 0
bin=1000: 0
bin=10000: 0
bin=20: 0
bin=2500: 0
bin=5: 0
bin=50: 0
bin=500: 0
bin=5000: 0
bin=999999: 0
</pre>
2 changes: 2 additions & 0 deletions ydb/core/persqueue/ut/resources/counters_topics.html
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
name=topic.partition.read.inflight_throttled_microseconds_max: 0
name=topic.partition.read.speed_limit_bytes_per_second: 20000000000
name=topic.partition.read.throttled_microseconds_max: 0
name=topic.partition.read_without_consumer.speed_limit_bytes_per_second: 0
name=topic.partition.read_without_consumer.throttled_microseconds_max: 0
name=topic.partition.storage_bytes_max: 0
name=topic.partition.total_count: 2
name=topic.partition.uptime_milliseconds_min: 30000
Expand Down
Loading

0 comments on commit 3a46186

Please sign in to comment.