Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add hasNextPage to Rest Datasource Paging Strategy #1704

Merged
merged 4 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 43 additions & 29 deletions streamingpro-mlsql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<!-- spark 2.4 start -->
<!-- <scala.version>2.11.12</scala.version> -->
<!-- <scala.binary.version>2.11</scala.binary.version> -->
<!-- <spark.version>2.4.3</spark.version> -->
<!-- <spark.bigversion>2.4</spark.bigversion> -->
<!-- <scalatest.version>3.0.3</scalatest.version> -->
<!-- <scala.version>2.11.12</scala.version> -->
<!-- <scala.binary.version>2.11</scala.binary.version> -->
<!-- <spark.version>2.4.3</spark.version> -->
<!-- <spark.bigversion>2.4</spark.bigversion> -->
<!-- <scalatest.version>3.0.3</scalatest.version> -->
<!-- spark 2.4 end -->

<!-- spark 3.0 start -->
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.1.1</spark.version>
<spark.bigversion>3.0</spark.bigversion>
<scalatest.version>3.0.3</scalatest.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.1.1</spark.version>
<spark.bigversion>3.0</spark.bigversion>
<scalatest.version>3.0.3</scalatest.version>
<!-- spark 3.0 end -->
</properties>
<artifactId>streamingpro-mlsql-spark_${spark.bigversion}_${scala.binary.version}</artifactId>
<profiles>

<profile>
<id>aliyun-oss</id>
<properties>
Expand All @@ -42,7 +42,7 @@

</dependencies>
</profile>

<profile>
<id>shade</id>
<build>
Expand Down Expand Up @@ -298,7 +298,7 @@
<artifactId>mlsql-healthy-${spark.bigversion}_${scala.binary.version}</artifactId>
<version>${project.parent.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -331,9 +331,9 @@
<profile>
<id>streamingpro-spark-3.0.0-adaptor</id>

<activation>
<activeByDefault>true</activeByDefault>
</activation>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>

<dependency>
Expand Down Expand Up @@ -481,7 +481,7 @@
<version>${kylin-jdbc.version}</version>
</dependency>


<dependency>
<groupId>tech.mlsql</groupId>
<artifactId>common-utils_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -547,18 +547,6 @@
<version>${project.parent.version}</version>
</dependency>

<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic_${scala.binary.version}</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -720,6 +708,32 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,11 @@ trait SparkOperationUtil {
}
}
}

def tryWithResource[A <: {def close(): Unit}, B](a: A)(f: A => B): B = {
try f(a)
finally {
if (a != null) a.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,29 @@ import tech.mlsql.tool.Templates2
* 3/12/2021 WilliamZhu(allwefantasy@gmail.com)
*/
class AutoIncrementPageStrategy(params: Map[String, String]) extends PageStrategy {
val Array(_, initialPageNum) = params("config.page.values").split(":")
var pageNum = initialPageNum.toInt

override def pageValues(_content: Option[Any]) = Array[String](pageNum.toString)
// config.page.values="auto-increment:0"
// config.page.stop="sizeZero:$.content"
val Array(_, initialPageNum) = {
if (!params.contains(PageStrategy.PAGE_CONFIG_VALUES)) {
Array("", "0")
} else {
params("config.page.values").split(":")
}

}
var pageNum = initialPageNum.toInt

override def nexPage: AutoIncrementPageStrategy = {
override def nexPage(_content: Option[Any]): AutoIncrementPageStrategy = {
pageNum += 1
this
}

override def pageUrl(_content: Option[Any]): String = {
val urlTemplate = params("config.page.next")
Templates2.evaluate(urlTemplate, pageValues(None))
Templates2.evaluate(urlTemplate, Array(pageNum.toString))
}

override def hasNextPage(_content: Option[Any]): Boolean = {
PageStrategy.defaultHasNextPage(params, _content)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import tech.mlsql.tool.Templates2
*/
class DefaultPageStrategy(params: Map[String, String]) extends PageStrategy {

override def pageValues(_content: Option[Any]): Array[String] = {
def pageValues(_content: Option[Any]): Array[String] = {
try {
val content = _content.get.toString
params("config.page.values").split(",").map(path => JsonPath.read[String](content, path)).toArray
params("config.page.values").split(",").map(path => JsonPath.read[Object](content, path).toString).toArray
} catch {
case _: com.jayway.jsonpath.PathNotFoundException =>
Array[String]()
Expand All @@ -20,12 +20,22 @@ class DefaultPageStrategy(params: Map[String, String]) extends PageStrategy {
}
}

override def nexPage: DefaultPageStrategy = {
override def nexPage(_content: Option[Any]): DefaultPageStrategy = {
this
}

override def pageUrl(_content: Option[Any]): String = {
val urlTemplate = params("config.page.next")
Templates2.evaluate(urlTemplate, pageValues(_content))
}

override def hasNextPage(_content: Option[Any]): Boolean = {
if (params.get("config.page.stop").isDefined) {
PageStrategy.defaultHasNextPage(params, _content)
} else {
val pageValues = this.pageValues(_content)
!(pageValues.size == 0 || pageValues.filter(value => value == null || value.isEmpty).size > 0)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ class OffsetPageStrategy(params: Map[String, String]) extends PageStrategy {
var pageNum: Int = initialPageNum.trim.toInt
val pageAddend: Int = pageIncrement.trim.toInt

override def pageValues(_content: Option[Any]) = Array[String](pageNum.toString)


override def nexPage: OffsetPageStrategy = {
override def nexPage(_content: Option[Any]): OffsetPageStrategy = {
pageNum += pageAddend
this
}

override def pageUrl(_content: Option[Any]): String = {
val urlTemplate = params("config.page.next")
Templates2.evaluate(urlTemplate, pageValues(None))
Templates2.evaluate(urlTemplate, Array(pageNum.toString))
}

override def hasNextPage(_content: Option[Any]): Boolean = {
PageStrategy.defaultHasNextPage(params, _content)
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,61 @@
package tech.mlsql.datasource.helper.rest

import com.jayway.jsonpath.JsonPath
import org.apache.spark.sql.mlsql.session.MLSQLException

/**
* 3/12/2021 WilliamZhu(allwefantasy@gmail.com)
*/
trait PageStrategy {
def pageValues(_content: Option[Any]): Array[String]

def nexPage: PageStrategy
def nexPage(_content: Option[Any]): PageStrategy

def hasNextPage(_content: Option[Any]): Boolean

def pageUrl(_content: Option[Any]): String
}

object PageStrategy {
val PAGE_CONFIG_VALUES = "config.page.values"

def defaultHasNextPage(params: Map[String, String], _content: Option[Any]): Boolean = {
val stopPagingCondition = params.get("config.page.stop")

if (stopPagingCondition.isEmpty) {
//todo: Choose a better default value
return true
}
val content = _content.get.toString
val Array(func, jsonPath) = stopPagingCondition.get.split(":")

func match {
case "size-zero" | "sizeZero" =>
try {
val targetValue = JsonPath.read[net.minidev.json.JSONArray](content, jsonPath)
targetValue.size() > 0
} catch {
case _: com.jayway.jsonpath.PathNotFoundException =>
false
}

case "not-exists" | "notExists" => try {
JsonPath.read[Object](content, jsonPath)
true
} catch {
case _: com.jayway.jsonpath.PathNotFoundException =>
false
}
case "equals" =>
try {
val Array(realJsonPath, equalValue) = jsonPath.split(",")
val targetValue = JsonPath.read[Object](content, realJsonPath).toString
targetValue != equalValue
} catch {
case _: com.jayway.jsonpath.PathNotFoundException =>
true
}

case _ => throw new MLSQLException(s"config.page.stop with ${func} is not supported")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package tech.mlsql.datasource.helper.rest
object PageStrategyDispatcher {
def get(params: Map[String, String]): PageStrategy = {
params("config.page.values").trim.toLowerCase match {
case s if s.startsWith("auto-increment") =>
case s if s.startsWith("auto-increment") || s.startsWith("autoIncrement") =>
new AutoIncrementPageStrategy(params)
case s if s.startsWith("offset") =>
new OffsetPageStrategy(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class MLSQLRest(override val uid: String) extends MLSQLSource
* and `header.content-type`="application/json"
*
* and `config.page.next`="http://mlsql.tech/api?cursor={0}&wow={1}"
* and `config.page.skip-params`="true"
* and `config.page.skip-params`="true"
* and `config.page.values`="$.path,$.path2" -- json path
* and `config.page.interval`="10ms"
* and `config.page.retry`="3"
Expand Down Expand Up @@ -85,7 +85,7 @@ class MLSQLRest(override val uid: String) extends MLSQLSource

val pageStrategy = PageStrategyDispatcher.get(config.config)
if (debug) {
logInfo(format(s"Get Page ${count} ${config.path} started"))
logInfo(format(s"Started to get Page ${count} ${config.path} "))
}
val firstPageFetchTime = System.currentTimeMillis()
val (_, firstDfOpt) = RestUtils.executeWithRetrying[(Int, Option[DataFrame])](maxTries)((() => {
Expand Down Expand Up @@ -117,12 +117,13 @@ class MLSQLRest(override val uid: String) extends MLSQLSource

val uuid = UUID.randomUUID().toString.replaceAll("-", "")
val context = ScriptSQLExec.context()
val tmpTablePath = resourceRealPath(context.execListener, Option(context.owner), PathFun("__tmp__").add(uuid).toPath)
val tmpTablePath = resourceRealPath(context.execListener, Option(context.owner),
PathFun("__tmp__").add("rest").add(uuid).toPath)
context.execListener.addEnv(classOf[MLSQLRest].getName, tmpTablePath)
firstDf.write.format("parquet").mode(SaveMode.Append).save(tmpTablePath)

if (debug) {
logInfo(format(s"Get Page 1 ${config.path} Consume:${System.currentTimeMillis() - firstPageFetchTime}ms"))
logInfo(format(s"End to get Page ${count} ${config.path} Consume:${System.currentTimeMillis() - firstPageFetchTime}ms"))
}

while (count < maxSize) {
Expand All @@ -131,20 +132,21 @@ class MLSQLRest(override val uid: String) extends MLSQLSource

val row = firstDf.select(F.col("content").cast(StringType), F.col("status")).head
val content = row.getString(0)
val status = row.getInt(1)
val hasNextPage = pageStrategy.hasNextPage(Option(content))

val pageValues = pageStrategy.pageValues(Option(content))


val shouldStop = pageValues.size == 0 || pageValues.filter(value => value == null || value.isEmpty).size > 0
if (shouldStop) {
if (status != 200 || !hasNextPage) {
if (debug) {
logInfo(s"Stop paging. The last Page ${count - 1} ${config.path} status: ${status} hasNextPage: ${hasNextPage} ")
}
count = maxSize
} else {
val newUrl = pageStrategy.pageUrl(Option(content))
val tempTime = System.currentTimeMillis()
RestUtils.executeWithRetrying[(Int, Option[DataFrame])](maxTries)((() => {
try {
if (debug) {
logInfo(format(s"Get Page ${count} ${newUrl} started"))
logInfo(format(s"Started get Page ${count} ${newUrl}"))
}

val tempDF = _http(newUrl, config.config, skipParams, config.df.get.sparkSession)
Expand All @@ -166,10 +168,16 @@ class MLSQLRest(override val uid: String) extends MLSQLSource
},
failResp => logInfo(s"Fail request ${newUrl} failed after ${maxTries} attempts. the last response status is: ${failResp._1}. ")
)
firstDf.write.format("parquet").mode(SaveMode.Append).save(tmpTablePath)
pageStrategy.nexPage

val row = firstDf.select(F.col("content").cast(StringType), F.col("status")).head
val status = row.getInt(1)
//page should be 200 otherwise should not save in parquet file
if (status == 200) {
firstDf.write.format("parquet").mode(SaveMode.Append).save(tmpTablePath)
}
pageStrategy.nexPage(Option(content))
if (debug) {
logInfo(format(s"Get Page ${count} ${newUrl} Consume: ${System.currentTimeMillis() - tempTime}"))
logInfo(format(s"End to get Page ${count} ${newUrl} Consume: ${System.currentTimeMillis() - tempTime}"))
}
}

Expand Down
Loading