Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ dependencies {
testImplementation ("com.clickhouse:clickhouse-jdbc:${project.properties['clickhouse.jdbc.version']}${project.properties['clickhouse.jdbc.version']?.startsWith('0.9') ? ':all' : ''}") {
exclude group: "org.antlr", module: "antlr4-runtime"
}
testImplementation "org.apache.httpcomponents.client5:httpclient5:5.4.2"
testImplementation "org.apache.httpcomponents.core5:httpcore5:5.3.3"
}

ext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ private object ClickhouseDialectExtension extends JdbcDialect {
private val dateTimeTypePattern: Regex = """(?i)^DateTime(\d+)?(?:\((\d+)\))?$""".r
private val decimalTypePattern: Regex = """(?i)^Decimal\((\d+),\s*(\d+)\)$""".r
private val decimalTypePattern2: Regex = """(?i)^Decimal(32|64|128|256)\((\d+)\)$""".r

/**
* A pattern to match ClickHouse column definitions. This pattern captures the column name, data type, and whether it is nullable.
* A pattern to match ClickHouse column definitions. This pattern captures the column name, data
* type, and whether it is nullable.
* @example
* "column_name" String NOT NULL, "column_name" Int32, "column_name" Decimal(10,2) etc.
* "column_name" String NOT NULL, "column_name" Int32, "column_name" Decimal(10,2) etc.
*/
private val columnPattern: Regex = """"([^"]+)"\s+(.+?)(?:\s+(NOT\s+NULL))?\s*(?=(?:\s*,\s*"|$))""".r
private val columnPattern: Regex =
""""([^"]+)"\s+(.+?)(?:\s+(NOT\s+NULL))?\s*(?=(?:\s*,\s*"|$))""".r

override def canHandle(url: String): Boolean = {
url.startsWith("jdbc:clickhouse")
Expand All @@ -49,28 +52,20 @@ private object ClickhouseDialectExtension extends JdbcDialect {
typeName: String,
size: Int,
md: MetadataBuilder): Option[DataType] = {
val scale = md.build.getLong("scale").toInt
sqlType match {
case Types.ARRAY =>
unwrapNullable(typeName) match {
case (_, arrayTypePattern(nestType)) =>
// due to https://github.com/ClickHouse/clickhouse-java/issues/1754, spark is not able to read Arrays of
// any types except Decimal(...) and String
toCatalystType(Types.ARRAY, nestType, size, scale, md).map {
case (nullable, dataType) => ArrayType(dataType, nullable)
toCatalystType(nestType).map { case (nullable, dataType) =>
ArrayType(dataType, nullable)
}
case _ => None
}
case _ => toCatalystType(sqlType, typeName, size, scale, md).map(_._2)
case _ => toCatalystType(typeName).map(_._2)
}
}

private def toCatalystType(
sqlType: Int,
typeName: String,
precision: Int,
scale: Int,
md: MetadataBuilder): Option[(Boolean, DataType)] = {
private def toCatalystType(typeName: String): Option[(Boolean, DataType)] = {
val (nullable, _typeName) = unwrapNullable(typeName)
val dataType = _typeName match {
case "String" =>
Expand Down Expand Up @@ -194,14 +189,14 @@ private object ClickhouseDialectExtension extends JdbcDialect {
/**
* Custom implementation of `createTable` to handle specific ClickHouse table creation options.
* This method ensures that the column schemas are formatted correctly for ClickHouse,
* particularly by wrapping nullable types appropriately, as the default implementation
* does not support `Nullable` types for column schemas in ClickHouse.
* particularly by wrapping nullable types appropriately, as the default implementation does not
* support `Nullable` types for column schemas in ClickHouse.
*
* @see
* ›https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L919-L923
* ›https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L919-L923
*
* @see
* https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L823-L824
* https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L823-L824
*
* @param statement
* The SQL statement object used to execute the table creation command.
Expand All @@ -217,34 +212,39 @@ private object ClickhouseDialectExtension extends JdbcDialect {
tableName: String,
strSchema: String,
options: JdbcOptionsInWrite): Unit = {
statement.executeUpdate(s"CREATE TABLE $tableName (${parseColumnDefinitions(strSchema)}) ${options.createTableOptions}")
statement.executeUpdate(
s"CREATE TABLE $tableName (${parseColumnDefinitions(strSchema)}) ${options.createTableOptions}")
}

/**
* Parses column definitions from a raw string to format them for ClickHouse.
* This method transforms a string describing columns (including names, types, and constraints)
* into a proper SQL format, ensuring that NOT NULL constraints are applied correctly.
* Parses column definitions from a raw string to format them for ClickHouse. This method
* transforms a string describing columns (including names, types, and constraints) into a
* proper SQL format, ensuring that NOT NULL constraints are applied correctly.
*
* @param columnDefinitions
* A raw string representing the column definitions, formatted as "column_name column_type [NOT NULL]".
* A raw string representing the column definitions, formatted as "column_name column_type
* [NOT NULL]".
* @return
* A formatted string of column definitions ready for SQL execution.
*
* @example
* Input: "id" Integer NOT NULL, "name" String, "tags" Array(Nullable(String)) <br>
* Output: "id" Integer NOT NULL, "name" Nullable(String), "tags" Array(Nullable(String))
* Input: "id" Integer NOT NULL, "name" String, "tags" Array(Nullable(String)) <br> Output:
* "id" Integer NOT NULL, "name" Nullable(String), "tags" Array(Nullable(String))
*/
private def parseColumnDefinitions(columnDefinitions: String): String = {
columnPattern.findAllMatchIn(columnDefinitions).flatMap { matchResult =>
val columnName = matchResult.group(1)
val columnType = matchResult.group(2)
val notNull = matchResult.group(3)
columnPattern
.findAllMatchIn(columnDefinitions)
.flatMap { matchResult =>
val columnName = matchResult.group(1)
val columnType = matchResult.group(2)
val notNull = matchResult.group(3)

if (arrayTypePattern.findFirstIn(columnType).isDefined || notNull != null) {
Some(s""""$columnName" $columnType""")
} else {
Some(s""""$columnName" Nullable($columnType)""")
if (arrayTypePattern.findFirstIn(columnType).isDefined || notNull != null) {
Some(s""""$columnName" $columnType""")
} else {
Some(s""""$columnName" Nullable($columnType)""")
}
}
}.mkString(", ")
.mkString(", ")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,17 @@ class ClickhouseDialectTest

test("read ClickHouse Int32 as Spark IntegerType") {
setupTable("integerColumn Int32")
insertTestData(Seq("(-2147483648)", "(2147483647)")) // min and max values for a signed integer
insertTestData(
Seq("(-2147483648)", "(2147483647)")
) // min and max values for a signed integer

val df = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("user", jdbcUser)
.option("password", jdbcPassword)
.option("dbtable", tableName)
.load()
.format("jdbc")
.option("url", jdbcUrl)
.option("user", jdbcUser)
.option("password", jdbcPassword)
.option("dbtable", tableName)
.load()

assert(df.schema.fields.head.dataType == IntegerType)

Expand All @@ -201,15 +203,17 @@ class ClickhouseDialectTest

test("read ClickHouse Int64 as Spark LongType") {
setupTable("longColumn Int64")
insertTestData(Seq("(-9223372036854775808)", "(9223372036854775807)")) // min and max values for a signed long
insertTestData(
Seq("(-9223372036854775808)", "(9223372036854775807)")
) // min and max values for a signed long

val df = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("user", jdbcUser)
.option("password", jdbcPassword)
.option("dbtable", tableName)
.load()
.format("jdbc")
.option("url", jdbcUrl)
.option("user", jdbcUser)
.option("password", jdbcPassword)
.option("dbtable", tableName)
.load()

assert(df.schema.fields.head.dataType == LongType)

Expand Down Expand Up @@ -305,17 +309,20 @@ class ClickhouseDialectTest
insertTestData(Seq("(0)", "(18446744073709551615)")) // min and max values

val df = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("user", jdbcUser)
.option("password", jdbcPassword)
.option("dbtable", tableName)
.load()
.format("jdbc")
.option("url", jdbcUrl)
.option("user", jdbcUser)
.option("password", jdbcPassword)
.option("dbtable", tableName)
.load()

assert(df.schema.fields.head.dataType == DecimalType(20,0))
assert(df.schema.fields.head.dataType == DecimalType(20, 0))

val data = df.collect().map(_.getDecimal(0)).sorted
assert(data sameElements Array(new java.math.BigDecimal(0), new java.math.BigDecimal("18446744073709551615")))
assert(
data sameElements Array(
new java.math.BigDecimal(0),
new java.math.BigDecimal("18446744073709551615")))
}

test("read ClickHouse Float32 as Spark FloatType") {
Expand Down Expand Up @@ -563,7 +570,8 @@ class ClickhouseDialectTest
}

test("write Spark Nullable(TimestampType) as ClickHouse Nullable(Datetime64(6))") {
val schema = StructType(Seq(StructField("nullableTimestampColumn", TimestampType, nullable = true)))
val schema =
StructType(Seq(StructField("nullableTimestampColumn", TimestampType, nullable = true)))
val currentTime = new java.sql.Timestamp(System.currentTimeMillis())
val data = Seq(Row(null), Row(currentTime))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
Expand All @@ -585,7 +593,8 @@ class ClickhouseDialectTest
}

test("write Spark Nullable(BooleanType) as ClickHouse Nullable(Bool)") {
val schema = StructType(Seq(StructField("nullableBooleanColumn", BooleanType, nullable = true)))
val schema =
StructType(Seq(StructField("nullableBooleanColumn", BooleanType, nullable = true)))
val data = Seq(Row(null), Row(true), Row(false))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)

Expand Down Expand Up @@ -642,8 +651,7 @@ class ClickhouseDialectTest
(
"longArrayColumn Array(UInt32)",
"([1, 2, 3, 4, 5])",
ArrayType(LongType, containsNull = false)),
)
ArrayType(LongType, containsNull = false)))

val testReadArrayCasesV0_7_X = Table(
("columnDefinition", "insertedData", "expectedType"),
Expand All @@ -656,7 +664,8 @@ class ClickhouseDialectTest
"([1.23, 2.34, 3.45, 4.56, 5.67])",
ArrayType(DecimalType(9, 2), containsNull = false)))

forAll(if (driverVersion.startsWith("0.9")) testReadArrayCasesV0_9_X else testReadArrayCasesV0_7_X) {
forAll(
if (driverVersion.startsWith("0.9")) testReadArrayCasesV0_9_X else testReadArrayCasesV0_7_X) {
(columnDefinition: String, insertedData: String, expectedType: DataType) =>
test(s"read ClickHouse Array for ${columnDefinition} column") {
setupTable(columnDefinition)
Expand Down Expand Up @@ -728,9 +737,8 @@ class ClickhouseDialectTest
(
"UInt64Column Array(UInt64)",
"([1, 2, 3, 4, 5])",
ArrayType(DecimalType(20,0), containsNull = false),
"class [J cannot be cast to class [Ljava.math.BigDecimal"
),
ArrayType(DecimalType(20, 0), containsNull = false),
"class [J cannot be cast to class [Ljava.math.BigDecimal"),
// https://github.com/ClickHouse/clickhouse-java/issues/1409
(
"dateArrayColumn Array(Date)",
Expand Down Expand Up @@ -772,12 +780,14 @@ class ClickhouseDialectTest
ArrayType(DecimalType(20, 0), containsNull = false),
"class [Ljava.lang.Object; cannot be cast to class [Ljava.math.BigDecimal;"))

forAll(if (driverVersion.startsWith("0.9")) testReadArrayUnsupportedCasesV0_9_X else testReadArrayUnsupportedCasesV0_7_X) {
forAll(
if (driverVersion.startsWith("0.9")) testReadArrayUnsupportedCasesV0_9_X
else testReadArrayUnsupportedCasesV0_7_X) {
(
columnDefinition: String,
insertedData: String,
expectedType: DataType,
errorMessage: String) =>
columnDefinition: String,
insertedData: String,
expectedType: DataType,
errorMessage: String) =>
test(s"cannot read ClickHouse Array for ${columnDefinition} column") {
setupTable(columnDefinition)
insertTestData(Seq(insertedData))
Expand Down Expand Up @@ -965,7 +975,8 @@ class ClickhouseDialectTest
expectedType: DataType,
expectedClickhouseType: String,
nullable: Boolean) =>
test(s"write ClickHouse Array for ${if (nullable) s" ${columnName} Nullable" else columnName} column"){
test(
s"write ClickHouse Array for ${if (nullable) s" ${columnName} Nullable" else columnName} column") {

val schema = StructType(Array(StructField(columnName, expectedType, nullable = nullable)))
val df = spark.createDataFrame(spark.sparkContext.parallelize(insertedData), schema)
Expand Down
Loading