Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -256,21 +256,22 @@ class ControllerServer(
}
val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder)
val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config)
val migrationDriver = new KRaftMigrationDriver(
config.nodeId,
controller.asInstanceOf[QuorumController].zkRecordConsumer(),
migrationClient,
propagator,
publisher => sharedServer.loader.installPublishers(java.util.Collections.singletonList(publisher)),
sharedServer.faultHandlerFactory.build(
val migrationDriver = KRaftMigrationDriver.newBuilder()
.setNodeId(config.nodeId)
.setZkRecordConsumer(controller.asInstanceOf[QuorumController].zkRecordConsumer())
.setZkMigrationClient(migrationClient)
.setPropagator(propagator)
.setInitialZkLoadHandler(publisher => sharedServer.loader.installPublishers(java.util.Collections.singletonList(publisher)))
.setFaultHandler(sharedServer.faultHandlerFactory.build(
"zk migration",
fatal = false,
() => {}
),
quorumFeatures,
configSchema,
quorumControllerMetrics
)
))
.setQuorumFeatures(quorumFeatures)
.setConfigSchema(configSchema)
.setControllerMetrics(quorumControllerMetrics)
.setTime(time)
.build()
migrationDriver.start()
migrationSupport = Some(ControllerMigrationSupport(zkClient, migrationDriver, propagator))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public long nextPollTimeMs() {
private volatile MetadataImage image;
private volatile boolean firstPublish;

public KRaftMigrationDriver(
KRaftMigrationDriver(
int nodeId,
ZkRecordConsumer zkRecordConsumer,
MigrationClient zkMigrationClient,
Expand Down Expand Up @@ -145,21 +145,10 @@ public KRaftMigrationDriver(
this.recordRedactor = new RecordRedactor(configSchema);
}

public KRaftMigrationDriver(
int nodeId,
ZkRecordConsumer zkRecordConsumer,
MigrationClient zkMigrationClient,
LegacyPropagator propagator,
Consumer<MetadataPublisher> initialZkLoadHandler,
FaultHandler faultHandler,
QuorumFeatures quorumFeatures,
KafkaConfigSchema configSchema,
QuorumControllerMetrics controllerMetrics
) {
this(nodeId, zkRecordConsumer, zkMigrationClient, propagator, initialZkLoadHandler, faultHandler, quorumFeatures, configSchema, controllerMetrics, Time.SYSTEM);
public static Builder newBuilder() {
return new Builder();
}


public void start() {
eventQueue.prepend(new PollEvent());
}
Expand Down Expand Up @@ -756,4 +745,109 @@ static KRaftMigrationOperationConsumer countingOperationConsumer(
operationConsumer.accept(logMsg, operation);
};
}

public static class Builder {
private Integer nodeId;
private ZkRecordConsumer zkRecordConsumer;
private MigrationClient zkMigrationClient;
private LegacyPropagator propagator;
private Consumer<MetadataPublisher> initialZkLoadHandler;
private FaultHandler faultHandler;
private QuorumFeatures quorumFeatures;
private KafkaConfigSchema configSchema;
private QuorumControllerMetrics controllerMetrics;
private Time time;

public Builder setNodeId(int nodeId) {
this.nodeId = nodeId;
return this;
}

public Builder setZkRecordConsumer(ZkRecordConsumer zkRecordConsumer) {
this.zkRecordConsumer = zkRecordConsumer;
return this;
}

public Builder setZkMigrationClient(MigrationClient zkMigrationClient) {
this.zkMigrationClient = zkMigrationClient;
return this;
}

public Builder setPropagator(LegacyPropagator propagator) {
this.propagator = propagator;
return this;
}

public Builder setInitialZkLoadHandler(Consumer<MetadataPublisher> initialZkLoadHandler) {
this.initialZkLoadHandler = initialZkLoadHandler;
return this;
}

public Builder setFaultHandler(FaultHandler faultHandler) {
this.faultHandler = faultHandler;
return this;
}

public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
this.quorumFeatures = quorumFeatures;
return this;
}

public Builder setConfigSchema(KafkaConfigSchema configSchema) {
this.configSchema = configSchema;
return this;
}

public Builder setControllerMetrics(QuorumControllerMetrics controllerMetrics) {
this.controllerMetrics = controllerMetrics;
return this;
}

public Builder setTime(Time time) {
this.time = time;
return this;
}

public KRaftMigrationDriver build() {
if (nodeId == null) {
throw new IllegalStateException("You must specify the node ID of this controller.");
}
if (zkRecordConsumer == null) {
throw new IllegalStateException("You must specify the ZkRecordConsumer.");
}
if (zkMigrationClient == null) {
throw new IllegalStateException("You must specify the MigrationClient.");
}
if (propagator == null) {
throw new IllegalStateException("You must specify the MetadataPropagator.");
}
if (initialZkLoadHandler == null) {
throw new IllegalStateException("You must specify the initial ZK load callback.");
}
if (faultHandler == null) {
throw new IllegalStateException("You must specify the FaultHandler.");
}
if (configSchema == null) {
throw new IllegalStateException("You must specify the KafkaConfigSchema.");
}
if (controllerMetrics == null) {
throw new IllegalStateException("You must specify the QuorumControllerMetrics.");
}
if (time == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time is the only argument that can be undefined?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, yea I figured this one was fine to have a default for, but actually we only have two usages of the builder so I'll just make it required.

throw new IllegalStateException("You must specify the Time.");
}
return new KRaftMigrationDriver(
nodeId,
zkRecordConsumer,
zkMigrationClient,
propagator,
initialZkLoadHandler,
faultHandler,
quorumFeatures,
configSchema,
controllerMetrics,
time
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,22 @@ public long nanoseconds() {
}
};

/**
* Return a {@link org.apache.kafka.metadata.migration.KRaftMigrationDriver.Builder} that uses the mocks
* defined in this class.
*/
KRaftMigrationDriver.Builder defaultTestBuilder() {
return KRaftMigrationDriver.newBuilder()
.setNodeId(3000)
.setZkRecordConsumer(new NoOpRecordConsumer())
.setInitialZkLoadHandler(metadataPublisher -> { })
.setFaultHandler(new MockFaultHandler("test"))
.setQuorumFeatures(quorumFeatures)
.setConfigSchema(KafkaConfigSchema.EMPTY)
.setControllerMetrics(metrics)
.setTime(mockTime);
}

@BeforeEach
public void setup() {
apiVersions.update("4", new NodeApiVersions(Collections.emptyList(), Collections.emptyList(), true));
Expand Down Expand Up @@ -226,18 +242,12 @@ public void testOnlySendNeededRPCsToBrokers() throws Exception {
.setBrokersInZk(1, 2, 3)
.setConfigMigrationClient(configClient)
.build();
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setPropagator(metadataPropagator)
.setInitialZkLoadHandler(metadataPublisher -> { });

try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

Expand Down Expand Up @@ -312,18 +322,11 @@ public ZkMigrationLeadershipState claimControllerLeadership(ZkMigrationLeadershi
}
};
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setFaultHandler(faultHandler)
.setPropagator(metadataPropagator);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

Expand Down Expand Up @@ -358,19 +361,10 @@ public void testShouldNotMoveToNextStateIfControllerNodesAreNotReadyToMigrate()
CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().setBrokersInZk(1).build();
apiVersions.remove("6");

try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> {
},
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setPropagator(metadataPropagator);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

Expand Down Expand Up @@ -406,18 +400,11 @@ public void testSkipWaitForBrokersInDualWrite() throws Exception {
CapturingMigrationClient migrationClient = new CapturingMigrationClient(Collections.emptySet(),
new CapturingTopicMigrationClient(), new CapturingConfigMigrationClient(), new CapturingAclMigrationClient());
MockFaultHandler faultHandler = new MockFaultHandler("testMigrationClientExpiration");
try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
faultHandler,
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setPropagator(metadataPropagator)
.setFaultHandler(faultHandler);
try (KRaftMigrationDriver driver = builder.build()) {
MetadataImage image = MetadataImage.EMPTY;
MetadataDelta delta = new MetadataDelta(image);

Expand Down Expand Up @@ -478,19 +465,10 @@ public void iterateTopics(EnumSet<TopicVisitorInterest> interests, TopicVisitor
.setTopicMigrationClient(topicClient)
.setConfigMigrationClient(configClient)
.build();

try (KRaftMigrationDriver driver = new KRaftMigrationDriver(
3000,
new NoOpRecordConsumer(),
migrationClient,
metadataPropagator,
metadataPublisher -> { },
new MockFaultHandler("test"),
quorumFeatures,
KafkaConfigSchema.EMPTY,
metrics,
mockTime
)) {
KRaftMigrationDriver.Builder builder = defaultTestBuilder()
.setZkMigrationClient(migrationClient)
.setPropagator(metadataPropagator);
try (KRaftMigrationDriver driver = builder.build()) {
verifier.verify(driver, migrationClient, topicClient, configClient);
}
}
Expand Down