Skip to content

KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created#13104

Merged
hachikuji merged 6 commits intoapache:trunkfrom
andrewgrantcflt:KAFKA-14612
Jan 13, 2023
Merged

KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created#13104
hachikuji merged 6 commits intoapache:trunkfrom
andrewgrantcflt:KAFKA-14612

Conversation

@andrewgrantcflt
Copy link
Contributor

@andrewgrantcflt andrewgrantcflt commented Jan 10, 2023

JIRA

https://issues.apache.org/jira/browse/KAFKA-14612

Details

Makes sure we emit ConfigRecords for a topic iff it actually gets created. Currently, we might emit ConfigRecords even if the topic creation fails later in the createTopics method.

I created a new method incrementalAlterConfig in ConfigurationControlManager that is similar to incrementalAlterConfig but it just handles one config at a time. This is used in ReplicationControlManager for each topic. By handling one topic's config at a time, it's easier to isolate each topic's config records. This enables us to make sure we only write config records for topics that get created.

I refactored incrementalAlterConfigResource to return an ApiError. This made it easier to implement the new method incrementalAlterConfig in ConfigurationControlManager because it then doesnt have to search in the Map for the result.

Testing

Enhanced pre-existing test ReplicationControlManagerTest.testCreateTopicsWithConfigs. I ran the tests without the changes to ReplicationControlManager and made sure each assertion ends up failing. Also ran ./gradlew metadata:test --tests org.apache.kafka.controller.ReplicationControlManagerTest.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

ControllerResult<CreateTopicsResponseData> result3 =
replicationControl.createTopics(request3, Collections.singleton("baz"));
assertEquals(INVALID_REPLICATION_FACTOR.code(), result3.response().topics().find("baz").errorCode());
assertEquals(0, result3.records().size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a better assertion is this:

assertEquals(Collections.emptyList(), result3.records());

It gives more information in case of a failure.

setName(topic.name()).
setTopicId(topicId), (short) 0));
// ConfigRecords go after TopicRecord but before PartitionRecord(s).
records.addAll(configResult.records());
Copy link
Contributor

@hachikuji hachikuji Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks a little suspicious. Don't we need to pull only the respective configs for the topic that is being created? We should have a test case with multiple topics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - let me fix that.

@andrewgrantcflt andrewgrantcflt changed the title KAFKA-14612: Make sure to write ConfigRecords to metadata log iff topic is created KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created Jan 11, 2023
Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. Left some minor comments.


private void incrementalAlterConfigResource(ConfigResource configResource,
ControllerResult<ApiError> incrementalAlterConfig(
ConfigResource configResource, Map<String, Entry<OpType, String>> keyToOps,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: move second argument to next line. Not everyone agrees, but I also like to put the closing parenthesis on a new line.

ControllerResult<ApiError> incrementalAlterConfig(
    ConfigResource configResource, 
    Map<String, Entry<OpType, String>> keyToOps,
    boolean newlyCreatedResource
) {
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to that. I agree the closing parenthesis on the next line reads a bit better.

boolean newlyCreatedResource,
List<ApiMessageAndVersion> outputRecords,
Map<ConfigResource, ApiError> outputResults) {
List<ApiMessageAndVersion> outputRecords) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we align the arguments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Fixed.

TopicRecord batchedTopic1Record = (TopicRecord) result4.records().get(0).message();
assertEquals(batchedTopic1, batchedTopic1Record.name());
assertEquals(ConfigRecord.class, result4.records().get(1).message().getClass());
assertEquals(batchedTopic1, ((ConfigRecord) result4.records().get(1).message()).resourceName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if it would be better to assert the full ConfigRecord instead of just checking one field?

Copy link
Contributor Author

@andrewgrantcflt andrewgrantcflt Jan 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue we already have that sort of deeper validation above in the form of:

        ctx.replay(result1.records());
        assertEquals(
            "notNull",
            ctx.configurationControl.getConfigs(new ConfigResource(ConfigResource.Type.TOPIC, "foo")).get("foo")
        );

The testing I'm doing here is more to test we only emit records for topic batchedTopic1. So my thought process was no need to duplicate the more deeper validation.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it is a little different though. In once case, we're validating the result of applying the records. In the other, we're validating the records themselves. You can argue one implies the other, but maybe there's no harm being thorough? By the way, I was also thinking this might simplify the assertions and get rid of the messy-looking cast:

        assertEquals(new ConfigRecord()
            .setResourceName(batchedTopic1)
            .setResourceType(ConfigResource.Type.TOPIC.id())
            .setName("foo")
            .setValue("notNull"),
            result4.records().get(1).message());

Anyway, will leave it up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. It does make sure we are emitting the exact ConfigRecord expected. Updated.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, LGTM

@hachikuji hachikuji merged commit 0d9a702 into apache:trunk Jan 13, 2023
hachikuji pushed a commit that referenced this pull request Jan 13, 2023
…a log iff the topic is created (#13104)

### JIRA
https://issues.apache.org/jira/browse/KAFKA-14612

### Details
Makes sure we emit `ConfigRecord`s for a topic iff it actually gets created. Currently, we might emit `ConfigRecord`s even if the topic creation fails later in the `createTopics` method.

I created a new method `incrementalAlterConfig` in `ConfigurationControlManager` that is similar to `incrementalAlterConfig` but it just handles one config at a time. This is used in `ReplicationControlManager` for each topic. By handling one topic's config at a time, it's easier to isolate each topic's config records. This enables us to make sure we only write config records for topics that get created.

I refactored `incrementalAlterConfigResource` to return an `ApiError`. This made it easier to implement the new method `incrementalAlterConfig` in `ConfigurationControlManager` because it then doesnt have to search in the `Map` for the result.

### Testing
Enhanced pre-existing test `ReplicationControlManagerTest.testCreateTopicsWithConfigs`. I ran the tests without the changes to `ReplicationControlManager` and made sure each assertion ends up failing. Also ran `./gradlew metadata:test --tests org.apache.kafka.controller.ReplicationControlManagerTest`.

Reviewers: Jason Gustafson <jason@confluent.io>
ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 17, 2023
…master

* apache-github/trunk: (23 commits)
  MINOR: Include the inner exception stack trace when re-throwing an exception (apache#12229)
  MINOR: Fix docs to state that sendfile implemented in `TransferableRecords` instead of `MessageSet` (apache#13109)
  Update ProducerConfig.java (apache#13115)
  KAFKA-14618; Fix off by one error in snapshot id (apache#13108)
  KAFKA-13709 (follow-up): Avoid mention of 'exactly-once delivery' or 'delivery guarantees' in Connect (apache#13106)
  KAFKA-14367; Add `TxnOffsetCommit` to the new `GroupCoordinator` interface (apache#12901)
  KAFKA-14568: Move FetchDataInfo and related to storage module (apache#13085)
  KAFKA-14612: Make sure to write a new topics ConfigRecords to metadata log iff the topic is created (apache#13104)
  KAFKA-14601: Improve exception handling in KafkaEventQueue apache#13089
  KAFKA-14367; Add `OffsetCommit` to the new `GroupCoordinator` interface (apache#12886)
  KAFKA-14530: Check state updater more often (apache#13017)
  KAFKA-14304 Use boolean for ZK migrating brokers in RPC/record (apache#13103)
  KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (apache#12301)
  KAFKA-14607: Move Scheduler/KafkaScheduler to server-common (apache#13092)
  KAFKA-14367; Add `OffsetFetch` to the new `GroupCoordinator` interface (apache#12870)
  KAFKA-14557; Lock metadata log dir (apache#13058)
  MINOR: Implement toString method for TopicAssignment and PartitionAssignment (apache#13101)
  KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (apache#11818)
  KAFKA-14540: Fix DataOutputStreamWritable#writeByteBuffer (apache#13032)
  KAFKA-14600: Reduce flakiness in ProducerIdExpirationTest (apache#13087)
  ...
guozhangwang pushed a commit to guozhangwang/kafka that referenced this pull request Jan 25, 2023
…a log iff the topic is created (apache#13104)

### JIRA
https://issues.apache.org/jira/browse/KAFKA-14612

### Details
Makes sure we emit `ConfigRecord`s for a topic iff it actually gets created. Currently, we might emit `ConfigRecord`s even if the topic creation fails later in the `createTopics` method.

I created a new method `incrementalAlterConfig` in `ConfigurationControlManager` that is similar to `incrementalAlterConfig` but it just handles one config at a time. This is used in `ReplicationControlManager` for each topic. By handling one topic's config at a time, it's easier to isolate each topic's config records. This enables us to make sure we only write config records for topics that get created.

I refactored `incrementalAlterConfigResource` to return an `ApiError`. This made it easier to implement the new method `incrementalAlterConfig` in `ConfigurationControlManager` because it then doesnt have to search in the `Map` for the result.

### Testing
Enhanced pre-existing test `ReplicationControlManagerTest.testCreateTopicsWithConfigs`. I ran the tests without the changes to `ReplicationControlManager` and made sure each assertion ends up failing. Also ran `./gradlew metadata:test --tests org.apache.kafka.controller.ReplicationControlManagerTest`.

Reviewers: Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments