From 50e5b32a6d5f7940e932c6acb20a452535c0bf60 Mon Sep 17 00:00:00 2001 From: dengziming Date: Sat, 13 Aug 2022 00:06:24 +0800 Subject: [PATCH] KAFKA-13959: Controller should unfence Broker with busy metadata log (#12274) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The reason for KAFKA-13959 is a little complex, the two keys to this problem are: KafkaRaftClient.MAX_FETCH_WAIT_MS==MetadataMaxIdleIntervalMs == 500ms. We rely on fetchPurgatory to complete a FetchRequest, in details, if FetchRequest.fetchOffset >= log.endOffset, we will wait for 500ms to send a FetchResponse. The follower needs to send one more FetchRequest to get the HW. Here are the event sequences: 1. When starting the leader(active controller) LEO=m+1(m is the offset of the last record), leader HW=m(because we need more than half of the voters to reach m+1) 2. Follower (standby controller) and observer (broker) send FetchRequest(fetchOffset=m) 2.1. leader receives FetchRequest, set leader HW=m and waits 500ms before send FetchResponse 2.2. leader send FetchResponse(HW=m) 3.3 broker receive FetchResponse(HW=m), set metadataOffset=m. 3. Leader append NoOpRecord, LEO=m+2. leader HW=m 4. Looping 1-4 If we change MAX_FETCH_WAIT_MS=200 (less than half of MetadataMaxIdleIntervalMs), this problem can be solved temporarily. We plan to improve this problem in 2 ways, firstly, in this PR, we change the controller to unfence a broker when the broker's high-watermark has reached the broker registration record for that broker. Secondly, we will propagate the HWM to the replicas as quickly as possible in KAFKA-14145. Reviewers: Luke Chen , José Armando García Sancio --- .../main/scala/kafka/server/KafkaConfig.scala | 2 +- .../controller/BrokerHeartbeatManager.java | 26 +++++++-------- .../controller/ClusterControlManager.java | 25 ++++++++++++-- .../kafka/controller/QuorumController.java | 27 +++++++++++---- .../controller/ReplicationControlManager.java | 6 ++-- .../controller/ClusterControlManagerTest.java | 19 ++++++----- .../ProducerIdControlManagerTest.java | 2 +- .../kafka/metadata/RecordTestUtils.java | 33 +++++++++++-------- .../kafka/raft/internals/FuturePurgatory.java | 4 +-- 9 files changed, 92 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4e253047ee604..860056f9a3e4b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -81,7 +81,7 @@ object Defaults { val BrokerHeartbeatIntervalMs = 2000 val BrokerSessionTimeoutMs = 9000 val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024 - val MetadataMaxIdleIntervalMs = 5000 + val MetadataMaxIdleIntervalMs = 500 /** KRaft mode configs */ val EmptyNodeId: Int = -1 diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java index f31df917d7679..428f1c5833ea4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java @@ -511,17 +511,17 @@ BrokerControlState currentBrokerState(BrokerHeartbeatState broker) { /** * Calculate the next broker state for a broker that just sent a heartbeat request. * - * @param brokerId The broker id. - * @param request The incoming heartbeat request. - * @param lastCommittedOffset The last committed offset of the quorum controller. - * @param hasLeaderships A callback which evaluates to true if the broker leads - * at least one partition. + * @param brokerId The broker id. + * @param request The incoming heartbeat request. + * @param registerBrokerRecordOffset The offset of the broker's {@link org.apache.kafka.common.metadata.RegisterBrokerRecord}. + * @param hasLeaderships A callback which evaluates to true if the broker leads + * at least one partition. * - * @return The current and next broker states. + * @return The current and next broker states. */ BrokerControlStates calculateNextBrokerState(int brokerId, BrokerHeartbeatRequestData request, - long lastCommittedOffset, + long registerBrokerRecordOffset, Supplier hasLeaderships) { BrokerHeartbeatState broker = brokers.getOrDefault(brokerId, new BrokerHeartbeatState(brokerId)); @@ -533,17 +533,17 @@ BrokerControlStates calculateNextBrokerState(int brokerId, "shutdown.", brokerId); return new BrokerControlStates(currentState, SHUTDOWN_NOW); } else if (!request.wantFence()) { - if (request.currentMetadataOffset() >= lastCommittedOffset) { + if (request.currentMetadataOffset() >= registerBrokerRecordOffset) { log.info("The request from broker {} to unfence has been granted " + - "because it has caught up with the last committed metadata " + - "offset {}.", brokerId, lastCommittedOffset); + "because it has caught up with the offset of it's register " + + "broker record {}.", brokerId, registerBrokerRecordOffset); return new BrokerControlStates(currentState, UNFENCED); } else { if (log.isDebugEnabled()) { log.debug("The request from broker {} to unfence cannot yet " + - "be granted because it has not caught up with the last " + - "committed metadata offset {}. It is still at offset {}.", - brokerId, lastCommittedOffset, request.currentMetadataOffset()); + "be granted because it has not caught up with the offset of " + + "it's register broker record {}. It is still at offset {}.", + brokerId, registerBrokerRecordOffset, request.currentMetadataOffset()); } return new BrokerControlStates(currentState, FENCED); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 235f077cfff7a..d30f43242179f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -60,6 +60,7 @@ import java.util.Map.Entry; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.OptionalLong; import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -217,6 +218,14 @@ boolean check() { */ private final TimelineHashMap brokerRegistrations; + /** + * Save the offset of each broker registration record, we will only unfence a + * broker when its high watermark has reached its broker registration record, + * this is not necessarily the exact offset of each broker registration record + * but should not be smaller than it. + */ + private final TimelineHashMap registerBrokerRecordOffsets; + /** * A reference to the controller's metrics registry. */ @@ -255,6 +264,7 @@ private ClusterControlManager( this.sessionTimeoutNs = sessionTimeoutNs; this.replicaPlacer = replicaPlacer; this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0); + this.registerBrokerRecordOffsets = new TimelineHashMap<>(snapshotRegistry, 0); this.heartbeatManager = null; this.readyBrokersFuture = Optional.empty(); this.controllerMetrics = metrics; @@ -366,7 +376,15 @@ public ControllerResult registerBroker( return ControllerResult.atomicOf(records, new BrokerRegistrationReply(brokerEpoch)); } - public void replay(RegisterBrokerRecord record) { + public OptionalLong registerBrokerRecordOffset(int brokerId) { + if (registerBrokerRecordOffsets.containsKey(brokerId)) { + return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId)); + } + return OptionalLong.empty(); + } + + public void replay(RegisterBrokerRecord record, long offset) { + registerBrokerRecordOffsets.put(record.brokerId(), offset); int brokerId = record.brokerId(); List listeners = new ArrayList<>(); for (BrokerEndpoint endpoint : record.endPoints()) { @@ -401,14 +419,15 @@ public void replay(RegisterBrokerRecord record) { } public void replay(UnregisterBrokerRecord record) { + registerBrokerRecordOffsets.remove(record.brokerId()); int brokerId = record.brokerId(); BrokerRegistration registration = brokerRegistrations.get(brokerId); if (registration == null) { throw new RuntimeException(String.format("Unable to replay %s: no broker " + - "registration found for that id", record.toString())); + "registration found for that id", record)); } else if (registration.epoch() != record.brokerEpoch()) { throw new RuntimeException(String.format("Unable to replay %s: no broker " + - "registration with that epoch found", record.toString())); + "registration with that epoch found", record)); } else { if (heartbeatManager != null) heartbeatManager.remove(brokerId); brokerRegistrations.remove(brokerId); diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index ef87248f13420..3fee25841ba74 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.BrokerIdNotRegisteredException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.NotControllerException; +import org.apache.kafka.common.errors.StaleBrokerEpochException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.AllocateProducerIdsRequestData; @@ -759,7 +760,7 @@ public void run() throws Exception { int i = 1; for (ApiMessageAndVersion message : result.records()) { try { - replay(message.message(), Optional.empty()); + replay(message.message(), Optional.empty(), writeOffset + result.records().size()); } catch (Throwable e) { String failureMessage = String.format("Unable to apply %s record, which was " + "%d of %d record(s) in the batch following last writeOffset %d.", @@ -883,7 +884,7 @@ public void handleCommit(BatchReader reader) { int i = 1; for (ApiMessageAndVersion message : messages) { try { - replay(message.message(), Optional.empty()); + replay(message.message(), Optional.empty(), offset); } catch (Throwable e) { String failureMessage = String.format("Unable to apply %s record on standby " + "controller, which was %d of %d record(s) in the batch with baseOffset %d.", @@ -938,7 +939,7 @@ public void handleSnapshot(SnapshotReader reader) { int i = 1; for (ApiMessageAndVersion message : messages) { try { - replay(message.message(), Optional.of(reader.snapshotId())); + replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset()); } catch (Throwable e) { String failureMessage = String.format("Unable to apply %s record " + "from snapshot %s on standby controller, which was %d of " + @@ -1305,12 +1306,19 @@ private void handleFeatureControlChange() { } } - @SuppressWarnings("unchecked") - private void replay(ApiMessage message, Optional snapshotId) { + /** + * Apply the metadata record to its corresponding in-memory state(s) + * + * @param message The metadata record + * @param snapshotId The snapshotId if this record is from a snapshot + * @param batchLastOffset The offset of the last record in the log batch, or the lastContainedLogOffset + * if this record is from a snapshot, this is used along with RegisterBrokerRecord + */ + private void replay(ApiMessage message, Optional snapshotId, long batchLastOffset) { MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); switch (type) { case REGISTER_BROKER_RECORD: - clusterControl.replay((RegisterBrokerRecord) message); + clusterControl.replay((RegisterBrokerRecord) message, batchLastOffset); break; case UNREGISTER_BROKER_RECORD: clusterControl.replay((UnregisterBrokerRecord) message); @@ -1874,8 +1882,13 @@ public CompletableFuture processBrokerHeartbeat( @Override public ControllerResult generateRecordsAndResult() { + OptionalLong offsetForRegisterBrokerRecord = clusterControl.registerBrokerRecordOffset(brokerId); + if (!offsetForRegisterBrokerRecord.isPresent()) { + throw new StaleBrokerEpochException( + String.format("Receive a heartbeat from broker %d before registration", brokerId)); + } ControllerResult result = replicationControl. - processBrokerHeartbeat(request, lastCommittedOffset); + processBrokerHeartbeat(request, offsetForRegisterBrokerRecord.getAsLong()); inControlledShutdown = result.response().inControlledShutdown(); rescheduleMaybeFenceStaleBrokers(); return result; diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index bf3a679d2cef0..4ffb339967c2f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1355,13 +1355,13 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, } ControllerResult processBrokerHeartbeat( - BrokerHeartbeatRequestData request, long lastCommittedOffset) { + BrokerHeartbeatRequestData request, long registerBrokerRecordOffset) { int brokerId = request.brokerId(); long brokerEpoch = request.brokerEpoch(); clusterControl.checkBrokerEpoch(brokerId, brokerEpoch); BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId, - request, lastCommittedOffset, () -> brokersToIsrs.hasLeaderships(brokerId)); + request, registerBrokerRecordOffset, () -> brokersToIsrs.hasLeaderships(brokerId)); List records = new ArrayList<>(); if (states.current() != states.next()) { switch (states.next()) { @@ -1382,7 +1382,7 @@ ControllerResult processBrokerHeartbeat( heartbeatManager.touch(brokerId, states.next().fenced(), request.currentMetadataOffset()); - boolean isCaughtUp = request.currentMetadataOffset() >= lastCommittedOffset; + boolean isCaughtUp = request.currentMetadataOffset() >= registerBrokerRecordOffset; BrokerHeartbeatReply reply = new BrokerHeartbeatReply(isCaughtUp, states.next().fenced(), states.next().inControlledShutdown(), diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index 97d6c88377277..e47def81e6d5d 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -96,7 +96,7 @@ public void testReplay(MetadataVersion metadataVersion) { setPort((short) 9092). setName("PLAINTEXT"). setHost("example.com")); - clusterControl.replay(brokerRecord); + clusterControl.replay(brokerRecord, 100L); clusterControl.checkBrokerEpoch(1, 100); assertThrows(StaleBrokerEpochException.class, () -> clusterControl.checkBrokerEpoch(1, 101)); @@ -165,19 +165,20 @@ public void testReplayRegisterBrokerRecord() { setPort((short) 9092). setName("PLAINTEXT"). setHost("example.com")); - clusterControl.replay(brokerRecord); + clusterControl.replay(brokerRecord, 100L); assertFalse(clusterControl.unfenced(0)); assertTrue(clusterControl.inControlledShutdown(0)); brokerRecord.setInControlledShutdown(false); - clusterControl.replay(brokerRecord); + clusterControl.replay(brokerRecord, 100L); assertFalse(clusterControl.unfenced(0)); assertFalse(clusterControl.inControlledShutdown(0)); + assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong()); brokerRecord.setFenced(false); - clusterControl.replay(brokerRecord); + clusterControl.replay(brokerRecord, 100L); assertTrue(clusterControl.unfenced(0)); assertFalse(clusterControl.inControlledShutdown(0)); @@ -217,7 +218,7 @@ public void testReplayBrokerRegistrationChangeRecord() { setPort((short) 9092). setName("PLAINTEXT"). setHost("example.com")); - clusterControl.replay(brokerRecord); + clusterControl.replay(brokerRecord, 100L); assertTrue(clusterControl.unfenced(0)); assertFalse(clusterControl.inControlledShutdown(0)); @@ -341,17 +342,19 @@ public void testUnregister() throws Exception { setFeatureControlManager(featureControl). build(); clusterControl.activate(); - clusterControl.replay(brokerRecord); + clusterControl.replay(brokerRecord, 100L); assertEquals(new BrokerRegistration(1, 100, Uuid.fromString("fPZv1VBsRFmnlRvmGcOW9w"), Collections.singletonMap("PLAINTEXT", new Endpoint("PLAINTEXT", SecurityProtocol.PLAINTEXT, "example.com", 9092)), Collections.emptyMap(), Optional.of("arack"), true, false), clusterControl.brokerRegistrations().get(1)); + assertEquals(100L, clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong()); UnregisterBrokerRecord unregisterRecord = new UnregisterBrokerRecord(). setBrokerId(1). setBrokerEpoch(100); clusterControl.replay(unregisterRecord); assertFalse(clusterControl.brokerRegistrations().containsKey(1)); + assertFalse(clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).isPresent()); } @ParameterizedTest @@ -382,7 +385,7 @@ public void testPlaceReplicas(int numUsableBrokers) throws Exception { setPort((short) 9092). setName("PLAINTEXT"). setHost("example.com")); - clusterControl.replay(brokerRecord); + clusterControl.replay(brokerRecord, 100L); UnfenceBrokerRecord unfenceRecord = new UnfenceBrokerRecord().setId(i).setEpoch(100); clusterControl.replay(unfenceRecord); @@ -442,7 +445,7 @@ public void testIterator(MetadataVersion metadataVersion) throws Exception { setPort((short) 9092 + i). setName("PLAINTEXT"). setHost("example.com")); - clusterControl.replay(brokerRecord); + clusterControl.replay(brokerRecord, 100L); } for (int i = 0; i < 2; i++) { UnfenceBrokerRecord unfenceBrokerRecord = diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java index ccdd3a5b2331f..80c5c505ae0eb 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java @@ -76,7 +76,7 @@ public void setUp() { setPort((short) 9092). setName("PLAINTEXT"). setHost(String.format("broker-%02d.example.org", i))); - clusterControl.replay(brokerRecord); + clusterControl.replay(brokerRecord, 100L); } this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java index f5a8da5f8a2f9..c21bdb544789b 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java @@ -57,20 +57,25 @@ public static void replayAll(Object target, for (ApiMessageAndVersion recordAndVersion : recordsAndVersions) { ApiMessage record = recordAndVersion.message(); try { - Method method = target.getClass().getMethod("replay", record.getClass()); - method.invoke(target, record); - } catch (NoSuchMethodException e) { try { - Method method = target.getClass().getMethod("replay", - record.getClass(), - Optional.class); - method.invoke(target, record, Optional.empty()); - } catch (NoSuchMethodException t) { - // ignore - } catch (InvocationTargetException t) { - throw new RuntimeException(t); - } catch (IllegalAccessException t) { - throw new RuntimeException(t); + Method method = target.getClass().getMethod("replay", record.getClass()); + method.invoke(target, record); + } catch (NoSuchMethodException e) { + try { + Method method = target.getClass().getMethod("replay", + record.getClass(), + Optional.class); + method.invoke(target, record, Optional.empty()); + } catch (NoSuchMethodException t) { + try { + Method method = target.getClass().getMethod("replay", + record.getClass(), + long.class); + method.invoke(target, record, 0L); + } catch (NoSuchMethodException i) { + // ignore + } + } } } catch (InvocationTargetException e) { throw new RuntimeException(e); @@ -119,7 +124,7 @@ public static void replayAllBatches(Object target, * @param delta the metadata delta on which to replay the records * @param highestOffset highest offset from the list of record batches * @param highestEpoch highest epoch from the list of record batches - * @param recordsAndVersions list of batches of records + * @param batches list of batches of records */ public static void replayAllBatches( MetadataDelta delta, diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java b/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java index b37fb3a3847c5..e5dceeaa0c30e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/FuturePurgatory.java @@ -56,8 +56,8 @@ public interface FuturePurgatory> { CompletableFuture await(T threshold, long maxWaitTimeMs); /** - * Complete awaiting futures whose associated values are larger than the given threshold value. - * The completion callbacks will be triggered from the calling thread. + * Complete awaiting futures whose threshold value from {@link FuturePurgatory#await} are smaller + * than the given threshold value. The completion callbacks will be triggered from the calling thread. * * @param value the threshold value used to determine which futures can be completed * @param currentTimeMs the current time in milliseconds that will be passed to