-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20597][SQL][SS][WIP] KafkaSourceProvider falls back on path as synonym for topic #23791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20597][SQL][SS][WIP] KafkaSourceProvider falls back on path as synonym for topic #23791
Conversation
|
@jaceklaskowski could you please look at this patch and give your opinion on question asked in TODO section? Thank you |
|
If something can be configured multiple places and the value differs I would throw exception. If 10 person asked pretty sure several ways will be stated |
|
@gaborgsomogyi thank you for your tip, exception will be added. Docs also will be added as stated in the PR description along with reply on the review. |
|
added docs and error handling when topics do not match |
jaceklaskowski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to have another example that shows what happens when all the possible options are in use.
|
|
||
| // Write key-value data from a DataFrame to Kafka using the topic specified in the path option | ||
| StreamingQuery ds = df | ||
| .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Since you added it I'd change it to .select($"key" cast "string", $"value" cast "string") even if it's the only place in the doc. It's simply more type-safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be better to change this moment in all places in this doc file, as we anyway touch it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why make a deviation compared to all other places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the latest change don't fully understand the direction. Now half of the doc uses selectExpr and the other part select. Still questioning the need, what is the benefit? If the selectExpr executed and properly formed not more unstable than the simple select in the doc right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are trying to make this expression type safe, as it such case user will see mistakes at compile time if have some. select is used only for scala, as we cannot use same shortened($) syntax for Java and Python, but we can use select(col("key").cast("string")) for all three languages if we really want it both to be typesafe and looks same in all three cases. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the intention now, thanks for the explanation. I personally would leave as it is in the original code but not telling the direction is wrong. Let's decide by a committer who potentially picks this up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jaceklaskowski what do you think: should we revert the changes here or use select(col("key").cast("string")) in all places to provide type safety?
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
Outdated
Show resolved
Hide resolved
gaborgsomogyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the efforts! I've added my questions/thoughts.
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
Outdated
Show resolved
Hide resolved
| <td>streaming and batch</td> | ||
| <td>Sets the topic that all rows will be written to in Kafka. This option overrides any | ||
| topic column that may exist in the data.</td> | ||
| topic column that may exist in the data and is overridden by ```topic``` option. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks odd to have 3 params and only 2 of them checked if the content is different. Hard to check the topic column?
I would personally leave the fallback things at all and if these 3 things differ throw exception. If no fallback the feature shouldn't have to be explained in detail and the user don't have to remember what overrides what.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about the use-case when you want to have 'topic' column in your data with different topic values in it (e.g. "win", "click","visit"), but want to write that data to a single topic, so you override topic setting by topic (or path) option? I agree with you: user should not bother about overriding path option by topic option, but in my opinion we should not include topic field to this check and leave user a possibility to override topic column by topic or path option, whatever he likes most.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet understand the use-case, can you show the smallest example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rethought the situation and understood that you have the point, such situation is impossible, as we can only have "key", "value" and "topic" columns in the DataFrame that is written to Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, that all three of them should be checked. However, now we have the validation being splitted and duplicated between KafkaWriter#validateQuery, KafkaWriteTask#createProjection and KafkaSourceProvider#resolveTopic. We can refactor this moment and move this validations to one place, or just leave as is and add topic column and topic/path option comparison validation to KafkaSourceProvider#validateQuery. The fist is more complicated and error-prone, but will result in more readable code. On the other hand, second solution will not require so much code to be rewritten. Which way do you think is right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree the first solution is more complicated but not fully understand why is it error-prone? Having checks in 2 different places which are belonging together is the error-prone solution. Of course good unit test coverage is required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KafkaWriteTask and KafkaWriter are not public interfaces, and in all places where KafkaRowWriter#createProjection is called, it is preceded by KafkaWriter#validateQuery. I do not understand the purpose of the validation duplication between these methods. It was introduced in #17043, @tcondie @tdas could you please explain the purpose of validation duplication?

@gaborgsomogyi the reason, why first solution is error-prone, is that I don't actually understand the reason of the decisions taken. When I removed the validation from KafkaRowWriter#createProjection nothing failed.
And the second concern is as follows: in the topic column we can specify different values and our messages will be sent to different topics based on the value in particular row. When the topic option is specified it is applied for all messages, even if the topic column is present and has different values. So, I think that these two options have different purposes of use and do not intersect, so checking them for equality is just redundant. I can imagine only one situation when I will specify both column and option: when I want to use column approach with different values in production, but for testing purposes I specify topic option so that all messages come to one topic for a while.
And I think that is better to leave the KafkaSourceProvider#resolveTopic function where it is:
we should not only validate that topic path and option match when both are defined but also trim and decide on which value to take (as one of them or both may be null), so we cannot just include it to validateQuery. If we move method to KafkaWriter we will have to call this method from several other classes and pass additional path param. Now all that logic is incapsulated inside KafkaSourceProvider and IMHO it is the right way to do it.
I introduced the approach described here in my last commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've taken a deeper look and as you've mentioned it's a valid use-case where different topics are written because the topic column value differs. As a result the originally added fallback solution can be the way.
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
Outdated
Show resolved
Hide resolved
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
Show resolved
Hide resolved
|
thank you for the review @gaborfeher @jaceklaskowski. I have addressed all the comments except two of them where we have slightly different opinions. Please share your opinion on this controversial changes. |
|
Seems like another guy tagged and just seen the last comment. Could you please resolve the conversations where you think it's solved? It would help a lot. |
|
@gaborgsomogyi I have resolved all addressed comments, and have several questions to you: (1, 2), thank you! |
|
@jaceklaskowski @gaborgsomogyi kindly ask you to review my answers once more. Thank you! |
|
@jaceklaskowski kindly ping |
|
@jaceklaskowski @tcondie @tdas could you please look at these changes? |
|
Can one of the admins verify this patch? |
|
We're closing this PR because it hasn't been updated in a while. If you'd like to revive this PR, please reopen it! |
What changes were proposed in this pull request?
KafkaSourceProvider supports 'topic' option that sets target Kafka topic.
KafkaSourceProvider can use 'topic' column to assign rows to Kafka topics for writing.
It is proposed to treat 'path' option in start(path: String) and save(path: String) as alternative to setting 'topic' option. Path would designate the target topic when 'topic' option is not specified and will override 'topic' column value for topic.
Currently if topic in 'path' option is specified it overrides topic column and is overridden by 'topic' option (if present). IMHO this relation seems more straightforward than that specified in the ticket ('path' is the least precedence option).
for streaming it will look as follows:
for batch it will look as follows:
How was this patch tested?
Added and extended unit tests, local build&test