Skip to content

Commit

Permalink
Issue #390: Allowing additional white-space types in the query (#392)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrabinowitz authored May 13, 2021
1 parent 2f910c6 commit 73d1b8e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public static SparkBigQueryConfig from(
date -> validateDateFormat(date, config.getPartitionTypeOrDefault(), DATE_PARTITION_PARAM));
// checking for query
if (tableParam.isPresent()) {
String tableParamStr = tableParam.get().trim();
String tableParamStr = tableParam.get().trim().replaceAll("\\s+", " ");
if (tableParamStr.toLowerCase().startsWith("select ")) {
// it is a query in practice
config.query = com.google.common.base.Optional.of(tableParamStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,11 @@ class SparkBigQueryEndToEndReadFromQueryITSuite extends FunSuite
IntegrationTestUtils.createDataset(testDataset)
}

def testReadFromQuery(format: String) {
// the query suffix is to make sure that each format will have
// a different table createddue to the destination table cache
val sql = "SELECT corpus, corpus_date FROM `bigquery-public-data.samples.shakespeare` " +
s"WHERE word='spark' AND '$format'='$format'";
private def testReadFromQueryInternal(format: String, query: String) {
val df = spark.read.format(format)
.option("viewsEnabled", true)
.option("materializationDataset", testDataset)
.load(sql)
.load(query)

val totalRows = df.count
totalRows should equal(9)
Expand All @@ -65,11 +61,30 @@ class SparkBigQueryEndToEndReadFromQueryITSuite extends FunSuite
val expectedCorpuses = Array("2kinghenryvi", "3kinghenryvi", "allswellthatendswell", "hamlet",
"juliuscaesar", "kinghenryv", "kinglear", "periclesprinceoftyre", "troilusandcressida")
corpuses should equal(expectedCorpuses)

}

private def testReadFromQuery(format: String): Unit = {
// the query suffix is to make sure that each format will have
// a different table created due to the destination table cache
testReadFromQueryInternal(format,
"SELECT corpus, corpus_date FROM `bigquery-public-data.samples.shakespeare` " +
s"WHERE word='spark' AND '$format'='$format'")
}

private def testReadFromQueryWithNewLine(format: String) {
// the query suffix is to make sure that each format will have
// a different table created due to the destination table cache
testReadFromQueryInternal(format,
"""SELECT
|corpus, corpus_date
|FROM `bigquery-public-data.samples.shakespeare` """.stripMargin +
s"WHERE word='spark' AND '$format'='$format'")
}

def testQueryOption(format: String) {
// the query suffix is to make sure that each format will have
// a different table createddue to the destination table cache
// a different table created due to the destination table cache
val sql = "SELECT corpus, word_count FROM `bigquery-public-data.samples.shakespeare` " +
s"WHERE word='spark' AND '$format'='$format'";
val df = spark.read.format(format)
Expand Down Expand Up @@ -107,6 +122,14 @@ class SparkBigQueryEndToEndReadFromQueryITSuite extends FunSuite
testReadFromQuery("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2")
}

test("test read from query with newline - v1") {
testReadFromQueryWithNewLine("bigquery")
}

test("test read from query with newline - v2") {
testReadFromQueryWithNewLine("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2")
}

test("test query option - v1") {
testQueryOption("bigquery")
}
Expand Down

0 comments on commit 73d1b8e

Please sign in to comment.