Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
package kafka.server.metadata

import java.util.concurrent.RejectedExecutionException

import kafka.utils.Logging
import org.apache.kafka.image.MetadataImage
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.snapshot.SnapshotWriter


trait SnapshotWriterBuilder {
def build(committedOffset: Long,
committedEpoch: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ class BrokerMetadataListenerTest {
assertEquals(200L, newImage.highestOffsetAndEpoch().offset)
assertEquals(new BrokerRegistration(0, 100L,
Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg"), Collections.emptyList[Endpoint](),
Collections.emptyMap[String, VersionRange](), Optional.empty[String](), false),
Collections.emptyMap[String, VersionRange](), Optional.empty[String](), false, false),
delta.clusterDelta().broker(0))
assertEquals(new BrokerRegistration(1, 200L,
Uuid.fromString("QkOQtNKVTYatADcaJ28xDg"), Collections.emptyList[Endpoint](),
Collections.emptyMap[String, VersionRange](), Optional.empty[String](), true),
Collections.emptyMap[String, VersionRange](), Optional.empty[String](), true, false),
delta.clusterDelta().broker(1))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.VersionRange;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
Expand All @@ -64,8 +66,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;


/**
Expand All @@ -84,6 +86,7 @@ static class Builder {
private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
private ReplicaPlacer replicaPlacer = null;
private ControllerMetrics controllerMetrics = null;
private FeatureControlManager featureControl = null;

Builder setLogContext(LogContext logContext) {
this.logContext = logContext;
Expand Down Expand Up @@ -120,8 +123,15 @@ Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
return this;
}

Builder setFeatureControlManager(FeatureControlManager featureControl) {
this.featureControl = featureControl;
return this;
}

ClusterControlManager build() {
if (logContext == null) logContext = new LogContext();
if (logContext == null) {
logContext = new LogContext();
}
if (clusterId == null) {
clusterId = Uuid.randomUuid().toString();
}
Expand All @@ -132,15 +142,20 @@ ClusterControlManager build() {
replicaPlacer = new StripedReplicaPlacer(new Random());
}
if (controllerMetrics == null) {
throw new RuntimeException("You must specify controllerMetrics");
throw new RuntimeException("You must specify ControllerMetrics");
}
if (featureControl == null) {
throw new RuntimeException("You must specify FeatureControlManager");
}
return new ClusterControlManager(logContext,
clusterId,
time,
snapshotRegistry,
sessionTimeoutNs,
replicaPlacer,
controllerMetrics);
controllerMetrics,
featureControl
);
}
}

Expand Down Expand Up @@ -218,14 +233,20 @@ boolean check() {
*/
private Optional<ReadyBrokersFuture> readyBrokersFuture;

/**
* The feature control manager.
*/
private final FeatureControlManager featureControl;

private ClusterControlManager(
LogContext logContext,
String clusterId,
Time time,
SnapshotRegistry snapshotRegistry,
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer,
ControllerMetrics metrics
ControllerMetrics metrics,
FeatureControlManager featureControl
) {
this.logContext = logContext;
this.clusterId = clusterId;
Expand All @@ -237,6 +258,7 @@ private ClusterControlManager(
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
this.controllerMetrics = metrics;
this.featureControl = featureControl;
}

ReplicaPlacer replicaPlacer() {
Expand Down Expand Up @@ -339,7 +361,8 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
heartbeatManager.register(brokerId, record.fenced());

List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(record, REGISTER_BROKER_RECORD.highestSupportedVersion()));
records.add(new ApiMessageAndVersion(record, featureControl.metadataVersion().
registerBrokerRecordVersion()));
return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
}

Expand All @@ -361,7 +384,8 @@ public void replay(RegisterBrokerRecord record) {
BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
new BrokerRegistration(brokerId, record.brokerEpoch(),
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced()));
Optional.ofNullable(record.rack()), record.fenced(),
record.inControlledShutdown()));
updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
if (heartbeatManager != null) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
Expand Down Expand Up @@ -394,31 +418,49 @@ public void replay(UnregisterBrokerRecord record) {
}

public void replay(FenceBrokerRecord record) {
replayRegistrationChange(record, record.id(), record.epoch(),
BrokerRegistrationFencingChange.UNFENCE);
replayRegistrationChange(
record,
record.id(),
record.epoch(),
BrokerRegistrationFencingChange.UNFENCE.asBoolean(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to Jose mentioned, this should be noticed if #12236 merged first.

BrokerRegistrationInControlledShutdownChange.NONE.asBoolean()
);
}

public void replay(UnfenceBrokerRecord record) {
replayRegistrationChange(record, record.id(), record.epoch(),
BrokerRegistrationFencingChange.FENCE);
replayRegistrationChange(
record,
record.id(),
record.epoch(),
BrokerRegistrationFencingChange.FENCE.asBoolean(),
BrokerRegistrationInControlledShutdownChange.NONE.asBoolean()
);
}

public void replay(BrokerRegistrationChangeRecord record) {
Optional<BrokerRegistrationFencingChange> fencingChange =
BrokerRegistrationFencingChange.fromValue(record.fenced());
if (!fencingChange.isPresent()) {
throw new RuntimeException(String.format("Unable to replay %s: unknown " +
"value for fenced field: %d", record.toString(), record.fenced()));
}
replayRegistrationChange(record, record.brokerId(), record.brokerEpoch(),
fencingChange.get());
BrokerRegistrationFencingChange fencingChange =
BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for fenced field: %d", record, record.fenced())));
BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for inControlledShutdown field: %d", record, record.inControlledShutdown())));
replayRegistrationChange(
record,
record.brokerId(),
record.brokerEpoch(),
fencingChange.asBoolean(),
inControlledShutdownChange.asBoolean()
);
}

private void replayRegistrationChange(
ApiMessage record,
int brokerId,
long brokerEpoch,
BrokerRegistrationFencingChange fencingChange
Optional<Boolean> fencingChange,
Optional<Boolean> inControlledShutdownChange
) {
BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
if (curRegistration == null) {
Expand All @@ -428,10 +470,10 @@ private void replayRegistrationChange(
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration with that epoch found", record.toString()));
} else {
BrokerRegistration nextRegistration = curRegistration;
if (fencingChange != BrokerRegistrationFencingChange.NONE) {
nextRegistration = nextRegistration.cloneWithFencing(fencingChange.asBoolean().get());
}
BrokerRegistration nextRegistration = curRegistration.cloneWith(
fencingChange,
inControlledShutdownChange
);
if (!curRegistration.equals(nextRegistration)) {
brokerRegistrations.put(brokerId, nextRegistration);
updateMetrics(curRegistration, nextRegistration);
Expand Down Expand Up @@ -485,12 +527,36 @@ Iterator<UsableBroker> usableBrokers() {
id -> brokerRegistrations.get(id).rack());
}

/**
* Returns true if the broker is in fenced state; Returns false if it is
* not or if it does not exist.
*/
public boolean unfenced(int brokerId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor but I am pretty sure that we don't use this method anymore in src/main. If so, I say that we just remove it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. However, it is used in many places in the tests. I haven't found a good way to replace it in tests that is as convenient as this predicate. I would keep it.

BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) return false;
return !registration.fenced();
}

/**
* Returns true if the broker is in controlled shutdown state; Returns false
* if it is not or if it does not exist.
*/
public boolean inControlledShutdown(int brokerId) {
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) return false;
return registration.inControlledShutdown();
}

/**
* Returns true if the broker is active. Active means not fenced nor in controlled
* shutdown; Returns false if it is not active or if it does not exist.
*/
public boolean active(int brokerId) {
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) return false;
return !registration.inControlledShutdown() && !registration.fenced();
}

BrokerHeartbeatManager heartbeatManager() {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
Expand Down Expand Up @@ -520,9 +586,15 @@ public void addReadyBrokersFuture(CompletableFuture<Void> future, int minBrokers

class ClusterControlIterator implements Iterator<List<ApiMessageAndVersion>> {
private final Iterator<Entry<Integer, BrokerRegistration>> iterator;
private final MetadataVersion metadataVersion;

ClusterControlIterator(long epoch) {
this.iterator = brokerRegistrations.entrySet(epoch).iterator();
if (featureControl.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
this.metadataVersion = MetadataVersion.IBP_3_0_IV1;
} else {
this.metadataVersion = featureControl.metadataVersion();
}
}

@Override
Expand All @@ -549,16 +621,19 @@ public List<ApiMessageAndVersion> next() {
setMaxSupportedVersion(featureEntry.getValue().max()).
setMinSupportedVersion(featureEntry.getValue().min()));
}
List<ApiMessageAndVersion> batch = new ArrayList<>();
batch.add(new ApiMessageAndVersion(new RegisterBrokerRecord().
RegisterBrokerRecord record = new RegisterBrokerRecord().
setBrokerId(brokerId).
setIncarnationId(registration.incarnationId()).
setBrokerEpoch(registration.epoch()).
setEndPoints(endpoints).
setFeatures(features).
setRack(registration.rack().orElse(null)).
setFenced(registration.fenced()), REGISTER_BROKER_RECORD.highestSupportedVersion()));
return batch;
setFenced(registration.fenced());
if (metadataVersion.isInControlledShutdownStateSupported()) {
record.setInControlledShutdown(registration.inControlledShutdown());
}
return singletonList(new ApiMessageAndVersion(record,
metadataVersion.registerBrokerRecordVersion()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,11 +622,16 @@ public String toString() {
}
}

// VisibleForTesting
// Visible for testing
ReplicationControlManager replicationControl() {
return replicationControl;
}

// Visible for testing
ClusterControlManager clusterControl() {
return clusterControl;
}

<T> CompletableFuture<T> appendReadEvent(
String name,
OptionalLong deadlineNs,
Expand Down Expand Up @@ -1557,6 +1562,11 @@ private QuorumController(LogContext logContext,
setNodeId(nodeId).
build();
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.featureControl = new FeatureControlManager.Builder().
setLogContext(logContext).
setQuorumFeatures(quorumFeatures).
setSnapshotRegistry(snapshotRegistry).
build();
this.clusterControl = new ClusterControlManager.Builder().
setLogContext(logContext).
setClusterId(clusterId).
Expand All @@ -1565,12 +1575,8 @@ private QuorumController(LogContext logContext,
setSessionTimeoutNs(sessionTimeoutNs).
setReplicaPlacer(replicaPlacer).
setControllerMetrics(controllerMetrics).
setFeatureControlManager(featureControl).
build();
this.featureControl = new FeatureControlManager.Builder().
setLogContext(logContext).
setQuorumFeatures(quorumFeatures).
setSnapshotRegistry(snapshotRegistry).
build();
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
Expand Down
Loading