Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ class ZkMigrationIntegrationTest {
readyFuture.get(30, TimeUnit.SECONDS)

val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

def inDualWrite(): Boolean = {
val migrationState = kraftCluster.controllers().get(3000).migrationSupport.get.migrationDriver.migrationState().get(10, TimeUnit.SECONDS)
Expand Down Expand Up @@ -286,7 +289,10 @@ class ZkMigrationIntegrationTest {

// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

// Alter the metadata
log.info("Updating metadata with AdminClient")
Expand Down Expand Up @@ -358,7 +364,10 @@ class ZkMigrationIntegrationTest {

// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

// Alter the metadata
log.info("Updating metadata with AdminClient")
Expand Down Expand Up @@ -422,7 +431,10 @@ class ZkMigrationIntegrationTest {

// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

// Alter the metadata
log.info("Updating metadata with AdminClient")
Expand Down Expand Up @@ -481,7 +493,10 @@ class ZkMigrationIntegrationTest {

// Wait for migration to begin
log.info("Waiting for ZK migration to begin")
TestUtils.waitUntilTrue(() => zkClient.getControllerId.contains(3000), "Timed out waiting for KRaft controller to take over")
TestUtils.waitUntilTrue(
() => zkClient.getControllerId.contains(3000),
"Timed out waiting for KRaft controller to take over",
30_000)

// Alter the metadata
log.info("Create new topic with AdminClient")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,16 @@ private boolean isValidStateChange(MigrationDriverState newState) {
}
}

private boolean checkDriverState(MigrationDriverState expectedState) {
if (migrationState.equals(expectedState)) {
return true;
} else {
log.info("Expected driver state {} but found {}. Not running this event {}.",
expectedState, migrationState, this.getClass().getSimpleName());
return false;
}
}

private void transitionTo(MigrationDriverState newState) {
if (!isValidStateChange(newState)) {
throw new IllegalStateException(
Expand Down Expand Up @@ -460,11 +470,21 @@ public void run() throws Exception {
KRaftMigrationDriver.this.image = image;
String metadataType = isSnapshot ? "snapshot" : "delta";

if (migrationState.equals(MigrationDriverState.INACTIVE)) {
// No need to log anything if this node is not the active controller
completionHandler.accept(null);
return;
}

if (!migrationState.allowDualWrite()) {
log.trace("Received metadata {}, but the controller is not in dual-write " +
"mode. Ignoring the change to be replicated to Zookeeper", metadataType);
completionHandler.accept(null);
wakeup();
// If the driver is active and dual-write is not yet enabled, then the migration has not yet begun.
// Only wake up the thread if the broker registrations have changed
if (delta.clusterDelta() != null) {
wakeup();
}
return;
}

Expand Down Expand Up @@ -520,7 +540,7 @@ class WaitForControllerQuorumEvent extends MigrationEvent {

@Override
public void run() throws Exception {
if (migrationState.equals(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM)) {
if (checkDriverState(MigrationDriverState.WAIT_FOR_CONTROLLER_QUORUM)) {
if (!firstPublish) {
log.trace("Waiting until we have received metadata before proceeding with migration");
return;
Expand Down Expand Up @@ -564,24 +584,19 @@ public void run() throws Exception {
class WaitForZkBrokersEvent extends MigrationEvent {
@Override
public void run() throws Exception {
switch (migrationState) {
case WAIT_FOR_BROKERS:
if (areZkBrokersReadyForMigration()) {
log.info("Zk brokers are registered and ready for migration");
transitionTo(MigrationDriverState.BECOME_CONTROLLER);
}
break;
default:
// Ignore the event as we're not in the appropriate state anymore.
break;
if (checkDriverState(MigrationDriverState.WAIT_FOR_BROKERS)) {
if (areZkBrokersReadyForMigration()) {
log.info("Zk brokers are registered and ready for migration");
transitionTo(MigrationDriverState.BECOME_CONTROLLER);
}
}
}
}

class BecomeZkControllerEvent extends MigrationEvent {
@Override
public void run() throws Exception {
if (migrationState == MigrationDriverState.BECOME_CONTROLLER) {
if (checkDriverState(MigrationDriverState.BECOME_CONTROLLER)) {
applyMigrationOperation("Claiming ZK controller leadership", zkMigrationClient::claimControllerLeadership);
if (migrationLeadershipState.zkControllerEpochZkVersion() == -1) {
log.info("Unable to claim leadership, will retry until we learn of a different KRaft leader");
Expand All @@ -599,6 +614,9 @@ public void run() throws Exception {
class MigrateMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
if (!checkDriverState(MigrationDriverState.ZK_MIGRATION)) {
return;
}
Comment on lines +617 to +619
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual fix. The rest of the changes are for cleanliness/readability

Set<Integer> brokersInMetadata = new HashSet<>();
log.info("Starting ZK migration");
MigrationManifest.Builder manifestBuilder = MigrationManifest.newBuilder(time);
Expand Down Expand Up @@ -655,7 +673,7 @@ public void run() throws Exception {
class SyncKRaftMetadataEvent extends MigrationEvent {
@Override
public void run() throws Exception {
if (migrationState == MigrationDriverState.SYNC_KRAFT_TO_ZK) {
if (checkDriverState(MigrationDriverState.SYNC_KRAFT_TO_ZK)) {
log.info("Performing a full metadata sync from KRaft to ZK.");
Map<String, Integer> dualWriteCounts = new TreeMap<>();
zkMetadataWriter.handleSnapshot(image, countingOperationConsumer(
Expand All @@ -671,7 +689,7 @@ class SendRPCsToBrokersEvent extends MigrationEvent {
@Override
public void run() throws Exception {
// Ignore sending RPCs to the brokers since we're no longer in the state.
if (migrationState == MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM) {
if (checkDriverState(MigrationDriverState.KRAFT_CONTROLLER_TO_BROKER_COMM)) {
if (image.highestOffsetAndEpoch().compareTo(migrationLeadershipState.offsetAndEpoch()) >= 0) {
log.trace("Sending RPCs to broker before moving to dual-write mode using " +
"at offset and epoch {}", image.highestOffsetAndEpoch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -634,4 +635,49 @@ public void testControllerFailover() throws Exception {
assertEquals(new ConfigResource(ConfigResource.Type.TOPIC, "foo"), configClient.deletedResources.get(0));
});
}

@Test
public void testBeginMigrationOnce() throws Exception {
AtomicInteger migrationBeginCalls = new AtomicInteger(0);
NoOpRecordConsumer recordConsumer = new NoOpRecordConsumer() {
@Override
public void beginMigration() {
migrationBeginCalls.incrementAndGet();
}
};
CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator();
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1, 2, 3).build();
MockFaultHandler faultHandler = new MockFaultHandler("testTwoMigrateMetadataEvents");
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setZkRecordConsumer(recordConsumer)
.setPropagator(metadataPropagator)
.setFaultHandler(faultHandler);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

driver.start();
delta.replay(ZkMigrationState.PRE_MIGRATION.toRecord().message());
delta.replay(zkBrokerRecord(1));
delta.replay(zkBrokerRecord(2));
delta.replay(zkBrokerRecord(3));
MetadataProvenance provenance = new MetadataProvenance(100, 1, 1);
image = delta.apply(provenance);

driver.onControllerChange(new LeaderAndEpoch(OptionalInt.of(3000), 1));

// Call onMetadataUpdate twice. The first call will trigger the migration to begin (due to presence of brokers)
// Both calls will "wakeup" the driver and cause a PollEvent to be run. Calling these back-to-back effectively
// causes two MigrateMetadataEvents to be enqueued. Ensure only one is actually run.
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));
driver.onMetadataUpdate(delta, image, new LogDeltaManifest(provenance,
new LeaderAndEpoch(OptionalInt.of(3000), 1), 1, 100, 42));

TestUtils.waitForCondition(() -> driver.migrationState().get(1, TimeUnit.MINUTES).equals(MigrationDriverState.DUAL_WRITE),
"Waiting for KRaftMigrationDriver to enter ZK_MIGRATION state");
assertEquals(1, migrationBeginCalls.get());
}
}
}