Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#501 Fix Jdbc Native treating of nullable fields. #502

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,27 @@ class ResultSetToRowIterator(rs: ResultSet, sanitizeDateTime: Boolean, incorrect

// WARNING. Do not forget that `null` is a valid value returned by RecordSet methods that return a reference type objects.
dataType match {
case BIT | BOOLEAN => rs.getBoolean(columnIndex)
case TINYINT => rs.getByte(columnIndex)
case SMALLINT => rs.getShort(columnIndex)
case INTEGER => rs.getInt(columnIndex)
case BIGINT => rs.getLong(columnIndex)
case FLOAT => rs.getFloat(columnIndex)
case DOUBLE => rs.getDouble(columnIndex)
case BIT | BOOLEAN =>
val v = rs.getBoolean(columnIndex)
if (rs.wasNull()) null else v
Comment on lines +116 to +117
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very ugly, but it seems it is the way to go, at least for the maximum performance

Copy link
Collaborator

@VladimirRybalko VladimirRybalko Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val rsValue = dataType match {
  case BIT | BOOLEAN => rs.getBoolean(columnIndex)
  case TINYINT =>  rs.getByte(columnIndex)
  case SMALLINT      => rs.getShort(columnIndex)
  case INTEGER       => rs.getInt(columnIndex)
  case BIGINT        => rs.getLong(columnIndex)
  case FLOAT         => rs.getFloat(columnIndex)
  case DOUBLE        => rs.getDouble(columnIndex)        
  case REAL          => rs.getBigDecimal(columnIndex)
  case NUMERIC       => rs.getBigDecimal(columnIndex)
  case DATE          => sanitizeDate(rs.getDate(columnIndex))
  case TIMESTAMP     => sanitizeTimestamp(rs.getTimestamp(columnIndex))
  case _             => rs.getString(columnIndex)
}
if (rs.wasNull()) null else rsValue

?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this looks much nicer indeed! Just it would call rs.wasNull() even when it is not necessary, so I still prefer the original approach.

case TINYINT =>
val v = rs.getByte(columnIndex)
if (rs.wasNull()) null else v
case SMALLINT =>
val v = rs.getShort(columnIndex)
if (rs.wasNull()) null else v
case INTEGER =>
val v = rs.getInt(columnIndex)
if (rs.wasNull()) null else v
case BIGINT =>
val v = rs.getLong(columnIndex)
if (rs.wasNull()) null else v
case FLOAT =>
val v = rs.getFloat(columnIndex)
if (rs.wasNull()) null else v
case DOUBLE =>
val v = rs.getDouble(columnIndex)
if (rs.wasNull()) null else v
case REAL => rs.getBigDecimal(columnIndex)
case NUMERIC => rs.getBigDecimal(columnIndex)
case DATE => sanitizeDate(rs.getDate(columnIndex))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis
assert(df.schema.fields(2).name == "DESCRIPTION")
assert(df.schema.fields(3).name == "EMAIL")
assert(df.schema.fields(4).name == "FOUNDED")
assert(df.schema.fields(5).name == "LAST_UPDATED")
assert(df.schema.fields(5).name == "IS_TAX_FREE")
}

"get the source data frame for source with disabled count query" in {
Expand All @@ -301,7 +301,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis
assert(df.schema.fields(2).name == "DESCRIPTION")
assert(df.schema.fields(3).name == "EMAIL")
assert(df.schema.fields(4).name == "FOUNDED")
assert(df.schema.fields(5).name == "LAST_UPDATED")
assert(df.schema.fields(5).name == "IS_TAX_FREE")

TransientTableManager.reset()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ object RdbExampleTable {
| description VARCHAR NOT NULL,
| email VARCHAR(50) NOT NULL,
| founded DATE NOT NULL,
| is_tax_free BOOLEAN,
| tax_id BIGINT,
| last_updated TIMESTAMP NOT NULL,
| info_date VARCHAR(10) NOT NULL,
| PRIMARY KEY (id))
Expand All @@ -75,10 +77,10 @@ object RdbExampleTable {
)

val inserts: Seq[String] = Seq(
s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', 'company1@example.com', DATE '2000-10-11', TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', 'company2@example.com', DATE '2005-03-29', TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', 'company3@example.com', DATE '2016-12-30', TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', 'company4@example.com', DATE '2016-12-31', TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')"
s"INSERT INTO $tableName VALUES (1,'Company1', 'description1', 'company1@example.com', DATE '2000-10-11', FALSE, 123, TIMESTAMP '2020-11-04 10:11:00+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (2,'Company2', 'description2', 'company2@example.com', DATE '2005-03-29', TRUE, 456, TIMESTAMP '2020-11-04 10:22:33+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (3,'Company3', 'description3', 'company3@example.com', DATE '2016-12-30', FALSE, NULL, TIMESTAMP '2020-11-04 10:33:59+02:00', '2022-02-18')",
s"INSERT INTO $tableName VALUES (4,'Company4', 'description4', 'company4@example.com', DATE '2016-12-31', NULL, NULL, TIMESTAMP '2020-11-04 10:34:22+02:00', '2022-02-19')"
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark
val df = reader.getData(Query.Table("company"), null, null, Seq.empty[String])

assert(df.count() == 4)
assert(df.schema.fields.length == 7)
assert(df.schema.fields.length == 9)
}

"return selected column for a table snapshot-like query" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,15 +711,15 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture {
}

"generate data queries without date ranges" in {
assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company")
assert(gen.getDataQuery("company", Nil, None) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company")
}

"generate data queries when list of columns is specified" in {
assert(genEscaped.getDataQuery("company", columns, None) == "SELECT 'A'n, 'D'n, 'Column with spaces'n FROM 'company'n")
}

"generate data queries with limit clause date ranges" in {
assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100")
assert(gen.getDataQuery("company", Nil, Some(100)) == "SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company LIMIT 100")
}

"generate ranged count queries" when {
Expand Down Expand Up @@ -761,30 +761,30 @@ class SqlGeneratorLoaderSuite extends AnyWordSpec with RelationalDbFixture {
"generate ranged data queries" when {
"date is in DATE format" in {
assert(gen.getDataQuery("company", date1, date1, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17'")
assert(gen.getDataQuery("company", date1, date2, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30'")
}

"date is in STRING format" in {
assert(genStr.getDataQuery("company", date1, date1, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = '2020-08-17'")
assert(genStr.getDataQuery("company", date1, date2, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= '2020-08-17' AND D <= '2020-08-30'")
}

"date is in NUMBER format" in {
assert(genNum.getDataQuery("company", date1, date1, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = 20200817")
assert(genNum.getDataQuery("company", date1, date2, Nil, None) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= 20200817 AND D <= 20200830")
}

"with limit records" in {
assert(gen.getDataQuery("company", date1, date1, Nil, Some(100)) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D = date'2020-08-17' LIMIT 100")
assert(gen.getDataQuery("company", date1, date2, Nil, Some(100)) ==
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100")
"SELECT ID 'ID', NAME 'NAME', DESCRIPTION 'DESCRIPTION', EMAIL 'EMAIL', FOUNDED 'FOUNDED', IS_TAX_FREE 'IS_TAX_FREE', TAX_ID 'TAX_ID', LAST_UPDATED 'LAST_UPDATED', INFO_DATE 'INFO_DATE' FROM company WHERE D >= date'2020-08-17' AND D <= date'2020-08-30' LIMIT 100")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "IS_TAX_FREE",
| "type" : "boolean",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "TAX_ID",
| "type" : "long",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "LAST_UPDATED",
| "type" : "timestamp",
| "nullable" : true,
Expand All @@ -163,25 +173,30 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
| "ID" : 1,
| "NAME" : "Company1",
| "EMAIL" : "company1@example.com",
| "FOUNDED" : "2000-10-11"
| "FOUNDED" : "2000-10-11",
| "IS_TAX_FREE" : false,
| "TAX_ID" : 123
|}, {
| "ID" : 2,
| "NAME" : "Company2",
| "EMAIL" : "company2@example.com",
| "FOUNDED" : "2005-03-29"
| "FOUNDED" : "2005-03-29",
| "IS_TAX_FREE" : true,
| "TAX_ID" : 456
|}, {
| "ID" : 3,
| "NAME" : "Company3",
| "EMAIL" : "company3@example.com",
| "FOUNDED" : "2016-12-30"
| "FOUNDED" : "2016-12-30",
| "IS_TAX_FREE" : false
|}, {
| "ID" : 4,
| "NAME" : "Company4",
| "EMAIL" : "company4@example.com",
| "FOUNDED" : "2016-12-31"
|} ]""".stripMargin

val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded FROM $tableName")
val df = JdbcNativeUtils.getJdbcNativeDataFrame(jdbcConfig, jdbcConfig.primaryUrl.get, s"SELECT id, name, email, founded, is_tax_free, tax_id FROM $tableName")
val actual = SparkUtils.convertDataFrameToPrettyJSON(df)

compareText(actual, expected)
Expand Down Expand Up @@ -236,35 +251,35 @@ class JdbcNativeUtilsSuite extends AnyWordSpec with RelationalDbFixture with Spa
}

"getDecimalDataType" should {
val resultSet = mock(classOf[ResultSet])
val resultSetMetaData = mock(classOf[ResultSetMetaData])
val resultSet = mock(classOf[ResultSet])
val resultSetMetaData = mock(classOf[ResultSetMetaData])

when(resultSetMetaData.getColumnCount).thenReturn(1)
when(resultSet.getMetaData).thenReturn(resultSetMetaData)
when(resultSetMetaData.getColumnCount).thenReturn(1)
when(resultSet.getMetaData).thenReturn(resultSetMetaData)

"return normal decimal for correct precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
when(resultSetMetaData.getPrecision(0)).thenReturn(10)
when(resultSetMetaData.getScale(0)).thenReturn(2)
"return normal decimal for correct precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
when(resultSetMetaData.getPrecision(0)).thenReturn(10)
when(resultSetMetaData.getScale(0)).thenReturn(2)

assert(iterator.getDecimalDataType(0) == NUMERIC)
}
assert(iterator.getDecimalDataType(0) == NUMERIC)
}

"return fixed decimal for incorrect precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
when(resultSetMetaData.getPrecision(0)).thenReturn(0)
when(resultSetMetaData.getScale(0)).thenReturn(2)
"return fixed decimal for incorrect precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = false)
when(resultSetMetaData.getPrecision(0)).thenReturn(0)
when(resultSetMetaData.getScale(0)).thenReturn(2)

assert(iterator.getDecimalDataType(0) == NUMERIC)
}
assert(iterator.getDecimalDataType(0) == NUMERIC)
}

"return string type for incorrect precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true)
when(resultSetMetaData.getPrecision(0)).thenReturn(0)
when(resultSetMetaData.getScale(0)).thenReturn(2)
"return string type for incorrect precision and scale" in {
val iterator = new ResultSetToRowIterator(resultSet, true, incorrectDecimalsAsString = true)
when(resultSetMetaData.getPrecision(0)).thenReturn(0)
when(resultSetMetaData.getScale(0)).thenReturn(2)

assert(iterator.getDecimalDataType(0) == VARCHAR)
}
assert(iterator.getDecimalDataType(0) == VARCHAR)
}
}

"sanitizeDateTime" when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,6 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture
val newField2 = schema1Orig.fields.head.copy(metadata = metadata2)
val schema2 = schema1Orig.copy(fields = newField2 +: schema1Orig.fields.tail)

println(schema1.prettyJson)
println(schema2.prettyJson)

val diff = compareSchemas(schema1, schema2)

assert(diff.length == 1)
Expand Down
Loading