Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature-#1775][connector][http] http supports offline mode #1872

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions chunjun-connectors/chunjun-connector-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@
<artifactId>javacsv</artifactId>
<version>2.0</version>
</dependency>

<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.8.0</version>
<exclusions>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
<version>2.4.10</version>
</dependency>
<dependency>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
<version>1.7.36</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -56,6 +80,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.dtstack.chunjun.util.GsonUtil;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
Expand All @@ -41,9 +42,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.dtstack.chunjun.connector.http.common.ConstantValue.CSV_DECODE;
import static com.dtstack.chunjun.connector.http.common.ConstantValue.TEXT_DECODE;
import static com.dtstack.chunjun.connector.http.common.ConstantValue.XML_DECODE;
import static com.dtstack.chunjun.connector.http.common.ConstantValue.*;

@Slf4j
public class HttpClient {
Expand Down Expand Up @@ -134,6 +133,9 @@ public void initPosition(HttpRequestParam requestParam, String response) {
}

public void execute() {
if (restConfig.getLimitRequestTime() < restConfig.getRequestTime()) {
return;
}

if (!running) {
return;
Expand Down Expand Up @@ -182,6 +184,9 @@ public void execute() {
first = false;
requestRetryTime = 3;
requestNumber++;
// 子类和父类使用同一个对象,可以向上汇报请求次数进度,以便及时触发finish
Integer requestTime = restConfig.getRequestTime();
restConfig.setRequestTime(++requestTime);
}

public void doExecute(int retryTime) {
Expand All @@ -204,6 +209,21 @@ public void doExecute(int retryTime) {
String responseValue;
int responseStatus;
try {
Map<String, Object> requestParam = currentParam.getParam();
Map<String, Object> requestBody = currentParam.getBody();
if (StringUtils.isNotBlank(restConfig.getPageParamName())) {
Integer pagePosition =
restConfig.getStartIndex()
+ restConfig.getStep() * restConfig.getRequestTime();
if (pagePosition > restConfig.getEndIndex()) {
return;
}
if ("get".equals(restConfig.getRequestMode())) {
requestParam.put(restConfig.getPageParamName(), pagePosition);
} else {
requestBody.put(restConfig.getPageParamName(), pagePosition);
}
}

HttpUriRequest request =
HttpUtil.getRequest(
Expand All @@ -221,7 +241,8 @@ public void doExecute(int retryTime) {
return;
}

responseValue = EntityUtils.toString(httpResponse.getEntity());
// utf-8:支持中文
responseValue = EntityUtils.toString(httpResponse.getEntity(), "utf-8");
responseStatus = httpResponse.getStatusLine().getStatusCode();
} catch (Throwable e) {
// 只要本次请求中出现了异常 都会进行重试,如果重试次数达到了就真正结束任务
Expand Down Expand Up @@ -264,9 +285,16 @@ public void doExecute(int retryTime) {
}
}

responseParse.parse(responseValue, responseStatus, HttpRequestParam.copy(currentParam));
while (responseParse.hasNext()) {
processData(responseParse.next());
if (StringUtils.isBlank(responseValue)) {
reachEnd = true;
running = false;
} else {
responseParse.parse(
responseValue, responseStatus, HttpRequestParam.copy(currentParam));
while (responseParse.hasNext()) {
// 一条一条数据的增加
processData(responseParse.next());
}
}

if (-1 != restConfig.getCycles() && requestNumber >= restConfig.getCycles()) {
Expand Down Expand Up @@ -342,6 +370,8 @@ protected ResponseParse getResponseParse(AbstractRowConverter converter) {
return new XmlResponseParse(restConfig, converter);
case TEXT_DECODE:
return new TextResponseParse(restConfig, converter);
case OFFLINE_JSON_DECODE:
return new OfflineJsonResponseParse(restConfig, converter);
default:
return new JsonResponseParse(restConfig, converter);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.dtstack.chunjun.connector.http.client;

import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.util.GsonUtil;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.jayway.jsonpath.JsonPath;
import org.apache.commons.lang3.StringUtils;

import java.util.Iterator;
import java.util.Map;

/** @Description 离线任务 @Author lianggao @Date 2023/6/1 下午5:50 */
public class OfflineJsonResponseParse extends ResponseParse {

private String responseValue;
private HttpRequestParam requestParam;
private final Gson gson;
private Iterator<Object> iterator;
/** true:single true:array false:single false:array */
String parserFlag;

String jsonPath;

public OfflineJsonResponseParse(HttpRestConfig config, AbstractRowConverter converter) {
super(config, converter);
this.gson = GsonUtil.GSON;
this.jsonPath = config.getJsonPath();
this.parserFlag = !StringUtils.isBlank(jsonPath) + ":" + config.getReturnedDataType();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public ResponseValue next() throws Exception {
String rowJson = gson.toJson(iterator.next());
Map<String, Object> data =
DefaultRestHandler.gson.fromJson(rowJson, GsonUtil.gsonMapTypeToken);
return new ResponseValue(converter.toInternal(data), requestParam, responseValue);
}

@Override
public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) {
this.responseValue = responseValue;
this.requestParam = requestParam;
runParseJson();
}

public void runParseJson() {
switch (parserFlag) {
case "false:array":
iterator = JsonPath.read(responseValue, "$");
break;
case "true:single":
Object read = JsonPath.read(responseValue, jsonPath);
iterator = Lists.newArrayList(read).iterator();
break;
case "true:array":
Object eval = JsonPath.read(responseValue, jsonPath);
if (eval instanceof net.minidev.json.JSONArray) {
iterator = ((net.minidev.json.JSONArray) eval).iterator();
} else {
// 如果为null 则直接报错,返回解析错误的数据
if (eval == null) {
throw new RuntimeException(
"response parsing is incorrect Please check the conf ,get response is"
+ responseValue);
}
iterator = Lists.newArrayList(eval).iterator();
}
break;
default:
Lists.newArrayList(responseValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ConstantValue {
public static final String TEXT_DECODE = "text";

public static final String DEFAULT_DECODE = "json";
public static final String OFFLINE_JSON_DECODE = "offline-json";

public static final String PREFIX = "${";
public static final String SUFFIX = "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ public class HttpRestConfig extends CommonConfig {
/** 请求的超时时间 单位毫秒 */
private long timeOut = 10000;

private String returnedDataType;
private String jsonPath;

private String pageParamName;
private Integer endIndex;
private Integer startIndex;
private Integer Step;

private Integer limitRequestTime = 1;
private Integer requestTime = 0;

public String getFieldTypes() {
return fieldTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ protected void openInternal(InputSplit inputSplit) {
protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException {
ResponseValue value = myHttpClient.takeEvent();
if (null == value) {
reachEnd = httpRestConfig.getLimitRequestTime().equals(httpRestConfig.getRequestTime());
return null;
}
if (value.isNormal()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;

import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -69,7 +70,12 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(HttpOptions.DELAY);
options.add(HttpOptions.DATA_SUBJECT);
options.add(HttpOptions.CYCLES);

options.add(HttpOptions.RETURNEDDATATYPE);
options.add(HttpOptions.JSONPATH);
options.add(HttpOptions.PAGEPARAMNAME);
options.add(HttpOptions.STEP);
options.add(HttpOptions.STARTINDEX);
options.add(HttpOptions.ENDINDEX);
return options;
}

Expand Down Expand Up @@ -141,6 +147,24 @@ private HttpRestConfig getRestapiConf(ReadableConfig config) {
httpRestConfig.setRequestMode(config.get(HttpOptions.METHOD));
httpRestConfig.setDataSubject(config.get(HttpOptions.DATA_SUBJECT));
httpRestConfig.setCycles(config.get(HttpOptions.CYCLES));

httpRestConfig.setReturnedDataType(config.get(HttpOptions.RETURNEDDATATYPE));
httpRestConfig.setJsonPath(config.get(HttpOptions.JSONPATH));

if (StringUtils.isNotBlank(config.get(HttpOptions.PAGEPARAMNAME))) {
httpRestConfig.setPageParamName(config.get(HttpOptions.PAGEPARAMNAME));
httpRestConfig.setStep(config.get(HttpOptions.STEP));
httpRestConfig.setStartIndex(config.get(HttpOptions.STARTINDEX));
httpRestConfig.setEndIndex(config.get(HttpOptions.ENDINDEX));

Integer limitRequestTime =
(config.get(HttpOptions.ENDINDEX) - config.get(HttpOptions.STARTINDEX))
/ config.get(HttpOptions.STEP)
+ 1;
httpRestConfig.setLimitRequestTime(limitRequestTime);
httpRestConfig.setCycles(limitRequestTime);
}

httpRestConfig.setParam(
gson.fromJson(
config.get(HttpOptions.PARAMS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ public String asSummaryString() {

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,40 @@ public class HttpOptions {
.longType()
.defaultValue(1L)
.withDescription("request cycle");

public static final ConfigOption<String> RETURNEDDATATYPE =
ConfigOptions.key("returned-data-type")
.stringType()
.defaultValue("single")
.withDescription("The data structure returned is single or array");

public static final ConfigOption<String> JSONPATH =
ConfigOptions.key("json-path")
.stringType()
.defaultValue("")
.withDescription("json Path");

public static final ConfigOption<String> PAGEPARAMNAME =
ConfigOptions.key("page-param-name")
.stringType()
.defaultValue("")
.withDescription("Pagination request page name,for example: pageName");

public static final ConfigOption<Integer> STARTINDEX =
ConfigOptions.key("start-index")
.intType()
.defaultValue(1)
.withDescription("The initial page number of multiple requests");

public static final ConfigOption<Integer> ENDINDEX =
ConfigOptions.key("end-index")
.intType()
.defaultValue(1)
.withDescription("The final page number of multiple requests");

public static final ConfigOption<Integer> STEP =
ConfigOptions.key("step")
.intType()
.defaultValue(1)
.withDescription("The step size of the requested page number");
}
Loading
Loading