-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9076: support consumer offset sync across clusters in MM 2.0 #7577
Conversation
consumerOffsetSync | ||
.assign(new ArrayList<>(allOffsets.get(consumerGroupId).keySet())); | ||
consumerOffsetSync.commitSync(allOffsets.get(consumerGroupId)); | ||
consumerOffsetSync.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use the new reset offsets API for this: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new reset offsets API looks awesome, will make this change in the next iteration
// track the consumer that are not actively consuming from the replicated topic with same consumer group id | ||
Set<String> idleConsumers = new HashSet<>(); | ||
DescribeConsumerGroupsResult describedGroups = targetAdminClient.describeConsumerGroups(allOffsets.keySet()); | ||
for (Map.Entry<String, KafkaFuture<ConsumerGroupDescription>> group : describedGroups.describedGroups().entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also filter with the configured GroupFilter s.t. only whitelisted groups are sync'd this way. As written, anything with a checkpoint is sync'd, which is problematic when e.g. a user blacklists a group: we should stop sync'ing the group even tho a previous checkpoint may exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think once we move the "consumer offset sync" to MirrorCheckpointTask
, the group in the blacklist will be filtered out by the existing logics in MirrorCheckpointTask
return offsets; | ||
} | ||
|
||
public Map<String, Map<TopicPartition, OffsetAndMetadata>> remoteConsumerOffsets(String remoteClusterAlias, Duration timeout) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new public method, so please mention in the KIP.
@@ -67,6 +67,7 @@ | |||
protected static final String SYNC_TOPIC_ACLS = "sync.topic.acls"; | |||
protected static final String EMIT_HEARTBEATS = "emit.heartbeats"; | |||
protected static final String EMIT_CHECKPOINTS = "emit.checkpoints"; | |||
protected static final String SYNC_CONSUMER_OFFSETS = "sync.consumer.offsets"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elsewhere in MM2 we refer to "groups" but not "consumers". Should this be sync.group.offsets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
totally agree
private void syncCommitOffset() throws InterruptedException, ExecutionException { | ||
MirrorClient mirrorClient = new MirrorClient(config.targetConsumerConfig()); | ||
Map<String, Map<TopicPartition, OffsetAndMetadata>> allOffsets = | ||
mirrorClient.remoteConsumerOffsets(config.sourceClusterAlias(), config.adminTimeout()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the OffsetSyncStore class to do offset translation instead of reading back the checkpoint? The MirrorCheckpointTask class is generating the checkpoint -- why not sync the offsets at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, it indeed saves efforts, will attempt the consumer offset sync in MirrorCheckpointTask
in the next iteration
|
||
// track the consumer that are not actively consuming from the replicated topic with same consumer group id | ||
Set<String> idleConsumers = new HashSet<>(); | ||
DescribeConsumerGroupsResult describedGroups = targetAdminClient.describeConsumerGroups(allOffsets.keySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we split groups across Tasks instead of syncing all groups in a Connector? For example, MirrorCheckpointConnector divides all groups among MirrorCheckpointTasks, and each Task generates checkpoints for its assigned groups only. This is potentially more scalable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, that sounds a scalable way, will attempt this change in the next iteration
b6858b2
to
4d0477b
Compare
@ryannedolan revised the pr, please take your time for another pass of review. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks close, but I don't think we should be sending these syncs in the poll() loop. Instead, let's set up a scheduled task like in the Connectors, with a configurable interval.
ConsumerGroupDescription consumerGroupDescription = entry.getValue().get(); | ||
// consider to sync offset to the target cluster, only if the consumer group is not active | ||
if (consumerGroupDescription.state().equals(ConsumerGroupState.EMPTY)) { | ||
alterGroupOffset(group, checkpoints); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a side-effect in an otherwise pure function.
@ryannedolan do you suggest to a new connector, like "GroupOffsetSyncConnector" or an existing Connector? Also do you suggest to do the actual consumer offset sync job as a scheduled task in Connector (like |
I was thinking we'd add a Scheduler to MirrorCheckpointTask (not the Connector) and periodically write offsets from there. MirrorCheckpointTask already has an OffsetSyncStore, which lets you translate offsets for any group assigned to the Task. So you just need to loop thru the assigned groups, translate their offsets, and write them downstream. This is very close to what you have now -- just that we should do it periodically (in a Scheduler), not part of the poll() loop. I don't think we should create a new Connector or Task, since all the info we need (OffsetSyncStore, group assignment) is already in MirrorCheckpointTask. |
507648a
to
5f5694a
Compare
@ryannedolan thanks for your valuable and concrete feedback. I did the change that you may expect and please take another review when you have time. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are missing a close()/shutdown(), but otherwise lgtm.
metrics = config.metrics(); | ||
scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout()); | ||
if (config.groupOffsetSyncEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: the scheduler understands negative intervals as "disabled", so you don't need to have this extra check here. Just have syncGroupOffsetInterval return -1 if it's disabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the if
condition
if (getBoolean(SYNC_GROUP_OFFSETS_ENABLED)) { | ||
return Duration.ofSeconds(getLong(SYNC_GROUP_OFFSETS_INTERVAL_SECONDS)); | ||
} else { | ||
// negative interval to disable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
long offsetFromTarget = entry.getValue().offset(); | ||
long translatedOffset = offset.get(entry.getKey()).offset(); | ||
// do not alter the entire consumer offsets if any translated offset from upstream | ||
// is smaller than the current consumer offset in the target, per partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why skip the entire checkpoint if a single downstream partition is ahead of the checkpoint? I guess that's the safest approach -- but are we sure we can't just skip that partition and write the rest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but are we sure we can't just skip that partition and write the rest?
the latest version will skip such kind of partition and write the rest
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
Show resolved
Hide resolved
@ryannedolan thanks for another set of review feedback, updated the pr based on your latest comments |
8d962d7
to
ae1ce2f
Compare
List<Checkpoint> checkpoints = listConsumerGroupOffsets(group).entrySet().stream() | ||
.filter(x -> shouldCheckpointTopic(x.getKey().topic())) | ||
.map(x -> checkpoint(group, x.getKey(), x.getValue())) | ||
.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is duplicated -- can we make DRYer? I suggest adding a checkpointsForGroup() method, and use that in both places.
.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately | ||
.collect(Collectors.toList()); | ||
|
||
DescribeConsumerGroupsResult describedGroups = targetAdminClient.describeConsumerGroups( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be slightly more efficient to describe all consumerGroups once (outside the for-loop) and then use the resulting map here.
cce6116
to
7cd2419
Compare
Can I upvote so this ticet gets prioritized :) |
@nils-getdreams sounds like you may be interested in this feature. @ryannedolan may be testing it in house. How about cherry-pick this into your fork and let me know any feedback based on your test? |
@ning2008wisc @ryannedolan with this automated CO sync I was wondering if the user can also set an option to ensure the mirrored topics in the target cluster are also created without the src cluster prefix. |
@amuraru thanks for your comments. To directly answer your first question, I think currently no, meaning the src cluster prefix has to be added. The reason is detailed here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP-382:MirrorMaker2.0-Cycledetection The consumer will still require to subscribe to |
for (Entry<TopicPartition, OffsetAndMetadata> entry : group.getValue()) { | ||
long latestDownstreamOffset = entry.getValue().offset(); | ||
TopicPartition topicPartition = entry.getKey(); | ||
if (!convertedUpstreamOffset.containsKey(topicPartition)) { | ||
log.trace("convertedUpstreamOffset does not contain TopicPartition: {}", topicPartition.toString()); | ||
continue; | ||
} | ||
|
||
// if translated offset from upstream is smaller than the current consumer offset | ||
// in the target, skip updating the offset for that partition | ||
long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); | ||
if (latestDownstreamOffset >= convertedOffset) { | ||
log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " | ||
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); | ||
continue; | ||
} | ||
offsetToSync.put(entry.getKey(), convertedUpstreamOffset.get(topicPartition)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to propose the following change to take care of the source consumer group changes
for (Entry<TopicPartition, OffsetAndMetadata> entry : group.getValue()) { | |
long latestDownstreamOffset = entry.getValue().offset(); | |
TopicPartition topicPartition = entry.getKey(); | |
if (!convertedUpstreamOffset.containsKey(topicPartition)) { | |
log.trace("convertedUpstreamOffset does not contain TopicPartition: {}", topicPartition.toString()); | |
continue; | |
} | |
// if translated offset from upstream is smaller than the current consumer offset | |
// in the target, skip updating the offset for that partition | |
long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); | |
if (latestDownstreamOffset >= convertedOffset) { | |
log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " | |
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); | |
continue; | |
} | |
offsetToSync.put(entry.getKey(), convertedUpstreamOffset.get(topicPartition)); | |
for (Map.Entry<TopicPartition, OffsetAndMetadata> convertedEntry : convertedUpstreamOffset.entrySet()) { | |
TopicPartition topicPartition = convertedEntry.getKey(); | |
for (Entry<TopicPartition, OffsetAndMetadata> idleEntry : group.getValue()) { | |
if (idleEntry.getKey() == topicPartition) { | |
long latestDownstreamOffset = idleEntry.getValue().offset(); | |
// if translated offset from upstream is smaller than the current consumer offset | |
// in the target, skip updating the offset for that partition | |
long convertedOffset = convertedUpstreamOffset.get(topicPartition).offset(); | |
if (latestDownstreamOffset >= convertedOffset) { | |
log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " | |
+ "TopicPartition {}", latestDownstreamOffset, convertedOffset, topicPartition); | |
continue; | |
} | |
} | |
} | |
offsetToSync.put(convertedEntry.getKey(), convertedUpstreamOffset.get(topicPartition)); |
81b41bc
to
35a7b1f
Compare
@ryannedolan @mimaison I added the Integration tests for testing this automated consumer offset sync in MM 2.0. When available, I am appreciated for your first pass of review. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. I've made another pass and left a few comments
@@ -16,6 +16,13 @@ | |||
*/ | |||
package org.apache.kafka.connect.mirror; | |||
|
|||
import java.util.HashMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move these imports with the other java.util
imports?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -79,7 +93,15 @@ public void start(Map<String, String> props) { | |||
pollTimeout = config.consumerPollTimeout(); | |||
offsetSyncStore = new OffsetSyncStore(config); | |||
sourceAdminClient = AdminClient.create(config.sourceAdminConfig()); | |||
targetAdminClient = (KafkaAdminClient) Admin.create(config.targetAdminConfig()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we change the type definition of these 2 to be Admin
? Then we don't need the cast
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
|
||
private void refreshIdleConsumerGroupOffset() { | ||
Map<String, KafkaFuture<ConsumerGroupDescription>> consumerGroupsDesc = targetAdminClient | ||
.describeConsumerGroups(consumerGroups).describedGroups(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we describe all groups just to get groups in the EMPTY
state. Can we use the new listGroups()
method introduced in KIP-518 to only get groups in that specific state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second though, using describeConsumerGroups()
may be more predictable in terms on work to do, as you describe only the groups assgined to this task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great to know that KIP, then I will keep using describeConsumerGroups()
here
// in the target, skip updating the offset for that partition | ||
long latestDownstreamOffset = targetConsumerOffset.get(topicPartition).offset(); | ||
if (latestDownstreamOffset >= convertedOffset.offset()) { | ||
log.trace("latestDownstreamOffset {} is larger than convertedUpstreamOffset {} for " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
larger or equal
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to latestDownstreamOffset {} is larger than or equal to convertedUpstreamOffset {} for....
props.putAll(originalsWithPrefix(TARGET_CLUSTER_PREFIX)); | ||
props.keySet().retainAll(MirrorClientConfig.CLIENT_CONFIG_DEF.names()); | ||
props.putAll(originalsWithPrefix(CONSUMER_CLIENT_PREFIX)); | ||
props.put("enable.auto.commit", "false"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the existing constants for the config names?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -29,7 +35,7 @@ | |||
@Test | |||
public void testDownstreamTopicRenaming() { | |||
MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2", | |||
new DefaultReplicationPolicy(), null); | |||
new DefaultReplicationPolicy(), null, new HashMap<>(), new HashMap<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could use Collections.emptyMap()
here and in a few places below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
// 't1p0' denotes topic1, partition 0 | ||
TopicPartition t1p0 = new TopicPartition(topic1, 0); | ||
// 'c1t1p0' denotes the consumer offsets for topic1, partition 0 for consumer1 | ||
Entry<TopicPartition, OffsetAndMetadata> c1t1p0 = new SimpleEntry<>(t1p0, new OffsetAndMetadata(100, "")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks unused, same below for c2t2p0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the unused
|
||
Map<TopicPartition, OffsetAndMetadata> c2t2 = new HashMap<>(); | ||
TopicPartition t2p0 = new TopicPartition(topic2, 0); | ||
Entry<TopicPartition, OffsetAndMetadata> c2t2p0 = new SimpleEntry<>(t2p0, new OffsetAndMetadata(50, "")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use new OffsetAndMetadata(50)
if we don't set any metadata. same below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
@Test | ||
public void testReplication() throws InterruptedException { | ||
|
||
// create consumers before starting the connectors so we don't need to wait for discovery | ||
Consumer<byte[], byte[]> consumer3 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit unusual to have consumer3
and consumer4
without 1 and 2 =)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to consumer1
and consumer2
|
||
records = consumer.poll(Duration.ofMillis(500)); | ||
// similar reasoning as above, no more records to consume by the same consumer group at backup cluster | ||
assertTrue("consumer record size is not zero", records.count() == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about assertEquals("consumer record size is not zero", 0, records.count());
? It can also be applied in a few other places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Hello @mimaison thanks for your comments. I have addressed them in the latest push and please take another review. Thanks |
bump for attention @mimaison ^ given that https://issues.apache.org/jira/browse/KAFKA-9076 is slipped to the next release (2.7.0) and some people may be already testing/using this feature, I would hope if it is possible to revisit this PR soon so that it can formally part of Kafka. Thanks |
ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. I've taken another look and left a few more minor comments. I think I'd be happy to merge it once these are addressed
@@ -94,6 +114,7 @@ public void stop() { | |||
Utils.closeQuietly(offsetSyncStore, "offset sync store"); | |||
Utils.closeQuietly(sourceAdminClient, "source admin client"); | |||
Utils.closeQuietly(metrics, "metrics"); | |||
Utils.closeQuietly(scheduler, "scheduler"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also close targetAdminClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -42,6 +47,7 @@ | |||
private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointTask.class); | |||
|
|||
private AdminClient sourceAdminClient; | |||
private AdminClient targetAdminClient; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Admin
instead of AdminClient
for both of these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -61,16 +63,18 @@ | |||
private static final int RECORD_TRANSFER_DURATION_MS = 10_000; | |||
private static final int CHECKPOINT_DURATION_MS = 20_000; | |||
|
|||
private SystemTime time = new SystemTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Time.SYSTEM
instead of creating a new instance
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
In order to make the Kafka consumer and stream application migrate from source to target cluster transparently and conveniently, e.g. in event of source cluster failure, a background job is proposed to periodically sync the consumer offsets from the source to target cluster, so that when the consumer and stream applications switche to the target cluster, they will resume to consume from where they left off at source cluster.
@mimaison thanks so much for your fast response :) I have addressed your above 3 comments and please take the final review pass |
retest this please |
ok to test |
unrelated test failures:
|
retest this please |
ok to test |
Some of the failures look related:
Let's retest |
ok to test |
seems unrelated test failed:
|
@mimaison If the one failed test is not relevant, are we ready to merge, or anything I can do? Thanks |
Huge thanks to all reviewers and committers for providing valuable comments and testing results |
* 'trunk' of github.com:apache/kafka: KAFKA-10180: Fix security_config caching in system tests (apache#8917) KAFKA-10173: Fix suppress changelog binary schema compatibility (apache#8905) KAFKA-10166: always write checkpoint before closing an (initialized) task (apache#8926) MINOR: Rename SslTransportLayer.State."NOT_INITALIZED" enum value to "NOT_INITIALIZED" MINOR: Update Scala to 2.13.3 (apache#8931) KAFKA-9076: support consumer sync across clusters in MM 2.0 (apache#7577) MINOR: Remove Diamond and code code Alignment (apache#8107) KAFKA-10198: guard against recycling dirty state (apache#8924)
In order to make the Kafka consumer and stream application migrate from source to target cluster
transparently and conveniently, e.g. in event of source cluster failure, a background task is proposed to periodically sync the consumer offsets from the source to target cluster, so that when the consumer and stream applications switch to the target cluster, they will resume to consume from where they left off at source cluster.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0