Skip to content

Commit

Permalink
[hotfix-#1497][chunjun-connector-http]Solve the problem that http-x p…
Browse files Browse the repository at this point in the history
…lug-in cannot be used in sql mode (#1500)

Co-authored-by: 443321070@qq.com <taotao0226.?>
  • Loading branch information
taoyameng authored Feb 13, 2023
1 parent 58fbb3b commit d65793b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@

package com.dtstack.chunjun.connector.http.converter;

import com.dtstack.chunjun.connector.http.client.DefaultRestHandler;
import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.converter.IDeserializationConverter;
import com.dtstack.chunjun.converter.ISerializationConverter;
import com.dtstack.chunjun.util.GsonUtil;
import com.dtstack.chunjun.util.MapUtil;

import org.apache.flink.table.data.DecimalData;
Expand All @@ -45,7 +43,8 @@
import java.util.Map;

/** Base class for all converters that convert between restapi body and Flink internal object. */
public class HttpRowConverter extends AbstractRowConverter<String, RowData, RowData, LogicalType> {
public class HttpRowConverter
extends AbstractRowConverter<Map<String, Object>, RowData, RowData, LogicalType> {

private HttpRestConfig httpRestConfig;

Expand All @@ -69,9 +68,7 @@ public HttpRowConverter(RowType rowType) {
}

@Override
public RowData toInternal(String input) throws Exception {
Map<String, Object> result =
DefaultRestHandler.gson.fromJson(input, GsonUtil.gsonMapTypeToken);
public RowData toInternal(Map<String, Object> result) throws Exception {
GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
List<String> columns = rowType.getFieldNames();
for (int pos = 0; pos < columns.size(); pos++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(HttpOptions.INTERVALTIME);
options.add(HttpOptions.COLUMN);
options.add(HttpOptions.DELAY);
options.add(HttpOptions.DATA_SUBJECT);
options.add(HttpOptions.CYCLES);

return options;
}
Expand Down Expand Up @@ -142,6 +144,8 @@ private HttpRestConfig getRestapiConf(ReadableConfig config) {
httpRestConfig.setUrl(config.get(HttpOptions.URL));
httpRestConfig.setDecode(config.get(HttpOptions.DECODE));
httpRestConfig.setRequestMode(config.get(HttpOptions.METHOD));
httpRestConfig.setDataSubject(config.get(HttpOptions.DATA_SUBJECT));
httpRestConfig.setCycles(config.get(HttpOptions.CYCLES));
httpRestConfig.setParam(
gson.fromJson(
config.get(HttpOptions.PARAMS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,16 @@ public class HttpOptions {

public static final ConfigOption<Integer> DELAY =
ConfigOptions.key("delay").intType().defaultValue(30).withDescription("request delay");

public static final ConfigOption<String> DATA_SUBJECT =
ConfigOptions.key("dataSubject")
.stringType()
.defaultValue("${data}")
.withDescription("response data subject");

public static final ConfigOption<Long> CYCLES =
ConfigOptions.key("cycles")
.longType()
.defaultValue(1L)
.withDescription("request cycle");
}

0 comments on commit d65793b

Please sign in to comment.