Skip to content

Conversation

@xwu0226
Copy link
Contributor

@xwu0226 xwu0226 commented May 25, 2016

What changes were proposed in this pull request?

Currently only DataFrameReader.csv(...): DataFrame does not support converting Dataset[String] to a DataFrame.

This PR is to add the API DataFrameReader.csv(rdd: Dataset[String]): DataFrame. Also in order to easily invoke the helper functions that are already implemented for csv parsing, I moved some of the private methods from csv.DefaultSource to CSVRelation.

How was this patch tested?

A test case is added to load csv files to Dataset[String] and covert to DataFrame and check the results.
Regression test is run.

@xwu0226
Copy link
Contributor Author

xwu0226 commented May 25, 2016

@HyukjinKwon @falaki Could you review the PR? Thanks!

@pjfanning
Copy link
Member

Takeshi Yamamuro suggested on https://issues.apache.org/jira/browse/SPARK-15463 that the new API should take a Dataset[String] as input instead of an RDD[String]

@xwu0226
Copy link
Contributor Author

xwu0226 commented May 25, 2016

Yes. I am adding the Dataset[String] API also. will push soon.

@xwu0226 xwu0226 changed the title [SPARK-15463][SQL] support creating dataframe out of RDD[String] for csv data [SPARK-15463][SQL] support creating dataframe out of RDD[String] and Dataset[String] for csv data May 25, 2016
@HyukjinKwon
Copy link
Member

HyukjinKwon commented May 26, 2016

cc @rxin

(@xwu0226 For me, I think it might be better if the structure of CSV and JSON data sources have the similar structure so that we can fix up similar issues together easily if you need to change the structure.)

@maropu
Copy link
Member

maropu commented May 26, 2016

Do we still need the interface for RDD[String]?

@xwu0226
Copy link
Contributor Author

xwu0226 commented May 26, 2016

@maropu The API that converts Dataset[String] to DataFrame is using the one for RDD[String]. So i am thinking it could be beneficial to provide both there?

@xwu0226
Copy link
Contributor Author

xwu0226 commented May 26, 2016

@HyukjinKwon Let me try to understand your question. Right now, we have csv.DefaultSource implementing DataSourceRegister and csv.CSVRelation contains some parsing helpers, while json.JSONRelation.DefaultSource implements DataSourceRegister and also has some parsing helpers. Would it be better to have csv.CSVRelation and csv.DefaultSource combine, just like json.JSONRelation? Do you mean something like this? Thanks!

@HyukjinKwon
Copy link
Member

@xwu0226 Ah, for example, it seems a new method, CSVInferSchema.inferSchemaFromRDD(..), seems introduced here while CSVInferSchema.infer(...) is already implemented. For JSON InferSchema.infer(...) seems responsible for both cases.
BTW, more importantly, it seems arguable to add new APIs.

@rxin
Copy link
Contributor

rxin commented May 26, 2016

Let's not add so many new APIs. You can just add the Dataset[String] one, since RDD[String] can be easily converted into Dataset.

@xwu0226
Copy link
Contributor Author

xwu0226 commented May 26, 2016

@rxin OK. Thanks! will do.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is used in both csv.DefaultSource and DataFrameReader.csv(ds: Dataset[String]). So I refactored it here to take care both the default schema type and inferSchemaFlag=true cases.

@xwu0226 xwu0226 changed the title [SPARK-15463][SQL] support creating dataframe out of RDD[String] and Dataset[String] for csv data [SPARK-15463][SQL] support creating dataframe out of Dataset[String] for csv data May 26, 2016
@xwu0226
Copy link
Contributor Author

xwu0226 commented May 27, 2016

@rxin Please help double check! Many thanks!!

@xwu0226
Copy link
Contributor Author

xwu0226 commented May 27, 2016

test this please

@pjfanning
Copy link
Member

@xwu0226 the unit tests you added seem sufficient

@xwu0226
Copy link
Contributor Author

xwu0226 commented Jun 2, 2016

@yhuai @rxin Thanks!

@pjfanning
Copy link
Member

@HyukjinKwon @rxin @falaki Would it be feasible to get this merged for Spark 2.0 release?

@rxin
Copy link
Contributor

rxin commented Jun 8, 2016

@pjfanning we are now focusing on bug fixes and stability fixes rather than adding new features.

@xwu0226
Copy link
Contributor Author

xwu0226 commented Aug 6, 2016

@rxin Do you think we can revisit this feature and have it in 2.1? Thanks!

@pjfanning
Copy link
Member

Would it be feasible to get this merged for Spark 2.1.0?

@maropu
Copy link
Member

maropu commented Nov 17, 2016

What's the status of this pr? It seems to be more natural that we implement from_csv in a similar way of from_json in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L2900

@maropu
Copy link
Member

maropu commented Nov 21, 2016

I checked the feasibility to implement from_json in sql.functions; If we move csv parser code (CSVReader, CSVOptions, ...) from o.a.s.sql.execution.datasources.csv to o.a.s.sql.catalyst.csv, we can easily implement from_json with a few modifications.
master...maropu:FromCsv#diff-80a6da9ac9681594543c70c837b12641R2890

scala> val ds1 = Seq("""1,abc""").toDS()
ds1: org.apache.spark.sql.Dataset[String] = [value: string]

scala> val schema = new StructType().add("a", IntegerType).add("b", StringType)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true), StructField(b,StringType,true))

scala> val ds2 = ds1.select(from_csv($"value", schema))
ds2: org.apache.spark.sql.DataFrame = [csvtostruct(value): struct<a: int, b: string>]

scala> ds2.printSchema
root
 |-- csvtostruct(value): struct (nullable = true)
 |    |-- a: integer (nullable = true)
 |    |-- b: string (nullable = true)


scala> ds2.show
+------------------+
|csvtostruct(value)|
+------------------+
|           [1,abc]|
+------------------+

@xwu0226
Copy link
Contributor Author

xwu0226 commented Nov 21, 2016

@maropu Thanks for the comments! It seems like adding such new datasource API in DataFrameReader is not in the priority now. That is why it has been in relatively idle state now. What you are proposing can solve similar problem, it is just that the user can not use datasource API to do it.

@maropu
Copy link
Member

maropu commented Jan 26, 2017

This pr seems stale and inactive. I know this kind of API changes has lower priorities now. So, how about closing this pr for now and setting LATER in the corresponding JIRA? Thought? cc: @rxin @xwu0226 @HyukjinKwon

@xwu0226 xwu0226 closed this Jan 26, 2017
@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 26, 2017

Actually, this feature might not be urgent as said above but IMO I like this feature to be honest. I guess the reason it was hold is that IMHO it does not look a clean fix.

I recently refactored this code path and I have one left PR, #16680. After hopefully merging, there can be a easy clean fix consistently with json one within 15-ish line additions, for example, something like one below in Dataset...

def csv(csv: Dataset[String]): DataFrame = {
  val parsedOptions: CSVOptions = new CSVOptions(extraOptions.toMap)
  val caseSensitive = sparkSession.sessionState.conf.caseSensitive
  val schema = userSpecifiedSchema.getOrElse {
    InferSchema.infer(csv, caseSensitive, parsedOptions)
  }

  val parsed = csv.mapPartitions { iter =>
    val parser = new UnivocityParser(schema, caseSensitive, parsedOptions)
    iter.flatMap(parser.parse)
  }

  Dataset.ofRows(
    sparkSession,
    LogicalRDD(schema.toAttributes, parsed)(sparkSession))
}

I remember there have been a quite bit of questions about this feature in spark-csv as thirdparty (and also spark-xml too).

@xwu0226
Copy link
Contributor Author

xwu0226 commented Jan 26, 2017

@HyukjinKwon Thanks! After your #16680 is merged, submit a PR with the code you show above. then.

@maropu
Copy link
Member

maropu commented Jan 26, 2017

Yea, I also think json and csv stuffs should be consistent and they'd be better to have the same code structure and behaviour as @HyukjinKwon said. Since we do not have DataFrameReader.json(Dataset[String]) in the current master, functions.from_json is the only interface for that conversion. If we add csv(Dataset[String]) in DataFrameReader, do we need to also add json(Dataset[String]) there even though we already have functions.from_json?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Jan 26, 2017

Oh, I remember the answer to my previous similar question, which was that we should not add some APIs just for consistency.

I have some references about the requests for this feature but don't have ones for the others. So, I am less sure.

@maropu
Copy link
Member

maropu commented Jan 26, 2017

Aha, I see. Anyway, we need to keep discussion not here but the JIRA! (because this is the closed..)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants