From d5cc3b7f5fed8ea7e1253ebccca7deb7cff06fcc Mon Sep 17 00:00:00 2001 From: semyonsinchenko Date: Thu, 6 Jun 2024 12:17:04 +0200 Subject: [PATCH] Refactoring + license headers + scalafmt On branch 493-datasources-spark-refactoring Changes to be committed: modified: licenserc.toml modified: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala renamed: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarCommitProtocol.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala renamed: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScan.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala renamed: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScanBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala renamed: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala renamed: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarWriterBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala renamed: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/csv/CSVWriterBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala renamed: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcOutputWriter.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala renamed: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcWriteBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala renamed: maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/parquet/ParquetWriterBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala modified: maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala --- licenserc.toml | 4 +- .../graphar/datasources/GarDataSource.scala | 54 +++++++------- .../sql/graphar}/GarCommitProtocol.scala | 38 +++++----- .../sql/graphar}/GarScan.scala | 71 ++++++++----------- .../sql/graphar}/GarScanBuilder.scala | 28 ++++---- .../sql/graphar}/GarTable.scala | 36 +++++----- .../sql/graphar/GarWriteBuilder.scala} | 61 +++++++--------- .../sql/graphar/csv/CSVWriteBuilder.scala} | 39 +++++----- .../sql/graphar}/orc/OrcOutputWriter.scala | 22 +++--- .../sql/graphar}/orc/OrcWriteBuilder.scala | 39 +++++----- .../parquet/ParquetWriteBuilder.scala} | 48 ++++++------- .../graphar/datasources/GarDataSource.scala | 38 +++++----- .../spark/sql/graphar/GarCommitProtocol.scala | 28 ++++---- .../apache/spark/sql/graphar/GarScan.scala | 44 ++++++------ .../spark/sql/graphar/GarScanBuilder.scala | 18 ++--- .../apache/spark/sql/graphar/GarTable.scala | 22 +++--- .../spark/sql/graphar/GarWriteBuilder.scala | 34 +++++---- .../sql/graphar/csv/CSVWriteBuilder.scala | 28 ++++---- .../sql/graphar/orc/OrcOutputWriter.scala | 14 ++-- .../sql/graphar/orc/OrcWriteBuilder.scala | 28 ++++---- .../graphar/parquet/ParquetWriteBuilder.scala | 34 ++++----- 21 files changed, 340 insertions(+), 388 deletions(-) rename maven-projects/spark/datasources-32/src/main/scala/org/apache/{graphar/datasources => spark/sql/graphar}/GarCommitProtocol.scala (84%) rename maven-projects/spark/datasources-32/src/main/scala/org/apache/{graphar/datasources => spark/sql/graphar}/GarScan.scala (86%) rename maven-projects/spark/datasources-32/src/main/scala/org/apache/{graphar/datasources => spark/sql/graphar}/GarScanBuilder.scala (86%) rename maven-projects/spark/datasources-32/src/main/scala/org/apache/{graphar/datasources => spark/sql/graphar}/GarTable.scala (84%) rename maven-projects/spark/datasources-32/src/main/scala/org/apache/{graphar/datasources/GarWriterBuilder.scala => spark/sql/graphar/GarWriteBuilder.scala} (82%) rename maven-projects/spark/datasources-32/src/main/scala/org/apache/{graphar/datasources/csv/CSVWriterBuilder.scala => spark/sql/graphar/csv/CSVWriteBuilder.scala} (68%) rename maven-projects/spark/datasources-32/src/main/scala/org/apache/{graphar/datasources => spark/sql/graphar}/orc/OrcOutputWriter.scala (82%) rename maven-projects/spark/datasources-32/src/main/scala/org/apache/{graphar/datasources => spark/sql/graphar}/orc/OrcWriteBuilder.scala (76%) rename maven-projects/spark/datasources-32/src/main/scala/org/apache/{graphar/datasources/parquet/ParquetWriterBuilder.scala => spark/sql/graphar/parquet/ParquetWriteBuilder.scala} (80%) diff --git a/licenserc.toml b/licenserc.toml index b6e0919a2..ad2bdf77b 100644 --- a/licenserc.toml +++ b/licenserc.toml @@ -44,8 +44,8 @@ excludes = [ "cpp/apidoc", "cpp/thirdparty", "cpp/misc/cpplint.py", - "spark/datasources-32/src/main/scala/org/apache/graphar/datasources", - "spark/datasources-33/src/main/scala/org/apache/graphar/datasources", + "spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar", + "spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar", "java/src/main/java/org/apache/graphar/stdcxx/StdString.java", "java/src/main/java/org/apache/graphar/stdcxx/StdVector.java", "java/src/main/java/org/apache/graphar/stdcxx/StdSharedPtr.java", diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala index 38a3c183d..eab9b337e 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala @@ -16,24 +16,24 @@ package org.apache.graphar.datasources -import scala.collection.JavaConverters._ -import scala.util.matching.Regex -import java.util - import com.fasterxml.jackson.databind.ObjectMapper import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.catalog.{Table, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.graphar.GarTable +import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.connector.expressions.Transform + +import java.util +import scala.collection.JavaConverters._ +import scala.util.matching.Regex // Derived from Apache Spark 3.1.1 // https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -79,8 +79,8 @@ class GarDataSource extends TableProvider with DataSourceRegister { } protected def getOptionsWithoutPaths( - map: CaseInsensitiveStringMap - ): CaseInsensitiveStringMap = { + map: CaseInsensitiveStringMap + ): CaseInsensitiveStringMap = { val withoutPath = map.asCaseSensitiveMap().asScala.filterKeys { k => !k.equalsIgnoreCase("path") && !k.equalsIgnoreCase("paths") } @@ -88,9 +88,9 @@ class GarDataSource extends TableProvider with DataSourceRegister { } protected def getTableName( - map: CaseInsensitiveStringMap, - paths: Seq[String] - ): String = { + map: CaseInsensitiveStringMap, + paths: Seq[String] + ): String = { val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions( map.asCaseSensitiveMap().asScala.toMap ) @@ -101,9 +101,9 @@ class GarDataSource extends TableProvider with DataSourceRegister { } private def qualifiedPathName( - path: String, - hadoopConf: Configuration - ): String = { + path: String, + hadoopConf: Configuration + ): String = { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString @@ -149,16 +149,16 @@ class GarDataSource extends TableProvider with DataSourceRegister { } override def inferPartitioning( - options: CaseInsensitiveStringMap - ): Array[Transform] = { + options: CaseInsensitiveStringMap + ): Array[Transform] = { Array.empty } override def getTable( - schema: StructType, - partitioning: Array[Transform], - properties: util.Map[String, String] - ): Table = { + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String] + ): Table = { // If the table is already loaded during schema inference, return it directly. if (t != null) { t @@ -169,11 +169,11 @@ class GarDataSource extends TableProvider with DataSourceRegister { // Get the actual fall back file format. private def getFallbackFileFormat( - options: CaseInsensitiveStringMap - ): Class[_ <: FileFormat] = options.get("fileFormat") match { - case "csv" => classOf[CSVFileFormat] - case "orc" => classOf[OrcFileFormat] + options: CaseInsensitiveStringMap + ): Class[_ <: FileFormat] = options.get("fileFormat") match { + case "csv" => classOf[CSVFileFormat] + case "orc" => classOf[OrcFileFormat] case "parquet" => classOf[ParquetFileFormat] - case _ => throw new IllegalArgumentException + case _ => throw new IllegalArgumentException } } diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarCommitProtocol.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala similarity index 84% rename from maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarCommitProtocol.scala rename to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala index 07cff02ee..f3da5e067 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarCommitProtocol.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala @@ -17,16 +17,14 @@ // Derived from Apache Spark 3.1.1 // https://github.com/apache/spark/blob/1d550c4/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala -package org.apache.graphar.datasources +package org.apache.spark.sql.graphar import org.apache.graphar.GeneralParams - -import org.json4s._ -import org.json4s.jackson.JsonMethods._ - -import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.hadoop.mapreduce._ import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol +import org.json4s._ +import org.json4s.jackson.JsonMethods._ object GarCommitProtocol { private def binarySearchPair(aggNums: Array[Int], key: Int): (Int, Int) = { @@ -52,23 +50,23 @@ object GarCommitProtocol { } class GarCommitProtocol( - jobId: String, - path: String, - options: Map[String, String], - dynamicPartitionOverwrite: Boolean = false -) extends SQLHadoopMapReduceCommitProtocol( - jobId, - path, - dynamicPartitionOverwrite - ) - with Serializable - with Logging { + jobId: String, + path: String, + options: Map[String, String], + dynamicPartitionOverwrite: Boolean = false + ) extends SQLHadoopMapReduceCommitProtocol( + jobId, + path, + dynamicPartitionOverwrite +) + with Serializable + with Logging { // override getFilename to customize the file name override def getFilename( - taskContext: TaskAttemptContext, - ext: String - ): String = { + taskContext: TaskAttemptContext, + ext: String + ): String = { val partitionId = taskContext.getTaskAttemptID.getTaskID.getId if (options.contains(GeneralParams.offsetStartChunkIndexKey)) { // offset chunk file name, looks like chunk0 diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScan.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala similarity index 86% rename from maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScan.scala rename to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala index 4b063db78..c5ba01fda 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScan.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala @@ -17,54 +17,45 @@ // Derived from Apache Spark 3.1.1 // https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala -package org.apache.graphar.datasources - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +package org.apache.spark.sql.graphar import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.ParquetInputFormat - import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Expression, ExprUtils} import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression} import org.apache.spark.sql.connector.read.PartitionReaderFactory import org.apache.spark.sql.execution.PartitionedFileUtil -import org.apache.spark.sql.execution.datasources.{ - FilePartition, - PartitioningAwareFileIndex, - PartitionedFile -} -import org.apache.spark.sql.execution.datasources.parquet.{ - ParquetOptions, - ParquetReadSupport, - ParquetWriteSupport -} +import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetReadSupport, ParquetWriteSupport} import org.apache.spark.sql.execution.datasources.v2.FileScan -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory -import org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory import org.apache.spark.sql.execution.datasources.v2.csv.CSVPartitionReaderFactory +import org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, PartitioningAwareFileIndex} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + /** GarScan is a class to implement the file scan for GarDataSource. */ case class GarScan( - sparkSession: SparkSession, - hadoopConf: Configuration, - fileIndex: PartitioningAwareFileIndex, - dataSchema: StructType, - readDataSchema: StructType, - readPartitionSchema: StructType, - pushedFilters: Array[Filter], - options: CaseInsensitiveStringMap, - formatName: String, - partitionFilters: Seq[Expression] = Seq.empty, - dataFilters: Seq[Expression] = Seq.empty -) extends FileScan { + sparkSession: SparkSession, + hadoopConf: Configuration, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, + pushedFilters: Array[Filter], + options: CaseInsensitiveStringMap, + formatName: String, + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty + ) extends FileScan { /** The gar format is not splitable. */ override def isSplitable(path: Path): Boolean = false @@ -72,8 +63,8 @@ case class GarScan( /** Create the reader factory according to the actual file format. */ override def createReaderFactory(): PartitionReaderFactory = formatName match { - case "csv" => createCSVReaderFactory() - case "orc" => createOrcReaderFactory() + case "csv" => createCSVReaderFactory() + case "orc" => createOrcReaderFactory() case "parquet" => createParquetReaderFactory() case _ => throw new IllegalArgumentException("Invalid format name: " + formatName) @@ -232,9 +223,9 @@ case class GarScan( * chunk file in GraphAr to a single partition. */ private def getFilePartitions( - sparkSession: SparkSession, - partitionedFiles: Seq[PartitionedFile] - ): Seq[FilePartition] = { + sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile] + ): Seq[FilePartition] = { val partitions = new ArrayBuffer[FilePartition] val currentFiles = new ArrayBuffer[PartitionedFile] @@ -270,8 +261,8 @@ case class GarScan( /** Get the hash code of the object. */ override def hashCode(): Int = formatName match { - case "csv" => super.hashCode() - case "orc" => getClass.hashCode() + case "csv" => super.hashCode() + case "orc" => getClass.hashCode() case "parquet" => getClass.hashCode() case _ => throw new IllegalArgumentException("Invalid format name: " + formatName) @@ -289,8 +280,8 @@ case class GarScan( /** Construct the file scan with filters. */ override def withFilters( - partitionFilters: Seq[Expression], - dataFilters: Seq[Expression] - ): FileScan = + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression] + ): FileScan = this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) } diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScanBuilder.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala similarity index 86% rename from maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScanBuilder.scala rename to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala index 1e83c7737..bc21614c0 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScanBuilder.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala @@ -17,31 +17,30 @@ // Derived from Apache Spark 3.1.1 // https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala -package org.apache.graphar.datasources +package org.apache.spark.sql.graphar import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters} import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex - import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap import scala.collection.JavaConverters._ -import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder /** GarScanBuilder is a class to build the file scan for GarDataSource. */ case class GarScanBuilder( - sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, - schema: StructType, - dataSchema: StructType, - options: CaseInsensitiveStringMap, - formatName: String -) extends FileScanBuilder(sparkSession, fileIndex, dataSchema) - with SupportsPushDownFilters { + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + schema: StructType, + dataSchema: StructType, + options: CaseInsensitiveStringMap, + formatName: String + ) extends FileScanBuilder(sparkSession, fileIndex, dataSchema) + with SupportsPushDownFilters { lazy val hadoopConf = { val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap // Hadoop Configurations are case sensitive. @@ -49,14 +48,15 @@ case class GarScanBuilder( } private var filters: Array[Filter] = Array.empty + override def pushFilters(filters: Array[Filter]): Array[Filter] = { this.filters = filters filters } override def pushedFilters(): Array[Filter] = formatName match { - case "csv" => Array.empty[Filter] - case "orc" => pushedOrcFilters + case "csv" => Array.empty[Filter] + case "orc" => pushedOrcFilters case "parquet" => pushedParquetFilters case _ => throw new IllegalArgumentException("Invalid format name: " + formatName) diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala similarity index 84% rename from maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala rename to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala index 8aa231794..df3d51fa9 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala @@ -17,41 +17,39 @@ // Derived from Apache Spark 3.1.1 // https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala -package org.apache.graphar.datasources - -import scala.collection.JavaConverters._ +package org.apache.spark.sql.graphar import org.apache.hadoop.fs.FileStatus - import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.catalyst.csv.CSVOptions +import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.csv.CSVDataSource import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.execution.datasources.v2.FileTable +import org.apache.spark.sql.graphar.csv.CSVWriteBuilder +import org.apache.spark.sql.graphar.orc.OrcWriteBuilder +import org.apache.spark.sql.graphar.parquet.ParquetWriteBuilder import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.graphar.datasources.csv.CSVWriteBuilder -import org.apache.graphar.datasources.parquet.ParquetWriteBuilder -import org.apache.graphar.datasources.orc.OrcWriteBuilder +import scala.collection.JavaConverters._ /** GarTable is a class to represent the graph data in GraphAr as a table. */ case class GarTable( - name: String, - sparkSession: SparkSession, - options: CaseInsensitiveStringMap, - paths: Seq[String], - userSpecifiedSchema: Option[StructType], - fallbackFileFormat: Class[_ <: FileFormat] -) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat] + ) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { /** Construct a new scan builder. */ override def newScanBuilder( - options: CaseInsensitiveStringMap - ): GarScanBuilder = + options: CaseInsensitiveStringMap + ): GarScanBuilder = new GarScanBuilder( sparkSession, fileIndex, @@ -116,9 +114,9 @@ case class GarTable( case ArrayType(elementType, _) => formatName match { - case "orc" => supportsDataType(elementType) + case "orc" => supportsDataType(elementType) case "parquet" => supportsDataType(elementType) - case _ => false + case _ => false } // case MapType(keyType, valueType, _) => diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarWriterBuilder.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala similarity index 82% rename from maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarWriterBuilder.scala rename to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala index 3acd9247c..cc8bfc916 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarWriterBuilder.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala @@ -17,46 +17,33 @@ // Derived from Apache Spark 3.1.1 // https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala -package org.apache.graphar.datasources - -import java.util.UUID - -import scala.collection.JavaConverters._ +package org.apache.spark.sql.graphar import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat -import org.apache.hadoop.mapreduce.Job - -import org.apache.spark.sql.execution.datasources.OutputWriterFactory import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.connector.write.{ - BatchWrite, - LogicalWriteInfo, - WriteBuilder -} -import org.apache.spark.sql.execution.datasources.{ - BasicWriteJobStatsTracker, - DataSource, - OutputWriterFactory, - WriteJobDescription -} +import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite +import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.SerializableConfiguration -import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite -import org.apache.spark.sql.catalyst.expressions.AttributeReference + +import java.util.UUID +import scala.collection.JavaConverters._ abstract class GarWriteBuilder( - paths: Seq[String], - formatName: String, - supportsDataType: DataType => Boolean, - info: LogicalWriteInfo -) extends WriteBuilder { + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo + ) extends WriteBuilder { private val schema = info.schema() private val queryId = info.queryId() private val options = info.options() @@ -90,11 +77,11 @@ abstract class GarWriteBuilder( } def prepareWrite( - sqlConf: SQLConf, - job: Job, - options: Map[String, String], - dataSchema: StructType - ): OutputWriterFactory + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType + ): OutputWriterFactory private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = { assert(schema != null, "Missing input data schema") @@ -127,12 +114,12 @@ abstract class GarWriteBuilder( } private def createWriteJobDescription( - sparkSession: SparkSession, - hadoopConf: Configuration, - job: Job, - pathName: String, - options: Map[String, String] - ): WriteJobDescription = { + sparkSession: SparkSession, + hadoopConf: Configuration, + job: Job, + pathName: String, + options: Map[String, String] + ): WriteJobDescription = { val caseInsensitiveOptions = CaseInsensitiveMap(options) // Note: prepareWrite has side effect. It sets "job". val outputWriterFactory = diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/csv/CSVWriterBuilder.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala similarity index 68% rename from maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/csv/CSVWriterBuilder.scala rename to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala index c0a38d52f..6e9101155 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/csv/CSVWriterBuilder.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala @@ -17,35 +17,30 @@ // Derived from Apache Spark 3.1.1 // https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala -package org.apache.graphar.datasources.csv +package org.apache.spark.sql.graphar.csv import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.sql.catalyst.csv.CSVOptions import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.connector.write.LogicalWriteInfo -import org.apache.spark.sql.execution.datasources.{ - CodecStreams, - OutputWriter, - OutputWriterFactory -} import org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter +import org.apache.spark.sql.execution.datasources.{CodecStreams, OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.graphar.GarWriteBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} -import org.apache.graphar.datasources.GarWriteBuilder - class CSVWriteBuilder( - paths: Seq[String], - formatName: String, - supportsDataType: DataType => Boolean, - info: LogicalWriteInfo -) extends GarWriteBuilder(paths, formatName, supportsDataType, info) { + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo + ) extends GarWriteBuilder(paths, formatName, supportsDataType, info) { override def prepareWrite( - sqlConf: SQLConf, - job: Job, - options: Map[String, String], - dataSchema: StructType - ): OutputWriterFactory = { + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType + ): OutputWriterFactory = { val conf = job.getConfiguration val csvOptions = new CSVOptions( options, @@ -58,10 +53,10 @@ class CSVWriteBuilder( new OutputWriterFactory { override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext - ): OutputWriter = { + path: String, + dataSchema: StructType, + context: TaskAttemptContext + ): OutputWriter = { new CsvOutputWriter(path, dataSchema, context, csvOptions) } diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcOutputWriter.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala similarity index 82% rename from maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcOutputWriter.scala rename to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala index c1d2ff820..872e7ef70 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcOutputWriter.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala @@ -18,37 +18,33 @@ // we have to reimplement it here. // https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala -package org.apache.graphar.datasources.orc +package org.apache.spark.sql.graphar.orc import org.apache.hadoop.fs.Path import org.apache.hadoop.io.NullWritable import org.apache.hadoop.mapreduce.TaskAttemptContext import org.apache.orc.OrcFile -import org.apache.orc.mapred.{ - OrcOutputFormat => OrcMapRedOutputFormat, - OrcStruct -} +import org.apache.orc.mapred.{OrcStruct, OrcOutputFormat => OrcMapRedOutputFormat} import org.apache.orc.mapreduce.{OrcMapreduceRecordWriter, OrcOutputFormat} - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.OutputWriter import org.apache.spark.sql.execution.datasources.orc.{OrcSerializer, OrcUtils} import org.apache.spark.sql.types._ class OrcOutputWriter( - val path: String, - dataSchema: StructType, - context: TaskAttemptContext -) extends OutputWriter { + val path: String, + dataSchema: StructType, + context: TaskAttemptContext + ) extends OutputWriter { private[this] val serializer = new OrcSerializer(dataSchema) private val recordWriter = { val orcOutputFormat = new OrcOutputFormat[OrcStruct]() { override def getDefaultWorkFile( - context: TaskAttemptContext, - extension: String - ): Path = { + context: TaskAttemptContext, + extension: String + ): Path = { new Path(path) } } diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcWriteBuilder.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala similarity index 76% rename from maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcWriteBuilder.scala rename to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala index 9bdf796b9..9b78e1d79 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcWriteBuilder.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala @@ -17,24 +17,19 @@ // Derived from Apache Spark 3.1.1 // https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/ORCWriteBuilder.scala -package org.apache.graphar.datasources.orc +package org.apache.spark.sql.graphar.orc import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA} import org.apache.orc.mapred.OrcStruct - import org.apache.spark.sql.connector.write.LogicalWriteInfo -import org.apache.spark.sql.execution.datasources.{ - OutputWriter, - OutputWriterFactory -} import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcUtils} +import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.graphar.GarWriteBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.graphar.datasources.GarWriteBuilder - object OrcWriteBuilder { // the getQuotedSchemaString method of spark OrcFileFormat private def getQuotedSchemaString(dataType: DataType): String = @@ -53,18 +48,18 @@ object OrcWriteBuilder { } class OrcWriteBuilder( - paths: Seq[String], - formatName: String, - supportsDataType: DataType => Boolean, - info: LogicalWriteInfo -) extends GarWriteBuilder(paths, formatName, supportsDataType, info) { + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo + ) extends GarWriteBuilder(paths, formatName, supportsDataType, info) { override def prepareWrite( - sqlConf: SQLConf, - job: Job, - options: Map[String, String], - dataSchema: StructType - ): OutputWriterFactory = { + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType + ): OutputWriterFactory = { val orcOptions = new OrcOptions(options, sqlConf) val conf = job.getConfiguration @@ -84,10 +79,10 @@ class OrcWriteBuilder( new OutputWriterFactory { override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext - ): OutputWriter = { + path: String, + dataSchema: StructType, + context: TaskAttemptContext + ): OutputWriter = { new OrcOutputWriter(path, dataSchema, context) } diff --git a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/parquet/ParquetWriterBuilder.scala b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala similarity index 80% rename from maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/parquet/ParquetWriterBuilder.scala rename to maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala index 8d7feceb9..4ea2299a7 100644 --- a/maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/parquet/ParquetWriterBuilder.scala +++ b/maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala @@ -17,42 +17,36 @@ // Derived from Apache Spark 3.1.1 // https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala -package org.apache.graphar.datasources.parquet +package org.apache.spark.sql.graphar.parquet import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext} -import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil - -import org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter +import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat} import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.connector.write.LogicalWriteInfo -import org.apache.spark.sql.execution.datasources.{ - OutputWriter, - OutputWriterFactory -} import org.apache.spark.sql.execution.datasources.parquet._ +import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} +import org.apache.spark.sql.graphar.GarWriteBuilder import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.graphar.datasources.GarWriteBuilder - class ParquetWriteBuilder( - paths: Seq[String], - formatName: String, - supportsDataType: DataType => Boolean, - info: LogicalWriteInfo -) extends GarWriteBuilder(paths, formatName, supportsDataType, info) - with Logging { + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo + ) extends GarWriteBuilder(paths, formatName, supportsDataType, info) + with Logging { override def prepareWrite( - sqlConf: SQLConf, - job: Job, - options: Map[String, String], - dataSchema: StructType - ): OutputWriterFactory = { + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType + ): OutputWriterFactory = { val parquetOptions = new ParquetOptions(options, sqlConf) val conf = ContextUtil.getConfiguration(job) @@ -113,14 +107,14 @@ class ParquetWriteBuilder( // SPARK-15719: Disables writing Parquet summary files by default. if ( conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null - && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null + && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null ) { conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE) } if ( ParquetOutputFormat.getJobSummaryLevel(conf) == JobSummaryLevel.NONE - && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass) ) { // output summary is requested, but the class is not a Parquet Committer logWarning( @@ -132,10 +126,10 @@ class ParquetWriteBuilder( new OutputWriterFactory { override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext - ): OutputWriter = { + path: String, + dataSchema: StructType, + context: TaskAttemptContext + ): OutputWriter = { new ParquetOutputWriter(path, context) } diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala index b6094914f..21932d4d7 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala @@ -78,8 +78,8 @@ class GarDataSource extends TableProvider with DataSourceRegister { } protected def getOptionsWithoutPaths( - map: CaseInsensitiveStringMap - ): CaseInsensitiveStringMap = { + map: CaseInsensitiveStringMap + ): CaseInsensitiveStringMap = { val withoutPath = map.asCaseSensitiveMap().asScala.filterKeys { k => !k.equalsIgnoreCase("path") && !k.equalsIgnoreCase("paths") } @@ -87,9 +87,9 @@ class GarDataSource extends TableProvider with DataSourceRegister { } protected def getTableName( - map: CaseInsensitiveStringMap, - paths: Seq[String] - ): String = { + map: CaseInsensitiveStringMap, + paths: Seq[String] + ): String = { val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions( map.asCaseSensitiveMap().asScala.toMap ) @@ -100,9 +100,9 @@ class GarDataSource extends TableProvider with DataSourceRegister { } private def qualifiedPathName( - path: String, - hadoopConf: Configuration - ): String = { + path: String, + hadoopConf: Configuration + ): String = { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString @@ -148,16 +148,16 @@ class GarDataSource extends TableProvider with DataSourceRegister { } override def inferPartitioning( - options: CaseInsensitiveStringMap - ): Array[Transform] = { + options: CaseInsensitiveStringMap + ): Array[Transform] = { Array.empty } override def getTable( - schema: StructType, - partitioning: Array[Transform], - properties: util.Map[String, String] - ): Table = { + schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String] + ): Table = { // If the table is already loaded during schema inference, return it directly. if (t != null) { t @@ -168,11 +168,11 @@ class GarDataSource extends TableProvider with DataSourceRegister { // Get the actual fall back file format. private def getFallbackFileFormat( - options: CaseInsensitiveStringMap - ): Class[_ <: FileFormat] = options.get("fileFormat") match { - case "csv" => classOf[CSVFileFormat] - case "orc" => classOf[OrcFileFormat] + options: CaseInsensitiveStringMap + ): Class[_ <: FileFormat] = options.get("fileFormat") match { + case "csv" => classOf[CSVFileFormat] + case "orc" => classOf[OrcFileFormat] case "parquet" => classOf[ParquetFileFormat] - case _ => throw new IllegalArgumentException + case _ => throw new IllegalArgumentException } } diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala index e1f68a4e8..3722b838f 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala @@ -53,23 +53,23 @@ object GarCommitProtocol { } class GarCommitProtocol( - jobId: String, - path: String, - options: Map[String, String], - dynamicPartitionOverwrite: Boolean = false -) extends SQLHadoopMapReduceCommitProtocol( - jobId, - path, - dynamicPartitionOverwrite - ) - with Serializable - with Logging { + jobId: String, + path: String, + options: Map[String, String], + dynamicPartitionOverwrite: Boolean = false + ) extends SQLHadoopMapReduceCommitProtocol( + jobId, + path, + dynamicPartitionOverwrite +) + with Serializable + with Logging { // override getFilename to customize the file name override def getFilename( - taskContext: TaskAttemptContext, - spec: FileNameSpec - ): String = { + taskContext: TaskAttemptContext, + spec: FileNameSpec + ): String = { val partitionId = taskContext.getTaskAttemptID.getTaskID.getId if (options.contains(GeneralParams.offsetStartChunkIndexKey)) { // offset chunk file name, looks like chunk0 diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala index 0832c8584..645fdf4b3 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala @@ -44,18 +44,18 @@ import scala.jdk.CollectionConverters._ /** GarScan is a class to implement the file scan for GarDataSource. */ case class GarScan( - sparkSession: SparkSession, - hadoopConf: Configuration, - fileIndex: PartitioningAwareFileIndex, - dataSchema: StructType, - readDataSchema: StructType, - readPartitionSchema: StructType, - pushedFilters: Array[Filter], - options: CaseInsensitiveStringMap, - formatName: String, - partitionFilters: Seq[Expression] = Seq.empty, - dataFilters: Seq[Expression] = Seq.empty -) extends FileScan { + sparkSession: SparkSession, + hadoopConf: Configuration, + fileIndex: PartitioningAwareFileIndex, + dataSchema: StructType, + readDataSchema: StructType, + readPartitionSchema: StructType, + pushedFilters: Array[Filter], + options: CaseInsensitiveStringMap, + formatName: String, + partitionFilters: Seq[Expression] = Seq.empty, + dataFilters: Seq[Expression] = Seq.empty + ) extends FileScan { /** The gar format is not splitable. */ override def isSplitable(path: Path): Boolean = false @@ -63,8 +63,8 @@ case class GarScan( /** Create the reader factory according to the actual file format. */ override def createReaderFactory(): PartitionReaderFactory = formatName match { - case "csv" => createCSVReaderFactory() - case "orc" => createOrcReaderFactory() + case "csv" => createCSVReaderFactory() + case "orc" => createOrcReaderFactory() case "parquet" => createParquetReaderFactory() case _ => throw new IllegalArgumentException("Invalid format name: " + formatName) @@ -233,9 +233,9 @@ case class GarScan( * chunk file in GraphAr to a single partition. */ private def getFilePartitions( - sparkSession: SparkSession, - partitionedFiles: Seq[PartitionedFile] - ): Seq[FilePartition] = { + sparkSession: SparkSession, + partitionedFiles: Seq[PartitionedFile] + ): Seq[FilePartition] = { val partitions = new ArrayBuffer[FilePartition] val currentFiles = new ArrayBuffer[PartitionedFile] @@ -271,8 +271,8 @@ case class GarScan( /** Get the hash code of the object. */ override def hashCode(): Int = formatName match { - case "csv" => super.hashCode() - case "orc" => getClass.hashCode() + case "csv" => super.hashCode() + case "orc" => getClass.hashCode() case "parquet" => getClass.hashCode() case _ => throw new IllegalArgumentException("Invalid format name: " + formatName) @@ -290,8 +290,8 @@ case class GarScan( /** Construct the file scan with filters. */ def withFilters( - partitionFilters: Seq[Expression], - dataFilters: Seq[Expression] - ): FileScan = + partitionFilters: Seq[Expression], + dataFilters: Seq[Expression] + ): FileScan = this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters) } diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala index 94fe57525..71ec67794 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala @@ -33,13 +33,13 @@ import scala.collection.JavaConverters._ /** GarScanBuilder is a class to build the file scan for GarDataSource. */ case class GarScanBuilder( - sparkSession: SparkSession, - fileIndex: PartitioningAwareFileIndex, - schema: StructType, - dataSchema: StructType, - options: CaseInsensitiveStringMap, - formatName: String -) extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { + sparkSession: SparkSession, + fileIndex: PartitioningAwareFileIndex, + schema: StructType, + dataSchema: StructType, + options: CaseInsensitiveStringMap, + formatName: String + ) extends FileScanBuilder(sparkSession, fileIndex, dataSchema) { lazy val hadoopConf = { val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap // Hadoop Configurations are case sensitive. @@ -51,8 +51,8 @@ case class GarScanBuilder( override def pushDataFilters(dataFilters: Array[Filter]): Array[Filter] = { this.filters = dataFilters formatName match { - case "csv" => Array.empty[Filter] - case "orc" => pushedOrcFilters + case "csv" => Array.empty[Filter] + case "orc" => pushedOrcFilters case "parquet" => pushedParquetFilters case _ => throw new IllegalArgumentException("Invalid format name: " + formatName) diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala index acf4943cf..df3d51fa9 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala @@ -38,18 +38,18 @@ import scala.collection.JavaConverters._ /** GarTable is a class to represent the graph data in GraphAr as a table. */ case class GarTable( - name: String, - sparkSession: SparkSession, - options: CaseInsensitiveStringMap, - paths: Seq[String], - userSpecifiedSchema: Option[StructType], - fallbackFileFormat: Class[_ <: FileFormat] -) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat] + ) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { /** Construct a new scan builder. */ override def newScanBuilder( - options: CaseInsensitiveStringMap - ): GarScanBuilder = + options: CaseInsensitiveStringMap + ): GarScanBuilder = new GarScanBuilder( sparkSession, fileIndex, @@ -114,9 +114,9 @@ case class GarTable( case ArrayType(elementType, _) => formatName match { - case "orc" => supportsDataType(elementType) + case "orc" => supportsDataType(elementType) case "parquet" => supportsDataType(elementType) - case _ => false + case _ => false } // case MapType(keyType, valueType, _) => diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala index 94b070f19..4179b404b 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat -import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.execution.datasources.OutputWriterFactory import org.apache.spark.sql.SparkSession @@ -41,7 +40,6 @@ import org.apache.spark.sql.connector.write.{ import org.apache.spark.sql.execution.datasources.{ BasicWriteJobStatsTracker, DataSource, - OutputWriterFactory, WriteJobDescription } import org.apache.spark.sql.execution.metric.SQLMetric @@ -52,11 +50,11 @@ import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite import org.apache.spark.sql.catalyst.expressions.AttributeReference abstract class GarWriteBuilder( - paths: Seq[String], - formatName: String, - supportsDataType: DataType => Boolean, - info: LogicalWriteInfo -) extends WriteBuilder { + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo + ) extends WriteBuilder { private val schema = info.schema() private val queryId = info.queryId() private val options = info.options() @@ -90,11 +88,11 @@ abstract class GarWriteBuilder( } def prepareWrite( - sqlConf: SQLConf, - job: Job, - options: Map[String, String], - dataSchema: StructType - ): OutputWriterFactory + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType + ): OutputWriterFactory private def validateInputs(caseSensitiveAnalysis: Boolean): Unit = { assert(schema != null, "Missing input data schema") @@ -127,12 +125,12 @@ abstract class GarWriteBuilder( } private def createWriteJobDescription( - sparkSession: SparkSession, - hadoopConf: Configuration, - job: Job, - pathName: String, - options: Map[String, String] - ): WriteJobDescription = { + sparkSession: SparkSession, + hadoopConf: Configuration, + job: Job, + pathName: String, + options: Map[String, String] + ): WriteJobDescription = { val caseInsensitiveOptions = CaseInsensitiveMap(options) // Note: prepareWrite has side effect. It sets "job". val outputWriterFactory = diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala index 7fe3066b8..c24f12a14 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala @@ -30,17 +30,17 @@ import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.graphar.GarWriteBuilder class CSVWriteBuilder( - paths: Seq[String], - formatName: String, - supportsDataType: DataType => Boolean, - info: LogicalWriteInfo -) extends GarWriteBuilder(paths, formatName, supportsDataType, info) { + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo + ) extends GarWriteBuilder(paths, formatName, supportsDataType, info) { override def prepareWrite( - sqlConf: SQLConf, - job: Job, - options: Map[String, String], - dataSchema: StructType - ): OutputWriterFactory = { + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType + ): OutputWriterFactory = { val conf = job.getConfiguration val csvOptions = new CSVOptions( options, @@ -53,10 +53,10 @@ class CSVWriteBuilder( new OutputWriterFactory { override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext - ): OutputWriter = { + path: String, + dataSchema: StructType, + context: TaskAttemptContext + ): OutputWriter = { new CsvOutputWriter(path, dataSchema, context, csvOptions) } diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala index ccc7a48e1..5dd21f5a3 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala @@ -36,19 +36,19 @@ import org.apache.spark.sql.execution.datasources.orc.{OrcSerializer, OrcUtils} import org.apache.spark.sql.types._ class OrcOutputWriter( - val path: String, - dataSchema: StructType, - context: TaskAttemptContext -) extends OutputWriter { + val path: String, + dataSchema: StructType, + context: TaskAttemptContext + ) extends OutputWriter { private[this] val serializer = new OrcSerializer(dataSchema) private val recordWriter = { val orcOutputFormat = new OrcOutputFormat[OrcStruct]() { override def getDefaultWorkFile( - context: TaskAttemptContext, - extension: String - ): Path = { + context: TaskAttemptContext, + extension: String + ): Path = { new Path(path) } } diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala index 287162f8e..3a3636888 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala @@ -53,18 +53,18 @@ object OrcWriteBuilder { } class OrcWriteBuilder( - paths: Seq[String], - formatName: String, - supportsDataType: DataType => Boolean, - info: LogicalWriteInfo -) extends GarWriteBuilder(paths, formatName, supportsDataType, info) { + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo + ) extends GarWriteBuilder(paths, formatName, supportsDataType, info) { override def prepareWrite( - sqlConf: SQLConf, - job: Job, - options: Map[String, String], - dataSchema: StructType - ): OutputWriterFactory = { + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType + ): OutputWriterFactory = { val orcOptions = new OrcOptions(options, sqlConf) val conf = job.getConfiguration @@ -84,10 +84,10 @@ class OrcWriteBuilder( new OutputWriterFactory { override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext - ): OutputWriter = { + path: String, + dataSchema: StructType, + context: TaskAttemptContext + ): OutputWriter = { new OrcOutputWriter(path, dataSchema, context) } diff --git a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala index 8e53dc5f8..b9e2726e5 100644 --- a/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala +++ b/maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala @@ -39,19 +39,19 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.graphar.GarWriteBuilder class ParquetWriteBuilder( - paths: Seq[String], - formatName: String, - supportsDataType: DataType => Boolean, - info: LogicalWriteInfo -) extends GarWriteBuilder(paths, formatName, supportsDataType, info) - with Logging { + paths: Seq[String], + formatName: String, + supportsDataType: DataType => Boolean, + info: LogicalWriteInfo + ) extends GarWriteBuilder(paths, formatName, supportsDataType, info) + with Logging { override def prepareWrite( - sqlConf: SQLConf, - job: Job, - options: Map[String, String], - dataSchema: StructType - ): OutputWriterFactory = { + sqlConf: SQLConf, + job: Job, + options: Map[String, String], + dataSchema: StructType + ): OutputWriterFactory = { val parquetOptions = new ParquetOptions(options, sqlConf) val conf = ContextUtil.getConfiguration(job) @@ -118,14 +118,14 @@ class ParquetWriteBuilder( // SPARK-15719: Disables writing Parquet summary files by default. if ( conf.get(ParquetOutputFormat.JOB_SUMMARY_LEVEL) == null - && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null + && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null ) { conf.setEnum(ParquetOutputFormat.JOB_SUMMARY_LEVEL, JobSummaryLevel.NONE) } if ( ParquetOutputFormat.getJobSummaryLevel(conf) == JobSummaryLevel.NONE - && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass) + && !classOf[ParquetOutputCommitter].isAssignableFrom(committerClass) ) { // output summary is requested, but the class is not a Parquet Committer logWarning( @@ -137,10 +137,10 @@ class ParquetWriteBuilder( new OutputWriterFactory { override def newInstance( - path: String, - dataSchema: StructType, - context: TaskAttemptContext - ): OutputWriter = { + path: String, + dataSchema: StructType, + context: TaskAttemptContext + ): OutputWriter = { new ParquetOutputWriter(path, context) }