Skip to content

Commit

Permalink
[feat-855][doris] doris stream load change to json format.
Browse files Browse the repository at this point in the history
  • Loading branch information
shitou authored and FlechazoW committed May 23, 2022
1 parent c7b7f0e commit ec525e7
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
*/
public class DorisConf extends ChunJunCommonConf {

private String fieldDelimiter;

private String lineDelimiter;

private String database;

private String table;
Expand All @@ -58,22 +54,6 @@ public class DorisConf extends ChunJunCommonConf {

private Properties loadProperties;

public String getFieldDelimiter() {
return fieldDelimiter;
}

public void setFieldDelimiter(String fieldDelimiter) {
this.fieldDelimiter = fieldDelimiter;
}

public String getLineDelimiter() {
return lineDelimiter;
}

public void setLineDelimiter(String lineDelimiter) {
this.lineDelimiter = lineDelimiter;
}

public String getDatabase() {
return database;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,6 @@ public DorisConfBuilder setFeNodes(List<String> feNodes) {
return this;
}

public DorisConfBuilder setFieldDelimiter(String fieldDelimiter) {
this.dorisConf.setFieldDelimiter(fieldDelimiter);
return this;
}

public DorisConfBuilder setLineDelimiter(String lineDelimiter) {
this.dorisConf.setLineDelimiter(lineDelimiter);
return this;
}

public DorisConfBuilder setUsername(String username) {
this.dorisConf.setUsername(username);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.IntStream;

/**
* Company:www.dtstack.com.
Expand All @@ -17,18 +20,16 @@
*/
public class Carrier implements Serializable {
private static final long serialVersionUID = 1L;
private final StringJoiner insertContent;
private final List<Map<String, Object>> insertContent;
private final StringJoiner deleteContent;
private final String fieldDelimiter;
private int batch = 0;
private String database;
private String table;
private List<String> columns;
private final Set<Integer> rowDataIndexes = new HashSet<>();

public Carrier(String fieldDelimiter, String lineDelimiter) {
this.fieldDelimiter = fieldDelimiter;
insertContent = new StringJoiner(lineDelimiter);
public Carrier() {
insertContent = new ArrayList<>();
deleteContent = new StringJoiner(" OR ");
}

Expand All @@ -48,8 +49,8 @@ public void setTable(String table) {
this.table = table;
}

public String getInsertContent() {
return insertContent.toString();
public List<Map<String, Object>> getInsertContent() {
return insertContent;
}

public String getDeleteContent() {
Expand Down Expand Up @@ -78,14 +79,20 @@ public void addInsertContent(List<String> insertV) {
// It is certain that in this case, the size
// of insertV is twice the size of column
List<String> forward = insertV.subList(0, columns.size());
String forwardV = StringUtils.join(forward, fieldDelimiter);
final Map<String, Object> forwardV = new HashMap<>(columns.size());
IntStream.range(0, columns.size())
.forEach(i -> forwardV.put(columns.get(i), forward.get(i)));
insertContent.add(forwardV);
List<String> behind = insertV.subList(columns.size(), insertV.size());
String behindV = StringUtils.join(behind, fieldDelimiter);
final Map<String, Object> behindV = new HashMap<>(columns.size());
IntStream.range(0, columns.size())
.forEach(i -> behindV.put(columns.get(i), behind.get(i)));
insertContent.add(behindV);
} else {
String s = StringUtils.join(insertV, fieldDelimiter);
insertContent.add(s);
final Map<String, Object> values = new HashMap<>(columns.size());
IntStream.range(0, columns.size())
.forEach(i -> values.put(columns.get(i), insertV.get(i)));
insertContent.add(values);
}
}
}
Expand All @@ -110,7 +117,9 @@ private String buildMergeOnConditions(List<String> columns, List<String> values)
List<String> deleteOnStr = new ArrayList<>();
for (int i = 0, size = columns.size(); i < size; i++) {
String s =
columns.get(i)
"`"
+ columns.get(i)
+ "`"
+ "<=>"
+ "'"
+ ((values.get(i)) == null ? "" : values.get(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,13 @@ public class DorisLoadClient implements Serializable {
.collect(Collectors.toCollection(HashSet::new));

private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load?";
private static final String NULL_VALUE = "\\N";
private static final String KEY_SCHEMA = "schema";
private static final String KEY_TABLE = "table";
private static final String KEY_POINT = ".";
public static final String KEY_BEFORE = "before_";
public static final String KEY_AFTER = "after_";

private final DorisStreamLoad dorisStreamLoad;
private final String fieldDelimiter;
private final String lineDelimiter;
private final boolean nameMapped;
private String hostPort;
private final DorisConf conf;
Expand All @@ -80,8 +77,6 @@ public DorisLoadClient(DorisStreamLoad dorisStreamLoad, DorisConf conf, String h
this.hostPort = hostPort;
this.conf = conf;
this.nameMapped = conf.isNameMapped();
this.fieldDelimiter = conf.getFieldDelimiter();
this.lineDelimiter = conf.getLineDelimiter();
}

public void setHostPort(String hostPort) {
Expand Down Expand Up @@ -271,15 +266,15 @@ private void wrapValuesFromRowData(
if (column.equalsIgnoreCase(trueCol)) {
insertV.add(convert(value, i));
deleteV.add(convert(value, i));
break;
continue;
}
}
// case 2, need to insert.
if (headers[i].startsWith(KEY_AFTER)) {
String trueCol = headers[i].substring(6);
if (column.equalsIgnoreCase(trueCol)) {
insertV.add(convert(value, i));
break;
continue;
}
}
// case 3. column name is obvious.
Expand All @@ -288,7 +283,6 @@ private void wrapValuesFromRowData(
if (delete) {
deleteV.add(convert(value, i));
}
break;
}
}
}
Expand All @@ -301,7 +295,7 @@ private Carrier initCarrier(
List<String> deleteV,
String schema,
String table) {
Carrier carrier = new Carrier(fieldDelimiter, lineDelimiter);
Carrier carrier = new Carrier();
carrier.setColumns(columns);
carrier.setDatabase(schema);
carrier.setTable(table);
Expand Down Expand Up @@ -347,6 +341,6 @@ private List<String> getColumnName(List<FieldConf> fields) {

private String convert(@Nonnull ColumnRowData rowData, int index) {
Object value = rowData.getField(index);
return (value == null || "".equals(value.toString())) ? NULL_VALUE : value.toString();
return (value == null || "".equals(value.toString())) ? null : value.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
import java.util.Arrays;
import java.util.Base64;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;

/**
* @author tiezhu@dtstack.com
Expand All @@ -63,8 +65,6 @@ public class DorisStreamLoad implements Serializable {
new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
private final String authEncoding;
private final Properties streamLoadProp;
private final String fieldDelimiter;
private final String lineDelimiter;

public DorisStreamLoad(DorisConf options) {
this.authEncoding =
Expand All @@ -73,8 +73,6 @@ public DorisStreamLoad(DorisConf options) {
String.format("%s:%s", options.getUsername(), options.getPassword())
.getBytes(StandardCharsets.UTF_8));
this.streamLoadProp = options.getLoadProperties();
this.fieldDelimiter = options.getFieldDelimiter();
this.lineDelimiter = options.getLineDelimiter();
}

/**
Expand All @@ -94,23 +92,34 @@ private HttpPut generatePut(
httpPut.setHeader("Expect", "100-continue");
httpPut.setHeader("Content-Type", "text/plain; charset=UTF-8");
httpPut.setHeader("label", label);
httpPut.setHeader("columns", StringUtils.join(columnNames, ","));
httpPut.setHeader("format", "json");
// if body is list type ,strip_outer_array should be true
httpPut.setHeader("strip_outer_array", "true");
List<String> columns =
columnNames.stream()
.map(this::quoteColumn)
.collect(Collectors.toCollection(LinkedList::new));
httpPut.setHeader("columns", StringUtils.join(columns, ","));
if (StringUtils.isNotBlank(mergeConditions)) {
httpPut.setHeader("merge_type", "MERGE");
httpPut.setHeader("delete", mergeConditions);
} else {
httpPut.setHeader("merge_type", "APPEND");
}
httpPut.setHeader("column_separator", fieldDelimiter);
if (!"\n".equals(lineDelimiter)) {
httpPut.setHeader("line_delimiter", lineDelimiter);
}
// httpPut.setHeader("column_separator", fieldDelimiter);
// if (!"\n".equals(lineDelimiter)) {
// httpPut.setHeader("line_delimiter", lineDelimiter);
// }
for (Map.Entry<Object, Object> entry : streamLoadProp.entrySet()) {
httpPut.setHeader(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
}
return httpPut;
}

private String quoteColumn(String column) {
return "`" + column + "`";
}

public static class LoadResponse {
public int status;
public String respContent;
Expand Down Expand Up @@ -138,9 +147,9 @@ public String toString() {
*/
public void load(Carrier carrier, String loadUrlStr) throws IOException {
List<String> columnNames = carrier.getColumns();
String value = carrier.getInsertContent();
String json = OM.writeValueAsString(carrier.getInsertContent());
String mergeConditions = carrier.getDeleteContent();
LoadResponse loadResponse = loadBatch(columnNames, value, mergeConditions, loadUrlStr);
LoadResponse loadResponse = loadBatch(columnNames, json, mergeConditions, loadUrlStr);
LOG.debug("StreamLoad Response:{}", loadResponse);
if (loadResponse.status != 200) {
throw new ConnectException("stream load error, detail : " + loadResponse);
Expand Down Expand Up @@ -187,7 +196,7 @@ private String generateLabel() {
String formatDate = sdf.format(new Date());
label =
String.format(
"chunjun_connector_%s_%s",
"flinkx_connector_%s_%s",
formatDate, UUID.randomUUID().toString().replaceAll("-", ""));
}
return label;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_WRITE_MODE_DEFAULT;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.EXEC_MEM_LIMIT_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FE_NODES_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FIELD_DELIMITER;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FIELD_DELIMITER_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FLUSH_INTERNAL_MS_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LINE_DELIMITER;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LINE_DELIMITER_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_OPTIONS_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_PROPERTIES_KEY;
import static com.dtstack.chunjun.connector.doris.options.DorisKeys.PASSWORD_KEY;
Expand Down Expand Up @@ -136,10 +132,6 @@ public DorisSinkFactory(SyncConf syncConf) {
.setDatabase(parameter.getStringVal(DATABASE_KEY))
.setTable(parameter.getStringVal(TABLE_KEY))
.setFeNodes((List<String>) parameter.getVal(FE_NODES_KEY))
.setFieldDelimiter(
parameter.getStringVal(FIELD_DELIMITER_KEY, FIELD_DELIMITER))
.setLineDelimiter(
parameter.getStringVal(LINE_DELIMITER_KEY, LINE_DELIMITER))
.setLoadOptions(loadConf)
.setLoadProperties(
parameter.getProperties(LOAD_PROPERTIES_KEY, new Properties()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ public class DorisDynamicTableFactory implements DynamicTableSinkFactory {
DorisOptions.EXEC_MEM_LIMIT,
DorisOptions.DESERIALIZE_QUEUE_SIZE,
DorisOptions.DESERIALIZE_ARROW_ASYNC,
DorisOptions.FIELD_DELIMITER,
DorisOptions.LINE_DELIMITER,
DorisOptions.MAX_RETRIES,
DorisOptions.WRITE_MODE,
DorisOptions.BATCH_SIZE)
Expand Down Expand Up @@ -112,8 +110,6 @@ private static DorisConf getConfByOptions(ReadableConfig config) {

LoadConf loadConf = getLoadConf(config);
dorisConf.setLoadConf(loadConf);
dorisConf.setFieldDelimiter(config.get(DorisOptions.FIELD_DELIMITER));
dorisConf.setLineDelimiter(config.get(DorisOptions.LINE_DELIMITER));
dorisConf.setLoadProperties(new Properties());
dorisConf.setMaxRetries(config.get(DorisOptions.MAX_RETRIES));
dorisConf.setWriteMode(config.get(DorisOptions.WRITE_MODE));
Expand Down

0 comments on commit ec525e7

Please sign in to comment.