diff --git a/build.gradle b/build.gradle index fb857da..3ca5a52 100644 --- a/build.gradle +++ b/build.gradle @@ -42,8 +42,6 @@ dependencies { testImplementation ("com.clickhouse:clickhouse-jdbc:${project.properties['clickhouse.jdbc.version']}${project.properties['clickhouse.jdbc.version']?.startsWith('0.9') ? ':all' : ''}") { exclude group: "org.antlr", module: "antlr4-runtime" } - testImplementation "org.apache.httpcomponents.client5:httpclient5:5.4.2" - testImplementation "org.apache.httpcomponents.core5:httpcore5:5.3.3" } ext { diff --git a/src/main/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/spark35/ClickhouseDialectExtension.scala b/src/main/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/spark35/ClickhouseDialectExtension.scala index 7881ed0..8392a1c 100644 --- a/src/main/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/spark35/ClickhouseDialectExtension.scala +++ b/src/main/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/spark35/ClickhouseDialectExtension.scala @@ -19,12 +19,15 @@ private object ClickhouseDialectExtension extends JdbcDialect { private val dateTimeTypePattern: Regex = """(?i)^DateTime(\d+)?(?:\((\d+)\))?$""".r private val decimalTypePattern: Regex = """(?i)^Decimal\((\d+),\s*(\d+)\)$""".r private val decimalTypePattern2: Regex = """(?i)^Decimal(32|64|128|256)\((\d+)\)$""".r + /** - * A pattern to match ClickHouse column definitions. This pattern captures the column name, data type, and whether it is nullable. + * A pattern to match ClickHouse column definitions. This pattern captures the column name, data + * type, and whether it is nullable. * @example - * "column_name" String NOT NULL, "column_name" Int32, "column_name" Decimal(10,2) etc. + * "column_name" String NOT NULL, "column_name" Int32, "column_name" Decimal(10,2) etc. */ - private val columnPattern: Regex = """"([^"]+)"\s+(.+?)(?:\s+(NOT\s+NULL))?\s*(?=(?:\s*,\s*"|$))""".r + private val columnPattern: Regex = + """"([^"]+)"\s+(.+?)(?:\s+(NOT\s+NULL))?\s*(?=(?:\s*,\s*"|$))""".r override def canHandle(url: String): Boolean = { url.startsWith("jdbc:clickhouse") @@ -49,28 +52,20 @@ private object ClickhouseDialectExtension extends JdbcDialect { typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { - val scale = md.build.getLong("scale").toInt sqlType match { case Types.ARRAY => unwrapNullable(typeName) match { case (_, arrayTypePattern(nestType)) => - // due to https://github.com/ClickHouse/clickhouse-java/issues/1754, spark is not able to read Arrays of - // any types except Decimal(...) and String - toCatalystType(Types.ARRAY, nestType, size, scale, md).map { - case (nullable, dataType) => ArrayType(dataType, nullable) + toCatalystType(nestType).map { case (nullable, dataType) => + ArrayType(dataType, nullable) } case _ => None } - case _ => toCatalystType(sqlType, typeName, size, scale, md).map(_._2) + case _ => toCatalystType(typeName).map(_._2) } } - private def toCatalystType( - sqlType: Int, - typeName: String, - precision: Int, - scale: Int, - md: MetadataBuilder): Option[(Boolean, DataType)] = { + private def toCatalystType(typeName: String): Option[(Boolean, DataType)] = { val (nullable, _typeName) = unwrapNullable(typeName) val dataType = _typeName match { case "String" => @@ -194,14 +189,14 @@ private object ClickhouseDialectExtension extends JdbcDialect { /** * Custom implementation of `createTable` to handle specific ClickHouse table creation options. * This method ensures that the column schemas are formatted correctly for ClickHouse, - * particularly by wrapping nullable types appropriately, as the default implementation - * does not support `Nullable` types for column schemas in ClickHouse. + * particularly by wrapping nullable types appropriately, as the default implementation does not + * support `Nullable` types for column schemas in ClickHouse. * * @see - * ›https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L919-L923 + * ›https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L919-L923 * * @see - * https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L823-L824 + * https://github.com/apache/spark/blob/branch-3.5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L823-L824 * * @param statement * The SQL statement object used to execute the table creation command. @@ -217,34 +212,39 @@ private object ClickhouseDialectExtension extends JdbcDialect { tableName: String, strSchema: String, options: JdbcOptionsInWrite): Unit = { - statement.executeUpdate(s"CREATE TABLE $tableName (${parseColumnDefinitions(strSchema)}) ${options.createTableOptions}") + statement.executeUpdate( + s"CREATE TABLE $tableName (${parseColumnDefinitions(strSchema)}) ${options.createTableOptions}") } /** - * Parses column definitions from a raw string to format them for ClickHouse. - * This method transforms a string describing columns (including names, types, and constraints) - * into a proper SQL format, ensuring that NOT NULL constraints are applied correctly. + * Parses column definitions from a raw string to format them for ClickHouse. This method + * transforms a string describing columns (including names, types, and constraints) into a + * proper SQL format, ensuring that NOT NULL constraints are applied correctly. * * @param columnDefinitions - * A raw string representing the column definitions, formatted as "column_name column_type [NOT NULL]". + * A raw string representing the column definitions, formatted as "column_name column_type + * [NOT NULL]". * @return * A formatted string of column definitions ready for SQL execution. * * @example - * Input: "id" Integer NOT NULL, "name" String, "tags" Array(Nullable(String))
- * Output: "id" Integer NOT NULL, "name" Nullable(String), "tags" Array(Nullable(String)) + * Input: "id" Integer NOT NULL, "name" String, "tags" Array(Nullable(String))
Output: + * "id" Integer NOT NULL, "name" Nullable(String), "tags" Array(Nullable(String)) */ private def parseColumnDefinitions(columnDefinitions: String): String = { - columnPattern.findAllMatchIn(columnDefinitions).flatMap { matchResult => - val columnName = matchResult.group(1) - val columnType = matchResult.group(2) - val notNull = matchResult.group(3) + columnPattern + .findAllMatchIn(columnDefinitions) + .flatMap { matchResult => + val columnName = matchResult.group(1) + val columnType = matchResult.group(2) + val notNull = matchResult.group(3) - if (arrayTypePattern.findFirstIn(columnType).isDefined || notNull != null) { - Some(s""""$columnName" $columnType""") - } else { - Some(s""""$columnName" Nullable($columnType)""") + if (arrayTypePattern.findFirstIn(columnType).isDefined || notNull != null) { + Some(s""""$columnName" $columnType""") + } else { + Some(s""""$columnName" Nullable($columnType)""") + } } - }.mkString(", ") + .mkString(", ") } } diff --git a/src/test/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/ClickhouseDialectTest.scala b/src/test/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/ClickhouseDialectTest.scala index f415444..52f55c2 100644 --- a/src/test/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/ClickhouseDialectTest.scala +++ b/src/test/scala/io/github/mtsongithub/doetl/sparkdialectextensions/clickhouse/ClickhouseDialectTest.scala @@ -183,15 +183,17 @@ class ClickhouseDialectTest test("read ClickHouse Int32 as Spark IntegerType") { setupTable("integerColumn Int32") - insertTestData(Seq("(-2147483648)", "(2147483647)")) // min and max values for a signed integer + insertTestData( + Seq("(-2147483648)", "(2147483647)") + ) // min and max values for a signed integer val df = spark.read - .format("jdbc") - .option("url", jdbcUrl) - .option("user", jdbcUser) - .option("password", jdbcPassword) - .option("dbtable", tableName) - .load() + .format("jdbc") + .option("url", jdbcUrl) + .option("user", jdbcUser) + .option("password", jdbcPassword) + .option("dbtable", tableName) + .load() assert(df.schema.fields.head.dataType == IntegerType) @@ -201,15 +203,17 @@ class ClickhouseDialectTest test("read ClickHouse Int64 as Spark LongType") { setupTable("longColumn Int64") - insertTestData(Seq("(-9223372036854775808)", "(9223372036854775807)")) // min and max values for a signed long + insertTestData( + Seq("(-9223372036854775808)", "(9223372036854775807)") + ) // min and max values for a signed long val df = spark.read - .format("jdbc") - .option("url", jdbcUrl) - .option("user", jdbcUser) - .option("password", jdbcPassword) - .option("dbtable", tableName) - .load() + .format("jdbc") + .option("url", jdbcUrl) + .option("user", jdbcUser) + .option("password", jdbcPassword) + .option("dbtable", tableName) + .load() assert(df.schema.fields.head.dataType == LongType) @@ -305,17 +309,20 @@ class ClickhouseDialectTest insertTestData(Seq("(0)", "(18446744073709551615)")) // min and max values val df = spark.read - .format("jdbc") - .option("url", jdbcUrl) - .option("user", jdbcUser) - .option("password", jdbcPassword) - .option("dbtable", tableName) - .load() + .format("jdbc") + .option("url", jdbcUrl) + .option("user", jdbcUser) + .option("password", jdbcPassword) + .option("dbtable", tableName) + .load() - assert(df.schema.fields.head.dataType == DecimalType(20,0)) + assert(df.schema.fields.head.dataType == DecimalType(20, 0)) val data = df.collect().map(_.getDecimal(0)).sorted - assert(data sameElements Array(new java.math.BigDecimal(0), new java.math.BigDecimal("18446744073709551615"))) + assert( + data sameElements Array( + new java.math.BigDecimal(0), + new java.math.BigDecimal("18446744073709551615"))) } test("read ClickHouse Float32 as Spark FloatType") { @@ -563,7 +570,8 @@ class ClickhouseDialectTest } test("write Spark Nullable(TimestampType) as ClickHouse Nullable(Datetime64(6))") { - val schema = StructType(Seq(StructField("nullableTimestampColumn", TimestampType, nullable = true))) + val schema = + StructType(Seq(StructField("nullableTimestampColumn", TimestampType, nullable = true))) val currentTime = new java.sql.Timestamp(System.currentTimeMillis()) val data = Seq(Row(null), Row(currentTime)) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) @@ -585,7 +593,8 @@ class ClickhouseDialectTest } test("write Spark Nullable(BooleanType) as ClickHouse Nullable(Bool)") { - val schema = StructType(Seq(StructField("nullableBooleanColumn", BooleanType, nullable = true))) + val schema = + StructType(Seq(StructField("nullableBooleanColumn", BooleanType, nullable = true))) val data = Seq(Row(null), Row(true), Row(false)) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) @@ -642,8 +651,7 @@ class ClickhouseDialectTest ( "longArrayColumn Array(UInt32)", "([1, 2, 3, 4, 5])", - ArrayType(LongType, containsNull = false)), - ) + ArrayType(LongType, containsNull = false))) val testReadArrayCasesV0_7_X = Table( ("columnDefinition", "insertedData", "expectedType"), @@ -656,7 +664,8 @@ class ClickhouseDialectTest "([1.23, 2.34, 3.45, 4.56, 5.67])", ArrayType(DecimalType(9, 2), containsNull = false))) - forAll(if (driverVersion.startsWith("0.9")) testReadArrayCasesV0_9_X else testReadArrayCasesV0_7_X) { + forAll( + if (driverVersion.startsWith("0.9")) testReadArrayCasesV0_9_X else testReadArrayCasesV0_7_X) { (columnDefinition: String, insertedData: String, expectedType: DataType) => test(s"read ClickHouse Array for ${columnDefinition} column") { setupTable(columnDefinition) @@ -728,9 +737,8 @@ class ClickhouseDialectTest ( "UInt64Column Array(UInt64)", "([1, 2, 3, 4, 5])", - ArrayType(DecimalType(20,0), containsNull = false), - "class [J cannot be cast to class [Ljava.math.BigDecimal" - ), + ArrayType(DecimalType(20, 0), containsNull = false), + "class [J cannot be cast to class [Ljava.math.BigDecimal"), // https://github.com/ClickHouse/clickhouse-java/issues/1409 ( "dateArrayColumn Array(Date)", @@ -772,12 +780,14 @@ class ClickhouseDialectTest ArrayType(DecimalType(20, 0), containsNull = false), "class [Ljava.lang.Object; cannot be cast to class [Ljava.math.BigDecimal;")) - forAll(if (driverVersion.startsWith("0.9")) testReadArrayUnsupportedCasesV0_9_X else testReadArrayUnsupportedCasesV0_7_X) { + forAll( + if (driverVersion.startsWith("0.9")) testReadArrayUnsupportedCasesV0_9_X + else testReadArrayUnsupportedCasesV0_7_X) { ( - columnDefinition: String, - insertedData: String, - expectedType: DataType, - errorMessage: String) => + columnDefinition: String, + insertedData: String, + expectedType: DataType, + errorMessage: String) => test(s"cannot read ClickHouse Array for ${columnDefinition} column") { setupTable(columnDefinition) insertTestData(Seq(insertedData)) @@ -965,7 +975,8 @@ class ClickhouseDialectTest expectedType: DataType, expectedClickhouseType: String, nullable: Boolean) => - test(s"write ClickHouse Array for ${if (nullable) s" ${columnName} Nullable" else columnName} column"){ + test( + s"write ClickHouse Array for ${if (nullable) s" ${columnName} Nullable" else columnName} column") { val schema = StructType(Array(StructField(columnName, expectedType, nullable = nullable))) val df = spark.createDataFrame(spark.sparkContext.parallelize(insertedData), schema)