KAFKA-12648: Pt. 2 - Introduce TopologyMetadata to wrap InternalTopologyBuilders of named topologies#10683
Conversation
0e50fed to
df5166f
Compare
b1c0c87 to
2c109ad
Compare
2c109ad to
4b5a2c3
Compare
649af70 to
29a0fcf
Compare
8a74f2e to
7291ef3
Compare
8d8d289 to
4c2e6b8
Compare
4c2e6b8 to
c508638
Compare
a4d9182 to
491a398
Compare
|
Pt. 2 is fully ready for review @guozhangwang @wcarlson5 |
guozhangwang
left a comment
There was a problem hiding this comment.
Made a first pass over the classes; I will try to make a second pass at the end of the week.
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
Show resolved
Hide resolved
| } | ||
|
|
||
| // If there are topologies but they are all empty, this indicates a bug in user code | ||
| if (hasNoNonGlobalTopology() && !hasGlobalTopology()) { |
There was a problem hiding this comment.
Based on the logic of evaluateConditionIsTrueForAnyBuilders, if builders.isEmpty it should always return false right?
There was a problem hiding this comment.
Sure, but if builders.isEmpty then we would enter the if block above and return before reaching this section of the code. But I think maybe you meant that in hasNoNonGlobalTopology, we should actually return true only if all builders have no non-global topology, not if that's true for any one of them? There's some argument to be made for how to handle the case where some named topologies are legit, while others are empty, but I would still advocate for throwing an exception when any topology is empty since this is not a valid configuration. In which case, the current code is correct, but the comment is not. I'll fix the misleading comment
There was a problem hiding this comment.
@guozhangwang WDYT? If the user has started up Streams with several named topologies, but a subset of them are completely empty, should this be considered user error and cause Streams to shutdown or should we just roll with it as long as at least one topology is non-empty?
Take a look at the current state and lmk what you think
There was a problem hiding this comment.
Yeah what I meant is that, if we can ever reach this condition then it seems the extra check is redundant. I will check the current logic again.
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| // public for testing only | ||
| public synchronized final InternalTopologyBuilder setApplicationId(final String applicationId) { |
There was a problem hiding this comment.
Why remove synchronization here?
There was a problem hiding this comment.
I looked around and can't imagine why we would ever need it. setApplicationId should only ever be called once, from a single location/thread
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
Outdated
Show resolved
Hide resolved
| return stateFactories.containsKey(name) || globalStateStores.containsKey(name); | ||
| } | ||
|
|
||
| public boolean hasPersistentStores() { |
There was a problem hiding this comment.
Seems we are moving this check/flag earlier from the topology to the topology-builder, is there a motivation for it?
There was a problem hiding this comment.
Previously we would get a handle on the actual topology and then it would have to iterate through all the stores to check each one for persistence. But while you can now add and remove individual named topologies, you still can't change a topology or the stores in it while the app is running, so we may as well just keep track of whether we found any persistent stores or not as we go along, rather than iterate over all of them later. Also, this way we can keep and access this metadata easily through the TopologyMetadata/InternalTopologyBuilder, rather than ever having to go access the ProcessorTopology directly at all
That said, I'm not too attached to this way of doing things, so if you have concerns I can go back to something like how it was before. Just lmk what you think
There was a problem hiding this comment.
I'm not against this idea, just wondering what's the rationale behind it :) I'm happy with what you said.
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
Show resolved
Hide resolved
guozhangwang
left a comment
There was a problem hiding this comment.
Made a second pass on non-testing part.
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| public void setTopologyName(final String namedTopology) { |
There was a problem hiding this comment.
Could we move the logic of namedTopology.setTopologyName(topologyName); into the constructor of NamedTopologyStreamsBuilder(final String topologyName) itself, and then call the constructor of NamedTopology directly, so that we can still have a final field in line 138 above?
There was a problem hiding this comment.
I tried to, but just couldn't make it work. It has to do with Java and subclassing quirks like constructing the parent before the child. It seems to be pretty much impossible to set things up so that everything is final -- if we set the topologyName in the NamedTopology constructor, then it's not accessible (ie always null) when we call the InternalTopologyBuilder's constructor since that occurs during the parent Topology's construction.
It's definitely annoying, but at least we should be able to clean things up once we go through a KIP and don't need to subclass like this.
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
Show resolved
Hide resolved
| } else if (maybeDecorateInternalSourceTopics(latestResetTopics).contains(topic) || | ||
| latestResetPatterns.stream().anyMatch(p -> p.matcher(topic).matches())) { | ||
| return LATEST; | ||
| } else if (maybeDecorateInternalSourceTopics(sourceTopicNames).contains(topic) |
There was a problem hiding this comment.
Could you elaborate a bit on the NONE case? Not sure I fully follow here.
There was a problem hiding this comment.
The NONE case means we do have this topic in this InternalTopologyBuilder (as opposed to that of a different NamedTopology) but it hasn't set the offset reset strategy to EARLIEST or LATEST. If we fail the first two if conditions above, then all that's left is to verify whether or not we have this topic at all -- which is going to be true if we find it in either the source topic set or pattern.
Maybe you were wondering about the || !hasNamedTopology() part? Basically if we don't have any NamedTopologies then there is only one InternalTopologyBuilder, so all topics should belong to it
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
Show resolved
Hide resolved
| final ProcessorTopology globalTopology = builder.buildGlobalStateTopology(); | ||
| if (globalTopology != null) { | ||
| if (builder.namedTopology() != null) { | ||
| throw new IllegalStateException("Global state stores are not supported with Named Topologies"); |
There was a problem hiding this comment.
Okay now I see why we do not count named topologies for global stores for caching. Is it final, or just not supported within pt.2?
There was a problem hiding this comment.
It's not likely to be compatible in the first phase, but I think we would want it to be fully supported in the end, ie as part of the KIP once we have everything worked out and feeling ready for that
guozhangwang
left a comment
There was a problem hiding this comment.
Made a final pass. Overall LGTM.
I only have a concern about keeping track on the topology builders instead on the generated topologies that it may restrict how we want to optimize the topology generation process, but I also feel this is not closing the door anyways as we can still refactor this piece of logic in the future, so not a blocker for this PR after all.
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
Show resolved
Hide resolved
| private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); | ||
|
|
||
| private final StreamsConfig config; | ||
| private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability |
There was a problem hiding this comment.
Very nit: maybe we can leave some comments on whether we keep the map of topologyBuilder instead of the built topologies here (and hence move many of the checks ahead before the actual generation of the topology).
Also as a meta thought just for the future roadmap, one caveat of moving the checks ahead of time is that it may restrict on what kind of optimizations we can do during the topology generation -- e.g. we cannot say generate a topology with in-memory store if the builder indicates persistent stores etc. So just looking ahead in pt.3 here, do you think in the future (beyond V1) in registerAndBuildNewTopology we can still just rebuild the topology immediately and track based on the topologies not topology builders? @ableegoldman
There was a problem hiding this comment.
Yeah I think it would fine (better even) to swap in the topologies rather than the topology builders, the only reason for using the builders now is that a huge amount of topology-related functionality currently resides in the InternalTopologyBuilder, including pretty much all the metadata. I 100% would support cleaning this up and separating things out from this class and made sure it would be easy to do so here, the builders are really only kept around after the topology is built because they still contain most of the metadata we need.
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
Show resolved
Hide resolved
| builder1.stream(inputStream1).selectKey((k, v) -> k).groupByKey().count(Materialized.as(Stores.persistentKeyValueStore("store"))).toStream().to(outputStream1); | ||
| builder2.stream(inputStream2).selectKey((k, v) -> k).groupByKey().count(Materialized.as(Stores.persistentKeyValueStore("store"))).toStream().to(outputStream2); | ||
| builder3.stream(inputStream3).selectKey((k, v) -> k).groupByKey().count(Materialized.as(Stores.persistentKeyValueStore("store"))).toStream().to(outputStream3); | ||
| streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(builder1, builder2, builder3), props, clientSupplier); |
There was a problem hiding this comment.
Just curious, what if we just pass in buildNamedTopologies(builder1, builder1, builder1) here?
There was a problem hiding this comment.
If you pass in the same builder object it will end up throwing the IllegalStateException: Tried to set topologyName but the name was already set since it's using the same underlying InternalTopologyBuilder. Actually the same thing would have happened before named topologies but for applicationId instead of topologyName , except that we don't do this check in setApplicationId
Personally I would argue that we should also do this check in setApplicationId, but it would probably break hundreds of tests which happen to reuse the same builder across different KafkaStreams. Imo the real issue is that the StreamsBuilder class (or technically the InternalStreamsBuilder) has its own InternalTopologyBuilder that it creates during instantiation and then actively updates as the DSL topology is built, even though it should just be a basic builder class that simply specifies the DSL and the InternalTopologyBuilder is only created when calling StreamsBuilder#build.
It seems like some awkwardness in the logical vs physical topology plan: the StreamsBuilder feels like it should only cover the logical DSL topology, and the StreamsBuilder alone, but in reality this leeches into the InternalTopologyBuilder. Whereas the InternalTopologyBuilder feels like it should just be responsible for the physical plan generated by the logical plan in StreamsBuilder#build, yet in fact this class contains pretty much everything -- it's the logical plan, the physical plan, and all the metadata that Streams needs to access.
Anyways that's my long rant on the InternalTopologyBuilder, I think you actually filed a ticket for cleaning up this logical vs physical plan logic so maybe that will cover it 🙂
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
Show resolved
Hide resolved
That's a fair point, however I think the TopologyMetadata/NamedTopology feature in general is pretty agnostic to whether the topologies have already been compiled or not, the only reason it tracks InternalTopologyBuilders is because that's where all the topology metadata currently resides. I think if we were to try any crazy optimizations on top of the current topology generation process it would require some refactoring to decouple the generated topology from the topology metadata, in which case we would likely have the metadata split out into a new, pre-topology compilation class. And then we would track that class instead. |
|
Merged to trunk, I'll address the remaining small items in Pt. 3 -- thanks @wcarlson5 and @guozhangwang for getting this over the line! |
Pt. 1: #10609 Pt. 2: #10683 Pt. 3: #10788 In Pt. 3 we implement the addNamedTopology API. This can be used to update the processing topology of a running Kafka Streams application without resetting the app, or even pausing/restarting the process. It's up to the user to ensure that this API is called on every instance of an application to ensure all clients are able to run the newly added NamedTopology. Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
…ogyBuilders of named topologies (apache#10683) Pt. 1: apache#10609 Pt. 2: apache#10683 Pt. 3: apache#10788 The TopologyMetadata is next up after Pt. 1 apache#10609. This PR sets up the basic architecture for running an app with multiple NamedTopologies, though the APIs to add/remove them dynamically are not implemented until Pt. 3 Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Pt. 1: apache#10609 Pt. 2: apache#10683 Pt. 3: apache#10788 In Pt. 3 we implement the addNamedTopology API. This can be used to update the processing topology of a running Kafka Streams application without resetting the app, or even pausing/restarting the process. It's up to the user to ensure that this API is called on every instance of an application to ensure all clients are able to run the newly added NamedTopology. Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Pt. 1: #10609
Pt. 2: #10683
Pt. 3: #10788
The TopologyMetadata is next up after Pt. 1 #10609. This PR sets up the basic architecture for running an app with multiple NamedTopologies, though the APIs to add/remove them dynamically are not implemented until Pt. 3
(Apologies for the length of this PR -- the vast majority of it is just refactoring/moving methods from the InternalTopologyBuilder to the new TopologyMetadata wrapper that now wraps all the individual builders. You should focus the review on the TopologyMetadata and how it interacts with the other classes)