Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ object FunctionRegistry {
expression[StringLPad]("lpad"),
expression[StringTrimLeft]("ltrim"),
expression[JsonTuple]("json_tuple"),
expression[ParseUrl]("parse_url"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should go before printf

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, Thank you for review. I'll fix this.

expression[FormatString]("printf"),
expression[RegExpExtract]("regexp_extract"),
expression[RegExpReplace]("regexp_replace"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.sql.catalyst.expressions

import java.net.{MalformedURLException, URL}
import java.text.{BreakIterator, DecimalFormat, DecimalFormatSymbols}
import java.util.{HashMap, Locale, Map => JMap}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, imports need to be sorted.

import java.util.regex.Pattern

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -654,6 +656,154 @@ case class StringRPad(str: Expression, len: Expression, pad: Expression)
override def prettyName: String = "rpad"
}

object ParseUrl {
private val HOST = UTF8String.fromString("HOST")
private val PATH = UTF8String.fromString("PATH")
private val QUERY = UTF8String.fromString("QUERY")
private val REF = UTF8String.fromString("REF")
private val PROTOCOL = UTF8String.fromString("PROTOCOL")
private val FILE = UTF8String.fromString("FILE")
private val AUTHORITY = UTF8String.fromString("AUTHORITY")
private val USERINFO = UTF8String.fromString("USERINFO")
private val REGEXPREFIX = "(&|^)"
private val REGEXSUBFIX = "=([^&]*)"
}

/**
* Extracts a part from a URL
*/
@ExpressionDescription(
usage = "_FUNC_(url, partToExtract[, key]) - extracts a part from a URL",
extended = """Parts: HOST, PATH, QUERY, REF, PROTOCOL, AUTHORITY, FILE, USERINFO.
Key specifies which query to extract.
Examples:
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'HOST')
'spark.apache.org'
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY')
'query=1'
> SELECT _FUNC_('http://spark.apache.org/path?query=1', 'QUERY', 'query')
'1'""")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably .stripMargin here:

    """...
      |...
    """.stripMargin

Otherwise all leading white spaces are included in the extended description string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll fix this, thank you,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complication with the error
Error:(686, 9) annotation argument needs to be a constant; found: scala.this.Predef.augmentString
So I should probably remain the current extended description string.

case class ParseUrl(children: Seq[Expression])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again we should not use Seq[Expression] here. We should just have a 3-arg ctor, and then add a 2-arg ctor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should think of a good default value for the 3rd argument. We should avoid using null as we assume the children of expression won't be null in a lot of places. How about using empty string as the default value for key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I explained before, I can hardly find a magic key that may let us treat parse_url(url, part, magic key) as parse_url(url, part). I have doubt on empty string, eg.

hive> select parse_url("http://spark/path?=1", "QUERY", "");
1

hive> select parse_url("http://spark/path?=1", "QUERY");
=1

Any suggestion on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I don't have a strong preference here, Seq[Expression] doesn't look so bad to me. @rxin what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we use # as the default value and check on that? It is not a valid URL key is it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway I don't have a super strong preference here either. It might be more clear to not use a hacky # value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, # is not a valid URL key. And I agree with you on not using a hacky value.

extends Expression with ExpectsInputTypes with CodegenFallback {

override def nullable: Boolean = true
override def inputTypes: Seq[DataType] = Seq.fill(children.size)(StringType)
override def dataType: DataType = StringType
override def prettyName: String = "parse_url"

// If the url is a constant, cache the URL object so that we don't need to convert url
// from UTF8String to String to URL for every row.
@transient private lazy val cachedUrl = children(0) match {
case Literal(url: UTF8String, _) if url ne null => getUrl(url)
case _ => null
}

// If the key is a constant, cache the Pattern object so that we don't need to convert key
// from UTF8String to String to StringBuilder to String to Pattern for every row.
@transient private lazy val cachedPattern = children(2) match {
case Literal(key: UTF8String, _) if key ne null => getPattern(key)
case _ => null
}

// If the partToExtract is a constant, cache the Extract part function so that we don't need
// to check the partToExtract for every row.
@transient private lazy val cachedExtractPartFunc = children(1) match {
case Literal(part: UTF8String, _) => getExtractPartFunc(part)
case _ => null
}

import ParseUrl._

override def checkInputDataTypes(): TypeCheckResult = {
if (children.size > 3 || children.size < 2) {
TypeCheckResult.TypeCheckFailure(s"$prettyName function requires two or three arguments")
} else {
super[ExpectsInputTypes].checkInputDataTypes()
}
}

private def getPattern(key: UTF8String): Pattern = {
Pattern.compile(REGEXPREFIX + key.toString + REGEXSUBFIX)
}

private def getUrl(url: UTF8String): URL = {
try {
new URL(url.toString)
} catch {
case e: MalformedURLException => null
}
}

private def getExtractPartFunc(partToExtract: UTF8String): URL => String = {
partToExtract match {
case HOST => _.getHost
case PATH => _.getPath
case QUERY => _.getQuery
case REF => _.getRef
case PROTOCOL => _.getProtocol
case FILE => _.getFile
case AUTHORITY => _.getAuthority
case USERINFO => _.getUserInfo
case _ => (url: URL) => null
}
}

private def extractValueFromQuery(query: UTF8String, pattern: Pattern): UTF8String = {
val m = pattern.matcher(query.toString)
if (m.find()) {
UTF8String.fromString(m.group(2))
} else {
null
}
}

private def extractFromUrl(url: URL, partToExtract: UTF8String): UTF8String = {
if (cachedExtractPartFunc ne null) {
UTF8String.fromString(cachedExtractPartFunc.apply(url))
} else {
UTF8String.fromString(getExtractPartFunc(partToExtract).apply(url))
}
}

private def parseUrlWithoutKey(url: UTF8String, partToExtract: UTF8String): UTF8String = {
if (cachedUrl ne null) {
extractFromUrl(cachedUrl, partToExtract)
} else {
val currentUrl = getUrl(url)
if (currentUrl ne null) {
extractFromUrl(currentUrl, partToExtract)
} else {
null
}
}
}

override def eval(input: InternalRow): Any = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is somewhat convoluted with 4 levels of nesting, I think you can rewrite it this way to make it easier to follow

val evaluated = children.map(_.eval(input).asInstanceOf[UTF8String])
if (evaluated.contains(null)) {
  return null
}

if (evaluated.size == 2) {
  return parseUrlWithoutKey(evaluated(0), evaluated(1))
} else {
  // 3-arg, i.e. QUERY with key
  assert(evaluated.size == 3)
  if (evaluated(1) != QUERY) {
    return null
  }

  val query = parseUrlWithoutKey(evaluated(0), evaluated(1))
  if (query eq null) {
    return null
  }

  if (cachedPattern ne null) {
    return extractValueFromQuery(query, cachedPattern)
  } else {
    return extractValueFromQuery(query, getPattern(evaluated(2)))
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, I'll fix this.

val evaluated = children.map{e => e.eval(input).asInstanceOf[UTF8String]}
if (evaluated.contains(null)) return null
if (evaluated.size == 2) {
parseUrlWithoutKey(evaluated(0), evaluated(1))
} else {
// 3-arg, i.e. QUERY with key
assert(evaluated.size == 3)
if (evaluated(1) != QUERY) {
return null
}

val query = parseUrlWithoutKey(evaluated(0), evaluated(1))
if (query eq null) {
return null
}

if (cachedPattern ne null) {
extractValueFromQuery(query, cachedPattern)
} else {
extractValueFromQuery(query, getPattern(evaluated(2)))
}
}
}
}

/**
* Returns the input formatted according do printf-style format strings
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,57 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(FindInSet(Literal("ab,"), Literal("abc,b,ab,c,def")), 0)
}

test("ParseUrl") {
def checkParseUrl(expected: String, urlStr: String, partToExtract: String): Unit = {
checkEvaluation(
ParseUrl(Seq(Literal(urlStr), Literal(partToExtract))), expected)
}
def checkParseUrlWithKey(
expected: String,
urlStr: String,
partToExtract: String,
key: String): Unit = {
checkEvaluation(
ParseUrl(Seq(Literal(urlStr), Literal(partToExtract), Literal(key))), expected)
}

checkParseUrl("spark.apache.org", "http://spark.apache.org/path?query=1", "HOST")
checkParseUrl("/path", "http://spark.apache.org/path?query=1", "PATH")
checkParseUrl("query=1", "http://spark.apache.org/path?query=1", "QUERY")
checkParseUrl("Ref", "http://spark.apache.org/path?query=1#Ref", "REF")
checkParseUrl("http", "http://spark.apache.org/path?query=1", "PROTOCOL")
checkParseUrl("/path?query=1", "http://spark.apache.org/path?query=1", "FILE")
checkParseUrl("spark.apache.org:8080", "http://spark.apache.org:8080/path?query=1", "AUTHORITY")
checkParseUrl("userinfo", "http://userinfo@spark.apache.org/path?query=1", "USERINFO")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if there is no userinfo in the url?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the result is null. I'll add a test case for this.

checkParseUrlWithKey("1", "http://spark.apache.org/path?query=1", "QUERY", "query")

// Null checking
checkParseUrl(null, null, "HOST")
checkParseUrl(null, "http://spark.apache.org/path?query=1", null)
checkParseUrl(null, null, null)
checkParseUrl(null, "test", "HOST")
checkParseUrl(null, "http://spark.apache.org/path?query=1", "NO")
checkParseUrl(null, "http://spark.apache.org/path?query=1", "USERINFO")
checkParseUrlWithKey(null, "http://spark.apache.org/path?query=1", "HOST", "query")
checkParseUrlWithKey(null, "http://spark.apache.org/path?query=1", "QUERY", "quer")
checkParseUrlWithKey(null, "http://spark.apache.org/path?query=1", "QUERY", null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add exceptional cases by using the following statement?

intercept[AnalysisException] {
  ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. Is there any exceptional case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. invalid url, invalid part

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I miss the point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As invalid url and invalid part just get null result, I wonder in what circumstance there would throw an exception?

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid key? Try this one.

SELECT parse_url('http://spark/?','QUERY', '???')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to handle this kind of malformed queries. Hive makes reasonable message.

hive> SELECT parse_url('https://?1=1&?','QUERY', '???');
FAILED: SemanticException [Error 10014]: Line 1:7 Wrong arguments ''???'': 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. Invalid key does this job. I'll fix this.

checkParseUrlWithKey(null, "http://spark.apache.org/path?query=1", "QUERY", "")

// exceptional cases
intercept[java.util.regex.PatternSyntaxException] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @janplus .
I thought about this a little more. Currently, this exception happens in Executor side. It's not desirable. IMO, we had better make this as AnalysisException.
Could you add some simple validation logic for key?

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of Hive, it's also SemanticException, not a raw PatternSyntaxException.
You may need to investigate Hive SemanticException-related logic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, Spark of this PR runs the execution for that problematic parameter while Hive does not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll have a investigation on this.
It should be different whether key is Literal.

hive> select parse_url("http://spark/path?", "QUERY", "???");
FAILED: SemanticException [Error 10014]: Line 1:7 Wrong arguments '"???"': org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String org.apache.hadoop.hive.ql.udf.UDFParseUrl.evaluate(java.lang.String,java.lang.String,java.lang.String) on object org.apache.hadoop.hive.ql.udf.UDFParseUrl@6682e6a5 of class org.apache.hadoop.hive.ql.udf.UDFParseUrl with arguments {http://spark/path?:java.lang.String, QUERY:java.lang.String, ???:java.lang.String} of size 3

hive> select parse_url("http://spark/path?", "QUERY", name) from test;
OK
Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String org.apache.hadoop.hive.ql.udf.UDFParseUrl.evaluate(java.lang.String,java.lang.String,java.lang.String) on object org.apache.hadoop.hive.ql.udf.UDFParseUrl@2035d65b of class org.apache.hadoop.hive.ql.udf.UDFParseUrl with arguments {http://spark/path?:java.lang.String, QUERY:java.lang.String, ???:java.lang.String} of size 3
Time taken: 0.039 seconds

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @janplus .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @dongjoon-hyun
It seems only when url, partToExtract and key is all Literal, then hive may give a SemanticException.

hive> select * from url_parse_data;
OK
http://spark/path? QUERY ???
Time taken: 0.054 seconds, Fetched: 1 row(s)

hive> select parse_url("http://spark/path?", "QUERY", "???") from url_parse_data;
FAILED: SemanticException [Error 10014]: Line 1:7 Wrong arguments '"???"': org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String org.apache.hadoop.hive.ql.udf.UDFParseUrl.evaluate(java.lang.String,java.lang.String,java.lang.String) on object org.apache.hadoop.hive.ql.udf.UDFParseUrl@59e082f8 of class org.apache.hadoop.hive.ql.udf.UDFParseUrl with arguments {http://spark/path?:java.lang.String, QUERY:java.lang.String, ???:java.lang.String} of size 3

hive> select parse_url(url, "QUERY", "???") from url_parse_data;
OK
Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String org.apache.hadoop.hive.ql.udf.UDFParseUrl.evaluate(java.lang.String,java.lang.String,java.lang.String) on object org.apache.hadoop.hive.ql.udf.UDFParseUrl@7d1f3fe9 of class org.apache.hadoop.hive.ql.udf.UDFParseUrl with arguments {http://spark/path?:java.lang.String, QUERY:java.lang.String, ???:java.lang.String} of size 3

hive> select parse_url("http://spark/path?", part, "???") from url_parse_data;
OK
Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String org.apache.hadoop.hive.ql.udf.UDFParseUrl.evaluate(java.lang.String,java.lang.String,java.lang.String) on object org.apache.hadoop.hive.ql.udf.UDFParseUrl@37fef327 of class org.apache.hadoop.hive.ql.udf.UDFParseUrl with arguments {http://spark/path?:java.lang.String, QUERY:java.lang.String, ???:java.lang.String} of size 3

hive> select parse_url("http://spark/path?", "QUERY", key) from url_parse_data;
OK
Failed with exception java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: Unable to execute method public java.lang.String org.apache.hadoop.hive.ql.udf.UDFParseUrl.evaluate(java.lang.String,java.lang.String,java.lang.String) on object org.apache.hadoop.hive.ql.udf.UDFParseUrl@1d944fc0 of class org.apache.hadoop.hive.ql.udf.UDFParseUrl with arguments {http://spark/path?:java.lang.String, QUERY:java.lang.String, ???:java.lang.String} of size 3

Given that, it seems not that valuable to do this optimization.

Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for nice investigation. Yes, the validation of Hive seems too limited.
I think you can do better than Hive if you supports Literal key validation?
How do you think about that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely I can do that. In fact I have finished it.
But before I do the commit, let us get thought it first.
In checkAnalysis method for LogicalPlan, the only method will be called for Expression is checkInputDataTypes

case e: Expression if e.checkInputDataTypes().isFailure =>

Which means we can only implement this validation in checkInputDataTypes of ParseUrl. In that circumstance spark will give the AnalysisException like this

org.apache.spark.sql.AnalysisException: cannot resolve 'parse_url("http://spark.apache.org/path?", "QUERY", "???")' due to data type mismatch: wrong key "???"; line 1 pos 0

But obviously this should not be a data type mismatch. This message may confuse the users. Also the different message for Literal key and Not Literal key may make them confused too.
Otherwise, if we do not validate the Literal key, the Executor will get an exception at the first row. It seems not that unacceptable.
So compared the both sides, I think we should not do the Literal key validation.
How do you think about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to throw the exception at executor side, no need to specially handle literal here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

evaluate(ParseUrl(Seq(Literal("http://spark.apache.org/path?"),
Literal("QUERY"), Literal("???"))))
}

// arguments checking
assert(ParseUrl(Seq(Literal("1"))).checkInputDataTypes().isFailure)
assert(ParseUrl(Seq(Literal("1"), Literal("2"), Literal("3"), Literal("4")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also add some cases with invalid-type parameters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I declare ParseUrl with ImplicitCastInputTypes, I am no sure whether the cases with invalid-type parameters is necessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right, no need to bother here

.checkInputDataTypes().isFailure)
assert(ParseUrl(Seq(Literal("1"), Literal(2))).checkInputDataTypes().isFailure)
assert(ParseUrl(Seq(Literal(1), Literal("2"))).checkInputDataTypes().isFailure)
assert(ParseUrl(Seq(Literal("1"), Literal("2"), Literal(3))).checkInputDataTypes().isFailure)
}

test("Sentences") {
val nullString = Literal.create(null, StringType)
checkEvaluation(Sentences(nullString, nullString, nullString), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,21 @@ class StringFunctionsSuite extends QueryTest with SharedSQLContext {
Row("???hi", "hi???", "h", "h"))
}

test("string parse_url function") {
val df = Seq[String](("http://userinfo@spark.apache.org/path?query=1#Ref"))
.toDF("url")

checkAnswer(
df.selectExpr(
"parse_url(url, 'HOST')", "parse_url(url, 'PATH')",
"parse_url(url, 'QUERY')", "parse_url(url, 'REF')",
"parse_url(url, 'PROTOCOL')", "parse_url(url, 'FILE')",
"parse_url(url, 'AUTHORITY')", "parse_url(url, 'USERINFO')",
"parse_url(url, 'QUERY', 'query')"),
Row("spark.apache.org", "/path", "query=1", "Ref",
"http", "/path?query=1", "userinfo@spark.apache.org", "userinfo", "1"))
}

test("string repeat function") {
val df = Seq(("hi", 2)).toDF("a", "b")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ private[sql] class HiveSessionCatalog(
// str_to_map, windowingtablefunction.
private val hiveFunctions = Seq(
"hash", "java_method", "histogram_numeric",
"parse_url", "percentile", "percentile_approx", "reflect", "str_to_map",
"percentile", "percentile_approx", "reflect", "str_to_map",
"xpath", "xpath_double", "xpath_float", "xpath_int", "xpath_long",
"xpath_number", "xpath_short", "xpath_string"
)
Expand Down