Skip to content

Commit

Permalink
Merge pull request #1704 from byzer-org/william-dev
Browse files Browse the repository at this point in the history
Add hasNextPage to Rest Datasource Paging Strategy
  • Loading branch information
chncaesar authored Mar 2, 2022
2 parents c9e1389 + 6b0f2ed commit 261974a
Show file tree
Hide file tree
Showing 9 changed files with 392 additions and 59 deletions.
72 changes: 43 additions & 29 deletions streamingpro-mlsql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@
<modelVersion>4.0.0</modelVersion>
<properties>
<!-- spark 2.4 start -->
<!-- <scala.version>2.11.12</scala.version> -->
<!-- <scala.binary.version>2.11</scala.binary.version> -->
<!-- <spark.version>2.4.3</spark.version> -->
<!-- <spark.bigversion>2.4</spark.bigversion> -->
<!-- <scalatest.version>3.0.3</scalatest.version> -->
<!-- <scala.version>2.11.12</scala.version> -->
<!-- <scala.binary.version>2.11</scala.binary.version> -->
<!-- <spark.version>2.4.3</spark.version> -->
<!-- <spark.bigversion>2.4</spark.bigversion> -->
<!-- <scalatest.version>3.0.3</scalatest.version> -->
<!-- spark 2.4 end -->

<!-- spark 3.0 start -->
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.1.1</spark.version>
<spark.bigversion>3.0</spark.bigversion>
<scalatest.version>3.0.3</scalatest.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.1.1</spark.version>
<spark.bigversion>3.0</spark.bigversion>
<scalatest.version>3.0.3</scalatest.version>
<!-- spark 3.0 end -->
</properties>
<artifactId>streamingpro-mlsql-spark_${spark.bigversion}_${scala.binary.version}</artifactId>
<profiles>

<profile>
<id>aliyun-oss</id>
<properties>
Expand All @@ -42,7 +42,7 @@

</dependencies>
</profile>

<profile>
<id>shade</id>
<build>
Expand Down Expand Up @@ -298,7 +298,7 @@
<artifactId>mlsql-healthy-${spark.bigversion}_${scala.binary.version}</artifactId>
<version>${project.parent.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -331,9 +331,9 @@
<profile>
<id>streamingpro-spark-3.0.0-adaptor</id>

<activation>
<activeByDefault>true</activeByDefault>
</activation>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>

<dependency>
Expand Down Expand Up @@ -481,7 +481,7 @@
<version>${kylin-jdbc.version}</version>
</dependency>


<dependency>
<groupId>tech.mlsql</groupId>
<artifactId>common-utils_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -547,18 +547,6 @@
<version>${project.parent.version}</version>
</dependency>

<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic_${scala.binary.version}</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>3.0.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -720,6 +708,32 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>3.8.0</version>
<scope>test</scope>
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,11 @@ trait SparkOperationUtil {
}
}
}

def tryWithResource[A <: {def close(): Unit}, B](a: A)(f: A => B): B = {
try f(a)
finally {
if (a != null) a.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,29 @@ import tech.mlsql.tool.Templates2
* 3/12/2021 WilliamZhu(allwefantasy@gmail.com)
*/
class AutoIncrementPageStrategy(params: Map[String, String]) extends PageStrategy {
val Array(_, initialPageNum) = params("config.page.values").split(":")
var pageNum = initialPageNum.toInt

override def pageValues(_content: Option[Any]) = Array[String](pageNum.toString)
// config.page.values="auto-increment:0"
// config.page.stop="sizeZero:$.content"
val Array(_, initialPageNum) = {
if (!params.contains(PageStrategy.PAGE_CONFIG_VALUES)) {
Array("", "0")
} else {
params("config.page.values").split(":")
}

}
var pageNum = initialPageNum.toInt

override def nexPage: AutoIncrementPageStrategy = {
override def nexPage(_content: Option[Any]): AutoIncrementPageStrategy = {
pageNum += 1
this
}

override def pageUrl(_content: Option[Any]): String = {
val urlTemplate = params("config.page.next")
Templates2.evaluate(urlTemplate, pageValues(None))
Templates2.evaluate(urlTemplate, Array(pageNum.toString))
}

override def hasNextPage(_content: Option[Any]): Boolean = {
PageStrategy.defaultHasNextPage(params, _content)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import tech.mlsql.tool.Templates2
*/
class DefaultPageStrategy(params: Map[String, String]) extends PageStrategy {

override def pageValues(_content: Option[Any]): Array[String] = {
def pageValues(_content: Option[Any]): Array[String] = {
try {
val content = _content.get.toString
params("config.page.values").split(",").map(path => JsonPath.read[String](content, path)).toArray
params("config.page.values").split(",").map(path => JsonPath.read[Object](content, path).toString).toArray
} catch {
case _: com.jayway.jsonpath.PathNotFoundException =>
Array[String]()
Expand All @@ -20,12 +20,22 @@ class DefaultPageStrategy(params: Map[String, String]) extends PageStrategy {
}
}

override def nexPage: DefaultPageStrategy = {
override def nexPage(_content: Option[Any]): DefaultPageStrategy = {
this
}

override def pageUrl(_content: Option[Any]): String = {
val urlTemplate = params("config.page.next")
Templates2.evaluate(urlTemplate, pageValues(_content))
}

override def hasNextPage(_content: Option[Any]): Boolean = {
if (params.get("config.page.stop").isDefined) {
PageStrategy.defaultHasNextPage(params, _content)
} else {
val pageValues = this.pageValues(_content)
!(pageValues.size == 0 || pageValues.filter(value => value == null || value.isEmpty).size > 0)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ class OffsetPageStrategy(params: Map[String, String]) extends PageStrategy {
var pageNum: Int = initialPageNum.trim.toInt
val pageAddend: Int = pageIncrement.trim.toInt

override def pageValues(_content: Option[Any]) = Array[String](pageNum.toString)


override def nexPage: OffsetPageStrategy = {
override def nexPage(_content: Option[Any]): OffsetPageStrategy = {
pageNum += pageAddend
this
}

override def pageUrl(_content: Option[Any]): String = {
val urlTemplate = params("config.page.next")
Templates2.evaluate(urlTemplate, pageValues(None))
Templates2.evaluate(urlTemplate, Array(pageNum.toString))
}

override def hasNextPage(_content: Option[Any]): Boolean = {
PageStrategy.defaultHasNextPage(params, _content)
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,61 @@
package tech.mlsql.datasource.helper.rest

import com.jayway.jsonpath.JsonPath
import org.apache.spark.sql.mlsql.session.MLSQLException

/**
* 3/12/2021 WilliamZhu(allwefantasy@gmail.com)
*/
trait PageStrategy {
def pageValues(_content: Option[Any]): Array[String]

def nexPage: PageStrategy
def nexPage(_content: Option[Any]): PageStrategy

def hasNextPage(_content: Option[Any]): Boolean

def pageUrl(_content: Option[Any]): String
}

object PageStrategy {
val PAGE_CONFIG_VALUES = "config.page.values"

def defaultHasNextPage(params: Map[String, String], _content: Option[Any]): Boolean = {
val stopPagingCondition = params.get("config.page.stop")

if (stopPagingCondition.isEmpty) {
//todo: Choose a better default value
return true
}
val content = _content.get.toString
val Array(func, jsonPath) = stopPagingCondition.get.split(":")

func match {
case "size-zero" | "sizeZero" =>
try {
val targetValue = JsonPath.read[net.minidev.json.JSONArray](content, jsonPath)
targetValue.size() > 0
} catch {
case _: com.jayway.jsonpath.PathNotFoundException =>
false
}

case "not-exists" | "notExists" => try {
JsonPath.read[Object](content, jsonPath)
true
} catch {
case _: com.jayway.jsonpath.PathNotFoundException =>
false
}
case "equals" =>
try {
val Array(realJsonPath, equalValue) = jsonPath.split(",")
val targetValue = JsonPath.read[Object](content, realJsonPath).toString
targetValue != equalValue
} catch {
case _: com.jayway.jsonpath.PathNotFoundException =>
true
}

case _ => throw new MLSQLException(s"config.page.stop with ${func} is not supported")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package tech.mlsql.datasource.helper.rest
object PageStrategyDispatcher {
def get(params: Map[String, String]): PageStrategy = {
params("config.page.values").trim.toLowerCase match {
case s if s.startsWith("auto-increment") =>
case s if s.startsWith("auto-increment") || s.startsWith("autoIncrement") =>
new AutoIncrementPageStrategy(params)
case s if s.startsWith("offset") =>
new OffsetPageStrategy(params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class MLSQLRest(override val uid: String) extends MLSQLSource
* and `header.content-type`="application/json"
*
* and `config.page.next`="http://mlsql.tech/api?cursor={0}&wow={1}"
* and `config.page.skip-params`="true"
* and `config.page.skip-params`="true"
* and `config.page.values`="$.path,$.path2" -- json path
* and `config.page.interval`="10ms"
* and `config.page.retry`="3"
Expand Down Expand Up @@ -85,7 +85,7 @@ class MLSQLRest(override val uid: String) extends MLSQLSource

val pageStrategy = PageStrategyDispatcher.get(config.config)
if (debug) {
logInfo(format(s"Get Page ${count} ${config.path} started"))
logInfo(format(s"Started to get Page ${count} ${config.path} "))
}
val firstPageFetchTime = System.currentTimeMillis()
val (_, firstDfOpt) = RestUtils.executeWithRetrying[(Int, Option[DataFrame])](maxTries)((() => {
Expand Down Expand Up @@ -117,12 +117,13 @@ class MLSQLRest(override val uid: String) extends MLSQLSource

val uuid = UUID.randomUUID().toString.replaceAll("-", "")
val context = ScriptSQLExec.context()
val tmpTablePath = resourceRealPath(context.execListener, Option(context.owner), PathFun("__tmp__").add(uuid).toPath)
val tmpTablePath = resourceRealPath(context.execListener, Option(context.owner),
PathFun("__tmp__").add("rest").add(uuid).toPath)
context.execListener.addEnv(classOf[MLSQLRest].getName, tmpTablePath)
firstDf.write.format("parquet").mode(SaveMode.Append).save(tmpTablePath)

if (debug) {
logInfo(format(s"Get Page 1 ${config.path} Consume:${System.currentTimeMillis() - firstPageFetchTime}ms"))
logInfo(format(s"End to get Page ${count} ${config.path} Consume:${System.currentTimeMillis() - firstPageFetchTime}ms"))
}

while (count < maxSize) {
Expand All @@ -131,20 +132,21 @@ class MLSQLRest(override val uid: String) extends MLSQLSource

val row = firstDf.select(F.col("content").cast(StringType), F.col("status")).head
val content = row.getString(0)
val status = row.getInt(1)
val hasNextPage = pageStrategy.hasNextPage(Option(content))

val pageValues = pageStrategy.pageValues(Option(content))


val shouldStop = pageValues.size == 0 || pageValues.filter(value => value == null || value.isEmpty).size > 0
if (shouldStop) {
if (status != 200 || !hasNextPage) {
if (debug) {
logInfo(s"Stop paging. The last Page ${count - 1} ${config.path} status: ${status} hasNextPage: ${hasNextPage} ")
}
count = maxSize
} else {
val newUrl = pageStrategy.pageUrl(Option(content))
val tempTime = System.currentTimeMillis()
RestUtils.executeWithRetrying[(Int, Option[DataFrame])](maxTries)((() => {
try {
if (debug) {
logInfo(format(s"Get Page ${count} ${newUrl} started"))
logInfo(format(s"Started get Page ${count} ${newUrl}"))
}

val tempDF = _http(newUrl, config.config, skipParams, config.df.get.sparkSession)
Expand All @@ -166,10 +168,16 @@ class MLSQLRest(override val uid: String) extends MLSQLSource
},
failResp => logInfo(s"Fail request ${newUrl} failed after ${maxTries} attempts. the last response status is: ${failResp._1}. ")
)
firstDf.write.format("parquet").mode(SaveMode.Append).save(tmpTablePath)
pageStrategy.nexPage

val row = firstDf.select(F.col("content").cast(StringType), F.col("status")).head
val status = row.getInt(1)
//page should be 200 otherwise should not save in parquet file
if (status == 200) {
firstDf.write.format("parquet").mode(SaveMode.Append).save(tmpTablePath)
}
pageStrategy.nexPage(Option(content))
if (debug) {
logInfo(format(s"Get Page ${count} ${newUrl} Consume: ${System.currentTimeMillis() - tempTime}"))
logInfo(format(s"End to get Page ${count} ${newUrl} Consume: ${System.currentTimeMillis() - tempTime}"))
}
}

Expand Down
Loading

0 comments on commit 261974a

Please sign in to comment.