From 445b081690bd06967f7d5efad36b235cea7fec04 Mon Sep 17 00:00:00 2001 From: WilliamZhu Date: Wed, 8 Jun 2022 22:41:39 +0800 Subject: [PATCH] Revert "Support runscript callbackHeader (#1787)" (#1789) This reverts commit e56aaaf3b87ee3213ce0ba756039e236ce742a69. --- .../main/java/tech/mlsql/crawler/RestUtils.scala | 14 +++++--------- .../main/java/streaming/rest/RestController.scala | 12 ++---------- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/streamingpro-core/src/main/java/tech/mlsql/crawler/RestUtils.scala b/streamingpro-core/src/main/java/tech/mlsql/crawler/RestUtils.scala index 57169b2ac..40b47cbe4 100644 --- a/streamingpro-core/src/main/java/tech/mlsql/crawler/RestUtils.scala +++ b/streamingpro-core/src/main/java/tech/mlsql/crawler/RestUtils.scala @@ -4,13 +4,13 @@ import net.csdn.common.path.Url import net.csdn.modules.transport.HttpTransportService.SResponse import net.csdn.modules.transport.{DefaultHttpTransportService, HttpTransportService} import org.apache.commons.lang3.exception.ExceptionUtils +import org.apache.http.{HttpEntity, HttpResponse} import org.apache.http.client.entity.UrlEncodedFormEntity import org.apache.http.client.fluent.{Form, Request} import org.apache.http.entity.ContentType import org.apache.http.entity.mime.{HttpMultipartMode, MultipartEntityBuilder} import org.apache.http.message.BasicNameValuePair import org.apache.http.util.EntityUtils -import org.apache.http.{HttpEntity, HttpResponse} import streaming.dsl.ScriptSQLExec import streaming.log.WowLog import tech.mlsql.common.JsonUtils @@ -22,21 +22,17 @@ import tech.mlsql.tool.{HDFSOperatorV2, Templates2} import java.nio.charset.Charset import scala.annotation.tailrec import scala.collection.JavaConversions._ +import scala.util.control.Breaks.{break, breakable} object RestUtils extends Logging with WowLog { - def httpClientPost(urlString: String, data: Map[String, String], headers: Map[String, String]): HttpResponse = { + def httpClientPost(urlString: String, data: Map[String, String]): HttpResponse = { val nameValuePairs = data.map { case (name, value) => new BasicNameValuePair(name, value) }.toList - val req = Request.Post(urlString) + Request.Post(urlString) .addHeader("Content-Type", "application/x-www-form-urlencoded") - - headers foreach { case (name, value) => - req.setHeader(name, value) - } - - req.body(new UrlEncodedFormEntity(nameValuePairs, DefaultHttpTransportService.charset)) + .body(new UrlEncodedFormEntity(nameValuePairs, DefaultHttpTransportService.charset)) .execute() .returnResponse() } diff --git a/streamingpro-mlsql/src/main/java/streaming/rest/RestController.scala b/streamingpro-mlsql/src/main/java/streaming/rest/RestController.scala index 179970541..4b902dcc3 100644 --- a/streamingpro-mlsql/src/main/java/streaming/rest/RestController.scala +++ b/streamingpro-mlsql/src/main/java/streaming/rest/RestController.scala @@ -38,7 +38,6 @@ import org.apache.spark.sql.mlsql.session.{MLSQLSparkSession, SparkSessionCacheM import org.apache.spark.{MLSQLConf, SparkInstanceService} import tech.mlsql.MLSQLEnvKey import tech.mlsql.app.{CustomController, ResultResp} -import tech.mlsql.common.JsonUtils import tech.mlsql.common.utils.log.Logging import tech.mlsql.common.utils.serder.json.JSONTool import tech.mlsql.crawler.RestUtils @@ -106,7 +105,6 @@ class RestController extends ApplicationController with WowLog with Logging { new Parameter(name = "sessionPerRequest", required = false, description = "by default false", `type` = "boolean", allowEmptyValue = false), new Parameter(name = "async", required = false, description = "If set true ,please also provide a callback url use `callback` parameter and the job will run in background and the API will return. default: false", `type` = "boolean", allowEmptyValue = false), new Parameter(name = "callback", required = false, description = "Used when async is set true. callback is a url. default: false", `type` = "string", allowEmptyValue = false), - new Parameter(name = "callbackHeader", required = false, description = "Provide a jsonString parameter to set the header parameter of the callback request. default: false", `type` = "string", allowEmptyValue = false), new Parameter(name = "maxRetries", required = false, description = "Max retries of request callback.", `type` = "int", allowEmptyValue = false), new Parameter(name = "skipInclude", required = false, description = "disable include statement. default: false", `type` = "boolean", allowEmptyValue = false), new Parameter(name = "skipAuth", required = false, description = "disable table authorize . default: true", `type` = "boolean", allowEmptyValue = false), @@ -149,12 +147,6 @@ class RestController extends ApplicationController with WowLog with Logging { if (paramAsBoolean("async", false)) { JobManager.asyncRun(sparkSession, jobInfo, () => { val urlString = param("callback") - val callbackHeaderString = param("callbackHeader") - var callbackHeader = Map[String,String]() - if (callbackHeaderString != null && callbackHeaderString.nonEmpty){ - callbackHeader = JsonUtils.fromJson[Map[String,String]](callbackHeaderString) - } - val maxTries = Math.max(0, paramAsInt("maxRetries", -1)) + 1 try { ScriptSQLExec.parse(param("sql"), context, @@ -166,7 +158,7 @@ class RestController extends ApplicationController with WowLog with Logging { outputResult = getScriptResult(context, sparkSession) executeWithRetrying[HttpResponse](maxTries)( - RestUtils.httpClientPost(urlString, callbackHeader, + RestUtils.httpClientPost(urlString, Map("stat" -> s"""succeeded""", "res" -> outputResult, "jobInfo" -> JSONTool.toJsonStr(jobInfo))), @@ -183,7 +175,7 @@ class RestController extends ApplicationController with WowLog with Logging { } executeWithRetrying[HttpResponse](maxTries)( - RestUtils.httpClientPost(urlString, Map(), + RestUtils.httpClientPost(urlString, Map("stat" -> s"""failed""", "msg" -> (e.getMessage + "\n" + msgBuffer.mkString("\n")), "jobInfo" -> JSONTool.toJsonStr(jobInfo))),