Skip to content

Commit

Permalink
feat(spark): Refactoring datasources (#514)
Browse files Browse the repository at this point in the history
### Reason for this PR
By moving datasources under `org.apache.spark.sql` we are able to access private Spark API. Last time when I was trying to fully migrate datasources to V2 it was a blocker. Detailed motivation is in #493 

### What changes are included in this PR?
Mostly refactoring.

### Are these changes tested?
Unit tests are passed

I manually checked the generated JARs:
![image](https://github.com/apache/incubator-graphar/assets/29755009/1b094516-88b1-490a-a2ea-8dcd092a3b1d)

### Are there any user-facing changes?
Mostly not because `GarDataSource` was left under the same package.


Close #493
  • Loading branch information
SemyonSinchenko authored Jun 7, 2024
1 parent 8b315a7 commit d288b7e
Show file tree
Hide file tree
Showing 21 changed files with 86 additions and 110 deletions.
2 changes: 2 additions & 0 deletions licenserc.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ excludes = [
"cpp/thirdparty",
"cpp/misc/cpplint.py",
"spark/datasources-32/src/main/scala/org/apache/graphar/datasources",
"spark/datasources-32/src/main/scala/org/apache/spark/sql/graphar",
"spark/datasources-33/src/main/scala/org/apache/graphar/datasources",
"spark/datasources-33/src/main/scala/org/apache/spark/sql/graphar",
"java/src/main/java/org/apache/graphar/stdcxx/StdString.java",
"java/src/main/java/org/apache/graphar/stdcxx/StdVector.java",
"java/src/main/java/org/apache/graphar/stdcxx/StdSharedPtr.java",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,24 @@

package org.apache.graphar.datasources

import scala.collection.JavaConverters._
import scala.util.matching.Regex
import java.util

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.graphar.GarTable
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.connector.expressions.Transform

import java.util
import scala.collection.JavaConverters._
import scala.util.matching.Regex

// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

package org.apache.graphar.datasources
package org.apache.spark.sql.graphar

import org.apache.graphar.GeneralParams

import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.hadoop.mapreduce._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.json4s._
import org.json4s.jackson.JsonMethods._

object GarCommitProtocol {
private def binarySearchPair(aggNums: Array[Int], key: Int): (Int, Int) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,39 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala

package org.apache.graphar.datasources

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
package org.apache.spark.sql.graphar

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetInputFormat

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Expression, ExprUtils}
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression}
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.PartitionedFileUtil
import org.apache.spark.sql.execution.datasources.{
FilePartition,
PartitioningAwareFileIndex,
PartitionedFile
}
import org.apache.spark.sql.execution.datasources.parquet.{
ParquetOptions,
ParquetReadSupport,
ParquetWriteSupport
}
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.csv.CSVPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.orc.OrcPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory
import org.apache.spark.sql.execution.datasources.{
FilePartition,
PartitionedFile,
PartitioningAwareFileIndex
}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

/** GarScan is a class to implement the file scan for GarDataSource. */
case class GarScan(
sparkSession: SparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala

package org.apache.graphar.datasources
package org.apache.spark.sql.graphar

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex

import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import scala.collection.JavaConverters._
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder

/** GarScanBuilder is a class to build the file scan for GarDataSource. */
case class GarScanBuilder(
Expand All @@ -49,6 +48,7 @@ case class GarScanBuilder(
}

private var filters: Array[Filter] = Array.empty

override def pushFilters(filters: Array[Filter]): Array[Filter] = {
this.filters = filters
filters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,24 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala

package org.apache.graphar.datasources

import scala.collection.JavaConverters._
package org.apache.spark.sql.graphar

import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.csv.CSVDataSource
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.execution.datasources.v2.FileTable
import org.apache.spark.sql.graphar.csv.CSVWriteBuilder
import org.apache.spark.sql.graphar.orc.OrcWriteBuilder
import org.apache.spark.sql.graphar.parquet.ParquetWriteBuilder
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import org.apache.graphar.datasources.csv.CSVWriteBuilder
import org.apache.graphar.datasources.parquet.ParquetWriteBuilder
import org.apache.graphar.datasources.orc.OrcWriteBuilder
import scala.collection.JavaConverters._

/** GarTable is a class to represent the graph data in GraphAr as a table. */
case class GarTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,22 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala

package org.apache.graphar.datasources

import java.util.UUID

import scala.collection.JavaConverters._
package org.apache.spark.sql.graphar

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.Job

import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.connector.write.{
BatchWrite,
LogicalWriteInfo,
WriteBuilder
}
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
import org.apache.spark.sql.execution.datasources.{
BasicWriteJobStatsTracker,
DataSource,
Expand All @@ -48,8 +43,9 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite
import org.apache.spark.sql.catalyst.expressions.AttributeReference

import java.util.UUID
import scala.collection.JavaConverters._

abstract class GarWriteBuilder(
paths: Seq[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,22 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVWriteBuilder.scala

package org.apache.graphar.datasources.csv
package org.apache.spark.sql.graphar.csv

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter
import org.apache.spark.sql.execution.datasources.{
CodecStreams,
OutputWriter,
OutputWriterFactory
}
import org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter
import org.apache.spark.sql.graphar.GarWriteBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}

import org.apache.graphar.datasources.GarWriteBuilder

class CSVWriteBuilder(
paths: Seq[String],
formatName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
// we have to reimplement it here.
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcOutputWriter.scala

package org.apache.graphar.datasources.orc
package org.apache.spark.sql.graphar.orc

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.orc.OrcFile
import org.apache.orc.mapred.{
OrcOutputFormat => OrcMapRedOutputFormat,
OrcStruct
OrcStruct,
OrcOutputFormat => OrcMapRedOutputFormat
}
import org.apache.orc.mapreduce.{OrcMapreduceRecordWriter, OrcOutputFormat}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.execution.datasources.orc.{OrcSerializer, OrcUtils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/ORCWriteBuilder.scala

package org.apache.graphar.datasources.orc
package org.apache.spark.sql.graphar.orc

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.orc.OrcConf.{COMPRESS, MAPRED_OUTPUT_SCHEMA}
import org.apache.orc.mapred.OrcStruct

import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcUtils}
import org.apache.spark.sql.execution.datasources.{
OutputWriter,
OutputWriterFactory
}
import org.apache.spark.sql.execution.datasources.orc.{OrcOptions, OrcUtils}
import org.apache.spark.sql.graphar.GarWriteBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.graphar.datasources.GarWriteBuilder

object OrcWriteBuilder {
// the getQuotedSchemaString method of spark OrcFileFormat
private def getQuotedSchemaString(dataType: DataType): String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,25 @@
// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWriteBuilder.scala

package org.apache.graphar.datasources.parquet
package org.apache.spark.sql.graphar.parquet

import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext}
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil

import org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.connector.write.LogicalWriteInfo
import org.apache.spark.sql.execution.datasources.parquet._
import org.apache.spark.sql.execution.datasources.{
OutputWriter,
OutputWriterFactory
}
import org.apache.spark.sql.execution.datasources.parquet._
import org.apache.spark.sql.graphar.GarWriteBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

import org.apache.graphar.datasources.GarWriteBuilder

class ParquetWriteBuilder(
paths: Seq[String],
formatName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package org.apache.graphar.datasources
import scala.collection.JavaConverters._
import scala.util.matching.Regex
import java.util

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.SparkSession
Expand All @@ -34,6 +32,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.graphar.GarTable

// Derived from Apache Spark 3.1.1
// https://github.com/apache/spark/blob/1d550c4/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// Derived from Apache Spark 3.3.4
// https://github.com/apache/spark/blob/18db204/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

package org.apache.graphar.datasources
package org.apache.spark.sql.graphar

import org.apache.graphar.GeneralParams

Expand Down Expand Up @@ -73,16 +73,14 @@ class GarCommitProtocol(
val partitionId = taskContext.getTaskAttemptID.getTaskID.getId
if (options.contains(GeneralParams.offsetStartChunkIndexKey)) {
// offset chunk file name, looks like chunk0
val chunk_index = options
.get(GeneralParams.offsetStartChunkIndexKey)
.get
.toInt + partitionId
val chunk_index =
options(GeneralParams.offsetStartChunkIndexKey).toInt + partitionId
return f"chunk$chunk_index"
}
if (options.contains(GeneralParams.aggNumListOfEdgeChunkKey)) {
// edge chunk file name, looks like part0/chunk0
val jValue = parse(
options.get(GeneralParams.aggNumListOfEdgeChunkKey).get
options(GeneralParams.aggNumListOfEdgeChunkKey)
)
implicit val formats =
DefaultFormats // initialize a default formats for json4s
Expand Down
Loading

0 comments on commit d288b7e

Please sign in to comment.