From 58fbb3b79e9aca5b50864ac61a20d4ebdfd5aba8 Mon Sep 17 00:00:00 2001
From: yanghuaiGit <38883656+yanghuaiGit@users.noreply.github.com>
Date: Mon, 6 Feb 2023 16:43:38 +0800
Subject: [PATCH] #1494 [feat-1494][http] http support array data (#1495)
---
.../chunjun-connector-http/pom.xml | 15 ++
.../http/client/CsvResponseParse.java | 84 ++++++++++
.../http/client/DefaultRestHandler.java | 6 -
.../connector/http/client/HttpClient.java | 57 +++++--
.../http/client/JsonResponseParse.java | 121 ++++++++++++++
.../connector/http/client/ResponseParse.java | 95 +++++++++++
.../connector/http/client/ResponseValue.java | 10 +-
.../connector/http/client/RestHandler.java | 11 --
.../http/client/TextResponseParse.java | 58 +++++++
.../http/client/XmlResponseParse.java | 98 +++++++++++
.../connector/http/common/ConstantValue.java | 4 +
.../connector/http/common/HttpRestConfig.java | 78 ++++++++-
.../connector/http/common/HttpUtil.java | 17 +-
.../http/converter/HttpColumnConverter.java | 47 +++---
.../http/inputformat/HttpInputFormat.java | 13 +-
.../connector/http/util/JsonPathUtil.java | 40 +++++
.../chunjun/connector/http/util/XmlUtil.java | 156 ++++++++++++++++++
17 files changed, 834 insertions(+), 76 deletions(-)
create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/CsvResponseParse.java
create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/JsonResponseParse.java
create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/ResponseParse.java
create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/TextResponseParse.java
create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/XmlResponseParse.java
create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/JsonPathUtil.java
create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/XmlUtil.java
diff --git a/chunjun-connectors/chunjun-connector-http/pom.xml b/chunjun-connectors/chunjun-connector-http/pom.xml
index 3a32617257..f1724cdcf1 100644
--- a/chunjun-connectors/chunjun-connector-http/pom.xml
+++ b/chunjun-connectors/chunjun-connector-http/pom.xml
@@ -31,6 +31,21 @@
chunjun-connector-http
ChunJun : Connectors : Http
+
+
+
+ org.dom4j
+ dom4j
+ 2.1.3
+
+
+
+ net.sourceforge.javacsv
+ javacsv
+ 2.0
+
+
+
diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/CsvResponseParse.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/CsvResponseParse.java
new file mode 100644
index 0000000000..08beb1887b
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/CsvResponseParse.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.http.client;
+
+import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
+import com.dtstack.chunjun.converter.AbstractRowConverter;
+
+import com.csvreader.CsvReader;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.util.HashMap;
+import java.util.Map;
+
+public class CsvResponseParse extends ResponseParse {
+ private CsvReader csvReader;
+ private Reader reader;
+ private String responseValue;
+ private HttpRequestParam requestParam;
+
+ public CsvResponseParse(HttpRestConfig config, AbstractRowConverter converter) {
+ super(config, converter);
+ if (CollectionUtils.isEmpty(columns)) {
+ throw new RuntimeException("please configure column when decode is csv");
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ return csvReader.readRecord();
+ }
+
+ @Override
+ public ResponseValue next() throws Exception {
+ String[] data = csvReader.getValues();
+ HashMap stringObjectHashMap = new HashMap<>(columns.size());
+ for (int i = 0; i < columns.size(); i++) {
+ if (columns.get(i).getValue() != null) {
+ stringObjectHashMap.put(columns.get(i).getName(), columns.get(i).getValue());
+ } else {
+ stringObjectHashMap.put(columns.get(i).getName(), data[i]);
+ }
+ }
+
+ return new ResponseValue(
+ converter.toInternal(stringObjectHashMap), requestParam, responseValue);
+ }
+
+ @Override
+ public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) {
+ this.responseValue = responseValue;
+ this.requestParam = requestParam;
+ this.reader = new StringReader(responseValue);
+ this.csvReader = new CsvReader(reader);
+ csvReader.setDelimiter(config.getCsvDelimiter().charAt(0));
+
+ Map csvConfig = config.getCsvConfig();
+ // 是否跳过空行
+ csvReader.setSkipEmptyRecords((Boolean) csvConfig.getOrDefault("skipEmptyRecords", true));
+ // 是否使用csv转义字符
+ csvReader.setUseTextQualifier((Boolean) csvConfig.getOrDefault("useTextQualifier", true));
+ csvReader.setTrimWhitespace((Boolean) csvConfig.getOrDefault("trimWhitespace", false));
+ // 单列长度是否限制100000字符
+ csvReader.setSafetySwitch((Boolean) csvConfig.getOrDefault("safetySwitch", false));
+ }
+}
diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/DefaultRestHandler.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/DefaultRestHandler.java
index 2c31477a93..2ad2bca056 100644
--- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/DefaultRestHandler.java
+++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/DefaultRestHandler.java
@@ -163,12 +163,6 @@ public HttpRequestParam buildRequestParam(
return requestParam;
}
- @Override
- public ResponseValue buildResponseValue(
- String decode, String responseValue, String fields, HttpRequestParam requestParam) {
- return new ResponseValue(responseValue, requestParam, responseValue);
- }
-
/**
* 根据指定的key 构建一个新的response
*
diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java
index 821d5a0fc1..cb6fce77ae 100644
--- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java
+++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java
@@ -21,6 +21,7 @@
import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
import com.dtstack.chunjun.connector.http.common.HttpUtil;
import com.dtstack.chunjun.connector.http.common.MetaParam;
+import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.util.ExceptionUtil;
import com.dtstack.chunjun.util.GsonUtil;
@@ -41,6 +42,10 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import static com.dtstack.chunjun.connector.http.common.ConstantValue.CSV_DECODE;
+import static com.dtstack.chunjun.connector.http.common.ConstantValue.TEXT_DECODE;
+import static com.dtstack.chunjun.connector.http.common.ConstantValue.XML_DECODE;
+
/**
* httpClient
*
@@ -60,6 +65,8 @@ public class HttpClient {
private final RestHandler restHandler;
+ protected final ResponseParse responseParse;
+
private int requestRetryTime;
/** origin body */
@@ -86,11 +93,14 @@ public class HttpClient {
private boolean running;
+ protected long requestNumber;
+
public HttpClient(
HttpRestConfig httpRestConfig,
List originalBodyList,
List originalParamList,
- List originalHeaderList) {
+ List originalHeaderList,
+ AbstractRowConverter converter) {
this.restConfig = httpRestConfig;
this.originalHeaderList = originalHeaderList;
this.originalBodyList = originalBodyList;
@@ -102,14 +112,16 @@ public HttpClient(
this.queue = new LinkedBlockingQueue<>();
this.scheduledExecutorService =
new ScheduledThreadPoolExecutor(1, r -> new Thread(r, THREAD_NAME));
- this.httpClient = HttpUtil.getHttpsClient();
+ this.httpClient = HttpUtil.getHttpsClient((int) restConfig.getTimeOut());
this.restHandler = new DefaultRestHandler();
+ this.responseParse = getResponseParse(converter);
this.prevResponse = "";
this.first = true;
this.currentParam = new HttpRequestParam();
this.reachEnd = false;
this.requestRetryTime = 2;
+ this.requestNumber = 1;
}
public void start() {
@@ -172,6 +184,7 @@ public void execute() {
doExecute(ConstantValue.REQUEST_RETRY_TIME);
first = false;
requestRetryTime = 3;
+ requestNumber++;
}
public void doExecute(int retryTime) {
@@ -192,6 +205,7 @@ public void doExecute(int retryTime) {
// 执行请求
String responseValue = null;
+ int responseStatus;
try {
HttpUriRequest request =
@@ -211,6 +225,7 @@ public void doExecute(int retryTime) {
}
responseValue = EntityUtils.toString(httpResponse.getEntity());
+ responseStatus = httpResponse.getStatusLine().getStatusCode();
} catch (Throwable e) {
// 只要本次请求中出现了异常 都会进行重试,如果重试次数达到了就真正结束任务
LOG.warn(
@@ -244,27 +259,28 @@ public void doExecute(int retryTime) {
case ConstantValue.STRATEGY_STOP:
reachEnd = true;
running = false;
+ // stop 此次请求数据有问题 任务直接异常结束
+ processData(new ResponseValue(0, null, strategy.toString(), null, null));
break;
default:
break;
}
}
- ResponseValue value =
- restHandler.buildResponseValue(
- restConfig.getDecode(),
- responseValue,
- restConfig.getFields(),
- HttpRequestParam.copy(currentParam));
+ responseParse.parse(responseValue, responseStatus, HttpRequestParam.copy(currentParam));
+ while (responseParse.hasNext()) {
+ processData(responseParse.next());
+ }
+
+ if (-1 != restConfig.getCycles() && requestNumber >= restConfig.getCycles()) {
+ reachEnd = true;
+ running = false;
+ }
+
if (reachEnd) {
// 如果结束了 需要告诉format 结束了
- if (value.isNormal()) {
- value.setStatus(0);
- // 触发的策略信息返回上游
- value.setErrorMsg(strategy.toString());
- }
+ processData(new ResponseValue(2, null, null, null, null));
}
- processData(value);
prevParam = currentParam;
prevResponse = responseValue;
@@ -321,6 +337,19 @@ public void close() {
}
}
+ protected ResponseParse getResponseParse(AbstractRowConverter converter) {
+ switch (restConfig.getDecode()) {
+ case CSV_DECODE:
+ return new CsvResponseParse(restConfig, converter);
+ case XML_DECODE:
+ return new XmlResponseParse(restConfig, converter);
+ case TEXT_DECODE:
+ return new TextResponseParse(restConfig, converter);
+ default:
+ return new JsonResponseParse(restConfig, converter);
+ }
+ }
+
@Override
public String toString() {
return "HttpClient{"
diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/JsonResponseParse.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/JsonResponseParse.java
new file mode 100644
index 0000000000..67768815a1
--- /dev/null
+++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/JsonResponseParse.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.dtstack.chunjun.connector.http.client;
+
+import com.dtstack.chunjun.conf.FieldConf;
+import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
+import com.dtstack.chunjun.connector.http.util.JsonPathUtil;
+import com.dtstack.chunjun.constants.ConstantValue;
+import com.dtstack.chunjun.converter.AbstractRowConverter;
+import com.dtstack.chunjun.util.GsonUtil;
+import com.dtstack.chunjun.util.MapUtil;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class JsonResponseParse extends ResponseParse {
+ private String responseValue;
+ private HttpRequestParam requestParam;
+ private final Gson gson;
+ private final List fields;
+ private Iterator