Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,6 @@ public CompletableFuture<MigrationDriverState> migrationState() {
return stateFuture;
}

private void recoverMigrationStateFromZK() {
applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState);
String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
log.info("Initial migration of ZK metadata is {}.", maybeDone);

// Once we've recovered the migration state from ZK, install this class as a metadata publisher
// by calling the initialZkLoadHandler.
initialZkLoadHandler.accept(this);

// Transition to INACTIVE state and wait for leadership events.
transitionTo(MigrationDriverState.INACTIVE);
}

private boolean isControllerQuorumReadyForMigration() {
Optional<String> notReadyMsg = this.quorumFeatures.reasonAllControllersZkMigrationNotReady(
image.features().metadataVersion(), image.cluster().controllers());
Expand Down Expand Up @@ -414,7 +401,7 @@ public String toString() {
/**
* An event generated by a call to {@link MetadataPublisher#onControllerChange}. This will not be called until
* this class is registered with {@link org.apache.kafka.image.loader.MetadataLoader}. The registration happens
* after the migration state is loaded from ZooKeeper in {@link #recoverMigrationStateFromZK}.
* after the migration state is loaded from ZooKeeper in {@link RecoverMigrationStateFromZKEvent}.
*/
class KRaftLeaderEvent extends MigrationEvent {
private final LeaderAndEpoch leaderAndEpoch;
Expand Down Expand Up @@ -786,12 +773,31 @@ public void run() throws Exception {
}
}

class RecoverMigrationStateFromZKEvent extends MigrationEvent {
@Override
public void run() throws Exception {
if (checkDriverState(MigrationDriverState.UNINITIALIZED, this)) {
applyMigrationOperation("Recovering migration state from ZK", zkMigrationClient::getOrCreateMigrationRecoveryState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, was this the line where uncaught exception is thrown? Can we handle the exception more gracefully and log and error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is where the uncaught exception thrown. The exception will be handled by its parent
MigrationEvent#handleException, and we'll log error there, and even call the faultHandler.handleFault to handle fatal errors.

public void handleException(Throwable e) {
if (e instanceof MigrationClientAuthException) {
KRaftMigrationDriver.this.faultHandler.handleFault("Encountered ZooKeeper authentication in " + this, e);
} else if (e instanceof MigrationClientException) {
log.info(String.format("Encountered ZooKeeper error during event %s. Will retry.", this), e.getCause());
} else if (e instanceof RejectedExecutionException) {
log.debug("Not processing {} because the event queue is closed.", this);
} else {
KRaftMigrationDriver.this.faultHandler.handleFault("Unhandled error in " + this, e);

Thanks.

String maybeDone = migrationLeadershipState.initialZkMigrationComplete() ? "done" : "not done";
log.info("Initial migration of ZK metadata is {}.", maybeDone);

// Once we've recovered the migration state from ZK, install this class as a metadata publisher
// by calling the initialZkLoadHandler.
initialZkLoadHandler.accept(KRaftMigrationDriver.this);

// Transition to INACTIVE state and wait for leadership events.
transitionTo(MigrationDriverState.INACTIVE);
}
}
}

class PollEvent extends MigrationEvent {

@Override
public void run() throws Exception {
switch (migrationState) {
case UNINITIALIZED:
recoverMigrationStateFromZK();
eventQueue.append(new RecoverMigrationStateFromZKEvent());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use prepend to make sure this event is executed ASAP

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need. Like I said in this comment, in the UNINITIALIZED state, the only event we will receive is the pollEvent. We'll receive additionalonControllerChange (KRaftLeaderEvent) and onMetadataUpdate (MetadataChangeEvent) after completing RecoverMigrationStateFromZKEvent. So, we don't have to worry about the order at this moment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we allowing a race between the RecoverMigrationStateFromZKEvent and the next PollEvent scheduled after the switch? Maybe this could be more straightforward if we only schedule the next poll once RecoverMigrationStateFromZKEvent finishes, either normally or exceptionally? WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, I don't think my question makes sense. The following PollEvent can only after RecoverMigrationStateFromZKEvent finishes.

break;
case INACTIVE:
// Nothing to do when the driver is inactive. We must wait until a KRaftLeaderEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void testOnlySendNeededRPCsToBrokers(boolean registerControllers) throws
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, registerControllers);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
Expand Down Expand Up @@ -338,7 +338,7 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi
MetadataDelta delta = new MetadataDelta(image);
setupDeltaForMigration(delta, true);

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
Expand All @@ -363,6 +363,62 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi
}
}

@Test
public void testMigrationWithClientExceptionWhileMigratingZnodeCreation() throws Exception {
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
// suppose the ZNode creation failed 3 times
CountDownLatch createZnodeAttempts = new CountDownLatch(3);
CapturingMigrationClient migrationClient = new CapturingMigrationClient(new HashSet<>(Arrays.asList(1, 2, 3)),
new CapturingTopicMigrationClient(),
new CapturingConfigMigrationClient(),
new CapturingAclMigrationClient(),
new CapturingDelegationTokenMigrationClient(),
CapturingMigrationClient.EMPTY_BATCH_SUPPLIER) {
@Override
public ZkMigrationLeadershipState getOrCreateMigrationRecoveryState(ZkMigrationLeadershipState initialState) {
if (createZnodeAttempts.getCount() == 0) {
this.setMigrationRecoveryState(initialState);
return initialState;
} else {
createZnodeAttempts.countDown();
throw new MigrationClientException("Some kind of ZK error!");
}
}
};
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setFaultHandler(faultHandler)
.setPropagator(metadataPropagator);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);
setupDeltaForMigration(delta, true);

startAndWaitForRecoveringMigrationStateFromZK(driver);

delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
delta.replay(zkBrokerRecord(3));
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
image = delta.apply(provenance);
// Before leadership claiming, the getOrCreateMigrationRecoveryState should be able to get correct state
assertTrue(createZnodeAttempts.await(1, TimeUnit.MINUTES));

// Notify the driver that it is the leader
driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));
// Publish metadata of all the ZK brokers being ready
driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());

TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");

Assertions.assertNull(faultHandler.firstException());
}
}

private void setupDeltaForMigration(
MetadataDelta delta,
boolean registerControllers
Expand Down Expand Up @@ -413,7 +469,7 @@ public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate(
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
if (allNodePresent) {
setupDeltaWithControllerRegistrations(delta, Arrays.asList(4, 5, 6), Arrays.asList());
} else {
Expand Down Expand Up @@ -469,7 +525,7 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception {
migrationClient.setMigrationRecoveryState(
ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1));

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
Expand All @@ -483,7 +539,7 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception {
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());

TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
}
}

Expand Down Expand Up @@ -546,7 +602,7 @@ public void testTopicDualWriteSnapshot() throws Exception {
DelegationTokenImage.EMPTY);
MetadataDelta delta = new MetadataDelta(image);

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(0));
Expand All @@ -565,7 +621,7 @@ public void testTopicDualWriteSnapshot() throws Exception {

// Wait for migration
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");

// Modify topics in a KRaft snapshot -- delete foo, modify bar, add baz, add new foo, add bam, delete bam
provenance = new MetadataProvenance(200, 1, 1);
Expand Down Expand Up @@ -601,7 +657,7 @@ public void testTopicDualWriteDelta() throws Exception {
DelegationTokenImage.EMPTY);
MetadataDelta delta = new MetadataDelta(image);

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(0));
Expand Down Expand Up @@ -656,7 +712,7 @@ public void testNoDualWriteBeforeMigration() throws Exception {
DelegationTokenImage.EMPTY);
MetadataDelta delta = new MetadataDelta(image);

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(0));
Expand All @@ -673,7 +729,7 @@ public void testNoDualWriteBeforeMigration() throws Exception {
driver.onControllerChange(newLeader);

TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM),
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
"Waiting for KRaftMigrationDriver to enter WAIT_FOR_CONTROLLER_QUORUM state");

driver.onMetadataUpdate(delta, image, logDeltaManifestBuilder(provenance, newLeader).build());

Expand Down Expand Up @@ -711,7 +767,7 @@ public void testControllerFailover() throws Exception {
DelegationTokenImage.EMPTY);
MetadataDelta delta = new MetadataDelta(image);

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(0));
Expand Down Expand Up @@ -778,7 +834,7 @@ public CompletableFuture<?> beginMigration() {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
Expand All @@ -798,7 +854,7 @@ public CompletableFuture<?> beginMigration() {
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());

TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
assertEquals(1, migrationBeginCalls.get());
}
}
Expand Down Expand Up @@ -864,7 +920,7 @@ public List<List<ApiMessageAndVersion>> recordBatches() {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

driver.start();
startAndWaitForRecoveringMigrationStateFromZK(driver);
setupDeltaForMigration(delta, true);
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
Expand All @@ -881,10 +937,18 @@ public List<List<ApiMessageAndVersion>> recordBatches() {
new LeaderAndEpoch(OptionalInt.of(3000), 1)).build());

TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
"Waiting for KRaftMigrationDriver to enter DUAL_WRITE state");
Comment on lines 939 to +940
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side fix.


assertEquals(expectedBatchCount, batchesPassedToController.size());
assertEquals(expectedRecordCount, batchesPassedToController.stream().mapToInt(List::size).sum());
}
}

// Wait until the driver has recovered MigrationState From ZK. This is to simulate the driver needs to be installed as the metadata publisher
// so that it can receive onControllerChange (KRaftLeaderEvent) and onMetadataUpdate (MetadataChangeEvent) events.
private void startAndWaitForRecoveringMigrationStateFromZK(KRaftMigrationDriver driver) throws InterruptedException {
driver.start();
TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.INACTIVE),
"Waiting for KRaftMigrationDriver to enter INACTIVE state");
Comment on lines +947 to +952
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary now because in the test suite, we might invoke onControllerChange to append KRaftLeaderEvent before the RecoverMigrationStateFromZKEvent is appended. This won't happen in practice because the driver needs to wait until RecoverMigrationStateFromZKEvent completed to register metadata publisher to receive KRaftLeaderEvent and MetadataChangeEvent.

}
}