diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala index f70b500f974a..83b665853c53 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/PostgresIntegrationSuite.scala @@ -107,24 +107,46 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT connection.prepareStatement("CREATE TABLE array_timestamptz (col timestamptz[])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_int VALUES (array[array[10]])").executeUpdate() - connection.prepareStatement("INSERT INTO array_bigint VALUES (array[array[10]])") + connection.prepareStatement("INSERT INTO array_int VALUES (array[10]), (array[array[10]])") .executeUpdate() - connection.prepareStatement("INSERT INTO array_smallint VALUES (array[array[10]])") - .executeUpdate() - connection.prepareStatement("INSERT INTO array_boolean VALUES (array[array[true]])") - .executeUpdate() - connection.prepareStatement("INSERT INTO array_float VALUES (array[array[10.5]])") - .executeUpdate() - connection.prepareStatement("INSERT INTO array_double VALUES (array[array[10.1]])") - .executeUpdate() - connection.prepareStatement("INSERT INTO array_timestamp VALUES (" + - "array[array['2022-01-01 09:15'::timestamp]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_bigint VALUES (array[10]), " + + "(array[array[10]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_smallint VALUES (array[10]), " + + "(array[array[10]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_boolean VALUES (array[true]), " + + "(array[array[true]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_float VALUES (array[10.5]), " + + "(array[array[10.5]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_double VALUES (array[10.1]), " + + "(array[array[10.1]])").executeUpdate() + connection.prepareStatement("INSERT INTO array_timestamp VALUES " + + "(array['2022-01-01 09:15'::timestamp]), " + + "(array[array['2022-01-01 09:15'::timestamp]])").executeUpdate() connection.prepareStatement("INSERT INTO array_timestamptz VALUES " + + "(array['2022-01-01 09:15'::timestamptz]), " + "(array[array['2022-01-01 09:15'::timestamptz]])").executeUpdate() connection.prepareStatement( "CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)") .executeUpdate() + + connection.prepareStatement("CREATE TABLE array_of_int (col int[])") + .executeUpdate() + connection.prepareStatement("INSERT INTO array_of_int " + + "VALUES (array[1])").executeUpdate() + connection.prepareStatement("CREATE TABLE ctas_array_of_int " + + "AS SELECT * FROM array_of_int").executeUpdate() + + connection.prepareStatement("CREATE TABLE array_of_array_of_int (col int[][])") + .executeUpdate() + connection.prepareStatement("INSERT INTO array_of_array_of_int " + + "VALUES (array[array[1],array[2]])").executeUpdate() + connection.prepareStatement("CREATE TABLE ctas_array_of_array_of_int " + + "AS SELECT * FROM array_of_array_of_int").executeUpdate() + + connection.prepareStatement("CREATE TABLE unsupported_array_of_array_of_int (col int[][])") + .executeUpdate() + connection.prepareStatement("INSERT INTO unsupported_array_of_array_of_int " + + "VALUES (array[array[1],array[2]]), (array[3])").executeUpdate() } test("Test multi-dimensional column types") { @@ -302,4 +324,34 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT assert(rows10(0).getString(0) === "amy") assert(rows10(1).getString(0) === "alex") } + + test("Test reading 2d array from table created via CTAS command - positive test") { + val dfNoCTASTable = sql(s"SELECT * FROM $catalogName.array_of_int") + val dfWithCTASTable = sql(s"SELECT * FROM $catalogName.ctas_array_of_int") + + checkAnswer(dfWithCTASTable, dfNoCTASTable.collect()) + } + + test("Test reading 2d array from table created via CTAS command - negative test") { + val dfNoCTASTable = sql(s"SELECT * FROM $catalogName.array_of_int") + + checkError( + exception = intercept[org.apache.spark.SparkSQLException] { + // This should fail as only 1D CTAS tables are supported + sql(s"SELECT * FROM $catalogName.ctas_array_of_array_of_int").collect() + }, + condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH", + parameters = Map("pos" -> "0", "type" -> "\"ARRAY\"") + ) + } + + test("Test reading multiple dimension array from table created via CTAS command") { + checkError( + exception = intercept[org.apache.spark.SparkSQLException] { + sql(s"SELECT * FROM $catalogName.unsupported_array_of_array_of_int").collect() + }, + condition = "COLUMN_ARRAY_ELEMENT_TYPE_MISMATCH", + parameters = Map("pos" -> "0", "type" -> "\"ARRAY>\"") + ) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala index 1265550b3f19..04a62298a49f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala @@ -393,7 +393,9 @@ private case class PostgresDialect() try { Using.resource(conn.createStatement()) { stmt => Using.resource(stmt.executeQuery(query)) { rs => - if (rs.next()) metadata.putLong("arrayDimension", rs.getLong(1)) + // Metadata can return 0 for CTAS tables. For such tables, we are always reading + // them as 1D array + if (rs.next()) metadata.putLong("arrayDimension", Math.max(1L, rs.getLong(1))) } } } catch {