Skip to content

Conversation

@beliefer
Copy link
Contributor

@beliefer beliefer commented Jan 20, 2023

What changes were proposed in this pull request?

Currently, JDBCRDD uses fixed format for SELECT statement.

val sqlText = options.prepareQuery +
      s"SELECT $columnList FROM ${options.tableOrQuery} $myTableSampleClause" +
      s" $myWhereClause $getGroupByClause $getOrderByClause $myLimitClause $myOffsetClause"

But some databases have different syntax. For example, MS SQL Server uses keyword TOP to describe LIMIT clause or Top N.
The LIMIT clause of MS SQL Server show below.

SELECT TOP(1) Model, Color, Price  
      FROM dbo.Cars  
      WHERE Color = 'blue'

The Top N of MS SQL Server show below.

SELECT TOP(1) Model, Color, Price  
FROM dbo.Cars  
WHERE Color = 'blue'  
ORDER BY Price ASC

This PR lets JDBC dialect could define their own syntax.

Why are the changes needed?

Extract the function that construct the select statement for JDBC dialect.

Does this PR introduce any user-facing change?

'No'.
New feature.

How was this patch tested?

N/A

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have unit tests that cover it? If not, maybe it is a good time to add them to make sure the string is generated correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are already a lot of unit tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you list them? I could not find them when I was trying to add a unit test.

Copy link
Contributor Author

@beliefer beliefer Jan 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost all tests of JDBCSuite or JDBCV2Suite will call getSQLText.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is accurate, those tests only run against H2 database, there are no unit tests that verify correctness for other dialects. Moreover, there are no tests at all to check the correctness of the generated SQL string.

@sadikovi
Copy link
Contributor

Again, I don't see how this makes it better than simple changes in #39660. If the aim of this PR to improve JdbcDialects, then I think we should have a small design doc + make the API extensible. For example, adding TOP creates a question whether to add another parameter to getSQLText or silently overwrite it in the dialect.

@beliefer
Copy link
Contributor Author

whether to add another parameter to getSQLText or silently overwrite it in the dialect.

We could think TOP n as the special LIMIT n.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this approach is we need to change it every time we add a new feature (think about the table sample pushdown).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JdbcSQLBuilder has been created.

@beliefer
Copy link
Contributor Author

ping @cloud-fan @huaxingao cc @sadikovi

@cloud-fan
Copy link
Contributor

@beliefer can you implement a MsSqlServerJdbcSQLBuilder, override withLimit to implement the TOP N feature?

assert(msg.contains("UpdateColumnNullability is not supported"))
}

test("simple scan with LIMIT") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we move the tests to the parent test suite?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

case (SortDirection.ASCENDING, NullOrdering.NULLS_FIRST) =>
s"$sortKey $sortDirection"
case (SortDirection.ASCENDING, NullOrdering.NULLS_LAST) =>
s"CASE WHEN $sortKey IS NULL THEN 1 ELSE 0 END, $sortKey $sortDirection"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a standard way to do NULLS LAST in mssql?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a standard way and just is a little skill.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

/**
* The builder to generate jdbc sql query.
*
* @since 3.4.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for 3.4.0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a flaw in the jdbc dialect API. It would be great to merge it before 3.4 rc cut, if there is no objection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After a second thought, people may find problems of this API after using is more. Let's merge it to master branch only and change it to 3.5.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val namespaceOpt: Option[String] = None

private def catalogAndNamespace =
protected def catalogAndNamespace =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change?

Copy link
Contributor

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any mechanism to ensure that any builder that overrides build method needs to be updated if a new feature is introduced?

/**
* The builder to generate jdbc sql query.
*
* @since 3.4.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this?


class MsSqlServerSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
extends JdbcSQLQueryBuilder(dialect, options) {
override def build(): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: new line before the method.

Copy link
Contributor Author

@beliefer beliefer Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not necessary but is OK too.


override def getLimitClause(limit: Integer): String = {
""
if (limit > 0 ) s"TOP $limit" else ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't limit = 0 valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark optimizer returns empty relation directly.

@sadikovi
Copy link
Contributor

sadikovi commented Feb 2, 2023

Why can we not introduce getTopNExpression to the query builder? Although it acts like limit the semantics are slightly different.

}

/**
* returns the SQL builder for the SELECT statement.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Returns ...

this
}

def withGroupByColumns(groupByColumns: Option[Array[String]]): JdbcSQLQueryBuilder = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc for the API methods to explain the purpose and the input.

import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo

/**
* The builder to generate jdbc sql query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a bit more API doc. This builder is to build a single SELECT query, so all the withXXX methods will be invoked at most once. The invocation order does not matter, as all these clauses follow the natural SQL order: sample the table first, then filter, then group by, then sort, then offset, then limit.

cc @sadikovi , this also means we don't need a dedicate topN API, If both sort and limit are present, it means top N. This is also how SQL language represents top N.

*/
protected var tableSampleClause: String = ""

def withColumns(columns: Array[String]): JdbcSQLQueryBuilder = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add api doc for each with method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like if the column name is the raw name or quoted name following dialect's SQL syntax?

this
}

def withTableSample(sample: Option[TableSampleInfo]): JdbcSQLQueryBuilder = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the parameter type should be Option. Spark should not invoke withTableSample at all if there is no table sample in the query.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

supportedFunctions.contains(funcName)

class MySQLSQLBuilder extends JDBCSQLBuilder {
override def visitSortOrder(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when we push down top N to mysql before this PR, it throws syntax error?

Copy link
Contributor Author

@beliefer beliefer Feb 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Many dialects cannot treat it correctly. The reason is MySQL doesn't support NullOrdering.

Copy link
Contributor

@sadikovi sadikovi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I left a few questions.
What is the ETA to merge this PR?

override def getLimitClause(limit: Integer): String = {
// Oracle doesn't support LIMIT clause.
// We can use rownum <= n to limit the number of rows in the result set.
if (limit > 0 ) s"WHERE rownum <= $limit" else ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no space after 0.

override def build(): String = {
val selectStmt = s"SELECT $columnList FROM ${options.tableOrQuery} $tableSampleClause" +
s" $whereClause $groupByClause $orderByClause"
if (limit > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing the limit. Could you add a test for Oracle JDBC limit? Those integration tests don't test limit, otherwise we would have encountered this issue before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two test case simple scan with LIMIT and simple scan with top N are in the parent trait.


/**
* The columns names that following dialect's SQL syntax.
* e.g. The column name is the raw name or quoted name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it's the raw name string or quoted name?

From API's perspective, I think it's better to pass raw name string like a#b instead of 'a#b'. And the implementation should call dialect API to quote it. We can change it later as this PR will be master branch only.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. It seems need more change.

/**
* Constructs the GROUP BY clause that following dialect's SQL syntax.
*/
def withGroupByColumns(groupByColumns: Option[Array[String]]): JdbcSQLQueryBuilder = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, the with method should take an optional parameter.

this
}

def build(): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

api doc here as well.

class MsSqlServerSQLQueryBuilder(dialect: JdbcDialect, options: JDBCOptions)
extends JdbcSQLQueryBuilder(dialect, options) {

// TODO[SPARK-42289]: DS V2 pushdown could let JDBC dialect decide to push down offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this TODO mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spark force push down offset to databases, but some databases do not support offset.


val myLimitClause: String = dialect.getLimitClause(limit)
val myOffsetClause: String = dialect.getOffsetClause(offset)
groupByColumns.foreach(builder.withGroupByColumns)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

withXXX returns a new builder. We can's assume the implementation always returns this

let's do

var builder = ...
groupByColumns.foreach { groupBy =>
  builder = ...
}

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 060a2b8 Feb 8, 2023
@beliefer
Copy link
Contributor Author

beliefer commented Feb 9, 2023

@cloud-fan @sadikovi Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants