Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Apr 15, 2022
1 parent 197e95f commit 4c2e920
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import java.nio.file.Files

import com.intel.oap.tpc.util.TPCRunner
import org.apache.log4j.{Level, LogManager}

import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.ColumnarShuffleExchangeExec
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.PackageAccessor

class PayloadSuite extends QueryTest with SharedSparkSession {

Expand Down Expand Up @@ -75,20 +77,29 @@ class PayloadSuite extends QueryTest with SharedSparkSession {
val lfile = Files.createTempFile("", ".parquet").toFile
lfile.deleteOnExit()
lPath = lfile.getAbsolutePath
spark.range(2).select(col("id"), expr("1").as("kind"),
expr("1").as("key"),
expr("array(1, 2)").as("arr_field"),
expr("array(array(1, 2), array(3, 4))").as("arr_arr_field"),
expr("array(struct(1, 2), struct(1, 2))").as("arr_struct_field"),
expr("array(map(1, 2), map(3,4))").as("arr_map_field"),
expr("struct(1, 2)").as("struct_field"),
expr("struct(1, struct(1, 2))").as("struct_struct_field"),
expr("struct(1, array(1, 2))").as("struct_array_field"),
expr("map(1, 2)").as("map_field"),
expr("map(1, map(3,4))").as("map_map_field"),
expr("map(1, array(1, 2))").as("map_arr_field"),
expr("map(struct(1, 2), 2)").as("map_struct_field"))
.coalesce(1)
val dfl = spark
.range(2)
.select(
col("id"),
expr("1").as("kind"),
expr("1").as("key"),
expr("array(1, 2)").as("arr_field"),
expr("array(\"hello\", \"world\")").as("arr_str_field"),
expr("array(array(1, 2), array(3, 4))").as("arr_arr_field"),
expr("array(struct(1, 2), struct(1, 2))").as("arr_struct_field"),
expr("array(map(1, 2), map(3,4))").as("arr_map_field"),
expr("struct(1, 2)").as("struct_field"),
expr("struct(1, struct(1, 2))").as("struct_struct_field"),
expr("struct(1, array(1, 2))").as("struct_array_field"),
expr("map(1, 2)").as("map_field"),
expr("map(1, map(3,4))").as("map_map_field"),
expr("map(1, array(1, 2))").as("map_arr_field"),
expr("map(struct(1, 2), 2)").as("map_struct_field"))

// Arrow scan doesn't support converting from non-null nested type to nullable as of now
val dflNullable = dfl.sqlContext.createDataFrame(dfl.rdd, PackageAccessor.asNullable(dfl.schema))

dflNullable.coalesce(1)
.write
.format("parquet")
.mode("overwrite")
Expand All @@ -97,11 +108,19 @@ class PayloadSuite extends QueryTest with SharedSparkSession {
val rfile = Files.createTempFile("", ".parquet").toFile
rfile.deleteOnExit()
rPath = rfile.getAbsolutePath
spark.range(2).select(col("id"), expr("id % 2").as("kind"),
expr("id % 2").as("key"),
expr("array(1, 2)").as("arr_field"),
expr("struct(1, 2)").as("struct_field"))
.coalesce(1)

val dfr = spark.range(2)
.select(
col("id"),
expr("id % 2").as("kind"),
expr("id % 2").as("key"),
expr("array(1, 2)").as("arr_field"),
expr("struct(1, 2)").as("struct_field"))

// Arrow scan doesn't support converting from non-null nested type to nullable as of now
val dfrNullable = dfr.sqlContext.createDataFrame(dfr.rdd, PackageAccessor.asNullable(dfr.schema))

dfrNullable.coalesce(1)
.write
.format("parquet")
.mode("overwrite")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,28 @@ class ComplexTypeSuite extends QueryTest with SharedSparkSession {
val lfile = Files.createTempFile("", ".parquet").toFile
lfile.deleteOnExit()
lPath = lfile.getAbsolutePath
spark.range(2).select(col("id"), expr("1").as("kind"),
expr("array(1, 2)").as("arr_field"),
expr("array(\"hello\", \"world\")").as("arr_str_field"),
expr("array(array(1, 2), array(3, 4))").as("arr_arr_field"),
expr("array(struct(1, 2), struct(1, 2))").as("arr_struct_field"),
expr("array(map(1, 2), map(3,4))").as("arr_map_field"),
expr("struct(1, 2)").as("struct_field"),
expr("struct(1, struct(1, 2))").as("struct_struct_field"),
expr("struct(1, array(1, 2))").as("struct_array_field"),
expr("map(1, 2)").as("map_field"),
expr("map(1, map(3,4))").as("map_map_field"),
expr("map(1, array(1, 2))").as("map_arr_field"),
expr("map(struct(1, 2), 2)").as("map_struct_field"))
.coalesce(1)
val dfl = spark
.range(2)
.select(
col("id"),
expr("1").as("kind"),
expr("array(1, 2)").as("arr_field"),
expr("array(\"hello\", \"world\")").as("arr_str_field"),
expr("array(array(1, 2), array(3, 4))").as("arr_arr_field"),
expr("array(struct(1, 2), struct(1, 2))").as("arr_struct_field"),
expr("array(map(1, 2), map(3,4))").as("arr_map_field"),
expr("struct(1, 2)").as("struct_field"),
expr("struct(1, struct(1, 2))").as("struct_struct_field"),
expr("struct(1, array(1, 2))").as("struct_array_field"),
expr("map(1, 2)").as("map_field"),
expr("map(1, map(3,4))").as("map_map_field"),
expr("map(1, array(1, 2))").as("map_arr_field"),
expr("map(struct(1, 2), 2)").as("map_struct_field"))

// Arrow scan doesn't support converting from non-null nested type to nullable as of now
val dflNullable = dfl.sqlContext.createDataFrame(dfl.rdd, dfl.schema.asNullable)

dflNullable.coalesce(1)
.write
.format("parquet")
.mode("overwrite")
Expand All @@ -95,10 +103,18 @@ class ComplexTypeSuite extends QueryTest with SharedSparkSession {
val rfile = Files.createTempFile("", ".parquet").toFile
rfile.deleteOnExit()
rPath = rfile.getAbsolutePath
spark.range(2).select(col("id"), expr("id % 2").as("kind"),
expr("array(1, 2)").as("arr_field"),
expr("struct(1, 2)").as("struct_field"))
.coalesce(1)

val dfr = spark.range(2)
.select(
col("id"),
expr("id % 2").as("kind"),
expr("array(1, 2)").as("arr_field"),
expr("struct(1, 2)").as("struct_field"))

// Arrow scan doesn't support converting from non-null nested type to nullable as of now
val dfrNullable = dfr.sqlContext.createDataFrame(dfr.rdd, dfr.schema.asNullable)

dfrNullable.coalesce(1)
.write
.format("parquet")
.mode("overwrite")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import org.apache.spark.sql.types.StructType

object PackageAccessor {
def asNullable(schema: StructType): StructType = {
schema.asNullable
}
}

0 comments on commit 4c2e920

Please sign in to comment.