Skip to content

Conversation

@ryandielhenn
Copy link
Contributor

@ryandielhenn ryandielhenn commented Apr 21, 2021

The metrics are calculated by counting records as they are replayed e.g. replay(TopicRecord), replay(RemoveTopicRecord)

This was unit tested using MockControllerMetrics.

https://issues.apache.org/jira/browse/KAFKA-12697

@cmccabe
Copy link
Contributor

cmccabe commented Apr 21, 2021

Hi @dielhennr, thanks for the PR!

I think it would be simpler to have ControllerMetrics#setGlobalTopicsCount / ControllerMetrics#globalTopicsCount rather than inc / dec / get. We can just look at the size of ReplicationControlManager#topics to get the relevant number in O(1) time. This also makes it harder for our count to get out of sync somehow.

For the partition count, we have no choice but to maintain our own count, since we can't efficiently get that number just by taking the size() of any existing data structure. However, we should do this in ReplicationControlManager rather than in the metrics object. There are other reasons why we might want this count besides exposing metrics. For example, one reason is that we might not want to allow more than a certain number of partitions to be created. (There is a KIP proposing such a feature, I believe.) So again for consistency we can just have ControllerMetrics#setGlobalPartitionsCount / ControllerMetrics#globalPartitionsCount

Keep in mind that anything accessed by a metrics callback functions needs to be set in a thread-safe way. The metrics system uses a different set of threads than the controller. So you need to use a lock, a volatile, or an atomic variable when passing information from one to the other. A volatile is usually the best way to do things since it has the lowest overhead (certainly it's the best in this particular case, I think.)

@ryandielhenn ryandielhenn changed the title Add GlobalTopicCount and GlobalPartitionCount metrics to Quorum Controller KAFKA-12697 Add GlobalTopicCount and GlobalPartitionCount metrics to Quorum Controller May 7, 2021
@ryandielhenn ryandielhenn changed the title KAFKA-12697 Add GlobalTopicCount and GlobalPartitionCount metrics to Quorum Controller KAFKA-12697: Add GlobalTopicCount and GlobalPartitionCount metrics to Quorum Controller May 7, 2021
@ryandielhenn ryandielhenn changed the title KAFKA-12697: Add GlobalTopicCount and GlobalPartitionCount metrics to Quorum Controller KAFKA-12697: Add metrics to Quorum Controller May 7, 2021
@cmccabe
Copy link
Contributor

cmccabe commented May 12, 2021

Can you create a PR that has just the global topics count and global partitions count? Then we can keep the remaining metrics in this PR. That will make it easier to review and get in.

@ryandielhenn ryandielhenn changed the title KAFKA-12697: Add metrics to Quorum Controller KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller May 13, 2021
*/
private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers;

private final Map<Uuid, Integer> offlinePartitionCounts;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use regular maps here because they will not roll back to the desired state during a snapshot restore.

In any case, I don't see why we need this map. It's enough to know how many offline partitions there are, which we already have a count of below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was so that when a topic is removed, any offline partitions for that topic are decremented from the counter.

Copy link
Contributor

@cmccabe cmccabe May 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the information that is needed is already here. If you delete X partitions that had a leader of -1, you decrement the counter by X.

String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " +
record.topicId();
newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);
if ((newPartitionInfo.leader != newPartitionInfo.preferredReplica()) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a function like "hasPreferredLeader" on PartitionControlInfo, to make this simpler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do this in a follow-on

unfenceBroker(i, ctx);
}

CreatableTopicResult foo = ctx.createTestTopic("foo",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely a nitpick, but can you put this one fewer lines? 2 lines should be enough for this. Same for the ones below.

// nothing to do
}

@Override
Copy link
Contributor

@cmccabe cmccabe May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weird, did this @Override line get deleted by accident?

Copy link
Contributor

@cmccabe cmccabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cmccabe cmccabe merged commit 80ec8fb into apache:trunk May 20, 2021
ijuma added a commit to ijuma/kafka that referenced this pull request May 26, 2021
…e-allocations-lz4

* apache-github/trunk: (43 commits)
  KAFKA-12800: Configure generator to fail on trailing JSON tokens (apache#10717)
  MINOR: clarify message ordering with max in-flight requests and idempotent producer (apache#10690)
  MINOR: Add log identifier/prefix printing in Log layer static functions (apache#10742)
  MINOR: update java doc for deprecated methods (apache#10722)
  MINOR: Fix deprecation warnings in SlidingWindowedCogroupedKStreamImplTest (apache#10703)
  KAFKA-12499: add transaction timeout verification (apache#10482)
  KAFKA-12620 Allocate producer ids on the controller (apache#10504)
  MINOR: Kafka Streams code samples formating unification (apache#10651)
  KAFKA-12808: Remove Deprecated Methods under StreamsMetrics (apache#10724)
  KAFKA-12522: Cast SMT should allow null value records to pass through (apache#10375)
  KAFKA-12820: Upgrade maven-artifact dependency to resolve CVE-2021-26291
  HOTFIX: fix checkstyle issue in KAFKA-12697
  KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (apache#10572)
  KAFKA-12342: Remove MetaLogShim and use RaftClient directly (apache#10705)
  KAFKA-12779: KIP-740, Clean up public API in TaskId and fix TaskMetadata#taskId() (apache#10735)
  KAFKA-12814: Remove Deprecated Method StreamsConfig getConsumerConfigs (apache#10737)
  MINOR: Eliminate redundant functions in LogTest suite (apache#10732)
  MINOR: Remove unused maxProducerIdExpirationMs parameter in Log constructor (apache#10723)
  MINOR: Updating files with release 2.7.1 (apache#10660)
  KAFKA-12809: Remove deprecated methods of Stores factory (apache#10729)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants