Skip to content

Commit

Permalink
KAFKA-13959: Controller should unfence Broker with busy metadata log (a…
Browse files Browse the repository at this point in the history
…pache#12274)

The reason for KAFKA-13959 is a little complex, the two keys to this problem are:

KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms. We rely on fetchPurgatory to complete a FetchRequest, in details, if FetchRequest.fetchOffset >= log.endOffset, we will wait for 500ms to send a FetchResponse. The follower needs to send one more FetchRequest to get the HW.

Here are the event sequences:

1. When starting the leader(active controller) LEO=m+1(m is the offset of the last record), leader HW=m(because we need more than half of the voters to reach m+1)
2. Follower (standby controller) and observer (broker) send FetchRequest(fetchOffset=m)
    2.1. leader receives FetchRequest, set leader HW=m and waits 500ms before send FetchResponse
    2.2. leader send FetchResponse(HW=m)
    3.3 broker receive FetchResponse(HW=m), set metadataOffset=m.
3. Leader append NoOpRecord, LEO=m+2. leader HW=m
4. Looping 1-4

If we change MAX_FETCH_WAIT_MS=200 (less than half of MetadataMaxIdleIntervalMs), this problem can be solved temporarily.

We plan to improve this problem in 2 ways, firstly, in this PR, we change the controller to unfence a broker when the broker's high-watermark has reached the broker registration record for that broker. Secondly, we will propagate the HWM to the replicas as quickly as possible in KAFKA-14145.

Reviewers: Luke Chen <showuon@gmail.com>, José Armando García Sancio <jsancio@users.noreply.github.com>
  • Loading branch information
dengziming authored Aug 12, 2022
1 parent 4a3e92b commit 50e5b32
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 52 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object Defaults {
val BrokerHeartbeatIntervalMs = 2000
val BrokerSessionTimeoutMs = 9000
val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
val MetadataMaxIdleIntervalMs = 5000
val MetadataMaxIdleIntervalMs = 500

/** KRaft mode configs */
val EmptyNodeId: Int = -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,17 +511,17 @@ BrokerControlState currentBrokerState(BrokerHeartbeatState broker) {
/**
* Calculate the next broker state for a broker that just sent a heartbeat request.
*
* @param brokerId The broker id.
* @param request The incoming heartbeat request.
* @param lastCommittedOffset The last committed offset of the quorum controller.
* @param hasLeaderships A callback which evaluates to true if the broker leads
* at least one partition.
* @param brokerId The broker id.
* @param request The incoming heartbeat request.
* @param registerBrokerRecordOffset The offset of the broker's {@link org.apache.kafka.common.metadata.RegisterBrokerRecord}.
* @param hasLeaderships A callback which evaluates to true if the broker leads
* at least one partition.
*
* @return The current and next broker states.
* @return The current and next broker states.
*/
BrokerControlStates calculateNextBrokerState(int brokerId,
BrokerHeartbeatRequestData request,
long lastCommittedOffset,
long registerBrokerRecordOffset,
Supplier<Boolean> hasLeaderships) {
BrokerHeartbeatState broker = brokers.getOrDefault(brokerId,
new BrokerHeartbeatState(brokerId));
Expand All @@ -533,17 +533,17 @@ BrokerControlStates calculateNextBrokerState(int brokerId,
"shutdown.", brokerId);
return new BrokerControlStates(currentState, SHUTDOWN_NOW);
} else if (!request.wantFence()) {
if (request.currentMetadataOffset() >= lastCommittedOffset) {
if (request.currentMetadataOffset() >= registerBrokerRecordOffset) {
log.info("The request from broker {} to unfence has been granted " +
"because it has caught up with the last committed metadata " +
"offset {}.", brokerId, lastCommittedOffset);
"because it has caught up with the offset of it's register " +
"broker record {}.", brokerId, registerBrokerRecordOffset);
return new BrokerControlStates(currentState, UNFENCED);
} else {
if (log.isDebugEnabled()) {
log.debug("The request from broker {} to unfence cannot yet " +
"be granted because it has not caught up with the last " +
"committed metadata offset {}. It is still at offset {}.",
brokerId, lastCommittedOffset, request.currentMetadataOffset());
"be granted because it has not caught up with the offset of " +
"it's register broker record {}. It is still at offset {}.",
brokerId, registerBrokerRecordOffset, request.currentMetadataOffset());
}
return new BrokerControlStates(currentState, FENCED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -217,6 +218,14 @@ boolean check() {
*/
private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;

/**
* Save the offset of each broker registration record, we will only unfence a
* broker when its high watermark has reached its broker registration record,
* this is not necessarily the exact offset of each broker registration record
* but should not be smaller than it.
*/
private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;

/**
* A reference to the controller's metrics registry.
*/
Expand Down Expand Up @@ -255,6 +264,7 @@ private ClusterControlManager(
this.sessionTimeoutNs = sessionTimeoutNs;
this.replicaPlacer = replicaPlacer;
this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
this.registerBrokerRecordOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
this.controllerMetrics = metrics;
Expand Down Expand Up @@ -366,7 +376,15 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch));
}

public void replay(RegisterBrokerRecord record) {
public OptionalLong registerBrokerRecordOffset(int brokerId) {
if (registerBrokerRecordOffsets.containsKey(brokerId)) {
return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId));
}
return OptionalLong.empty();
}

public void replay(RegisterBrokerRecord record, long offset) {
registerBrokerRecordOffsets.put(record.brokerId(), offset);
int brokerId = record.brokerId();
List<Endpoint> listeners = new ArrayList<>();
for (BrokerEndpoint endpoint : record.endPoints()) {
Expand Down Expand Up @@ -401,14 +419,15 @@ public void replay(RegisterBrokerRecord record) {
}

public void replay(UnregisterBrokerRecord record) {
registerBrokerRecordOffsets.remove(record.brokerId());
int brokerId = record.brokerId();
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration found for that id", record.toString()));
"registration found for that id", record));
} else if (registration.epoch() != record.brokerEpoch()) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration with that epoch found", record.toString()));
"registration with that epoch found", record));
} else {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
brokerRegistrations.remove(brokerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
Expand Down Expand Up @@ -759,7 +760,7 @@ public void run() throws Exception {
int i = 1;
for (ApiMessageAndVersion message : result.records()) {
try {
replay(message.message(), Optional.empty());
replay(message.message(), Optional.empty(), writeOffset + result.records().size());
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record, which was " +
"%d of %d record(s) in the batch following last writeOffset %d.",
Expand Down Expand Up @@ -883,7 +884,7 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
int i = 1;
for (ApiMessageAndVersion message : messages) {
try {
replay(message.message(), Optional.empty());
replay(message.message(), Optional.empty(), offset);
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record on standby " +
"controller, which was %d of %d record(s) in the batch with baseOffset %d.",
Expand Down Expand Up @@ -938,7 +939,7 @@ public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
int i = 1;
for (ApiMessageAndVersion message : messages) {
try {
replay(message.message(), Optional.of(reader.snapshotId()));
replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
} catch (Throwable e) {
String failureMessage = String.format("Unable to apply %s record " +
"from snapshot %s on standby controller, which was %d of " +
Expand Down Expand Up @@ -1305,12 +1306,19 @@ private void handleFeatureControlChange() {
}
}

@SuppressWarnings("unchecked")
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) {
/**
* Apply the metadata record to its corresponding in-memory state(s)
*
* @param message The metadata record
* @param snapshotId The snapshotId if this record is from a snapshot
* @param batchLastOffset The offset of the last record in the log batch, or the lastContainedLogOffset
* if this record is from a snapshot, this is used along with RegisterBrokerRecord
*/
private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
switch (type) {
case REGISTER_BROKER_RECORD:
clusterControl.replay((RegisterBrokerRecord) message);
clusterControl.replay((RegisterBrokerRecord) message, batchLastOffset);
break;
case UNREGISTER_BROKER_RECORD:
clusterControl.replay((UnregisterBrokerRecord) message);
Expand Down Expand Up @@ -1874,8 +1882,13 @@ public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(

@Override
public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
OptionalLong offsetForRegisterBrokerRecord = clusterControl.registerBrokerRecordOffset(brokerId);
if (!offsetForRegisterBrokerRecord.isPresent()) {
throw new StaleBrokerEpochException(
String.format("Receive a heartbeat from broker %d before registration", brokerId));
}
ControllerResult<BrokerHeartbeatReply> result = replicationControl.
processBrokerHeartbeat(request, lastCommittedOffset);
processBrokerHeartbeat(request, offsetForRegisterBrokerRecord.getAsLong());
inControlledShutdown = result.response().inControlledShutdown();
rescheduleMaybeFenceStaleBrokers();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1355,13 +1355,13 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
}

ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
BrokerHeartbeatRequestData request, long lastCommittedOffset) {
BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) {
int brokerId = request.brokerId();
long brokerEpoch = request.brokerEpoch();
clusterControl.checkBrokerEpoch(brokerId, brokerEpoch);
BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager();
BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId,
request, lastCommittedOffset, () -> brokersToIsrs.hasLeaderships(brokerId));
request, registerBrokerRecordOffset, () -> brokersToIsrs.hasLeaderships(brokerId));
List<ApiMessageAndVersion> records = new ArrayList<>();
if (states.current() != states.next()) {
switch (states.next()) {
Expand All @@ -1382,7 +1382,7 @@ ControllerResult<BrokerHeartbeatReply> processBrokerHeartbeat(
heartbeatManager.touch(brokerId,
states.next().fenced(),
request.currentMetadataOffset());
boolean isCaughtUp = request.currentMetadataOffset() >= lastCommittedOffset;
boolean isCaughtUp = request.currentMetadataOffset() >= registerBrokerRecordOffset;
BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp,
states.next().fenced(),
states.next().inControlledShutdown(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testReplay(MetadataVersion metadataVersion) {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord);
clusterControl.replay(brokerRecord, 100L);
clusterControl.checkBrokerEpoch(1, 100);
assertThrows(StaleBrokerEpochException.class,
() -> clusterControl.checkBrokerEpoch(1, 101));
Expand Down Expand Up @@ -165,19 +165,20 @@ public void testReplayRegisterBrokerRecord() {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord);
clusterControl.replay(brokerRecord, 100L);

assertFalse(clusterControl.unfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));

brokerRecord.setInControlledShutdown(false);
clusterControl.replay(brokerRecord);
clusterControl.replay(brokerRecord, 100L);

assertFalse(clusterControl.unfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());

brokerRecord.setFenced(false);
clusterControl.replay(brokerRecord);
clusterControl.replay(brokerRecord, 100L);

assertTrue(clusterControl.unfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
Expand Down Expand Up @@ -217,7 +218,7 @@ public void testReplayBrokerRegistrationChangeRecord() {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord);
clusterControl.replay(brokerRecord, 100L);

assertTrue(clusterControl.unfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
Expand Down Expand Up @@ -341,17 +342,19 @@ public void testUnregister() throws Exception {
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
clusterControl.replay(brokerRecord);
clusterControl.replay(brokerRecord, 100L);
assertEquals(new BrokerRegistration(1, 100,
Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w"), Collections.singletonMap("PLAINTEXT",
new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 9092)),
Collections.emptyMap(), Optional.of("arack"), true, false),
clusterControl.brokerRegistrations().get(1));
assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord().
setBrokerId(1).
setBrokerEpoch(100);
clusterControl.replay(unregisterRecord);
assertFalse(clusterControl.brokerRegistrations().containsKey(1));
assertFalse(clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).isPresent());
}

@ParameterizedTest
Expand Down Expand Up @@ -382,7 +385,7 @@ public void testPlaceReplicas(int numUsableBrokers) throws Exception {
setPort((short) 9092).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord);
clusterControl.replay(brokerRecord, 100L);
UnfenceBrokerRecord unfenceRecord =
new UnfenceBrokerRecord().setId(i).setEpoch(100);
clusterControl.replay(unfenceRecord);
Expand Down Expand Up @@ -442,7 +445,7 @@ public void testIterator(MetadataVersion metadataVersion) throws Exception {
setPort((short) 9092 + i).
setName("PLAINTEXT").
setHost("example.com"));
clusterControl.replay(brokerRecord);
clusterControl.replay(brokerRecord, 100L);
}
for (int i = 0; i < 2; i++) {
UnfenceBrokerRecord unfenceBrokerRecord =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void setUp() {
setPort((short) 9092).
setName("PLAINTEXT").
setHost(String.format("broker-%02d.example.org", i)));
clusterControl.replay(brokerRecord);
clusterControl.replay(brokerRecord, 100L);
}

this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,25 @@ public static void replayAll(Object target,
for (ApiMessageAndVersion recordAndVersion : recordsAndVersions) {
ApiMessage record = recordAndVersion.message();
try {
Method method = target.getClass().getMethod("replay", record.getClass());
method.invoke(target, record);
} catch (NoSuchMethodException e) {
try {
Method method = target.getClass().getMethod("replay",
record.getClass(),
Optional.class);
method.invoke(target, record, Optional.empty());
} catch (NoSuchMethodException t) {
// ignore
} catch (InvocationTargetException t) {
throw new RuntimeException(t);
} catch (IllegalAccessException t) {
throw new RuntimeException(t);
Method method = target.getClass().getMethod("replay", record.getClass());
method.invoke(target, record);
} catch (NoSuchMethodException e) {
try {
Method method = target.getClass().getMethod("replay",
record.getClass(),
Optional.class);
method.invoke(target, record, Optional.empty());
} catch (NoSuchMethodException t) {
try {
Method method = target.getClass().getMethod("replay",
record.getClass(),
long.class);
method.invoke(target, record, 0L);
} catch (NoSuchMethodException i) {
// ignore
}
}
}
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -119,7 +124,7 @@ public static void replayAllBatches(Object target,
* @param delta the metadata delta on which to replay the records
* @param highestOffset highest offset from the list of record batches
* @param highestEpoch highest epoch from the list of record batches
* @param recordsAndVersions list of batches of records
* @param batches list of batches of records
*/
public static void replayAllBatches(
MetadataDelta delta,
Expand Down
Loading

0 comments on commit 50e5b32

Please sign in to comment.