Skip to content

Conversation

@HeartSaVioR
Copy link
Contributor

What changes were proposed in this pull request?

This patch deduplicates code blocks in Kafka data source which are being repeated multiple times in a method.

Why are the changes needed?

This change would simplify the code and open possibility to simplify future code whenever fields are added to Kafka writer schema.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

@HeartSaVioR
Copy link
Contributor Author

I found these duplications during reviewing #26153 - #26153 adds a new field to the schema of writer and it gets more and more redundant. It would be ideal to deduplicate them to help reducing the necessary change when addressing #26153.

@SparkQA
Copy link

SparkQA commented Oct 18, 2019

Test build #112252 has finished for PR 26158 at commit eb8ab27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

@gaborgsomogyi
Copy link
Contributor

I think the direction is good but not sure it's a minor stuff.

@HeartSaVioR HeartSaVioR changed the title [MINOR][SQL][SS] Deduplicate codes from Kafka data source [SPARK-29509][SQL][SS] Deduplicate codes from Kafka data source Oct 18, 2019
@HeartSaVioR
Copy link
Contributor Author

Thanks for the suggestion. Filed an issue and changed the title.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minors found.

def assertDataType(attrName: String, desired: Seq[DataType], actual: DataType): Unit = {
if (!desired.exists(_.sameType(actual))) {
throw new IllegalStateException(s"$attrName attribute unsupported type " +
s"${actual.catalogString}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add ...$attrName must be a $desired?

}
}

val topicExpression = topic.map(Literal(_)).orElse {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about whether it's possible to put expression function into assertDataType but then seen that topicExpression is calculated in a different way. Do you think we can do this somehow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought it would be complicated or require more change on expression but turned out it's not. I'll make a change like below:

    val topicExpression = topic.map(Literal(_)).getOrElse(
      expression(KafkaWriter.TOPIC_ATTRIBUTE_NAME) { () =>
        throw new IllegalStateException(s"topic option required when no " +
          s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present")
      }
    )

defaultFn: () => Expression): Unit = {
val attr = schema.find(_.name == attrName).getOrElse(defaultFn())
if (!desired.exists(_.sameType(attr.dataType))) {
throw new AnalysisException(s"$attrName attribute type must be a " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be helpful to print the actual type and the expected types, just like in the previous case.

)
}
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string"))
assertWrongType(input.toDF(), Seq("CAST('1' as INT) as topic", "value"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: input.toDF() is repeating.

}

test("streaming - write data with valid schema but wrong types") {
def assertWrongType(df: DataFrame, selectExpr: Seq[String], expectErrorMsg: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: expectedErrorMsg

@SparkQA
Copy link

SparkQA commented Oct 22, 2019

Test build #112493 has finished for PR 26158 at commit 47967d0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@gaborgsomogyi
Copy link
Contributor

cc @srowen

}.getOrElse {
throw new IllegalStateException(s"topic option required when no " +
s"'${KafkaWriter.TOPIC_ATTRIBUTE_NAME}' attribute is present")
def expression(attrName: String)(defaultFn: () => Expression): Expression = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defaultFn: => Expression. Then you don't need () => everywhere.

}
}

private def validateAttribute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this looks like exactly the same thing you have in KafkaWriterTask. You could even use the same one-method approach there, as far as I can see, instead of calling expression + assertDataType.

If the goal is to deduplicate code, then here's another one you can deduplicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion. I'll need to see where it's the good place to put. Thanks!

selectExpr: Seq[String],
expectErrorMsg: String): Unit = {
var writer: StreamingQuery = null
var ex: Exception = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val ex = try { ... }

writer.processAllAvailable()
}
} finally {
writer.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writer can be null here.

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112628 has finished for PR 26158 at commit 848ad1a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor Author

I just spent more time to reduce more, as I saw more spots to deduplicate easily while addressing review comments. Please take a look again. Thanks!

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112663 has finished for PR 26158 at commit 272d91a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

#26153 merged so this has to be adapted...

@HeartSaVioR HeartSaVioR force-pushed the MINOR-deduplicate-kafka-source branch from 272d91a to 5a2371b Compare October 25, 2019 17:14
@HeartSaVioR
Copy link
Contributor Author

Rebased. Please take a look at the next round of review. Thanks!

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112683 has finished for PR 26158 at commit 5a2371b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}

def keyExpression(schema: Seq[Attribute]): Expression = {
expression(schema, KEY_ATTRIBUTE_NAME, Seq(StringType, BinaryType))(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer { ... } for the function argument (like in the case of throwing an exception).

import org.apache.spark.sql.types.{BinaryType, DataType}
import org.apache.spark.util.Utils


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove.

input: DataFrame,
selectExpr: Seq[String],
expectErrorMsg: String): Unit = {
verifyException[AnalysisException](expectErrorMsg)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you reuse runAndVerifyStreamingQueryException here like in the other suite? These seem to expect the error to happen when the writer is created, so the stuff that method does after it calls writeFn shouldn't influence the test result.

That could make it possible to reuse these for both test suites (e.g. by adding them to KafkaTestUtils).

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Oct 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I separated both because one does need actual input topic and the other doesn't, but no harm to provide input topic for latter as well. I'll make a change.

Btw, there's some difference of waiting the query result and checking exception between batch/micro-batch and continuous so it doesn't seem to be a complete duplication. Actually the purpose of this PR was deduplicating the code which is due to the number of fields, and scope seems to be continuously increasing. Maybe we can revisit deduplicating code between batch/micro-batch and continuous once more in follow-up PR. WDYT?

@SparkQA
Copy link

SparkQA commented Oct 26, 2019

Test build #112703 has finished for PR 26158 at commit f05a6ca.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Oct 28, 2019

Merging to master.

@vanzin vanzin closed this in 762db39 Oct 28, 2019
@HeartSaVioR
Copy link
Contributor Author

Thanks all for reviewing and merging!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants