Skip to content

Commit

Permalink
Refactoring + license headers + scalafmt
Browse files Browse the repository at this point in the history
 On branch 493-datasources-spark-refactoring
 Changes to be committed:
	modified:   licenserc.toml
	modified:   maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
	renamed:    maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarCommitProtocol.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala
	renamed:    maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScan.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
	renamed:    maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarScanBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
	renamed:    maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarTable.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
	renamed:    maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/GarWriterBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala
	renamed:    maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/csv/CSVWriterBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala
	renamed:    maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcOutputWriter.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala
	renamed:    maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/orc/OrcWriteBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala
	renamed:    maven-projects/spark/datasources-32/src/main/scala/org/apache/graphar/datasources/parquet/ParquetWriterBuilder.scala -> maven-projects/spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/graphar/datasources/GarDataSource.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarCommitProtocol.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScan.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarScanBuilder.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarTable.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/GarWriteBuilder.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/csv/CSVWriteBuilder.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcOutputWriter.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/orc/OrcWriteBuilder.scala
	modified:   maven-projects/spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar/parquet/ParquetWriteBuilder.scala
  • Loading branch information
SemyonSinchenko committed Jun 6, 2024
1 parent 6b484b4 commit d5cc3b7
Show file tree
Hide file tree
Showing 21 changed files with 340 additions and 388 deletions.
4 changes: 2 additions & 2 deletions licenserc.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ excludes = [
"cpp/apidoc",
"cpp/thirdparty",
"cpp/misc/cpplint.py",
"spark/datasources-32/src/main/scala/org/apache/graphar/datasources",
"spark/datasources-33/src/main/scala/org/apache/graphar/datasources",
"spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar",
"spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar",
"java/src/main/java/org/apache/graphar/stdcxx/StdString.java",
"java/src/main/java/org/apache/graphar/stdcxx/StdVector.java",
"java/src/main/java/org/apache/graphar/stdcxx/StdSharedPtr.java",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@

package org.apache.graphar.datasources

import scala.collection.JavaConverters._
import scala.util.matching.Regex
import java.util

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.graphar.GarTable
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.connector.expressions.Transform

import java.util
import scala.collection.JavaConverters._
import scala.util.matching.Regex

// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
Expand Down Expand Up @@ -79,18 +79,18 @@ class GarDataSource extends TableProvider with DataSourceRegister {
}

protected def getOptionsWithoutPaths(
map: CaseInsensitiveStringMap
): CaseInsensitiveStringMap = {
map: CaseInsensitiveStringMap
): CaseInsensitiveStringMap = {
val withoutPath = map.asCaseSensitiveMap().asScala.filterKeys { k =>
!k.equalsIgnoreCase("path") && !k.equalsIgnoreCase("paths")
}
new CaseInsensitiveStringMap(withoutPath.toMap.asJava)
}

protected def getTableName(
map: CaseInsensitiveStringMap,
paths: Seq[String]
): String = {
map: CaseInsensitiveStringMap,
paths: Seq[String]
): String = {
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(
map.asCaseSensitiveMap().asScala.toMap
)
Expand All @@ -101,9 +101,9 @@ class GarDataSource extends TableProvider with DataSourceRegister {
}

private def qualifiedPathName(
path: String,
hadoopConf: Configuration
): String = {
path: String,
hadoopConf: Configuration
): String = {
val hdfsPath = new Path(path)
val fs = hdfsPath.getFileSystem(hadoopConf)
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
Expand Down Expand Up @@ -149,16 +149,16 @@ class GarDataSource extends TableProvider with DataSourceRegister {
}

override def inferPartitioning(
options: CaseInsensitiveStringMap
): Array[Transform] = {
options: CaseInsensitiveStringMap
): Array[Transform] = {
Array.empty
}

override def getTable(
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]
): Table = {
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]
): Table = {
// If the table is already loaded during schema inference, return it directly.
if (t != null) {
t
Expand All @@ -169,11 +169,11 @@ class GarDataSource extends TableProvider with DataSourceRegister {

// Get the actual fall back file format.
private def getFallbackFileFormat(
options: CaseInsensitiveStringMap
): Class[_ <: FileFormat] = options.get("fileFormat") match {
case "csv" => classOf[CSVFileFormat]
case "orc" => classOf[OrcFileFormat]
options: CaseInsensitiveStringMap
): Class[_ <: FileFormat] = options.get("fileFormat") match {
case "csv" => classOf[CSVFileFormat]
case "orc" => classOf[OrcFileFormat]
case "parquet" => classOf[ParquetFileFormat]
case _ => throw new IllegalArgumentException
case _ => throw new IllegalArgumentException
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

package org.apache.graphar.datasources
package org.apache.spark.sql.graphar

import org.apache.graphar.GeneralParams

import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.hadoop.mapreduce._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.json4s._
import org.json4s.jackson.JsonMethods._

object GarCommitProtocol {
private def binarySearchPair(aggNums: Array[Int], key: Int): (Int, Int) = {
Expand All @@ -52,23 +50,23 @@ object GarCommitProtocol {
}

class GarCommitProtocol(
jobId: String,
path: String,
options: Map[String, String],
dynamicPartitionOverwrite: Boolean = false
) extends SQLHadoopMapReduceCommitProtocol(
jobId,
path,
dynamicPartitionOverwrite
)
with Serializable
with Logging {
jobId: String,
path: String,
options: Map[String, String],
dynamicPartitionOverwrite: Boolean = false
) extends SQLHadoopMapReduceCommitProtocol(
jobId,
path,
dynamicPartitionOverwrite
)
with Serializable
with Logging {

// override getFilename to customize the file name
override def getFilename(
taskContext: TaskAttemptContext,
ext: String
): String = {
taskContext: TaskAttemptContext,
ext: String
): String = {
val partitionId = taskContext.getTaskAttemptID.getTaskID.getId
if (options.contains(GeneralParams.offsetStartChunkIndexKey)) {
// offset chunk file name, looks like chunk0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,63 +17,54 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

package org.apache.graphar.datasources

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
package org.apache.spark.sql.graphar

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetInputFormat

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Expression, ExprUtils}
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression}
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{
FilePartition,
PartitioningAwareFileIndex,
PartitionedFile
}
import org.apache.spark.sql.execution.datasources.parquet.{
ParquetOptions,
ParquetReadSupport,
ParquetWriteSupport
}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetReadSupport, ParquetWriteSupport}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.csv.CSVPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, PartitioningAwareFileIndex}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

/** GarScan is a class to implement the file scan for GarDataSource. */
case class GarScan(
sparkSession: SparkSession,
hadoopConf: Configuration,
fileIndex: PartitioningAwareFileIndex,
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
pushedFilters: Array[Filter],
options: CaseInsensitiveStringMap,
formatName: String,
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty
) extends FileScan {
sparkSession: SparkSession,
hadoopConf: Configuration,
fileIndex: PartitioningAwareFileIndex,
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
pushedFilters: Array[Filter],
options: CaseInsensitiveStringMap,
formatName: String,
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty
) extends FileScan {

/** The gar format is not splitable. */
override def isSplitable(path: Path): Boolean = false

/** Create the reader factory according to the actual file format. */
override def createReaderFactory(): PartitionReaderFactory =
formatName match {
case "csv" => createCSVReaderFactory()
case "orc" => createOrcReaderFactory()
case "csv" => createCSVReaderFactory()
case "orc" => createOrcReaderFactory()
case "parquet" => createParquetReaderFactory()
case _ =>
throw new IllegalArgumentException("Invalid format name: " + formatName)
Expand Down Expand Up @@ -232,9 +223,9 @@ case class GarScan(
* chunk file in GraphAr to a single partition.
*/
private def getFilePartitions(
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile]
): Seq[FilePartition] = {
sparkSession: SparkSession,
partitionedFiles: Seq[PartitionedFile]
): Seq[FilePartition] = {
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]

Expand Down Expand Up @@ -270,8 +261,8 @@ case class GarScan(

/** Get the hash code of the object. */
override def hashCode(): Int = formatName match {
case "csv" => super.hashCode()
case "orc" => getClass.hashCode()
case "csv" => super.hashCode()
case "orc" => getClass.hashCode()
case "parquet" => getClass.hashCode()
case _ =>
throw new IllegalArgumentException("Invalid format name: " + formatName)
Expand All @@ -289,8 +280,8 @@ case class GarScan(

/** Construct the file scan with filters. */
override def withFilters(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]
): FileScan =
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]
): FileScan =
this.copy(partitionFilters = partitionFilters, dataFilters = dataFilters)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,46 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala

package org.apache.graphar.datasources
package org.apache.spark.sql.graphar

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex

import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import scala.collection.JavaConverters._
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder

/** GarScanBuilder is a class to build the file scan for GarDataSource. */
case class GarScanBuilder(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
schema: StructType,
dataSchema: StructType,
options: CaseInsensitiveStringMap,
formatName: String
) extends FileScanBuilder(sparkSession, fileIndex, dataSchema)
with SupportsPushDownFilters {
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
schema: StructType,
dataSchema: StructType,
options: CaseInsensitiveStringMap,
formatName: String
) extends FileScanBuilder(sparkSession, fileIndex, dataSchema)
with SupportsPushDownFilters {
lazy val hadoopConf = {
val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
// Hadoop Configurations are case sensitive.
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
}

private var filters: Array[Filter] = Array.empty

override def pushFilters(filters: Array[Filter]): Array[Filter] = {
this.filters = filters
filters
}

override def pushedFilters(): Array[Filter] = formatName match {
case "csv" => Array.empty[Filter]
case "orc" => pushedOrcFilters
case "csv" => Array.empty[Filter]
case "orc" => pushedOrcFilters
case "parquet" => pushedParquetFilters
case _ =>
throw new IllegalArgumentException("Invalid format name: " + formatName)
Expand Down
Loading

0 comments on commit d5cc3b7

Please sign in to comment.