Skip to content

Commit

Permalink
Support runscript callbackHeader
Browse files Browse the repository at this point in the history
添加callback header

添加callback header

添加callback header

添加callback header
  • Loading branch information
hellozepp committed Aug 8, 2022
1 parent 445b081 commit 514b8f5
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import net.csdn.common.path.Url
import net.csdn.modules.transport.HttpTransportService.SResponse
import net.csdn.modules.transport.{DefaultHttpTransportService, HttpTransportService}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.http.{HttpEntity, HttpResponse}
import org.apache.http.client.entity.UrlEncodedFormEntity
import org.apache.http.client.fluent.{Form, Request}
import org.apache.http.entity.ContentType
import org.apache.http.entity.mime.{HttpMultipartMode, MultipartEntityBuilder}
import org.apache.http.message.BasicNameValuePair
import org.apache.http.util.EntityUtils
import org.apache.http.{HttpEntity, HttpResponse}
import streaming.dsl.ScriptSQLExec
import streaming.log.WowLog
import tech.mlsql.common.JsonUtils
Expand All @@ -22,17 +22,21 @@ import tech.mlsql.tool.{HDFSOperatorV2, Templates2}
import java.nio.charset.Charset
import scala.annotation.tailrec
import scala.collection.JavaConversions._
import scala.util.control.Breaks.{break, breakable}

object RestUtils extends Logging with WowLog {
def httpClientPost(urlString: String, data: Map[String, String]): HttpResponse = {
def httpClientPost(urlString: String, data: Map[String, String], headers: Map[String, String]): HttpResponse = {
val nameValuePairs = data.map { case (name, value) =>
new BasicNameValuePair(name, value)
}.toList

Request.Post(urlString)
val req = Request.Post(urlString)
.addHeader("Content-Type", "application/x-www-form-urlencoded")
.body(new UrlEncodedFormEntity(nameValuePairs, DefaultHttpTransportService.charset))

headers foreach { case (name, value) =>
req.setHeader(name, value)
}

req.body(new UrlEncodedFormEntity(nameValuePairs, DefaultHttpTransportService.charset))
.execute()
.returnResponse()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package tech.mlsql.it

import net.csdn.modules.transport.DefaultHttpTransportService
import org.apache.http.HttpEntity
import org.apache.http.util.EntityUtils
import tech.mlsql.common.utils.log.Logging
import tech.mlsql.crawler.RestUtils
import tech.mlsql.it.contiainer.ByzerCluster
Expand All @@ -8,6 +11,7 @@ import tech.mlsql.it.utils.DockerUtils.getCurProjectRootPath

import java.io.File
import java.util.UUID
import scala.collection.mutable

/**
* 23/02/2022 hellozepp(lisheng.zhanglin@163.com)
Expand Down Expand Up @@ -54,11 +58,27 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging {
})
}

def runScript(url: String, user: String, code: String): (Int, String) = {
def runScript(url: String, user: String, code: String, callbackHeader: String = ""): (Int, String) = {
val jobName = UUID.randomUUID().toString
val params = mutable.Map("sql" -> code, "owner" -> user,
"jobName" -> jobName, "sessionPerUser" -> "true", "sessionPerRequest" -> "true")
if (callbackHeader != "") params.put("callbackHeader", callbackHeader)
logInfo(s"The test submits a script to the container through Rest, url:$url, sql:$code")
val (status, result) = RestUtils.rest_request_string(url, "post", Map("sql" -> code, "owner" -> user,
"jobName" -> jobName, "sessionPerUser" -> "true", "sessionPerRequest" -> "true"),
val (status, result) = RestUtils.rest_request_string(url, "post", params.toMap,
Map("Content-Type" -> "application/x-www-form-urlencoded"), Map("socket-timeout" -> "1800s",
"connect-timeout" -> "1800s", "retry" -> "1")
)
logInfo(s"status:$status,result:$result")
(status, result)
}

def runScriptWithHeader(url: String, user: String, code: String, callbackHeader: String = ""): (Int, HttpEntity) = {
val jobName = UUID.randomUUID().toString
val params = mutable.Map("sql" -> code, "owner" -> user,
"jobName" -> jobName, "sessionPerUser" -> "true", "sessionPerRequest" -> "true")
if (callbackHeader != "") params.put("callbackHeader", callbackHeader)
logInfo(s"The test submits a script to the container through Rest, url:$url, sql:$code")
val (status, result) = RestUtils.rest_request(url, "post", params.toMap,
Map("Content-Type" -> "application/x-www-form-urlencoded"), Map("socket-timeout" -> "1800s",
"connect-timeout" -> "1800s", "retry" -> "1")
)
Expand Down Expand Up @@ -101,6 +121,18 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging {
}

test("Execute yarn sql file") {
try {
val (_, result) = runScriptWithHeader(url, user, "select 1 as a,'jack' as b as bbc;",
"""{"Authorization":"Bearer acc"}""")
val _result = EntityUtils.toString(result, DefaultHttpTransportService.charset)
println("With callbackHeader result:" + _result)
assert(_result === "[{\"a\":1,\"b\":\"jack\"}]")
} catch {
case _: Exception =>
logError(s"callbackHeader should be returned normally in the byzer callback!")
System.exit(1)
}

TestManager.testCases.foreach(testCase => {
try {
val (status, result) = runScript(url, user, testCase.sql)
Expand All @@ -110,7 +142,6 @@ class ByzerScriptTestSuite extends LocalBaseTestSuite with Logging {
TestManager.acceptRest(testCase, 500, null, e)
}
})

TestManager.report()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.mlsql.session.{MLSQLSparkSession, SparkSessionCacheM
import org.apache.spark.{MLSQLConf, SparkInstanceService}
import tech.mlsql.MLSQLEnvKey
import tech.mlsql.app.{CustomController, ResultResp}
import tech.mlsql.common.JsonUtils
import tech.mlsql.common.utils.log.Logging
import tech.mlsql.common.utils.serder.json.JSONTool
import tech.mlsql.crawler.RestUtils
Expand Down Expand Up @@ -105,6 +106,7 @@ class RestController extends ApplicationController with WowLog with Logging {
new Parameter(name = "sessionPerRequest", required = false, description = "by default false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "async", required = false, description = "If set true ,please also provide a callback url use `callback` parameter and the job will run in background and the API will return. default: false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "callback", required = false, description = "Used when async is set true. callback is a url. default: false", `type` = "string", allowEmptyValue = false),
new Parameter(name = "callbackHeader", required = false, description = "Provide a jsonString parameter to set the header parameter of the callback request. default: false", `type` = "string", allowEmptyValue = false),
new Parameter(name = "maxRetries", required = false, description = "Max retries of request callback.", `type` = "int", allowEmptyValue = false),
new Parameter(name = "skipInclude", required = false, description = "disable include statement. default: false", `type` = "boolean", allowEmptyValue = false),
new Parameter(name = "skipAuth", required = false, description = "disable table authorize . default: true", `type` = "boolean", allowEmptyValue = false),
Expand Down Expand Up @@ -147,6 +149,12 @@ class RestController extends ApplicationController with WowLog with Logging {
if (paramAsBoolean("async", false)) {
JobManager.asyncRun(sparkSession, jobInfo, () => {
val urlString = param("callback")
val callbackHeaderString = param("callbackHeader")
var callbackHeader = Map[String,String]()
if (callbackHeaderString != null && callbackHeaderString.nonEmpty){
callbackHeader = JsonUtils.fromJson[Map[String,String]](callbackHeaderString)
}

val maxTries = Math.max(0, paramAsInt("maxRetries", -1)) + 1
try {
ScriptSQLExec.parse(param("sql"), context,
Expand All @@ -161,7 +169,8 @@ class RestController extends ApplicationController with WowLog with Logging {
RestUtils.httpClientPost(urlString,
Map("stat" -> s"""succeeded""",
"res" -> outputResult,
"jobInfo" -> JSONTool.toJsonStr(jobInfo))),
"jobInfo" -> JSONTool.toJsonStr(jobInfo)),
callbackHeader),
HttpStatus.SC_OK == _.getStatusLine.getStatusCode,
response => logger.error(s"Succeeded SQL callback request failed after ${maxTries} attempts, " +
s"the last response status is: ${response.getStatusLine.getStatusCode}.")
Expand All @@ -178,7 +187,8 @@ class RestController extends ApplicationController with WowLog with Logging {
RestUtils.httpClientPost(urlString,
Map("stat" -> s"""failed""",
"msg" -> (e.getMessage + "\n" + msgBuffer.mkString("\n")),
"jobInfo" -> JSONTool.toJsonStr(jobInfo))),
"jobInfo" -> JSONTool.toJsonStr(jobInfo)),
Map()),
HttpStatus.SC_OK == _.getStatusLine.getStatusCode,
response => logger.error(s"Fail SQL callback request failed after ${maxTries} attempts, " +
s"the last response status is: ${response.getStatusLine.getStatusCode}.")
Expand Down

0 comments on commit 514b8f5

Please sign in to comment.