From ec525e702d8da16bea7078ba3c34ac03218020f9 Mon Sep 17 00:00:00 2001 From: shitou Date: Mon, 23 May 2022 11:49:17 +0800 Subject: [PATCH] [feat-855][doris] doris stream load change to json format. --- .../connector/doris/options/DorisConf.java | 20 ----------- .../doris/options/DorisConfBuilder.java | 10 ------ .../chunjun/connector/doris/rest/Carrier.java | 33 ++++++++++++------- .../connector/doris/rest/DorisLoadClient.java | 14 +++----- .../connector/doris/rest/DorisStreamLoad.java | 33 ++++++++++++------- .../doris/sink/DorisSinkFactory.java | 8 ----- .../doris/table/DorisDynamicTableFactory.java | 4 --- 7 files changed, 46 insertions(+), 76 deletions(-) diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java index 1dc3c54836..c76e753a2e 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConf.java @@ -33,10 +33,6 @@ */ public class DorisConf extends ChunJunCommonConf { - private String fieldDelimiter; - - private String lineDelimiter; - private String database; private String table; @@ -58,22 +54,6 @@ public class DorisConf extends ChunJunCommonConf { private Properties loadProperties; - public String getFieldDelimiter() { - return fieldDelimiter; - } - - public void setFieldDelimiter(String fieldDelimiter) { - this.fieldDelimiter = fieldDelimiter; - } - - public String getLineDelimiter() { - return lineDelimiter; - } - - public void setLineDelimiter(String lineDelimiter) { - this.lineDelimiter = lineDelimiter; - } - public String getDatabase() { return database; } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java index 90768bd505..b6730fcfe3 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/options/DorisConfBuilder.java @@ -48,16 +48,6 @@ public DorisConfBuilder setFeNodes(List feNodes) { return this; } - public DorisConfBuilder setFieldDelimiter(String fieldDelimiter) { - this.dorisConf.setFieldDelimiter(fieldDelimiter); - return this; - } - - public DorisConfBuilder setLineDelimiter(String lineDelimiter) { - this.dorisConf.setLineDelimiter(lineDelimiter); - return this; - } - public DorisConfBuilder setUsername(String username) { this.dorisConf.setUsername(username); return this; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/Carrier.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/Carrier.java index 52f99e5fa2..45d8f56803 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/Carrier.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/Carrier.java @@ -4,10 +4,13 @@ import java.io.Serializable; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.StringJoiner; +import java.util.stream.IntStream; /** * Company:www.dtstack.com. @@ -17,18 +20,16 @@ */ public class Carrier implements Serializable { private static final long serialVersionUID = 1L; - private final StringJoiner insertContent; + private final List> insertContent; private final StringJoiner deleteContent; - private final String fieldDelimiter; private int batch = 0; private String database; private String table; private List columns; private final Set rowDataIndexes = new HashSet<>(); - public Carrier(String fieldDelimiter, String lineDelimiter) { - this.fieldDelimiter = fieldDelimiter; - insertContent = new StringJoiner(lineDelimiter); + public Carrier() { + insertContent = new ArrayList<>(); deleteContent = new StringJoiner(" OR "); } @@ -48,8 +49,8 @@ public void setTable(String table) { this.table = table; } - public String getInsertContent() { - return insertContent.toString(); + public List> getInsertContent() { + return insertContent; } public String getDeleteContent() { @@ -78,14 +79,20 @@ public void addInsertContent(List insertV) { // It is certain that in this case, the size // of insertV is twice the size of column List forward = insertV.subList(0, columns.size()); - String forwardV = StringUtils.join(forward, fieldDelimiter); + final Map forwardV = new HashMap<>(columns.size()); + IntStream.range(0, columns.size()) + .forEach(i -> forwardV.put(columns.get(i), forward.get(i))); insertContent.add(forwardV); List behind = insertV.subList(columns.size(), insertV.size()); - String behindV = StringUtils.join(behind, fieldDelimiter); + final Map behindV = new HashMap<>(columns.size()); + IntStream.range(0, columns.size()) + .forEach(i -> behindV.put(columns.get(i), behind.get(i))); insertContent.add(behindV); } else { - String s = StringUtils.join(insertV, fieldDelimiter); - insertContent.add(s); + final Map values = new HashMap<>(columns.size()); + IntStream.range(0, columns.size()) + .forEach(i -> values.put(columns.get(i), insertV.get(i))); + insertContent.add(values); } } } @@ -110,7 +117,9 @@ private String buildMergeOnConditions(List columns, List values) List deleteOnStr = new ArrayList<>(); for (int i = 0, size = columns.size(); i < size; i++) { String s = - columns.get(i) + "`" + + columns.get(i) + + "`" + "<=>" + "'" + ((values.get(i)) == null ? "" : values.get(i)) diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java index 651292a112..a4608e8cfd 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisLoadClient.java @@ -61,7 +61,6 @@ public class DorisLoadClient implements Serializable { .collect(Collectors.toCollection(HashSet::new)); private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load?"; - private static final String NULL_VALUE = "\\N"; private static final String KEY_SCHEMA = "schema"; private static final String KEY_TABLE = "table"; private static final String KEY_POINT = "."; @@ -69,8 +68,6 @@ public class DorisLoadClient implements Serializable { public static final String KEY_AFTER = "after_"; private final DorisStreamLoad dorisStreamLoad; - private final String fieldDelimiter; - private final String lineDelimiter; private final boolean nameMapped; private String hostPort; private final DorisConf conf; @@ -80,8 +77,6 @@ public DorisLoadClient(DorisStreamLoad dorisStreamLoad, DorisConf conf, String h this.hostPort = hostPort; this.conf = conf; this.nameMapped = conf.isNameMapped(); - this.fieldDelimiter = conf.getFieldDelimiter(); - this.lineDelimiter = conf.getLineDelimiter(); } public void setHostPort(String hostPort) { @@ -271,7 +266,7 @@ private void wrapValuesFromRowData( if (column.equalsIgnoreCase(trueCol)) { insertV.add(convert(value, i)); deleteV.add(convert(value, i)); - break; + continue; } } // case 2, need to insert. @@ -279,7 +274,7 @@ private void wrapValuesFromRowData( String trueCol = headers[i].substring(6); if (column.equalsIgnoreCase(trueCol)) { insertV.add(convert(value, i)); - break; + continue; } } // case 3. column name is obvious. @@ -288,7 +283,6 @@ private void wrapValuesFromRowData( if (delete) { deleteV.add(convert(value, i)); } - break; } } } @@ -301,7 +295,7 @@ private Carrier initCarrier( List deleteV, String schema, String table) { - Carrier carrier = new Carrier(fieldDelimiter, lineDelimiter); + Carrier carrier = new Carrier(); carrier.setColumns(columns); carrier.setDatabase(schema); carrier.setTable(table); @@ -347,6 +341,6 @@ private List getColumnName(List fields) { private String convert(@Nonnull ColumnRowData rowData, int index) { Object value = rowData.getField(index); - return (value == null || "".equals(value.toString())) ? NULL_VALUE : value.toString(); + return (value == null || "".equals(value.toString())) ? null : value.toString(); } } diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java index 77b7a21d07..3da2174dfd 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/rest/DorisStreamLoad.java @@ -46,10 +46,12 @@ import java.util.Arrays; import java.util.Base64; import java.util.Date; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.UUID; +import java.util.stream.Collectors; /** * @author tiezhu@dtstack.com @@ -63,8 +65,6 @@ public class DorisStreamLoad implements Serializable { new ArrayList<>(Arrays.asList("Success", "Publish Timeout")); private final String authEncoding; private final Properties streamLoadProp; - private final String fieldDelimiter; - private final String lineDelimiter; public DorisStreamLoad(DorisConf options) { this.authEncoding = @@ -73,8 +73,6 @@ public DorisStreamLoad(DorisConf options) { String.format("%s:%s", options.getUsername(), options.getPassword()) .getBytes(StandardCharsets.UTF_8)); this.streamLoadProp = options.getLoadProperties(); - this.fieldDelimiter = options.getFieldDelimiter(); - this.lineDelimiter = options.getLineDelimiter(); } /** @@ -94,23 +92,34 @@ private HttpPut generatePut( httpPut.setHeader("Expect", "100-continue"); httpPut.setHeader("Content-Type", "text/plain; charset=UTF-8"); httpPut.setHeader("label", label); - httpPut.setHeader("columns", StringUtils.join(columnNames, ",")); + httpPut.setHeader("format", "json"); + // if body is list type ,strip_outer_array should be true + httpPut.setHeader("strip_outer_array", "true"); + List columns = + columnNames.stream() + .map(this::quoteColumn) + .collect(Collectors.toCollection(LinkedList::new)); + httpPut.setHeader("columns", StringUtils.join(columns, ",")); if (StringUtils.isNotBlank(mergeConditions)) { httpPut.setHeader("merge_type", "MERGE"); httpPut.setHeader("delete", mergeConditions); } else { httpPut.setHeader("merge_type", "APPEND"); } - httpPut.setHeader("column_separator", fieldDelimiter); - if (!"\n".equals(lineDelimiter)) { - httpPut.setHeader("line_delimiter", lineDelimiter); - } + // httpPut.setHeader("column_separator", fieldDelimiter); + // if (!"\n".equals(lineDelimiter)) { + // httpPut.setHeader("line_delimiter", lineDelimiter); + // } for (Map.Entry entry : streamLoadProp.entrySet()) { httpPut.setHeader(String.valueOf(entry.getKey()), String.valueOf(entry.getValue())); } return httpPut; } + private String quoteColumn(String column) { + return "`" + column + "`"; + } + public static class LoadResponse { public int status; public String respContent; @@ -138,9 +147,9 @@ public String toString() { */ public void load(Carrier carrier, String loadUrlStr) throws IOException { List columnNames = carrier.getColumns(); - String value = carrier.getInsertContent(); + String json = OM.writeValueAsString(carrier.getInsertContent()); String mergeConditions = carrier.getDeleteContent(); - LoadResponse loadResponse = loadBatch(columnNames, value, mergeConditions, loadUrlStr); + LoadResponse loadResponse = loadBatch(columnNames, json, mergeConditions, loadUrlStr); LOG.debug("StreamLoad Response:{}", loadResponse); if (loadResponse.status != 200) { throw new ConnectException("stream load error, detail : " + loadResponse); @@ -187,7 +196,7 @@ private String generateLabel() { String formatDate = sdf.format(new Date()); label = String.format( - "chunjun_connector_%s_%s", + "flinkx_connector_%s_%s", formatDate, UUID.randomUUID().toString().replaceAll("-", "")); } return label; diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java index 9ab3c962a5..ead4536256 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/sink/DorisSinkFactory.java @@ -49,11 +49,7 @@ import static com.dtstack.chunjun.connector.doris.options.DorisKeys.DORIS_WRITE_MODE_DEFAULT; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.EXEC_MEM_LIMIT_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FE_NODES_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FIELD_DELIMITER; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FIELD_DELIMITER_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.FLUSH_INTERNAL_MS_KEY; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LINE_DELIMITER; -import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LINE_DELIMITER_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_OPTIONS_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.LOAD_PROPERTIES_KEY; import static com.dtstack.chunjun.connector.doris.options.DorisKeys.PASSWORD_KEY; @@ -136,10 +132,6 @@ public DorisSinkFactory(SyncConf syncConf) { .setDatabase(parameter.getStringVal(DATABASE_KEY)) .setTable(parameter.getStringVal(TABLE_KEY)) .setFeNodes((List) parameter.getVal(FE_NODES_KEY)) - .setFieldDelimiter( - parameter.getStringVal(FIELD_DELIMITER_KEY, FIELD_DELIMITER)) - .setLineDelimiter( - parameter.getStringVal(LINE_DELIMITER_KEY, LINE_DELIMITER)) .setLoadOptions(loadConf) .setLoadProperties( parameter.getProperties(LOAD_PROPERTIES_KEY, new Properties())) diff --git a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java index de9720252b..90486051e9 100644 --- a/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-doris/src/main/java/com/dtstack/chunjun/connector/doris/table/DorisDynamicTableFactory.java @@ -66,8 +66,6 @@ public class DorisDynamicTableFactory implements DynamicTableSinkFactory { DorisOptions.EXEC_MEM_LIMIT, DorisOptions.DESERIALIZE_QUEUE_SIZE, DorisOptions.DESERIALIZE_ARROW_ASYNC, - DorisOptions.FIELD_DELIMITER, - DorisOptions.LINE_DELIMITER, DorisOptions.MAX_RETRIES, DorisOptions.WRITE_MODE, DorisOptions.BATCH_SIZE) @@ -112,8 +110,6 @@ private static DorisConf getConfByOptions(ReadableConfig config) { LoadConf loadConf = getLoadConf(config); dorisConf.setLoadConf(loadConf); - dorisConf.setFieldDelimiter(config.get(DorisOptions.FIELD_DELIMITER)); - dorisConf.setLineDelimiter(config.get(DorisOptions.LINE_DELIMITER)); dorisConf.setLoadProperties(new Properties()); dorisConf.setMaxRetries(config.get(DorisOptions.MAX_RETRIES)); dorisConf.setWriteMode(config.get(DorisOptions.WRITE_MODE));