diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/pom.xml b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/pom.xml
new file mode 100644
index 00000000000..8bf150aa724
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/pom.xml
@@ -0,0 +1,90 @@
+
+
+
+ 4.0.0
+
+ org.apache.inlong
+ format-rowdata
+ 1.12.0-SNAPSHOT
+
+
+ sort-format-inlongmsg-rowdata-csv
+ jar
+ Apache InLong - Sort Format-InLongMsg-RowData-CSV
+
+
+ ${project.parent.parent.parent.parent.basedir}
+
+
+
+
+ org.apache.inlong
+ sort-format-inlongmsg-rowdata-base
+ ${project.version}
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ provided
+
+
+
+
+
+ org.apache.flink
+ flink-table-common
+ ${flink.version}
+ test-jar
+ test
+
+
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+ org.apache.logging.log4j
+ log4j-slf4j-impl
+
+
+
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ test-jar
+ test
+
+
+
diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java
new file mode 100644
index 00000000000..7898367647a
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvDecodingFormat.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDecodingFormat;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import static org.apache.inlong.sort.formats.base.TableFormatOptions.ROW_FORMAT_INFO;
+import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeRowFormatInfo;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ATTRIBUTE_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CHARSET;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.DELETE_HEAD_DELIMITER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ESCAPE_CHARACTER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.FIELD_DELIMITER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_ERRORS;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.LINE_DELIMITER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.NULL_LITERAL;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.QUOTE_CHARACTER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.TIME_FIELD_NAME;
+
+/**
+ * InLongMsgCsvDecodingFormat.
+ */
+public class InLongMsgCsvDecodingFormat extends AbstractInLongMsgDecodingFormat {
+
+ private final ReadableConfig formatOptions;
+
+ public InLongMsgCsvDecodingFormat(ReadableConfig formatOptions) {
+ this.formatOptions = formatOptions;
+ }
+
+ @Override
+ public DeserializationSchema createRuntimeDecoder(
+ DynamicTableSource.Context context, DataType dataType) {
+ InLongMsgCsvRowDataDeserializationSchema.Builder builder =
+ new InLongMsgCsvRowDataDeserializationSchema.Builder(
+ deserializeRowFormatInfo(formatOptions.get(ROW_FORMAT_INFO)));
+ configureDeserializationSchema(formatOptions, builder);
+ return builder.build();
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.insertOnly();
+ }
+
+ private void configureDeserializationSchema(
+ ReadableConfig formatOptions,
+ InLongMsgCsvRowDataDeserializationSchema.Builder schemaBuilder) {
+ schemaBuilder.setCharset(formatOptions.getOptional(CHARSET).orElse(CHARSET.defaultValue()))
+ .setMetadataKeys(metadataKeys);
+
+ formatOptions
+ .getOptional(TIME_FIELD_NAME)
+ .ifPresent(schemaBuilder::setTimeFieldName);
+
+ formatOptions
+ .getOptional(ATTRIBUTE_FIELD_NAME)
+ .ifPresent(schemaBuilder::setAttributesFieldName);
+
+ formatOptions
+ .getOptional(CHARSET)
+ .ifPresent(schemaBuilder::setCharset);
+
+ formatOptions
+ .getOptional(LINE_DELIMITER)
+ .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(schemaBuilder::setLineDelimiter);
+
+ formatOptions
+ .getOptional(FIELD_DELIMITER)
+ .map(delimiter -> StringEscapeUtils.unescapeJava(delimiter).charAt(0))
+ .ifPresent(schemaBuilder::setFieldDelimiter);
+
+ formatOptions
+ .getOptional(QUOTE_CHARACTER)
+ .map(quote -> quote.charAt(0))
+ .ifPresent(schemaBuilder::setQuoteCharacter);
+
+ formatOptions
+ .getOptional(ESCAPE_CHARACTER)
+ .map(escape -> escape.charAt(0))
+ .ifPresent(schemaBuilder::setEscapeCharacter);
+
+ formatOptions
+ .getOptional(NULL_LITERAL)
+ .ifPresent(schemaBuilder::setNullLiteral);
+
+ formatOptions
+ .getOptional(IGNORE_ERRORS)
+ .ifPresent(schemaBuilder::setIgnoreErrors);
+
+ formatOptions
+ .getOptional(DELETE_HEAD_DELIMITER)
+ .ifPresent(schemaBuilder::setDeleteHeadDelimiter);
+ }
+}
diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
new file mode 100644
index 00000000000..60417f9ce6c
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.TableFormatUtils;
+import org.apache.inlong.sort.formats.base.TextFormatBuilder;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_RETAIN_PREDEFINED_FIELD;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER;
+
+/**
+ * The deserializer for the records in InLongMsgCsv format.
+ */
+public final class InLongMsgCsvFormatDeserializer extends AbstractInLongMsgFormatDeserializer {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Format information describing the result type.
+ */
+ @Nonnull
+ private final RowFormatInfo rowFormatInfo;
+
+ /**
+ * The name of the time field.
+ */
+ @Nullable
+ private final String timeFieldName;
+
+ /**
+ * The name of the attributes field.
+ */
+ @Nullable
+ private final String attributesFieldName;
+
+ /**
+ * The charset of the text.
+ */
+ @Nonnull
+ private final String charset;
+
+ /**
+ * The delimiter between fields.
+ */
+ @Nonnull
+ private final Character delimiter;
+
+ /**
+ * The delimiter between lines.
+ */
+ @Nullable
+ private final Character lineDelimiter;
+
+ /**
+ * Escape character. Null if escaping is disabled.
+ */
+ @Nullable
+ private final Character escapeChar;
+
+ /**
+ * Quote character. Null if quoting is disabled.
+ */
+ @Nullable
+ private final Character quoteChar;
+
+ /**
+ * The literal represented null values, default "".
+ */
+ @Nullable
+ private final String nullLiteral;
+
+ /**
+ * True if the head delimiter should be removed.
+ */
+ private final boolean deleteHeadDelimiter;
+
+ private final List metadataKeys;
+
+ private final FieldToRowDataConverter[] converters;
+
+ /**
+ * True if the predefinedField existed, default true.
+ */
+ private boolean retainPredefinedField = true;
+
+ public InLongMsgCsvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character delimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ boolean deleteHeadDelimiter,
+ List metadataKeys,
+ boolean ignoreErrors,
+ boolean retainPredefinedField) {
+ this(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ delimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ deleteHeadDelimiter,
+ metadataKeys,
+ InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+ this.retainPredefinedField = retainPredefinedField;
+ }
+
+ public InLongMsgCsvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character delimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ boolean deleteHeadDelimiter,
+ List metadataKeys,
+ boolean ignoreErrors) {
+ this(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ delimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ deleteHeadDelimiter,
+ metadataKeys,
+ InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
+ }
+
+ public InLongMsgCsvFormatDeserializer(
+ @Nonnull RowFormatInfo rowFormatInfo,
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ @Nonnull String charset,
+ @Nonnull Character delimiter,
+ @Nullable Character lineDelimiter,
+ @Nullable Character escapeChar,
+ @Nullable Character quoteChar,
+ @Nullable String nullLiteral,
+ boolean deleteHeadDelimiter,
+ List metadataKeys,
+ @Nonnull FailureHandler failureHandler) {
+ super(failureHandler);
+
+ this.rowFormatInfo = rowFormatInfo;
+ this.timeFieldName = timeFieldName;
+ this.attributesFieldName = attributesFieldName;
+ this.delimiter = delimiter;
+ this.lineDelimiter = lineDelimiter;
+ this.charset = charset;
+ this.escapeChar = escapeChar;
+ this.quoteChar = quoteChar;
+ this.nullLiteral = nullLiteral;
+ this.deleteHeadDelimiter = deleteHeadDelimiter;
+ this.metadataKeys = metadataKeys;
+
+ converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
+ .map(formatInfo -> FieldToRowDataConverters.createConverter(
+ TableFormatUtils.deriveLogicalType(formatInfo)))
+ .toArray(FieldToRowDataConverter[]::new);
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return InLongMsgUtils.decorateRowTypeWithNeededHeadFieldsAndMetadata(
+ timeFieldName,
+ attributesFieldName,
+ rowFormatInfo,
+ metadataKeys);
+ }
+
+ @Override
+ protected InLongMsgHead parseHead(String attr) {
+ return InLongMsgCsvUtils.parseHead(attr);
+ }
+
+ @Override
+ protected List parseBodyList(byte[] bytes) {
+ return InLongMsgCsvUtils.parseBodyList(
+ bytes,
+ charset,
+ delimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ deleteHeadDelimiter);
+ }
+
+ @Override
+ protected List convertRowDataList(InLongMsgHead head, InLongMsgBody body) {
+ GenericRowData genericRowData = InLongMsgCsvUtils.deserializeRowData(
+ rowFormatInfo,
+ nullLiteral,
+ retainPredefinedField ? head.getPredefinedFields() : Collections.emptyList(),
+ body.getFields(),
+ converters);
+
+ // Decorate result with time and attributes fields if needed
+ genericRowData = InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+ timeFieldName,
+ attributesFieldName,
+ head.getTime(),
+ head.getAttributes(),
+ genericRowData);
+
+ // Decorate result with metadata if needed
+ return Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData, head, metadataKeys));
+ }
+
+ /**
+ * The builder for {@link InLongMsgCsvFormatDeserializer}.
+ */
+ public static class Builder extends TextFormatBuilder {
+
+ private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
+ private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
+ private Character delimiter = DEFAULT_DELIMITER;
+ private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private Boolean deleteHeadDelimiter = DEFAULT_DELETE_HEAD_DELIMITER;
+ private List metadataKeys = Collections.emptyList();
+ private Boolean retainPredefinedField = DEFAULT_RETAIN_PREDEFINED_FIELD;
+
+ public Builder(RowFormatInfo rowFormatInfo) {
+ super(rowFormatInfo);
+ }
+
+ public Builder setTimeFieldName(String timeFieldName) {
+ this.timeFieldName = timeFieldName;
+ return this;
+ }
+
+ public Builder setAttributesFieldName(String attributesFieldName) {
+ this.attributesFieldName = attributesFieldName;
+ return this;
+ }
+
+ public Builder setDelimiter(Character delimiter) {
+ this.delimiter = delimiter;
+ return this;
+ }
+
+ public Builder setLineDelimiter(Character lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ public Builder setDeleteHeadDelimiter(Boolean deleteHeadDelimiter) {
+ this.deleteHeadDelimiter = deleteHeadDelimiter;
+ return this;
+ }
+
+ public Builder setMetadataKeys(List metadataKeys) {
+ this.metadataKeys = metadataKeys;
+ return this;
+ }
+
+ public Builder setRetainPredefinedField(Boolean retainPredefinedField) {
+ this.retainPredefinedField = retainPredefinedField;
+ return this;
+ }
+
+ public InLongMsgCsvFormatDeserializer build() {
+ return new InLongMsgCsvFormatDeserializer(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ delimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ deleteHeadDelimiter,
+ metadataKeys,
+ ignoreErrors,
+ retainPredefinedField);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ InLongMsgCsvFormatDeserializer that = (InLongMsgCsvFormatDeserializer) o;
+ return deleteHeadDelimiter == that.deleteHeadDelimiter &&
+ rowFormatInfo.equals(that.rowFormatInfo) &&
+ Objects.equals(timeFieldName, that.timeFieldName) &&
+ Objects.equals(attributesFieldName, that.attributesFieldName) &&
+ charset.equals(that.charset) &&
+ delimiter.equals(that.delimiter) &&
+ Objects.equals(lineDelimiter, that.lineDelimiter) &&
+ Objects.equals(escapeChar, that.escapeChar) &&
+ Objects.equals(quoteChar, that.quoteChar) &&
+ Objects.equals(nullLiteral, that.nullLiteral) &&
+ Objects.equals(metadataKeys, that.metadataKeys);
+
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
+ attributesFieldName, charset, delimiter, lineDelimiter, escapeChar, quoteChar,
+ nullLiteral, deleteHeadDelimiter, metadataKeys);
+ }
+
+ @Override
+ public String toString() {
+ return "InLongMsgCsvFormatDeserializer{" +
+ "rowFormatInfo=" + rowFormatInfo +
+ ", timeFieldName='" + timeFieldName + '\'' +
+ ", attributesFieldName='" + attributesFieldName + '\'' +
+ ", charset='" + charset + '\'' +
+ ", delimiter=" + delimiter +
+ ", lineDelimiter=" + lineDelimiter +
+ ", escapeChar=" + escapeChar +
+ ", quoteChar=" + quoteChar +
+ ", nullLiteral='" + nullLiteral + '\'' +
+ ", deleteHeadDelimiter=" + deleteHeadDelimiter +
+ ", metadataKeys=" + metadataKeys +
+ '}';
+ }
+}
diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
new file mode 100644
index 00000000000..3800f5440f5
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactory.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.inlong.sort.formats.base.TableFormatOptions.ROW_FORMAT_INFO;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ATTRIBUTE_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.CHARSET;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.DELETE_HEAD_DELIMITER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.ESCAPE_CHARACTER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.FIELD_DELIMITER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.IGNORE_ERRORS;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.LINE_DELIMITER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.NULL_LITERAL;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.QUOTE_CHARACTER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.RETAIN_PREDEFINED_FIELD;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.TIME_FIELD_NAME;
+
+/**
+ * Table format factory for providing configured instances of InLongMsgCsv-to-rowdata
+ * serializer and deserializer.
+ */
+public final class InLongMsgCsvFormatFactory implements DeserializationFormatFactory {
+
+ public static final String IDENTIFIER = "inlong-msg-csv";
+
+ @Override
+ public DecodingFormat> createDecodingFormat(
+ DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ validateFormatOptions(formatOptions);
+
+ return new InLongMsgCsvDecodingFormat(formatOptions);
+ }
+
+ // ------------------------------------------------------------------------
+ // Validation
+ // ------------------------------------------------------------------------
+
+ static void validateFormatOptions(ReadableConfig tableOptions) {
+ // Validate the option value must be a single char.
+ validateCharacterVal(tableOptions, FIELD_DELIMITER, true);
+ validateCharacterVal(tableOptions, LINE_DELIMITER, true);
+ validateCharacterVal(tableOptions, QUOTE_CHARACTER);
+ validateCharacterVal(tableOptions, ESCAPE_CHARACTER);
+ }
+
+ /**
+ * Validates the option {@code option} value must be a Character.
+ */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption option) {
+ validateCharacterVal(tableOptions, option, false);
+ }
+
+ /**
+ * Validates the option {@code option} value must be a Character.
+ *
+ * @param tableOptions the table options
+ * @param option the config option
+ * @param unescape whether to unescape the option value
+ */
+ private static void validateCharacterVal(
+ ReadableConfig tableOptions, ConfigOption option, boolean unescape) {
+ if (tableOptions.getOptional(option).isPresent()) {
+ final String value =
+ unescape
+ ? StringEscapeUtils.unescapeJava(tableOptions.get(option))
+ : tableOptions.get(option);
+ if (value.length() != 1) {
+ throw new ValidationException(
+ String.format(
+ "Option '%s.%s' must be a string with single character, but was: %s",
+ IDENTIFIER, option.key(), tableOptions.get(option)));
+ }
+ }
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ return Stream.of(ROW_FORMAT_INFO).collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ Set> options = new HashSet<>();
+ options.add(TIME_FIELD_NAME);
+ options.add(ATTRIBUTE_FIELD_NAME);
+ options.add(CHARSET);
+ options.add(FIELD_DELIMITER);
+ options.add(LINE_DELIMITER);
+ options.add(QUOTE_CHARACTER);
+ options.add(ESCAPE_CHARACTER);
+ options.add(NULL_LITERAL);
+ options.add(IGNORE_ERRORS);
+ options.add(DELETE_HEAD_DELIMITER);
+ options.add(RETAIN_PREDEFINED_FIELD);
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java
new file mode 100644
index 00000000000..5c021e8be93
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvRowDataDeserializationSchema.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgDeserializationSchema;
+import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ESCAPE_CHARACTER;
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER;
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+
+/**
+ * Deserialization schema from InLongMsg-CSV to Flink Table & SQL internal data structures.
+ */
+public class InLongMsgCsvRowDataDeserializationSchema extends AbstractInLongMsgDeserializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ public InLongMsgCsvRowDataDeserializationSchema(AbstractInLongMsgFormatDeserializer formatDeserializer) {
+ super(formatDeserializer);
+ }
+
+ /**
+ * A builder for creating a {@link InLongMsgCsvRowDataDeserializationSchema}.
+ */
+ @PublicEvolving
+ public static class Builder {
+
+ private final RowFormatInfo rowFormatInfo;
+
+ private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
+ private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
+ private String charset = DEFAULT_CHARSET;
+ private Character fieldDelimiter = DEFAULT_DELIMITER;
+ private Character escapeChar = DEFAULT_ESCAPE_CHARACTER;
+ private Character quoteChar = DEFAULT_QUOTE_CHARACTER;
+ private String nullLiteral = DEFAULT_NULL_LITERAL;
+ private Character lineDelimiter = DEFAULT_LINE_DELIMITER;
+ private boolean deleteHeadDelimiter;
+ private boolean retainPredefinedField;
+ private boolean ignoreErrors = false;
+ private List metadataKeys = Collections.emptyList();
+
+ protected Builder(RowFormatInfo rowFormatInfo) {
+ this.rowFormatInfo = rowFormatInfo;
+ }
+
+ public Builder setTimeFieldName(String timeFieldName) {
+ this.timeFieldName = timeFieldName;
+ return this;
+ }
+
+ public Builder setAttributesFieldName(String attributesFieldName) {
+ this.attributesFieldName = attributesFieldName;
+ return this;
+ }
+
+ public Builder setFieldDelimiter(char fieldDelimiter) {
+ this.fieldDelimiter = fieldDelimiter;
+ return this;
+ }
+
+ public Builder setCharset(String charset) {
+ this.charset = charset;
+ return this;
+ }
+
+ public Builder setEscapeCharacter(char escapeChar) {
+ this.escapeChar = escapeChar;
+ return this;
+ }
+
+ public Builder setQuoteCharacter(char quoteChar) {
+ this.quoteChar = quoteChar;
+ return this;
+ }
+
+ public Builder setNullLiteral(String nullLiteral) {
+ this.nullLiteral = nullLiteral;
+ return this;
+ }
+
+ public Builder setDeleteHeadDelimiter(boolean deleteHeadDelimiter) {
+ this.deleteHeadDelimiter = deleteHeadDelimiter;
+ return this;
+ }
+
+ public Builder setRetainPredefinedField(boolean retainPredefinedField) {
+ this.retainPredefinedField = retainPredefinedField;
+ return this;
+ }
+
+ public Builder setLineDelimiter(char lineDelimiter) {
+ this.lineDelimiter = lineDelimiter;
+ return this;
+ }
+
+ public Builder setIgnoreErrors(boolean ignoreErrors) {
+ this.ignoreErrors = ignoreErrors;
+ return this;
+ }
+
+ public Builder setMetadataKeys(List metadataKeys) {
+ this.metadataKeys = metadataKeys;
+ return this;
+ }
+
+ public InLongMsgCsvRowDataDeserializationSchema build() {
+ AbstractInLongMsgFormatDeserializer formatDeserializer = new InLongMsgCsvFormatDeserializer(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ charset,
+ fieldDelimiter,
+ lineDelimiter,
+ escapeChar,
+ quoteChar,
+ nullLiteral,
+ deleteHeadDelimiter,
+ metadataKeys,
+ ignoreErrors,
+ retainPredefinedField);
+
+ return new InLongMsgCsvRowDataDeserializationSchema(formatDeserializer);
+ }
+ }
+}
diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
new file mode 100644
index 00000000000..24883a65bd0
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAMID;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime;
+import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
+
+/**
+ * Utilities for InLongMsgCSV.
+ */
+public class InLongMsgCsvUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InLongMsgCsvUtils.class);
+
+ public static final String FORMAT_DELETE_HEAD_DELIMITER = "format.delete-head-delimiter";
+ public static final boolean DEFAULT_DELETE_HEAD_DELIMITER = true;
+
+ public static InLongMsgHead parseHead(String attr) {
+ Map attributes = parseAttr(attr);
+
+ // Extracts interface from the attributes.
+ String streamId;
+
+ if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_NAME)) {
+ streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_NAME);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_ID)) {
+ streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_ID);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_TID)) {
+ streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_TID);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_STREAMID)) {
+ streamId = attributes.get(INLONGMSG_ATTR_STREAMID);
+ } else {
+ throw new IllegalArgumentException(
+ "Could not find " + INLONGMSG_ATTR_INTERFACE_NAME +
+ " or " + INLONGMSG_ATTR_INTERFACE_ID +
+ " or " + INLONGMSG_ATTR_INTERFACE_TID +
+ " or " + INLONGMSG_ATTR_STREAMID + " in attributes!");
+ }
+
+ // Extracts time from the attributes
+ Timestamp time;
+
+ if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) {
+ String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim();
+ time = parseDateTime(date);
+ } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) {
+ String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim();
+ time = parseEpochTime(epoch);
+ } else {
+ throw new IllegalArgumentException(
+ "Could not find " + INLONGMSG_ATTR_TIME_T +
+ " or " + INLONGMSG_ATTR_TIME_DT + " in attributes!");
+ }
+
+ // Extracts predefined fields from the attributes
+ List predefinedFields = getPredefinedFields(attributes);
+
+ return new InLongMsgHead(attributes, streamId, time, predefinedFields);
+ }
+
+ public static List parseBodyList(
+ byte[] bytes,
+ String charset,
+ char delimiter,
+ Character lineDelimiter,
+ Character escapeChar,
+ Character quoteChar,
+ boolean deleteHeadDelimiter) {
+ String bodyStr = new String(bytes, Charset.forName(charset));
+
+ String[][] split =
+ splitCsv(bodyStr, delimiter, escapeChar, quoteChar, lineDelimiter, deleteHeadDelimiter);
+
+ return Arrays.stream(split)
+ .map((line) -> {
+ // Only parsed fields will be used by downstream, so it's safe to leave
+ // the other parameters empty.
+ return new InLongMsgBody(
+ null,
+ null,
+ Arrays.asList(line),
+ Collections.emptyMap());
+ }).collect(Collectors.toList());
+ }
+
+ public static GenericRowData deserializeRowData(
+ RowFormatInfo rowFormatInfo,
+ String nullLiteral,
+ List predefinedFields,
+ List fields,
+ FieldToRowDataConverters.FieldToRowDataConverter[] converters) {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+ int actualNumFields = predefinedFields.size() + fields.size();
+ if (actualNumFields != fieldNames.length) {
+ LOG.warn("The number of fields mismatches: expected={}, actual={}. " +
+ "PredefinedFields=[{}], Fields=[{}]", fieldNames.length, actualNumFields,
+ predefinedFields, fields);
+ }
+
+ GenericRowData rowData = new GenericRowData(fieldNames.length);
+
+ // Deserialize pre-defined fields
+ for (int i = 0; i < predefinedFields.size(); ++i) {
+ if (i >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ FieldToRowDataConverter converter = converters[i];
+ String fieldText = predefinedFields.get(i);
+
+ Object field = converter.convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral));
+ rowData.setField(i, field);
+ }
+
+ // Deserialize fields
+ for (int i = 0; i < fields.size(); ++i) {
+
+ if (i + predefinedFields.size() >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i + predefinedFields.size()];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i + predefinedFields.size()];
+ FieldToRowDataConverter converter = converters[i + predefinedFields.size()];
+ String fieldText = fields.get(i);
+
+ Object field = converter.convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral));
+ rowData.setField(i + predefinedFields.size(), field);
+ }
+
+ // If schema length is larger than fields' length, use `null` to fill in the blanks
+ for (int i = predefinedFields.size() + fields.size(); i < fieldNames.length; ++i) {
+ rowData.setField(i, null);
+ }
+
+ return rowData;
+ }
+}
diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000000..52a23d9ac87
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvFormatFactory
diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
new file mode 100644
index 00000000000..47e3fe0595d
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializerTest.java
@@ -0,0 +1,861 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
+import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
+
+import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.api.DataTypes.MAP;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_CHARSET;
+import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_DELIMITER;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata.ReadableMetadata.STREAMID;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME;
+import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit tests for {@link InLongMsgCsvFormatDeserializer}.
+ */
+public class InLongMsgCsvFormatDeserializerTest {
+
+ private final FieldToRowDataConverter mapConvert =
+ FieldToRowDataConverters.createConverter(MAP(STRING(), STRING()).getLogicalType());
+
+ private static final RowFormatInfo TEST_ROW_INFO =
+ new RowFormatInfo(
+ new String[]{"f1", "f2", "f3", "f4", "f5", "f6"},
+ new FormatInfo[]{
+ IntFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE
+ });
+
+ private static final RowFormatInfo TEST_NO_PREDEFINE_ROW_INFO =
+ new RowFormatInfo(
+ new String[]{"inlongmsg_time", "inlongmsg_attributes", "f1", "f2", "f3", "f4", "f5", "f6"},
+ new FormatInfo[]{
+ new TimestampFormatInfo(),
+ new MapFormatInfo(StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE),
+ IntFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE,
+ StringFormatInfo.INSTANCE
+ });
+
+ @Test
+ public void testRowType() {
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setMetadataKeys(Collections.singletonList(STREAMID.getKey()))
+ .build();
+
+ String[] expectedFieldNames = new String[]{
+ "inlongmsg_time",
+ "inlongmsg_attributes",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "metadata-streamId"
+ };
+
+ LogicalType[] expectedFieldTypes = new LogicalType[]{
+ new TimestampType(),
+ new MapType(new VarCharType(), new VarCharType()),
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new VarCharType(),
+ new VarCharType(),
+ new VarCharType(),
+ new VarCharType()
+ };
+ RowType expectedRowType = RowType.of(expectedFieldTypes, expectedFieldNames);
+ assertEquals(InternalTypeInfo.of(expectedRowType), deserializer.getProducedType());
+ }
+
+ @Test
+ public void testRowTypeWithoutHeadFields() {
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .build();
+
+ String[] fieldNames = new String[]{
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6"
+ };
+
+ LogicalType[] fieldTypes = new LogicalType[]{
+ new IntType(),
+ new IntType(),
+ new IntType(),
+ new VarCharType(),
+ new VarCharType(),
+ new VarCharType()
+ };
+ RowType rowType = RowType.of(fieldTypes, fieldNames);
+ assertEquals(InternalTypeInfo.of(rowType), deserializer.getProducedType());
+ }
+
+ @Test
+ public void testNormal() throws Exception {
+
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "123,field11,field12,field13";
+ String body2 = "123,field21,field22,field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, StringData.fromString("field13"));
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, StringData.fromString("field22"));
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testInlongMsg() throws Exception {
+
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testStreamId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "123,field11,field12,field13";
+ String body2 = "123,field21,field22,field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testStreamId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectedRow1 = new GenericRowData(8);
+ expectedRow1.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectedRow1.setField(1, mapConvert.convert(expectedAttributes));
+ expectedRow1.setField(2, 1);
+ expectedRow1.setField(3, 2);
+ expectedRow1.setField(4, 123);
+ expectedRow1.setField(5, StringData.fromString("field11"));
+ expectedRow1.setField(6, StringData.fromString("field12"));
+ expectedRow1.setField(7, StringData.fromString("field13"));
+
+ GenericRowData expectedRow2 = new GenericRowData(8);
+ expectedRow2.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectedRow2.setField(1, mapConvert.convert(expectedAttributes));
+ expectedRow2.setField(2, 1);
+ expectedRow2.setField(3, 2);
+ expectedRow2.setField(4, 123);
+ expectedRow2.setField(5, StringData.fromString("field21"));
+ expectedRow2.setField(6, StringData.fromString("field22"));
+ expectedRow2.setField(7, StringData.fromString("field23"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectedRow1, expectedRow2));
+ }
+
+ @Test
+ public void testExceptionHandler() throws Exception {
+ TestFailureHandler errorHandler = new TestFailureHandler();
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(
+ TEST_ROW_INFO,
+ "inlongmsg_time",
+ "inlongmsg_attributes",
+ DEFAULT_CHARSET,
+ DEFAULT_DELIMITER,
+ null,
+ null,
+ null,
+ null,
+ InLongMsgCsvUtils.DEFAULT_DELETE_HEAD_DELIMITER,
+ Collections.emptyList(),
+ errorHandler);
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "test,field11,field12,field13";
+ String body2 = "123,field21,field22,field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ List actualRows = new ArrayList<>();
+ Collector collector = new ListCollector<>(actualRows);
+ deserializer.flatMap(inLongMsg.buildArray(), collector);
+ assertEquals(1, errorHandler.getRowCount());
+
+ InLongMsg inLongMsg1Head = InLongMsg.newInLongMsg();
+ String abNormalAttrs = "m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2";
+ inLongMsg1Head.addMsg(abNormalAttrs, body1.getBytes());
+ inLongMsg1Head.addMsg(abNormalAttrs, body2.getBytes());
+ deserializer.flatMap(inLongMsg1Head.buildArray(), collector);
+ assertEquals(1, errorHandler.getHeadCount());
+ }
+
+ @Test
+ public void testEmptyField() throws Exception {
+
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "123,field11,field12,";
+ String body2 = "123,field21,,field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, StringData.fromString(""));
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, StringData.fromString(""));
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testNoPredefinedFields() throws Exception {
+
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322";
+ String body1 = "1,2,123,field11,field12,";
+ String body2 = "1,2,123,field21,,field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, StringData.fromString(""));
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, StringData.fromString(""));
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testPredefinedFieldWithFlagOn() throws Exception {
+
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setRetainPredefinedField(true)
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322" +
+ "&__addcol1__repdate=20220224&__addcol2__hour=1517";
+ String body1 = "123,field11,field12,";
+ String body2 = "123,field21,,field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__repdate", "20220224");
+ expectedAttributes.put("__addcol2__hour", "1517");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 20220224);
+ expectRowData1.setField(3, 1517);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, StringData.fromString(""));
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 20220224);
+ expectRowData2.setField(3, 1517);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, StringData.fromString(""));
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testPredefinedFieldWithFlagOff() throws Exception {
+
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setRetainPredefinedField(false)
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322" +
+ "&__addcol1__repdate=20220224&__addcol2__hour=1517";
+ String body1 = "1,2,123,field11,field12,";
+ String body2 = "1,2,123,field21,,field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__repdate", "20220224");
+ expectedAttributes.put("__addcol2__hour", "1517");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, StringData.fromString(""));
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, StringData.fromString(""));
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testIgnoreAttributeErrors() throws Exception {
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer(
+ TEST_ROW_INFO,
+ DEFAULT_TIME_FIELD_NAME,
+ DEFAULT_ATTRIBUTES_FIELD_NAME,
+ Charset.defaultCharset().name(),
+ DEFAULT_DELIMITER,
+ null,
+ null,
+ null,
+ null,
+ true,
+ Collections.emptyList(),
+ true);
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&&&&";
+ String body1 = "123,field11,field12,field13";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Collections.emptyList());
+ }
+
+ @Test
+ public void testIgnoreBodyErrors() throws Exception {
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setIgnoreErrors(true)
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "aaa,field11,field12,field13";
+ String body2 = "123,field21,field22,field23";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData = new GenericRowData(8);
+ expectRowData.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData.setField(2, 1);
+ expectRowData.setField(3, 2);
+ expectRowData.setField(4, 123);
+ expectRowData.setField(5, StringData.fromString("field21"));
+ expectRowData.setField(6, StringData.fromString("field22"));
+ expectRowData.setField(7, StringData.fromString("field23"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Collections.singletonList(expectRowData));
+ }
+
+ @Test
+ public void testDeleteHeadDelimiter() throws Exception {
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setDeleteHeadDelimiter(true)
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322";
+ String body = ",1,2,3,field1,field2,field3";
+
+ inLongMsg.addMsg(attrs, body.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+
+ GenericRowData expectRowData = new GenericRowData(8);
+ expectRowData.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData.setField(2, 1);
+ expectRowData.setField(3, 2);
+ expectRowData.setField(4, 3);
+ expectRowData.setField(5, StringData.fromString("field1"));
+ expectRowData.setField(6, StringData.fromString("field2"));
+ expectRowData.setField(7, StringData.fromString("field3"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Collections.singletonList(expectRowData));
+ }
+
+ @Test
+ public void testRetainHeadDelimiter() throws Exception {
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setDeleteHeadDelimiter(false)
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322";
+ String body = ",1,2,field1,field2,field3";
+
+ inLongMsg.addMsg(attrs, body.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+
+ GenericRowData expectRowData = new GenericRowData(8);
+ expectRowData.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData.setField(2, null);
+ expectRowData.setField(3, 1);
+ expectRowData.setField(4, 2);
+ expectRowData.setField(5, StringData.fromString("field1"));
+ expectRowData.setField(6, StringData.fromString("field2"));
+ expectRowData.setField(7, StringData.fromString("field3"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Collections.singletonList(expectRowData));
+ }
+
+ @Test
+ public void testUnmatchedFields1() throws Exception {
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body1 = "123,field11,field12";
+ String body2 = "123,field21,field22,field23,field24";
+ inLongMsg.addMsg(attrs, body1.getBytes());
+ inLongMsg.addMsg(attrs, body2.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, null);
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, StringData.fromString("field22"));
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testUnmatchedFields2() throws Exception {
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&" +
+ "__addcol2__=2&__addcol3__=3&__addcol4__=4&__addcol5__=5&" +
+ "__addcol6__=6&__addcol7__=7";
+ String body = "field11,field12";
+ inLongMsg.addMsg(attrs, body.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+ expectedAttributes.put("__addcol3__", "3");
+ expectedAttributes.put("__addcol4__", "4");
+ expectedAttributes.put("__addcol5__", "5");
+ expectedAttributes.put("__addcol6__", "6");
+ expectedAttributes.put("__addcol7__", "7");
+
+ GenericRowData expectRowData = new GenericRowData(8);
+ expectRowData.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData.setField(2, 1);
+ expectRowData.setField(3, 2);
+ expectRowData.setField(4, 3);
+ expectRowData.setField(5, StringData.fromString("4"));
+ expectRowData.setField(6, StringData.fromString("5"));
+ expectRowData.setField(7, StringData.fromString("6"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Collections.singletonList(expectRowData));
+ }
+
+ @Test
+ public void testLineDelimiter() throws Exception {
+
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setLineDelimiter('\n')
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body = "123,field11,field12,field13\n123,field21,field22,field23";
+ inLongMsg.addMsg(attrs, body.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData1 = new GenericRowData(8);
+ expectRowData1.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData1.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData1.setField(2, 1);
+ expectRowData1.setField(3, 2);
+ expectRowData1.setField(4, 123);
+ expectRowData1.setField(5, StringData.fromString("field11"));
+ expectRowData1.setField(6, StringData.fromString("field12"));
+ expectRowData1.setField(7, StringData.fromString("field13"));
+
+ GenericRowData expectRowData2 = new GenericRowData(8);
+ expectRowData2.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData2.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData2.setField(2, 1);
+ expectRowData2.setField(3, 2);
+ expectRowData2.setField(4, 123);
+ expectRowData2.setField(5, StringData.fromString("field21"));
+ expectRowData2.setField(6, StringData.fromString("field22"));
+ expectRowData2.setField(7, StringData.fromString("field23"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Arrays.asList(expectRowData1, expectRowData2));
+ }
+
+ @Test
+ public void testMetadata() throws Exception {
+
+ InLongMsgCsvFormatDeserializer deserializer =
+ new InLongMsgCsvFormatDeserializer.Builder(TEST_ROW_INFO)
+ .setTimeFieldName("inlongmsg_time")
+ .setAttributesFieldName("inlongmsg_attributes")
+ .setMetadataKeys(Collections.singletonList(STREAMID.getKey()))
+ .build();
+
+ InLongMsg inLongMsg = InLongMsg.newInLongMsg();
+ String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2";
+ String body = "123,field11,field12,field13";
+ inLongMsg.addMsg(attrs, body.getBytes());
+
+ Map expectedAttributes = new HashMap<>();
+ expectedAttributes.put("m", "0");
+ expectedAttributes.put("streamId", "testInterfaceId");
+ expectedAttributes.put("t", "20200322");
+ expectedAttributes.put("__addcol1__", "1");
+ expectedAttributes.put("__addcol2__", "2");
+
+ GenericRowData expectRowData = new GenericRowData(9);
+ expectRowData.setField(0, TimestampData.fromTimestamp(Timestamp.valueOf("2020-03-22 00:00:00")));
+ expectRowData.setField(1, mapConvert.convert(expectedAttributes));
+ expectRowData.setField(2, 1);
+ expectRowData.setField(3, 2);
+ expectRowData.setField(4, 123);
+ expectRowData.setField(5, StringData.fromString("field11"));
+ expectRowData.setField(6, StringData.fromString("field12"));
+ expectRowData.setField(7, StringData.fromString("field13"));
+ expectRowData.setField(8, StringData.fromString("testInterfaceId"));
+
+ testRowDeserialization(
+ deserializer,
+ inLongMsg.buildArray(),
+ Collections.singletonList(expectRowData));
+ }
+
+ private void testRowDeserialization(
+ InLongMsgCsvFormatDeserializer deserializer,
+ byte[] bytes,
+ List expectedRows) throws Exception {
+
+ List actualRows = new ArrayList<>();
+ Collector collector = new ListCollector<>(actualRows);
+
+ deserializer.flatMap(bytes, collector);
+
+ assertEquals(expectedRows, actualRows);
+ }
+
+ private static class TestFailureHandler implements FailureHandler {
+
+ private int headCount = 0;
+ private int bodyCount = 0;
+ private int rowCount = 0;
+
+ public int getHeadCount() {
+ return headCount;
+ }
+
+ public int getBodyCount() {
+ return bodyCount;
+ }
+
+ public int getRowCount() {
+ return rowCount;
+ }
+
+ @Override
+ public void onParsingHeadFailure(String attribute, Exception exception) throws Exception {
+ headCount++;
+ }
+
+ @Override
+ public void onParsingBodyFailure(byte[] body, Exception exception) throws Exception {
+ bodyCount++;
+ }
+
+ @Override
+ public void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body,
+ Exception exception) throws Exception {
+ rowCount++;
+ }
+ }
+}
diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
new file mode 100644
index 00000000000..bc524678a3c
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatFactoryTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.inlongmsgcsv;
+
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
+import org.apache.inlong.sort.formats.common.FormatInfo;
+import org.apache.inlong.sort.formats.common.FormatUtils;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.RowFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link InLongMsgCsvFormatFactory}.
+ */
+public class InLongMsgCsvFormatFactoryTest {
+
+ public RowFormatInfo testFormatInfo;
+
+ public TypeInformation testTypeInformation;
+
+ public ResolvedSchema resolvedSchema;
+
+ public DataType dataType;
+
+ @Before
+ public void setup() {
+ resolvedSchema =
+ ResolvedSchema.of(
+ Column.physical("time", DataTypes.TIMESTAMP()),
+ Column.physical("attributes", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
+ Column.physical("student_name", DataTypes.STRING()),
+ Column.physical("score", DataTypes.INT()),
+ Column.physical("date", DataTypes.DATE()));
+ dataType = resolvedSchema.toPhysicalRowDataType();
+ RowType rowType = (RowType) dataType.getLogicalType();
+ testTypeInformation = InternalTypeInfo.of(rowType);
+ testFormatInfo = new RowFormatInfo(
+ new String[]{"time", "attributes", "student_name", "score", "date"},
+ new FormatInfo[]{
+ new TimestampFormatInfo(),
+ new MapFormatInfo(StringFormatInfo.INSTANCE, StringFormatInfo.INSTANCE),
+ StringFormatInfo.INSTANCE,
+ IntFormatInfo.INSTANCE,
+ new DateFormatInfo("yyyy-MM-dd")
+ });
+ }
+
+ @Test
+ public void testDeSchema() {
+ final InLongMsgCsvRowDataDeserializationSchema expectedDeSer =
+ new InLongMsgCsvRowDataDeserializationSchema.Builder(
+ testFormatInfo)
+ .setCharset("UTF-8")
+ .setFieldDelimiter(';')
+ .setQuoteCharacter('\'')
+ .setEscapeCharacter('\\')
+ .setNullLiteral("n/a")
+ .build();
+ final Map options = getAllOptions();
+ DeserializationSchema actualDeser = createDeserializationSchema(options);
+ assertEquals(expectedDeSer, actualDeser);
+ }
+
+ private DeserializationSchema createDeserializationSchema(
+ Map options) {
+ final DynamicTableSource actualSource = createTableSource(resolvedSchema, options);
+ assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
+ TestDynamicTableFactory.DynamicTableSourceMock sourceMock =
+ (TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+ return sourceMock.valueFormat.createRuntimeDecoder(
+ ScanRuntimeProviderContext.INSTANCE, dataType);
+ }
+
+ private Map getAllOptions() {
+ final Map options = new HashMap<>();
+ options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+ options.put("target", "MyTarget");
+ options.put("buffer-size", "1000");
+
+ options.put("format", InLongMsgCsvFormatFactory.IDENTIFIER);
+ options.put("inlong-msg-csv.row.format.info", FormatUtils.marshall(testFormatInfo));
+ options.put("inlong-msg-csv.format.field-delimiter", ";");
+ options.put("inlong-msg-csv.format.quote-character", "'");
+ options.put("inlong-msg-csv.format.escape-character", "\\");
+ options.put("inlong-msg-csv.format.null-literal", "n/a");
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/resources/log4j-test.properties b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/resources/log4j-test.properties
new file mode 100644
index 00000000000..e07d62aefe4
--- /dev/null
+++ b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=[%t] %-5p %l %x - %m%n
diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
index d3f477164b9..25b69037a01 100644
--- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/pom.xml
@@ -96,52 +96,4 @@
test
-
-
- japicmp-report
-
-
- japicmp-report
-
-
-
-
-
- com.github.siom79.japicmp
- japicmp-maven-plugin
-
-
- false
- false
-
-
-
-
-
-
-
- japicmp-check
-
-
- !japicmp-report
-
-
-
-
-
- com.github.siom79.japicmp
- japicmp-maven-plugin
-
-
-
- org.apache.inlong.sort.flink.formats.csv.CsvRowDataDeserializationSchema
- org.apache.inlong.sort.flink.formats.csv.CsvRowDataSerializationSchema
-
-
-
-
-
-
-
-
diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/pom.xml b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/pom.xml
index 4be8115d88d..7f7e854faff 100644
--- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/pom.xml
@@ -114,52 +114,4 @@
test
-
-
- japicmp-report
-
-
- japicmp-report
-
-
-
-
-
- com.github.siom79.japicmp
- japicmp-maven-plugin
-
-
- false
- false
-
-
-
-
-
-
-
- japicmp-check
-
-
- !japicmp-report
-
-
-
-
-
- com.github.siom79.japicmp
- japicmp-maven-plugin
-
-
-
- org.apache.inlong.sort.formats.json.JsonRowDataDeserializationSchema
- org.apache.inlong.sort.formats.json.JsonRowDataSerializationSchema
-
-
-
-
-
-
-
-
diff --git a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml
index 8156a8b18f4..859a8b88168 100644
--- a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/pom.xml
@@ -91,52 +91,4 @@
test
-
-
- japicmp-report
-
-
- japicmp-report
-
-
-
-
-
- com.github.siom79.japicmp
- japicmp-maven-plugin
-
-
- false
- false
-
-
-
-
-
-
-
- japicmp-check
-
-
- !japicmp-report
-
-
-
-
-
- com.github.siom79.japicmp
- japicmp-maven-plugin
-
-
-
- org.apache.inlong.sort.formats.kv.KvRowDataDeserializationSchema
- org.apache.inlong.sort.formats.kv.KvRowDataSerializationSchema
-
-
-
-
-
-
-
-
diff --git a/inlong-sort/sort-formats/format-rowdata/pom.xml b/inlong-sort/sort-formats/format-rowdata/pom.xml
index 5f5811b4bbd..27528c0dc27 100644
--- a/inlong-sort/sort-formats/format-rowdata/pom.xml
+++ b/inlong-sort/sort-formats/format-rowdata/pom.xml
@@ -39,6 +39,7 @@
format-inlongmsg-rowdata-base
format-inlongmsg-rowdata-binlog
format-inlongmsg-rowdata-pb
+ format-inlongmsg-rowdata-csv