From 58fbb3b79e9aca5b50864ac61a20d4ebdfd5aba8 Mon Sep 17 00:00:00 2001 From: yanghuaiGit <38883656+yanghuaiGit@users.noreply.github.com> Date: Mon, 6 Feb 2023 16:43:38 +0800 Subject: [PATCH] #1494 [feat-1494][http] http support array data (#1495) --- .../chunjun-connector-http/pom.xml | 15 ++ .../http/client/CsvResponseParse.java | 84 ++++++++++ .../http/client/DefaultRestHandler.java | 6 - .../connector/http/client/HttpClient.java | 57 +++++-- .../http/client/JsonResponseParse.java | 121 ++++++++++++++ .../connector/http/client/ResponseParse.java | 95 +++++++++++ .../connector/http/client/ResponseValue.java | 10 +- .../connector/http/client/RestHandler.java | 11 -- .../http/client/TextResponseParse.java | 58 +++++++ .../http/client/XmlResponseParse.java | 98 +++++++++++ .../connector/http/common/ConstantValue.java | 4 + .../connector/http/common/HttpRestConfig.java | 78 ++++++++- .../connector/http/common/HttpUtil.java | 17 +- .../http/converter/HttpColumnConverter.java | 47 +++--- .../http/inputformat/HttpInputFormat.java | 13 +- .../connector/http/util/JsonPathUtil.java | 40 +++++ .../chunjun/connector/http/util/XmlUtil.java | 156 ++++++++++++++++++ 17 files changed, 834 insertions(+), 76 deletions(-) create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/CsvResponseParse.java create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/JsonResponseParse.java create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/ResponseParse.java create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/TextResponseParse.java create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/XmlResponseParse.java create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/JsonPathUtil.java create mode 100644 chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/XmlUtil.java diff --git a/chunjun-connectors/chunjun-connector-http/pom.xml b/chunjun-connectors/chunjun-connector-http/pom.xml index 3a32617257..f1724cdcf1 100644 --- a/chunjun-connectors/chunjun-connector-http/pom.xml +++ b/chunjun-connectors/chunjun-connector-http/pom.xml @@ -31,6 +31,21 @@ chunjun-connector-http ChunJun : Connectors : Http + + + + org.dom4j + dom4j + 2.1.3 + + + + net.sourceforge.javacsv + javacsv + 2.0 + + + diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/CsvResponseParse.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/CsvResponseParse.java new file mode 100644 index 0000000000..08beb1887b --- /dev/null +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/CsvResponseParse.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.http.client; + +import com.dtstack.chunjun.connector.http.common.HttpRestConfig; +import com.dtstack.chunjun.converter.AbstractRowConverter; + +import com.csvreader.CsvReader; +import org.apache.commons.collections.CollectionUtils; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.HashMap; +import java.util.Map; + +public class CsvResponseParse extends ResponseParse { + private CsvReader csvReader; + private Reader reader; + private String responseValue; + private HttpRequestParam requestParam; + + public CsvResponseParse(HttpRestConfig config, AbstractRowConverter converter) { + super(config, converter); + if (CollectionUtils.isEmpty(columns)) { + throw new RuntimeException("please configure column when decode is csv"); + } + } + + @Override + public boolean hasNext() throws IOException { + return csvReader.readRecord(); + } + + @Override + public ResponseValue next() throws Exception { + String[] data = csvReader.getValues(); + HashMap stringObjectHashMap = new HashMap<>(columns.size()); + for (int i = 0; i < columns.size(); i++) { + if (columns.get(i).getValue() != null) { + stringObjectHashMap.put(columns.get(i).getName(), columns.get(i).getValue()); + } else { + stringObjectHashMap.put(columns.get(i).getName(), data[i]); + } + } + + return new ResponseValue( + converter.toInternal(stringObjectHashMap), requestParam, responseValue); + } + + @Override + public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) { + this.responseValue = responseValue; + this.requestParam = requestParam; + this.reader = new StringReader(responseValue); + this.csvReader = new CsvReader(reader); + csvReader.setDelimiter(config.getCsvDelimiter().charAt(0)); + + Map csvConfig = config.getCsvConfig(); + // 是否跳过空行 + csvReader.setSkipEmptyRecords((Boolean) csvConfig.getOrDefault("skipEmptyRecords", true)); + // 是否使用csv转义字符 + csvReader.setUseTextQualifier((Boolean) csvConfig.getOrDefault("useTextQualifier", true)); + csvReader.setTrimWhitespace((Boolean) csvConfig.getOrDefault("trimWhitespace", false)); + // 单列长度是否限制100000字符 + csvReader.setSafetySwitch((Boolean) csvConfig.getOrDefault("safetySwitch", false)); + } +} diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/DefaultRestHandler.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/DefaultRestHandler.java index 2c31477a93..2ad2bca056 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/DefaultRestHandler.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/DefaultRestHandler.java @@ -163,12 +163,6 @@ public HttpRequestParam buildRequestParam( return requestParam; } - @Override - public ResponseValue buildResponseValue( - String decode, String responseValue, String fields, HttpRequestParam requestParam) { - return new ResponseValue(responseValue, requestParam, responseValue); - } - /** * 根据指定的key 构建一个新的response * diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java index 821d5a0fc1..cb6fce77ae 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/HttpClient.java @@ -21,6 +21,7 @@ import com.dtstack.chunjun.connector.http.common.HttpRestConfig; import com.dtstack.chunjun.connector.http.common.HttpUtil; import com.dtstack.chunjun.connector.http.common.MetaParam; +import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.util.ExceptionUtil; import com.dtstack.chunjun.util.GsonUtil; @@ -41,6 +42,10 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import static com.dtstack.chunjun.connector.http.common.ConstantValue.CSV_DECODE; +import static com.dtstack.chunjun.connector.http.common.ConstantValue.TEXT_DECODE; +import static com.dtstack.chunjun.connector.http.common.ConstantValue.XML_DECODE; + /** * httpClient * @@ -60,6 +65,8 @@ public class HttpClient { private final RestHandler restHandler; + protected final ResponseParse responseParse; + private int requestRetryTime; /** origin body */ @@ -86,11 +93,14 @@ public class HttpClient { private boolean running; + protected long requestNumber; + public HttpClient( HttpRestConfig httpRestConfig, List originalBodyList, List originalParamList, - List originalHeaderList) { + List originalHeaderList, + AbstractRowConverter converter) { this.restConfig = httpRestConfig; this.originalHeaderList = originalHeaderList; this.originalBodyList = originalBodyList; @@ -102,14 +112,16 @@ public HttpClient( this.queue = new LinkedBlockingQueue<>(); this.scheduledExecutorService = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, THREAD_NAME)); - this.httpClient = HttpUtil.getHttpsClient(); + this.httpClient = HttpUtil.getHttpsClient((int) restConfig.getTimeOut()); this.restHandler = new DefaultRestHandler(); + this.responseParse = getResponseParse(converter); this.prevResponse = ""; this.first = true; this.currentParam = new HttpRequestParam(); this.reachEnd = false; this.requestRetryTime = 2; + this.requestNumber = 1; } public void start() { @@ -172,6 +184,7 @@ public void execute() { doExecute(ConstantValue.REQUEST_RETRY_TIME); first = false; requestRetryTime = 3; + requestNumber++; } public void doExecute(int retryTime) { @@ -192,6 +205,7 @@ public void doExecute(int retryTime) { // 执行请求 String responseValue = null; + int responseStatus; try { HttpUriRequest request = @@ -211,6 +225,7 @@ public void doExecute(int retryTime) { } responseValue = EntityUtils.toString(httpResponse.getEntity()); + responseStatus = httpResponse.getStatusLine().getStatusCode(); } catch (Throwable e) { // 只要本次请求中出现了异常 都会进行重试,如果重试次数达到了就真正结束任务 LOG.warn( @@ -244,27 +259,28 @@ public void doExecute(int retryTime) { case ConstantValue.STRATEGY_STOP: reachEnd = true; running = false; + // stop 此次请求数据有问题 任务直接异常结束 + processData(new ResponseValue(0, null, strategy.toString(), null, null)); break; default: break; } } - ResponseValue value = - restHandler.buildResponseValue( - restConfig.getDecode(), - responseValue, - restConfig.getFields(), - HttpRequestParam.copy(currentParam)); + responseParse.parse(responseValue, responseStatus, HttpRequestParam.copy(currentParam)); + while (responseParse.hasNext()) { + processData(responseParse.next()); + } + + if (-1 != restConfig.getCycles() && requestNumber >= restConfig.getCycles()) { + reachEnd = true; + running = false; + } + if (reachEnd) { // 如果结束了 需要告诉format 结束了 - if (value.isNormal()) { - value.setStatus(0); - // 触发的策略信息返回上游 - value.setErrorMsg(strategy.toString()); - } + processData(new ResponseValue(2, null, null, null, null)); } - processData(value); prevParam = currentParam; prevResponse = responseValue; @@ -321,6 +337,19 @@ public void close() { } } + protected ResponseParse getResponseParse(AbstractRowConverter converter) { + switch (restConfig.getDecode()) { + case CSV_DECODE: + return new CsvResponseParse(restConfig, converter); + case XML_DECODE: + return new XmlResponseParse(restConfig, converter); + case TEXT_DECODE: + return new TextResponseParse(restConfig, converter); + default: + return new JsonResponseParse(restConfig, converter); + } + } + @Override public String toString() { return "HttpClient{" diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/JsonResponseParse.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/JsonResponseParse.java new file mode 100644 index 0000000000..67768815a1 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/JsonResponseParse.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.http.client; + +import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.http.common.HttpRestConfig; +import com.dtstack.chunjun.connector.http.util.JsonPathUtil; +import com.dtstack.chunjun.constants.ConstantValue; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.util.GsonUtil; +import com.dtstack.chunjun.util.MapUtil; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class JsonResponseParse extends ResponseParse { + private String responseValue; + private HttpRequestParam requestParam; + private final Gson gson; + private final List fields; + private Iterator> iterator; + + public JsonResponseParse(HttpRestConfig config, AbstractRowConverter converter) { + super(config, converter); + this.gson = GsonUtil.setTypeAdapter(new Gson()); + if (StringUtils.isNotBlank(config.getFields())) { + fields = + Arrays.stream(config.getFields().split(",")) + .map( + i -> { + FieldConf fieldConf = new FieldConf(); + fieldConf.setName(i); + return fieldConf; + }) + .collect(Collectors.toList()); + } else { + fields = null; + } + } + + @Override + public boolean hasNext() throws IOException { + return iterator.hasNext(); + } + + @Override + public ResponseValue next() throws Exception { + Map next = iterator.next(); + + if (CollectionUtils.isEmpty(columns)) { + if (CollectionUtils.isNotEmpty(fields)) { + LinkedHashMap map = + buildResponseByKey(next, fields, ConstantValue.POINT_SYMBOL); + HashMap data = new HashMap<>(); + // 需要拆分key + ((Map) map) + .forEach( + (k, v) -> { + MapUtil.buildMap(k, ConstantValue.POINT_SYMBOL, v, data); + }); + return new ResponseValue(converter.toInternal(data), requestParam, responseValue); + } else { + return new ResponseValue(converter.toInternal(next), requestParam, responseValue); + } + + } else { + LinkedHashMap data = + buildResponseByKey(next, columns, ConstantValue.POINT_SYMBOL); + return new ResponseValue(converter.toInternal(data), requestParam, responseValue); + } + } + + @Override + public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) { + this.responseValue = responseValue; + this.requestParam = requestParam; + + Map map = gson.fromJson(responseValue, GsonUtil.gsonMapTypeToken); + if (StringUtils.isNotBlank(config.getDataSubject())) { + Object valueByKey = + MapUtil.getValueByKey( + map, + JsonPathUtil.parseJsonPath(config.getDataSubject()), + ConstantValue.POINT_SYMBOL); + if (valueByKey instanceof List) { + this.iterator = ((List) valueByKey).iterator(); + } else { + throw new RuntimeException(config.getDataSubject() + " in response is not array"); + } + } else { + this.iterator = Lists.newArrayList(map).iterator(); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/ResponseParse.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/ResponseParse.java new file mode 100644 index 0000000000..b3d4cfc5b1 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/ResponseParse.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.http.client; + +import com.dtstack.chunjun.conf.FieldConf; +import com.dtstack.chunjun.connector.http.common.HttpRestConfig; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.util.GsonUtil; +import com.dtstack.chunjun.util.MapUtil; + +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public abstract class ResponseParse { + protected final HttpRestConfig config; + protected final List columns; + protected final AbstractRowConverter converter; + + public ResponseParse(HttpRestConfig config, AbstractRowConverter converter) { + this.config = config; + this.columns = config.getColumn(); + this.converter = converter; + } + + public abstract boolean hasNext() throws IOException; + + public abstract ResponseValue next() throws Exception; + + public abstract void parse( + String responseValue, int responseStatus, HttpRequestParam requestParam); + + /** + * 根据指定的key 构建一个新的response + * + * @param map 返回值 + * @param columns 指定字段 + */ + protected LinkedHashMap buildResponseByKey( + Map map, List columns, String nested) { + LinkedHashMap filedValue = new LinkedHashMap<>(columns.size() << 2); + for (FieldConf key : columns) { + if (null != key.getValue()) { + filedValue.put(key.getName(), key.getValue()); + } else { + Object value = MapUtil.getValueByKey(map, key.getName(), nested); + filedValue.put(key.getName(), value); + } + } + return filedValue; + } + + protected LinkedHashMap buildResponseByKey( + Map map, List columns, boolean useNullReplaceNotExists) { + LinkedHashMap filedValue = new LinkedHashMap<>(columns.size() << 2); + for (FieldConf key : columns) { + if (null != key.getValue()) { + filedValue.put(key.getName(), key.getValue()); + } else { + if (!map.containsKey(key.getName())) { + if (useNullReplaceNotExists) { + filedValue.put(key.getName(), null); + } else { + throw new RuntimeException( + "not exists column " + + key.getName() + + " in map: " + + GsonUtil.GSON.toJson(map)); + } + } else { + Object o = map.get(key.getName()); + filedValue.put(key.getName(), o); + } + } + } + return filedValue; + } +} diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/ResponseValue.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/ResponseValue.java index b589cadf3a..073cd5a278 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/ResponseValue.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/ResponseValue.java @@ -18,6 +18,8 @@ package com.dtstack.chunjun.connector.http.client; +import org.apache.flink.table.data.RowData; + /** * 返回值 * @@ -28,7 +30,7 @@ public class ResponseValue { /** 本次请求状态 -1 不正常,代表出现了异常 0 代表结束任务 strategy 出现了stop 1 代表任务正常 */ private int status; /** 返回值 */ - private String data; + private RowData data; /** 如果是异常数据 这个是异常数据 */ private String errorMsg; /** 请求参数 */ @@ -39,7 +41,7 @@ public class ResponseValue { public ResponseValue( int status, - String data, + RowData data, String errorMsg, HttpRequestParam requestParam, String originResponseValue) { @@ -50,7 +52,7 @@ public ResponseValue( this.originResponseValue = originResponseValue; } - public ResponseValue(String data, HttpRequestParam requestParam, String originResponseValue) { + public ResponseValue(RowData data, HttpRequestParam requestParam, String originResponseValue) { this(1, data, null, requestParam, originResponseValue); } @@ -58,7 +60,7 @@ public boolean isNormal() { return status != -1; } - public String getData() { + public RowData getData() { return data; } diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/RestHandler.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/RestHandler.java index 3e49ac7946..fa231377d0 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/RestHandler.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/RestHandler.java @@ -68,15 +68,4 @@ HttpRequestParam buildRequestParam( Map prevResponseValue, HttpRestConfig restConfig, boolean first); - - /** - * 根据返回的response 构建出ResponseValue * json格式 会指定字段解析 - * - * @param decode 解析格式 json还是text - * @param responseValue 返回值 - * @param fields 解析字段 - * @return 返回值 - */ - ResponseValue buildResponseValue( - String decode, String responseValue, String fields, HttpRequestParam requestParam); } diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/TextResponseParse.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/TextResponseParse.java new file mode 100644 index 0000000000..2aca7a9920 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/TextResponseParse.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.http.client; + +import com.dtstack.chunjun.connector.http.common.HttpRestConfig; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.element.ColumnRowData; +import com.dtstack.chunjun.element.column.StringColumn; + +import com.google.common.collect.Lists; + +import java.io.IOException; +import java.util.Iterator; + +public class TextResponseParse extends ResponseParse { + private Iterator iterator; + private HttpRequestParam requestParam; + private String responseValue; + + public TextResponseParse(HttpRestConfig config, AbstractRowConverter converter) { + super(config, converter); + } + + @Override + public boolean hasNext() throws IOException { + return iterator.hasNext(); + } + + @Override + public ResponseValue next() throws Exception { + ColumnRowData columnRowData = new ColumnRowData(1); + columnRowData.addField(new StringColumn(iterator.next())); + return new ResponseValue(columnRowData, requestParam, responseValue); + } + + @Override + public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) { + this.responseValue = responseValue; + this.requestParam = requestParam; + this.iterator = Lists.newArrayList(responseValue).iterator(); + } +} diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/XmlResponseParse.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/XmlResponseParse.java new file mode 100644 index 0000000000..fd5e6e88e1 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/client/XmlResponseParse.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.http.client; + +import com.dtstack.chunjun.connector.http.common.HttpRestConfig; +import com.dtstack.chunjun.connector.http.util.JsonPathUtil; +import com.dtstack.chunjun.connector.http.util.XmlUtil; +import com.dtstack.chunjun.constants.ConstantValue; +import com.dtstack.chunjun.converter.AbstractRowConverter; +import com.dtstack.chunjun.util.MapUtil; + +import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class XmlResponseParse extends ResponseParse { + private String responseValue; + private HttpRequestParam requestParam; + private Iterator> iterator; + + public XmlResponseParse(HttpRestConfig config, AbstractRowConverter converter) { + super(config, converter); + if (CollectionUtils.isEmpty(columns)) { + throw new RuntimeException("please configure column when decode is csv"); + } + } + + @Override + public boolean hasNext() throws IOException { + return iterator.hasNext(); + } + + @Override + public ResponseValue next() throws Exception { + Map next = iterator.next(); + if (StringUtils.isBlank(config.getDataSubject()) + && next.containsKey(columns.get(0).getName())) { + // rootKey + String key = columns.get(0).getName(); + HashMap data = new HashMap<>(); + data.put(key, next.get(key)); + return new ResponseValue(converter.toInternal(data), requestParam, responseValue); + } + LinkedHashMap data = + buildResponseByKey(next, columns, ConstantValue.POINT_SYMBOL); + return new ResponseValue(converter.toInternal(data), requestParam, responseValue); + } + + @Override + public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) { + this.responseValue = responseValue; + this.requestParam = requestParam; + + Map xmlData = new HashMap<>(); + XmlUtil.xmlParse(responseValue, xmlData); + if (StringUtils.isBlank(config.getDataSubject())) { + this.iterator = Lists.newArrayList(xmlData).iterator(); + } else { + Object valueByKey = + MapUtil.getValueByKey( + xmlData, + JsonPathUtil.parseJsonPath(config.getDataSubject()), + ConstantValue.POINT_SYMBOL); + if (valueByKey instanceof List) { + this.iterator = ((List) valueByKey).iterator(); + } else if (valueByKey instanceof Map) { + Map data = (Map) valueByKey; + this.iterator = Lists.newArrayList(data).iterator(); + } else { + throw new RuntimeException( + config.getDataSubject() + " in response is not array or map"); + } + } + } +} diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/ConstantValue.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/ConstantValue.java index ee5ad97979..c0d22b7979 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/ConstantValue.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/ConstantValue.java @@ -35,6 +35,10 @@ public class ConstantValue { public static final String SYSTEM_FUNCTION_CURRENT_TIME = "currentTime"; public static final String SYSTEM_FUNCTION_INTERVAL_TIME = "intervalTime"; + public static final String CSV_DECODE = "csv"; + public static final String XML_DECODE = "xml"; + public static final String TEXT_DECODE = "text"; + public static final String DEFAULT_DECODE = "json"; public static String PREFIX = "${"; diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpRestConfig.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpRestConfig.java index 4cec16ccf4..cc76833d0c 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpRestConfig.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpRestConfig.java @@ -21,7 +21,9 @@ import com.dtstack.chunjun.connector.http.client.Strategy; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * HttpRestConfig @@ -43,6 +45,13 @@ public class HttpRestConfig extends ChunJunCommonConf { private String fieldDelimiter = com.dtstack.chunjun.constants.ConstantValue.POINT_SYMBOL; + /** 数据主体,代表json或者xml返回数据里,数据主体字段对应的数据一定是一个数组,里面的数据需要拆分 */ + private String dataSubject; + + private String csvDelimiter = com.dtstack.chunjun.constants.ConstantValue.COMMA_SYMBOL; + + private Map csvConfig = new HashMap<>(); + /** response text/json */ private String decode = "text"; @@ -53,7 +62,11 @@ public class HttpRestConfig extends ChunJunCommonConf { private String fieldTypes; /** 请求的间隔时间 单位毫秒 */ - private Long intervalTime; + private Long intervalTime = 3000L; + + // allow request num cycles 为-1时,除非异常或者strategy生效导致任务结束,否则任务会一直循环请求,如果 大于 + // 0,则代表循环请求的次数,如配置为3,则会发送三次http请求 + private long cycles = -1; /** 请求的header头 */ private List header = new ArrayList<>(2); @@ -67,6 +80,9 @@ public class HttpRestConfig extends ChunJunCommonConf { /** 返回结果的处理策略 */ protected List strategy = new ArrayList<>(2); + /** 请求的超时时间 单位毫秒 */ + private long timeOut = 10000; + public String getFieldTypes() { return fieldTypes; } @@ -167,6 +183,46 @@ public void setFieldDelimiter(String fieldDelimiter) { this.fieldDelimiter = fieldDelimiter; } + public String getDataSubject() { + return dataSubject; + } + + public void setDataSubject(String dataSubject) { + this.dataSubject = dataSubject; + } + + public String getCsvDelimiter() { + return csvDelimiter; + } + + public void setCsvDelimiter(String csvDelimiter) { + this.csvDelimiter = csvDelimiter; + } + + public Map getCsvConfig() { + return csvConfig; + } + + public void setCsvConfig(Map csvConfig) { + this.csvConfig = csvConfig; + } + + public long getCycles() { + return cycles; + } + + public void setCycles(long cycles) { + this.cycles = cycles; + } + + public long getTimeOut() { + return timeOut; + } + + public void setTimeOut(long timeOut) { + this.timeOut = timeOut; + } + @Override public String toString() { return "HttpRestConfig{" @@ -179,16 +235,30 @@ public String toString() { + ", requestMode='" + requestMode + '\'' + + ", fieldDelimiter='" + + fieldDelimiter + + '\'' + + ", dataSubject='" + + dataSubject + + '\'' + + ", csvDelimiter='" + + csvDelimiter + + '\'' + + ", csvConfig=" + + csvConfig + ", decode='" + decode + '\'' + ", fields='" + fields + '\'' + + ", fieldTypes='" + + fieldTypes + + '\'' + ", intervalTime=" + intervalTime - + ", fieldDelimiter=" - + fieldDelimiter + + ", cycles=" + + cycles + ", header=" + header + ", param=" @@ -197,6 +267,8 @@ public String toString() { + body + ", strategy=" + strategy + + ", timeOut=" + + timeOut + '}'; } } diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpUtil.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpUtil.java index 785235dc33..00a99f5b40 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpUtil.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/common/HttpUtil.java @@ -58,10 +58,10 @@ public class HttpUtil { public static CloseableHttpClient getHttpClient() { - return getBaseBuilder().build(); + return getBaseBuilder(TIME_OUT).build(); } - public static CloseableHttpClient getHttpsClient() { + public static CloseableHttpClient getHttpsClient(int timeOut) { // 设置Http连接池 SSLContext sslContext; @@ -74,13 +74,16 @@ public static CloseableHttpClient getHttpsClient() { LOG.warn(ExceptionUtil.getErrorMessage(e)); throw new RuntimeException(e); } - return getBaseBuilder() + return getBaseBuilder(timeOut) .setSSLContext(sslContext) .setSSLHostnameVerifier(new NoopHostnameVerifier()) .build(); } - public static HttpClientBuilder getBaseBuilder() { + public static HttpClientBuilder getBaseBuilder(int timeOut) { + if (timeOut <= 0) { + timeOut = TIME_OUT; + } // 设置自定义的重试策略 ServiceUnavailableRetryStrategyImpl strategy = new ServiceUnavailableRetryStrategyImpl.Builder() @@ -93,9 +96,9 @@ public static HttpClientBuilder getBaseBuilder() { // 设置超时时间 RequestConfig requestConfig = RequestConfig.custom() - .setConnectTimeout(TIME_OUT) - .setConnectionRequestTimeout(TIME_OUT) - .setSocketTimeout(TIME_OUT) + .setConnectTimeout(timeOut) + .setConnectionRequestTimeout(timeOut) + .setSocketTimeout(timeOut) .build(); // 设置Http连接池 PoolingHttpClientConnectionManager pcm = new PoolingHttpClientConnectionManager(); diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/converter/HttpColumnConverter.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/converter/HttpColumnConverter.java index ed1e977ad3..729fc668dd 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/converter/HttpColumnConverter.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/converter/HttpColumnConverter.java @@ -18,8 +18,7 @@ package com.dtstack.chunjun.connector.http.converter; -import com.dtstack.chunjun.connector.http.client.DefaultRestHandler; -import com.dtstack.chunjun.connector.http.common.ConstantValue; +import com.dtstack.chunjun.conf.FieldConf; import com.dtstack.chunjun.connector.http.common.HttpRestConfig; import com.dtstack.chunjun.converter.AbstractRowConverter; import com.dtstack.chunjun.converter.IDeserializationConverter; @@ -32,8 +31,6 @@ import com.dtstack.chunjun.element.column.StringColumn; import com.dtstack.chunjun.element.column.TimestampColumn; import com.dtstack.chunjun.util.DateUtil; -import com.dtstack.chunjun.util.GsonUtil; -import com.dtstack.chunjun.util.MapUtil; import org.apache.flink.table.data.RowData; @@ -41,6 +38,7 @@ import java.math.BigDecimal; import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.Map; @@ -50,7 +48,7 @@ * @description */ public class HttpColumnConverter - extends AbstractRowConverter, String> { + extends AbstractRowConverter, Object, Map, String> { /** restapi Conf */ private HttpRestConfig httpRestConfig; @@ -77,34 +75,27 @@ protected ISerializationConverter> wrapIntoNullableExternalC } @Override - public RowData toInternal(String input) throws Exception { + public RowData toInternal(Map input) throws Exception { ColumnRowData row; - if (httpRestConfig.getDecode().equals(ConstantValue.DEFAULT_DECODE)) { - Map result = - DefaultRestHandler.gson.fromJson(input, GsonUtil.gsonMapTypeToken); - if (toInternalConverters != null && toInternalConverters.size() > 0) { - // 同步任务配置了field参数(对应的类型转换都是string) 需要对每个字段进行类型转换 - row = new ColumnRowData(toInternalConverters.size()); - String fields = httpRestConfig.getFields(); - String[] split = fields.split(","); - - for (int i = 0; i < split.length; i++) { - Object value = - MapUtil.getValueByKey( - result, split[i], httpRestConfig.getFieldDelimiter()); - row.addField( - (AbstractBaseColumn) toInternalConverters.get(i).deserialize(value)); - } - } else { - // 直接作为mapColumn - row = new ColumnRowData(1); - row.addField(new MapColumn(result)); - } + if (toInternalConverters != null && toInternalConverters.size() > 0) { + List fieldConfList = commonConf.getColumn(); + // 同步任务配置了field参数(对应的类型转换都是string) 需要对每个字段进行类型转换 + row = new ColumnRowData(toInternalConverters.size()); + for (int i = 0; i < toInternalConverters.size(); i++) { + String name = httpRestConfig.getColumn().get(i).getName(); + AbstractBaseColumn baseColumn = + (AbstractBaseColumn) + toInternalConverters.get(i).deserialize(input.get(name)); + + row.addField(assembleFieldProps(fieldConfList.get(i), baseColumn)); + } } else { + // 实时直接作为mapColumn row = new ColumnRowData(1); - row.addField(new StringColumn(input)); + row.addField(new MapColumn(input)); } + return row; } diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/inputformat/HttpInputFormat.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/inputformat/HttpInputFormat.java index daf3e96e2f..b0ace723b1 100644 --- a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/inputformat/HttpInputFormat.java +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/inputformat/HttpInputFormat.java @@ -69,7 +69,8 @@ public void openInputFormat() throws IOException { @Override @SuppressWarnings("unchecked") protected void openInternal(InputSplit inputSplit) { - myHttpClient = new HttpClient(httpRestConfig, metaBodys, metaParams, metaHeaders); + myHttpClient = + new HttpClient(httpRestConfig, metaBodys, metaParams, metaHeaders, rowConverter); if (state != null) { myHttpClient.initPosition(state.getRequestParam(), state.getOriginResponseValue()); } @@ -96,14 +97,20 @@ protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException + value.getOriginResponseValue() + " job end"); } + // finished 任务正常结束 + if (value.getStatus() == 2) { + reachEnd = true; + return null; + } + // todo 离线任务后期需要加上一个finished策略 这样就是代表任务正常结束 而不是异常stop state = new ResponseValue( - "", + null, HttpRequestParam.copy(value.getRequestParam()), value.getOriginResponseValue()); try { - return rowConverter.toInternal(value.getData()); + return value.getData(); } catch (Exception e) { throw new ReadRecordException(e.getMessage(), e); } diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/JsonPathUtil.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/JsonPathUtil.java new file mode 100644 index 0000000000..e680364629 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/JsonPathUtil.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.http.util; + +import org.apache.commons.lang3.StringUtils; + +public class JsonPathUtil { + public static String parseJsonPath(String path) { + // + if (path.startsWith("$.")) { + return path.substring(2); + } else if (path.startsWith("${") && path.endsWith("}")) { + return path.substring(2, path.length() - 1); + } + throw new IllegalArgumentException("just support parse ${xxx.xxx} or $.xxx.xx format"); + } + + public static boolean isJsonPath(String path) { + if (StringUtils.isBlank(path)) { + throw new IllegalArgumentException("path is not empty"); + } + return path.startsWith("$.") || (path.startsWith("${") && path.endsWith("}")); + } +} diff --git a/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/XmlUtil.java b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/XmlUtil.java new file mode 100644 index 0000000000..d62d31d22a --- /dev/null +++ b/chunjun-connectors/chunjun-connector-http/src/main/java/com/dtstack/chunjun/connector/http/util/XmlUtil.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.http.util; + +import org.apache.commons.lang3.tuple.Pair; +import org.dom4j.Attribute; +import org.dom4j.Document; +import org.dom4j.DocumentException; +import org.dom4j.DocumentHelper; +import org.dom4j.Element; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class XmlUtil { + + public static Map xmlParse(String xml, Map outmap) { + Document document = null; + try { + document = DocumentHelper.parseText(xml); + } catch (DocumentException e) { + throw new RuntimeException( + "parse xml failed,the response values is not xml format and value is ->" + xml); + } + // 3.获取根节点 + Element rootElement = document.getRootElement(); + + return elementTomap(rootElement, outmap); + } + + @SuppressWarnings("unchecked") + public static Map elementTomap(Element outele, Map outmap) { + List list = outele.elements(); + int size = list.size(); + if (size == 0) { + outmap.put(outele.getName(), outele.getTextTrim()); + } else { + Map innermap = new HashMap<>(); + for (Element ele1 : list) { + String eleName = ele1.getName(); + Object obj = innermap.get(eleName); + if (obj == null) { + elementTomap(ele1, innermap); + } else { + if (obj instanceof Map) { + List> list1 = new ArrayList>(); + list1.add((Map) innermap.remove(eleName)); + elementTomap(ele1, innermap); + list1.add((Map) innermap.remove(eleName)); + innermap.put(eleName, list1); + } else if (obj instanceof String) { + List list1 = new ArrayList(); + list1.add((String) innermap.remove(eleName)); + elementTomap(ele1, innermap); + list1.add((String) innermap.remove(eleName)); + innermap.put(eleName, list1); + } else { + elementTomap(ele1, innermap); + if (innermap.get(eleName) instanceof Map) { + Map listValue = + (Map) innermap.get(eleName); + ((List>) obj).add(listValue); + innermap.put(eleName, obj); + } else if (innermap.get(eleName) instanceof String) { + String listValue = (String) innermap.get(eleName); + ((List) obj).add(listValue); + innermap.put(eleName, obj); + } + } + } + } + outmap.put(outele.getName(), innermap); + } + return outmap; + } + + public static LinkedHashMap>> hbaseXmlParse(String xml) { + Document document = null; + try { + document = DocumentHelper.parseText(xml); + } catch (DocumentException e) { + throw new RuntimeException( + "parse xml failed,the response values is not xml format and value is ->" + xml); + } + // 3.获取根节点 + Element rootElement = document.getRootElement(); + + return hbaseElementTomap(rootElement); + } + + // example + // + // + // MTM= + // a3lvdG9t + // + // + // MTY= + // + // + + public static LinkedHashMap>> hbaseElementTomap( + Element outele) { + + LinkedHashMap>> data = new LinkedHashMap<>(); + List list = outele.elements(); + for (Element ele1 : list) { + String eleName = ele1.getName(); + if (!eleName.equalsIgnoreCase("Row")) { + continue; + } + Attribute key = ele1.attribute("key"); + if (key == null) { + throw new RuntimeException( + "parse hbase xml data error, row element not has attribute key"); + } + // cell + ArrayList> columns = new ArrayList<>(); + // add rowkey + columns.add(Pair.of("rowkey", key.getValue())); + for (Element element : ele1.elements()) { + if (element.getName().equalsIgnoreCase("Cell")) { + Attribute column = element.attribute("column"); + if (column == null) { + throw new RuntimeException( + "parse hbase xml data error, cell element has not attribute column"); + } + columns.add(Pair.of(column.getValue(), element.getText())); + } + } + + data.put(key.getValue(), columns); + } + + return data; + } +}