diff --git a/inlong-sort/sort-dist/pom.xml b/inlong-sort/sort-dist/pom.xml index e7a387b4d62..2fcba9b316b 100644 --- a/inlong-sort/sort-dist/pom.xml +++ b/inlong-sort/sort-dist/pom.xml @@ -75,6 +75,11 @@ sort-format-inlongmsg-csv ${project.version} + + org.apache.inlong + sort-format-inlongmsg-kv + ${project.version} + org.apache.inlong sort-format-kv diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/pom.xml b/inlong-sort/sort-formats/format-inlongmsg-kv/pom.xml new file mode 100644 index 00000000000..4c105fc19b7 --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/pom.xml @@ -0,0 +1,129 @@ + + + + 4.0.0 + + org.apache.inlong + sort-formats + 1.11.0-SNAPSHOT + + + sort-format-inlongmsg-kv + Apache InLong - Sort Format-inlongmsg-kv + + + ${project.parent.parent.parent.basedir} + + + + + + org.apache.inlong + sort-format-common + ${project.version} + provided + + + + org.apache.inlong + sort-format-base + ${project.version} + provided + + + + org.apache.inlong + sort-format-inlongmsg-base + ${project.version} + provided + + + + org.xerial.snappy + snappy-java + ${snappy.version} + provided + + + + org.apache.flink + flink-shaded-jackson + ${flink.jackson.version} + provided + + + + org.apache.flink + flink-core + ${flink.version} + provided + + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + + + + + sql-jars + + + !skipSqlJars + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + jar + + package + + sql-jar + + + + + + + + + diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java new file mode 100644 index 00000000000..52eb340ae17 --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.sort.formats.base.TextFormatDescriptor; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME; + +/** + * Format descriptor for KVs. + */ +public class InLongMsgKv extends TextFormatDescriptor { + + public static final String FORMAT_TYPE_VALUE = "InLongMsg-KV"; + + public InLongMsgKv() { + super(FORMAT_TYPE_VALUE, 1); + } + + /** + * Sets the entry delimiter character ('&' by default). + * + * @param delimiter the entry delimiter character + * @return The instance which has delimiter set. + */ + public InLongMsgKv entryDelimiter(char delimiter) { + internalProperties.putCharacter(FORMAT_ENTRY_DELIMITER, delimiter); + return this; + } + + /** + * Sets the kv delimiter character ('=' by default). + * + * @param delimiter the kv delimiter character. + * @return The instance which has delimiter set. + */ + public InLongMsgKv kvDelimiter(char delimiter) { + internalProperties.putCharacter(FORMAT_KV_DELIMITER, delimiter); + return this; + } + + /** + * Sets the delimiter between lines. + * + * @param lineDelimiter The delimiter between lines (e.g. '\n'). + * @return The instance which has lineDelimiter set. + */ + public InLongMsgKv lineDelimiter(char lineDelimiter) { + internalProperties.putCharacter(FORMAT_LINE_DELIMITER, lineDelimiter); + return this; + } + + /** + * Sets the name of the time field. + * + * @param timeFieldName The name of the time field. + * @return The instance which has timeFieldName set. + */ + public InLongMsgKv timeFieldName(String timeFieldName) { + checkNotNull(timeFieldName); + internalProperties.putString(FORMAT_TIME_FIELD_NAME, timeFieldName); + return this; + } + + /** + * Sets the name of the attributes field. + * + * @param attributesFieldName The name of the attributes field. + * @return The instance which has attributes field set. + */ + public InLongMsgKv attributesFieldName(String attributesFieldName) { + checkNotNull(attributesFieldName); + internalProperties.putString(FORMAT_ATTRIBUTES_FIELD_NAME, attributesFieldName); + return this; + } + + /** + * Skips the predefined Field. + */ + public InLongMsgKv skipPredefinedField() { + internalProperties.putBoolean(FORMAT_RETAIN_PREDEFINED_FIELD, false); + return this; + } +} diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java new file mode 100644 index 00000000000..c7c16dcf307 --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.sort.formats.base.TextFormatBuilder; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.types.Row; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_PREDEFINED_FIELD; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET; + +/** + * The deserializer for the records in InLongMsgKv format. + */ +public final class InLongMsgKvFormatDeserializer extends AbstractInLongMsgFormatDeserializer { + + private static final long serialVersionUID = 1L; + + /** + * Format information describing the result type. + */ + @Nonnull + private final RowFormatInfo rowFormatInfo; + + /** + * The name of the time field. + */ + @Nullable + private final String timeFieldName; + + /** + * The name of the attributes field. + */ + @Nullable + private final String attributesFieldName; + + /** + * The charset of the text. + */ + @Nonnull + private final String charset; + + /** + * The delimiter between entries. + */ + @Nonnull + private final Character entryDelimiter; + + /** + * The delimiter between key and value. + */ + @Nonnull + private final Character kvDelimiter; + + /** + * The delimiter between lines. + */ + @Nullable + private final Character lineDelimiter; + + /** + * Escape character. Null if escaping is disabled. + */ + @Nullable + private final Character escapeChar; + + /** + * Quote character. Null if quoting is disabled. + */ + @Nullable + private final Character quoteChar; + + /** + * The literal represented null values, default "". + */ + @Nullable + private final String nullLiteral; + + /** + * True if the predefinedField existed, default true. + */ + private boolean retainPredefinedField = true; + + public InLongMsgKvFormatDeserializer( + @Nonnull RowFormatInfo rowFormatInfo, + @Nullable String timeFieldName, + @Nullable String attributesFieldName, + @Nonnull String charset, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character lineDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable String nullLiteral, + @Nonnull Boolean ignoreErrors, + boolean retainPredefinedField) { + this( + rowFormatInfo, + timeFieldName, + attributesFieldName, + charset, + entryDelimiter, + kvDelimiter, + lineDelimiter, + escapeChar, + quoteChar, + nullLiteral, + InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); + this.retainPredefinedField = retainPredefinedField; + } + + public InLongMsgKvFormatDeserializer( + @Nonnull RowFormatInfo rowFormatInfo, + @Nullable String timeFieldName, + @Nullable String attributesFieldName, + @Nonnull String charset, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character lineDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable String nullLiteral, + @Nonnull Boolean ignoreErrors) { + this( + rowFormatInfo, + timeFieldName, + attributesFieldName, + charset, + entryDelimiter, + kvDelimiter, + lineDelimiter, + escapeChar, + quoteChar, + nullLiteral, + InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); + } + + public InLongMsgKvFormatDeserializer( + @Nonnull RowFormatInfo rowFormatInfo, + @Nullable String timeFieldName, + @Nullable String attributesFieldName, + @Nonnull String charset, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character lineDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nullable String nullLiteral, + @Nonnull FailureHandler failureHandler) { + super(failureHandler); + + this.rowFormatInfo = rowFormatInfo; + this.timeFieldName = timeFieldName; + this.attributesFieldName = attributesFieldName; + this.charset = charset; + this.entryDelimiter = entryDelimiter; + this.kvDelimiter = kvDelimiter; + this.lineDelimiter = lineDelimiter; + this.escapeChar = escapeChar; + this.quoteChar = quoteChar; + this.nullLiteral = nullLiteral; + } + + @Override + public TypeInformation getProducedType() { + return InLongMsgUtils.decorateRowTypeWithNeededHeadFields(timeFieldName, attributesFieldName, rowFormatInfo); + } + + @Override + protected InLongMsgHead parseHead(String attr) { + return InLongMsgKvUtils.parseHead(attr); + } + + @Override + protected List parseBodyList(byte[] bytes) { + return InLongMsgKvUtils.parseBodyList( + bytes, + charset, + entryDelimiter, + kvDelimiter, + lineDelimiter, + escapeChar, + quoteChar); + } + + @Override + protected List convertRows(InLongMsgHead head, InLongMsgBody body) { + Row dataRow = + InLongMsgKvUtils.deserializeRow( + rowFormatInfo, + nullLiteral, + retainPredefinedField ? head.getPredefinedFields() : Collections.emptyList(), + body.getEntries()); + + Row row = InLongMsgUtils.decorateRowWithNeededHeadFields( + timeFieldName, + attributesFieldName, + head.getTime(), + head.getAttributes(), + dataRow); + + return Collections.singletonList(row); + } + + /** + * The builder for {@link InLongMsgKvFormatDeserializer}. + */ + public static class Builder extends TextFormatBuilder { + + private String timeFieldName = DEFAULT_TIME_FIELD_NAME; + private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME; + private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER; + private Character kvDelimiter = DEFAULT_KV_DELIMITER; + private Character lineDelimiter = DEFAULT_LINE_DELIMITER; + private Boolean retainPredefinedField = DEFAULT_PREDEFINED_FIELD; + + public Builder(RowFormatInfo rowFormatInfo) { + super(rowFormatInfo); + + this.charset = DEFAULT_INLONGMSGKV_CHARSET; + } + + public Builder setTimeFieldName(String timeFieldName) { + this.timeFieldName = timeFieldName; + return this; + } + + public Builder setAttributesFieldName(String attributesFieldName) { + this.attributesFieldName = attributesFieldName; + return this; + } + + public Builder setEntryDelimiter(Character entryDelimiter) { + this.entryDelimiter = entryDelimiter; + return this; + } + + public Builder setKvDelimiter(Character kvDelimiter) { + this.kvDelimiter = kvDelimiter; + return this; + } + + public Builder setLineDelimiter(Character lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + + public Builder setRetainPredefinedField(Boolean retainPredefinedField) { + this.retainPredefinedField = retainPredefinedField; + return this; + } + + public Builder configure(DescriptorProperties descriptorProperties) { + super.configure(descriptorProperties); + + descriptorProperties.getOptionalString(FORMAT_TIME_FIELD_NAME) + .ifPresent(this::setTimeFieldName); + descriptorProperties.getOptionalString(FORMAT_ATTRIBUTES_FIELD_NAME) + .ifPresent(this::setAttributesFieldName); + descriptorProperties.getOptionalCharacter(FORMAT_ENTRY_DELIMITER) + .ifPresent(this::setEntryDelimiter); + descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER) + .ifPresent(this::setKvDelimiter); + descriptorProperties.getOptionalCharacter(FORMAT_LINE_DELIMITER) + .ifPresent(this::setLineDelimiter); + descriptorProperties.getOptionalBoolean(FORMAT_RETAIN_PREDEFINED_FIELD) + .ifPresent(this::setRetainPredefinedField); + + return this; + } + + public InLongMsgKvFormatDeserializer build() { + return new InLongMsgKvFormatDeserializer( + rowFormatInfo, + timeFieldName, + attributesFieldName, + charset, + entryDelimiter, + kvDelimiter, + lineDelimiter, + escapeChar, + quoteChar, + nullLiteral, + ignoreErrors, + retainPredefinedField); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + if (!super.equals(o)) { + return false; + } + + InLongMsgKvFormatDeserializer that = (InLongMsgKvFormatDeserializer) o; + return rowFormatInfo.equals(that.rowFormatInfo) && + Objects.equals(timeFieldName, that.timeFieldName) && + Objects.equals(attributesFieldName, that.attributesFieldName) && + charset.equals(that.charset) && + entryDelimiter.equals(that.entryDelimiter) && + kvDelimiter.equals(that.kvDelimiter) && + Objects.equals(lineDelimiter, that.lineDelimiter) && + Objects.equals(escapeChar, that.escapeChar) && + Objects.equals(quoteChar, that.quoteChar) && + Objects.equals(nullLiteral, that.nullLiteral) && + Objects.equals(retainPredefinedField, that.retainPredefinedField); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName, + attributesFieldName, charset, entryDelimiter, kvDelimiter, lineDelimiter, + escapeChar, quoteChar, nullLiteral, retainPredefinedField); + } +} diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java new file mode 100644 index 00000000000..b3a0b54c9d5 --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactory.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.sort.formats.base.TableFormatDeserializer; +import org.apache.inlong.sort.formats.base.TableFormatDeserializer.TableFormatContext; +import org.apache.inlong.sort.formats.base.TableFormatDeserializerFactory; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter; +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterValidator; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatDeserializerValidator; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatFactory; + +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.table.factories.TableFormatFactoryBase; +import org.apache.flink.types.Row; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_CHARSET; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ESCAPE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_IGNORE_ERRORS; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_NULL_LITERAL; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_QUOTE_CHARACTER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_SCHEMA; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_RETAIN_PREDEFINED_FIELD; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getDataRowFormatInfo; + +/** + * Table format factory for providing configured instances of InLongMsgKv-to-row + * serializer and deserializer. + */ +public final class InLongMsgKvFormatFactory + extends + TableFormatFactoryBase + implements + TableFormatDeserializerFactory, + InLongMsgMixedFormatFactory { + + public InLongMsgKvFormatFactory() { + super(InLongMsgKv.FORMAT_TYPE_VALUE, 1, false); + } + + @Override + public List supportedFormatProperties() { + final List properties = new ArrayList<>(); + properties.add(FORMAT_CHARSET); + properties.add(FORMAT_ENTRY_DELIMITER); + properties.add(FORMAT_KV_DELIMITER); + properties.add(FORMAT_LINE_DELIMITER); + properties.add(FORMAT_ESCAPE_CHARACTER); + properties.add(FORMAT_QUOTE_CHARACTER); + properties.add(FORMAT_NULL_LITERAL); + properties.add(FORMAT_IGNORE_ERRORS); + properties.add(FORMAT_SCHEMA); + properties.add(FORMAT_TIME_FIELD_NAME); + properties.add(FORMAT_ATTRIBUTES_FIELD_NAME); + properties.add(FORMAT_RETAIN_PREDEFINED_FIELD); + return properties; + } + + @Override + public InLongMsgKvFormatDeserializer createFormatDeserializer( + Map properties) { + final DescriptorProperties descriptorProperties = + new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + InLongMsgKvValidator validator = new InLongMsgKvValidator(); + validator.validate(descriptorProperties); + + RowFormatInfo rowFormatInfo = getDataRowFormatInfo(descriptorProperties); + + InLongMsgKvFormatDeserializer.Builder builder = + new InLongMsgKvFormatDeserializer.Builder(rowFormatInfo); + builder.configure(descriptorProperties); + + return builder.build(); + } + + @Override + public TableFormatDeserializer createFormatDeserializer(TableFormatContext context) { + TableFormatDeserializer deserializer = + createFormatDeserializer(context.getFormatProperties()); + deserializer.init(context); + return deserializer; + } + + @Override + public InLongMsgKvMixedFormatDeserializer createMixedFormatDeserializer( + Map properties) { + final DescriptorProperties descriptorProperties = + new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + InLongMsgMixedFormatDeserializerValidator validator = + new InLongMsgMixedFormatDeserializerValidator(); + validator.validate(descriptorProperties); + + InLongMsgKvMixedFormatDeserializer.Builder builder = + new InLongMsgKvMixedFormatDeserializer.Builder(); + builder.configure(descriptorProperties); + + return builder.build(); + } + + @Override + public AbstractInLongMsgMixedFormatConverter createMixedFormatConverter( + AbstractInLongMsgMixedFormatConverter.TableFormatContext context) { + return createMixedFormatConverter(context.getFormatProperties()); + } + + @Override + public AbstractInLongMsgMixedFormatDeserializer createMixedFormatDeserializer( + TableFormatContext context) { + InLongMsgKvMixedFormatDeserializer deserializer = + createMixedFormatDeserializer(context.getFormatProperties()); + deserializer.init(context); + return deserializer; + } + + @Override + public InLongMsgKvMixedFormatConverter createMixedFormatConverter( + Map properties) { + final DescriptorProperties descriptorProperties = + new DescriptorProperties(true); + descriptorProperties.putProperties(properties); + + InLongMsgMixedFormatConverterValidator validator = + new InLongMsgMixedFormatConverterValidator(); + validator.validate(descriptorProperties); + + RowFormatInfo rowFormatInfo = getDataRowFormatInfo(descriptorProperties); + InLongMsgKvMixedFormatConverter.Builder builder = + new InLongMsgKvMixedFormatConverter.Builder(rowFormatInfo); + builder.configure(descriptorProperties); + + return builder.build(); + } +} diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java new file mode 100644 index 00000000000..59195b87e18 --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatConverter; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgMixedFormatConverterBuilder; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.types.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.sql.Timestamp; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Converter used to deserialize a mixed row in InLongMsg-kv format. + */ +public class InLongMsgKvMixedFormatConverter extends AbstractInLongMsgMixedFormatConverter { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(InLongMsgKvMixedFormatConverter.class); + + /** + * The schema of the rows. + */ + @Nonnull + private final RowFormatInfo rowFormatInfo; + + /** + * The name of the time field. + */ + @Nullable + private final String timeFieldName; + + /** + * The name of the attributes field. + */ + @Nullable + private final String attributesFieldName; + + /** + * The literal representing null values. + */ + private final String nullLiteral; + + public InLongMsgKvMixedFormatConverter( + @Nonnull RowFormatInfo rowFormatInfo, + @Nullable String timeFieldName, + @Nullable String attributesFieldName, + @Nullable String nullLiteral, + boolean ignoreErrors) { + super(ignoreErrors); + + this.rowFormatInfo = rowFormatInfo; + this.timeFieldName = timeFieldName; + this.attributesFieldName = attributesFieldName; + this.nullLiteral = nullLiteral; + } + + @Override + public TypeInformation getProducedType() { + return InLongMsgUtils.decorateRowTypeWithNeededHeadFields(timeFieldName, attributesFieldName, rowFormatInfo); + } + + @Override + public List convertRows( + Map attributes, + byte[] data, + String tid, + Timestamp time, + List predefinedFields, + List fields, + Map entries) { + Row dataRow = + InLongMsgKvUtils.deserializeRow( + rowFormatInfo, + nullLiteral, + predefinedFields, + entries); + + Row row = InLongMsgUtils.decorateRowWithNeededHeadFields( + timeFieldName, + attributesFieldName, + time, + attributes, + dataRow); + + return Collections.singletonList(row); + } + + /** + * The builder for {@link InLongMsgKvMixedFormatConverter}. + */ + public static class Builder extends InLongMsgMixedFormatConverterBuilder { + + public Builder(RowFormatInfo rowFormatInfo) { + super(rowFormatInfo); + } + + public InLongMsgKvMixedFormatConverter build() { + return new InLongMsgKvMixedFormatConverter( + rowFormatInfo, + timeFieldName, + attributesFieldName, + nullLiteral, + ignoreErrors); + } + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + + if (object == null || getClass() != object.getClass()) { + return false; + } + + if (!super.equals(object)) { + return false; + } + + InLongMsgKvMixedFormatConverter that = (InLongMsgKvMixedFormatConverter) object; + return rowFormatInfo.equals(that.rowFormatInfo) && + Objects.equals(timeFieldName, that.timeFieldName) && + Objects.equals(attributesFieldName, that.attributesFieldName) && + Objects.equals(nullLiteral, that.nullLiteral); + } + + @Override + public int hashCode() { + return Objects.hash(rowFormatInfo, timeFieldName, attributesFieldName, nullLiteral); + } +} diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java new file mode 100644 index 00000000000..34e67780a31 --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgMixedFormatDeserializer; +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgTextMixedFormatDeserializerBuilder; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.descriptors.DescriptorProperties; +import org.apache.flink.types.Row; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_LINE_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER; +import static org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET; + +/** + * The deserializer for the records in InLongMsgKv format. + */ +public final class InLongMsgKvMixedFormatDeserializer + extends + AbstractInLongMsgMixedFormatDeserializer { + + private static final long serialVersionUID = 1L; + + /** + * The charset of the text. + */ + @Nonnull + private final String charset; + + /** + * The delimiter between entries. + */ + @Nonnull + private final Character entryDelimiter; + + /** + * The delimiter between key and value. + */ + @Nonnull + private final Character kvDelimiter; + + /** + * The delimiter between lines. + */ + @Nullable + private final Character lineDelimiter; + + /** + * Escape character. Null if escaping is disabled. + */ + @Nullable + private final Character escapeChar; + + /** + * Quote character. Null if quoting is disabled. + */ + @Nullable + private final Character quoteChar; + + public InLongMsgKvMixedFormatDeserializer( + @Nonnull String charset, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character lineDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nonnull Boolean ignoreErrors) { + this( + charset, + entryDelimiter, + kvDelimiter, + lineDelimiter, + escapeChar, + quoteChar, + InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors)); + } + + public InLongMsgKvMixedFormatDeserializer( + @Nonnull String charset, + @Nonnull Character entryDelimiter, + @Nonnull Character kvDelimiter, + @Nullable Character lineDelimiter, + @Nullable Character escapeChar, + @Nullable Character quoteChar, + @Nonnull FailureHandler failureHandler) { + super(failureHandler); + + this.entryDelimiter = entryDelimiter; + this.kvDelimiter = kvDelimiter; + this.lineDelimiter = lineDelimiter; + this.charset = charset; + this.escapeChar = escapeChar; + this.quoteChar = quoteChar; + } + + @Override + public TypeInformation getProducedType() { + return InLongMsgUtils.MIXED_ROW_TYPE; + } + + @Override + protected InLongMsgHead parseHead(String attr) { + return InLongMsgKvUtils.parseHead(attr); + } + + @Override + protected List parseBodyList(byte[] bytes) throws Exception { + return InLongMsgKvUtils.parseBodyList( + bytes, + charset, + entryDelimiter, + kvDelimiter, + lineDelimiter, + escapeChar, + quoteChar); + } + + @Override + protected List convertRows(InLongMsgHead head, InLongMsgBody body) { + Row row = InLongMsgUtils.buildMixedRow(head, body, head.getTid()); + return Collections.singletonList(row); + } + + /** + * The builder for {@link InLongMsgKvMixedFormatDeserializer}. + */ + public static class Builder extends InLongMsgTextMixedFormatDeserializerBuilder { + + private Character entryDelimiter = DEFAULT_ENTRY_DELIMITER; + private Character kvDelimiter = DEFAULT_KV_DELIMITER; + private Character lineDelimiter = DEFAULT_LINE_DELIMITER; + + public Builder() { + super(); + + this.charset = DEFAULT_INLONGMSGKV_CHARSET; + } + + public Builder setEntryDelimiter(Character entryDelimiter) { + this.entryDelimiter = entryDelimiter; + return this; + } + + public Builder setKvDelimiter(Character kvDelimiter) { + this.kvDelimiter = kvDelimiter; + return this; + } + + public Builder setLineDelimiter(Character lineDelimiter) { + this.lineDelimiter = lineDelimiter; + return this; + } + + @Override + public Builder configure(DescriptorProperties descriptorProperties) { + super.configure(descriptorProperties); + + descriptorProperties.getOptionalCharacter(FORMAT_ENTRY_DELIMITER) + .ifPresent(this::setEntryDelimiter); + descriptorProperties.getOptionalCharacter(FORMAT_KV_DELIMITER) + .ifPresent(this::setKvDelimiter); + descriptorProperties.getOptionalCharacter(FORMAT_LINE_DELIMITER) + .ifPresent(this::setLineDelimiter); + + return this; + } + + public InLongMsgKvMixedFormatDeserializer build() { + return new InLongMsgKvMixedFormatDeserializer( + charset, + entryDelimiter, + kvDelimiter, + lineDelimiter, + escapeChar, + quoteChar, + ignoreErrors); + } + } + + @Override + public boolean equals(Object object) { + if (this == object) { + return true; + } + + if (object == null || getClass() != object.getClass()) { + return false; + } + + if (!super.equals(object)) { + return false; + } + + InLongMsgKvMixedFormatDeserializer that = (InLongMsgKvMixedFormatDeserializer) object; + return charset.equals(that.charset) && + entryDelimiter.equals(that.entryDelimiter) && + kvDelimiter.equals(that.kvDelimiter) && + Objects.equals(lineDelimiter, that.lineDelimiter) && + Objects.equals(escapeChar, that.escapeChar) && + Objects.equals(quoteChar, that.quoteChar); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), charset, entryDelimiter, kvDelimiter, lineDelimiter, + escapeChar, quoteChar); + } +} diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java new file mode 100644 index 00000000000..9964429c2e9 --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; + +import org.apache.flink.types.Row; + +import java.nio.charset.Charset; +import java.sql.Timestamp; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TID; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseAttr; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseDateTime; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.parseEpochTime; +import static org.apache.inlong.sort.formats.util.StringUtils.splitKv; + +/** + * Utilities for {@link InLongMsgKv}. + */ +public class InLongMsgKvUtils { + + public static final String DEFAULT_INLONGMSGKV_CHARSET = "ISO_8859_1"; + + public static InLongMsgHead parseHead(String attr) { + Map attributes = parseAttr(attr); + + String tid; + if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) { + tid = attributes.get(INLONGMSG_ATTR_STREAM_ID); + } else if (attributes.containsKey(INLONGMSG_ATTR_TID)) { + tid = attributes.get(INLONGMSG_ATTR_TID); + } else { + throw new IllegalArgumentException( + "Could not find " + INLONGMSG_ATTR_STREAM_ID + + " or " + INLONGMSG_ATTR_TID + " in attributes!"); + } + + Timestamp time; + if (attributes.containsKey(INLONGMSG_ATTR_TIME_DT)) { + String epoch = attributes.get(INLONGMSG_ATTR_TIME_DT).trim(); + time = parseEpochTime(epoch); + } else if (attributes.containsKey(INLONGMSG_ATTR_TIME_T)) { + String date = attributes.get(INLONGMSG_ATTR_TIME_T).trim(); + time = parseDateTime(date); + } else { + throw new IllegalArgumentException( + "Could not find " + INLONGMSG_ATTR_TIME_T + + " or " + INLONGMSG_ATTR_TIME_T + " in attributes!"); + } + + List predefinedFields = getPredefinedFields(attributes); + + return new InLongMsgHead(attributes, tid, time, predefinedFields); + } + + public static List parseBodyList( + byte[] bytes, + String charset, + char entryDelimiter, + char kvDelimiter, + Character lineDelimiter, + Character escapeChar, + Character quoteChar) { + String text = new String(bytes, Charset.forName(charset)); + + List> list = + splitKv( + text, + entryDelimiter, + kvDelimiter, + escapeChar, + quoteChar, + lineDelimiter); + + return list.stream().map((line) -> { + return new InLongMsgBody( + bytes, + null, + Collections.emptyList(), + line); + }).collect(Collectors.toList()); + } + + /** + * Deserializes the row from the given entries. + * + * @param rowFormatInfo The format of the fields. + * @param nullLiteral The literal for null values. + * @param predefinedFields The predefined fields. + * @param entries The entries. + * @return The row deserialized from the given entries. + */ + public static Row deserializeRow( + RowFormatInfo rowFormatInfo, + String nullLiteral, + List predefinedFields, + Map entries) { + String[] fieldNames = rowFormatInfo.getFieldNames(); + FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos(); + + Row row = new Row(fieldNames.length); + + for (int i = 0; i < predefinedFields.size(); ++i) { + + if (i >= fieldNames.length) { + break; + } + + String fieldName = fieldNames[i]; + FormatInfo fieldFormatInfo = fieldFormatInfos[i]; + + String fieldText = predefinedFields.get(i); + + Object field = + deserializeBasicField( + fieldName, + fieldFormatInfo, + fieldText, + nullLiteral); + row.setField(i, field); + } + + for (int i = predefinedFields.size(); i < fieldNames.length; ++i) { + + String fieldName = fieldNames[i]; + FormatInfo fieldFormatInfo = fieldFormatInfos[i]; + + String fieldText = entries.get(fieldName); + + Object field = + deserializeBasicField( + fieldName, + fieldFormatInfo, + fieldText, + nullLiteral); + row.setField(i, field); + } + + return row; + } +} diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java new file mode 100644 index 00000000000..e290784c099 --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvValidator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.sort.formats.base.TextFormatDescriptorValidator; + +import org.apache.flink.table.descriptors.DescriptorProperties; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_LINE_DELIMITER; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.validateInLongMsgSchema; + +/** + * The validator for {@link InLongMsgKv}. + */ +public class InLongMsgKvValidator extends TextFormatDescriptorValidator { + + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + + properties.validateString(FORMAT_ENTRY_DELIMITER, true, 1, 1); + properties.validateString(FORMAT_KV_DELIMITER, true, 1, 1); + properties.validateString(FORMAT_LINE_DELIMITER, true, 1, 1); + + validateInLongMsgSchema(properties); + } +} diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory new file mode 100644 index 00000000000..4b312acf4ad --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvFormatFactory diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java new file mode 100644 index 00000000000..911486ed6ac --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializerTest.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.common.msg.InLongMsg; +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.inlongmsg.FailureHandler; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody; +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead; + +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; +import org.junit.Test; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_ENTRY_DELIMITER; +import static org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_KV_DELIMITER; +import static org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET; +import static org.junit.Assert.assertEquals; + +/** + * Unit tests for {@link InLongMsgKvFormatDeserializer}. + */ +public class InLongMsgKvFormatDeserializerTest { + + private static final RowFormatInfo TEST_ROW_INFO = + new RowFormatInfo( + new String[]{"pf1", "pf2", "f1", "f2", "f3", "f4"}, + new FormatInfo[]{ + IntFormatInfo.INSTANCE, + IntFormatInfo.INSTANCE, + IntFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE, + StringFormatInfo.INSTANCE + }); + + @Test + public void testExceptionHandler() throws Exception { + TestFailureHandler errorHandler = new TestFailureHandler(); + InLongMsgKvFormatDeserializer deserializer = + new InLongMsgKvFormatDeserializer( + TEST_ROW_INFO, + "inlongmsg_time", + "inlongmsg_attributes", + DEFAULT_INLONGMSGKV_CHARSET, + DEFAULT_ENTRY_DELIMITER, + DEFAULT_KV_DELIMITER, + null, + null, + null, + null, + errorHandler); + + InLongMsg inLongMsg = InLongMsg.newInLongMsg(); + String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2"; + String body1 = "f1=123&f2=field11&f3=field12&f4=field13"; + String body2 = "f1=errormsg&f2=field21&f3=field22&f4=field23"; + inLongMsg.addMsg(attrs, body1.getBytes()); + inLongMsg.addMsg(attrs, body2.getBytes()); + + List actualRows = new ArrayList<>(); + Collector collector = new ListCollector<>(actualRows); + deserializer.flatMap(inLongMsg.buildArray(), collector); + assertEquals(1, errorHandler.getRowCount()); + + InLongMsg InLongMsgHead = InLongMsg.newInLongMsg(); + String abNormalAttrs = "m=0&streamId=testInterfaceId&__addcol1__=1&__addcol2__=2"; + InLongMsgHead.addMsg(abNormalAttrs, body1.getBytes()); + InLongMsgHead.addMsg(abNormalAttrs, body2.getBytes()); + deserializer.flatMap(InLongMsgHead.buildArray(), collector); + assertEquals(1, errorHandler.getHeadCount()); + } + + @Test + public void testNormal() throws Exception { + + InLongMsg inLongMsg = InLongMsg.newInLongMsg(); + String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2"; + String body1 = "f1=123&f2=field11&f3=field12&f4=field13"; + String body2 = "f1=123&f2=field21&f3=field22&f4=field23"; + inLongMsg.addMsg(attrs, body1.getBytes()); + inLongMsg.addMsg(attrs, body2.getBytes()); + + Map expectedAttributes = new HashMap<>(); + expectedAttributes.put("m", "0"); + expectedAttributes.put("streamId", "testInterfaceId"); + expectedAttributes.put("t", "20200322"); + expectedAttributes.put("__addcol1__", "1"); + expectedAttributes.put("__addcol2__", "2"); + + Row expectedRow1 = Row.of( + Timestamp.valueOf("2020-03-22 00:00:00"), + expectedAttributes, + 1, + 2, + 123, + "field11", + "field12", + "field13"); + + Row expectedRow2 = Row.of( + Timestamp.valueOf("2020-03-22 00:00:00"), + expectedAttributes, + 1, + 2, + 123, + "field21", + "field22", + "field23"); + + InLongMsgKvFormatDeserializer deserializer = + new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO) + .setTimeFieldName("inlongmsg_time") + .setAttributesFieldName("inlongmsg_attributes") + .build(); + testRowDeserialization( + deserializer, inLongMsg.buildArray(), + Arrays.asList(expectedRow1, expectedRow2)); + } + + @Test + public void testNullField() throws Exception { + + InLongMsg inLongMsg = InLongMsg.newInLongMsg(); + String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2"; + String body1 = "f1=123&f2=field11&f3=field12"; + String body2 = "f1=123&f2=field21&f4=field23&f5=field24&="; + inLongMsg.addMsg(attrs, body1.getBytes()); + inLongMsg.addMsg(attrs, body2.getBytes()); + + Map expectedAttributes = new HashMap<>(); + expectedAttributes.put("m", "0"); + expectedAttributes.put("streamId", "testInterfaceId"); + expectedAttributes.put("t", "20200322"); + expectedAttributes.put("__addcol1__", "1"); + expectedAttributes.put("__addcol2__", "2"); + + Row expectedRow1 = Row.of( + Timestamp.valueOf("2020-03-22 00:00:00"), + expectedAttributes, + 1, + 2, + 123, + "field11", + "field12", + null); + + Row expectedRow2 = Row.of( + Timestamp.valueOf("2020-03-22 00:00:00"), + expectedAttributes, + 1, + 2, + 123, + "field21", + null, + "field23"); + + InLongMsgKvFormatDeserializer deserializer = + new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO) + .setTimeFieldName("inlongmsg_time") + .setAttributesFieldName("inlongmsg_attributes") + .build(); + testRowDeserialization( + deserializer, inLongMsg.buildArray(), + Arrays.asList(expectedRow1, expectedRow2)); + } + + @Test + public void testNullField1() throws Exception { + + InLongMsg inLongMsg = InLongMsg.newInLongMsg(); + String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2"; + String body1 = "f1=123&f2=field11&f3=field12"; + String body2 = "f1=123&f2=field21&f4=field23&f5=field24&="; + inLongMsg.addMsg(attrs, body1.getBytes()); + inLongMsg.addMsg(attrs, body2.getBytes()); + + Map expectedAttributes = new HashMap<>(); + expectedAttributes.put("m", "0"); + expectedAttributes.put("streamId", "testInterfaceId"); + expectedAttributes.put("t", "20200322"); + expectedAttributes.put("__addcol1__", "1"); + expectedAttributes.put("__addcol2__", "2"); + + Row expectedRow1 = Row.of( + Timestamp.valueOf("2020-03-22 00:00:00"), + expectedAttributes, + null, + null, + 123, + "field11", + "field12", + null); + + Row expectedRow2 = Row.of( + Timestamp.valueOf("2020-03-22 00:00:00"), + expectedAttributes, + null, + null, + 123, + "field21", + null, + "field23"); + + InLongMsgKvFormatDeserializer deserializer = + new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO) + .setTimeFieldName("inlongmsg_time") + .setAttributesFieldName("inlongmsg_attributes") + .setRetainPredefinedField(false) + .build(); + testRowDeserialization( + deserializer, inLongMsg.buildArray(), + Arrays.asList(expectedRow1, expectedRow2)); + } + + @Test + public void testLineDelimiter() throws Exception { + InLongMsg inLongMsg = InLongMsg.newInLongMsg(); + String attrs = "m=0&streamId=testInterfaceId&t=20200322&__addcol1__=1&__addcol2__=2"; + String body = "f1=123&f2=field11\nf1=1&f2=2"; + inLongMsg.addMsg(attrs, body.getBytes()); + + Map expectedAttributes = new HashMap<>(); + expectedAttributes.put("m", "0"); + expectedAttributes.put("streamId", "testInterfaceId"); + expectedAttributes.put("t", "20200322"); + expectedAttributes.put("__addcol1__", "1"); + expectedAttributes.put("__addcol2__", "2"); + + Row expectedRow1 = Row.of( + Timestamp.valueOf("2020-03-22 00:00:00"), + expectedAttributes, + 1, + 2, + 123, + "field11", + null, + null); + + Row expectedRow2 = Row.of( + Timestamp.valueOf("2020-03-22 00:00:00"), + expectedAttributes, + 1, + 2, + 1, + "2", + null, + null); + + InLongMsgKvFormatDeserializer deserializer = + new InLongMsgKvFormatDeserializer.Builder(TEST_ROW_INFO) + .setTimeFieldName("inloingmsg_time") + .setAttributesFieldName("inlongmsg_attributes") + .setLineDelimiter('\n') + .build(); + + testRowDeserialization( + deserializer, inLongMsg.buildArray(), + Arrays.asList(expectedRow1, expectedRow2)); + } + + private void testRowDeserialization( + InLongMsgKvFormatDeserializer deserializer, + byte[] bytes, + List expectedRows) throws Exception { + List actualRows = new ArrayList<>(); + Collector collector = new ListCollector<>(actualRows); + deserializer.flatMap(bytes, collector); + assertEquals(expectedRows, actualRows); + } + + private static class TestFailureHandler implements FailureHandler { + + private int headCount = 0; + private int bodyCount = 0; + private int rowCount = 0; + + public int getHeadCount() { + return headCount; + } + + public int getBodyCount() { + return bodyCount; + } + + public int getRowCount() { + return rowCount; + } + + @Override + public void onParsingHeadFailure(String attribute, Exception exception) throws Exception { + headCount++; + } + + @Override + public void onParsingBodyFailure(byte[] body, Exception exception) throws Exception { + bodyCount++; + } + + @Override + public void onConvertingRowFailure(InLongMsgHead head, + InLongMsgBody body, + Exception exception) throws Exception { + rowCount++; + } + } +} diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java new file mode 100644 index 00000000000..54352acb3cb --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatFactoryTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.sort.formats.base.TableFormatDeserializer; +import org.apache.inlong.sort.formats.base.TableFormatUtils; +import org.apache.inlong.sort.formats.common.DateFormatInfo; +import org.apache.inlong.sort.formats.common.FormatInfo; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.RowFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.Types; +import org.apache.flink.table.descriptors.Schema; +import org.apache.flink.types.Row; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_ATTRIBUTES_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME; +import static org.apache.inlong.sort.formats.inlongmsgkv.InLongMsgKvUtils.DEFAULT_INLONGMSGKV_CHARSET; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +/** + * Tests for {@link InLongMsgKvFormatFactory}. + */ +public class InLongMsgKvFormatFactoryTest { + + private static final TypeInformation SCHEMA = + Types.ROW( + new String[]{"time", "attributes", "student_name", "score", "date"}, + new TypeInformation[]{ + Types.SQL_TIMESTAMP(), + Types.MAP(Types.STRING(), Types.STRING()), + Types.STRING(), + Types.INT(), + Types.SQL_DATE() + }); + + private static final RowFormatInfo TEST_FORMAT_SCHEMA = + new RowFormatInfo( + new String[]{"student_name", "score", "date"}, + new FormatInfo[]{ + StringFormatInfo.INSTANCE, + IntFormatInfo.INSTANCE, + new DateFormatInfo("yyyy-MM-dd") + }); + + @Test + public void testCreateTableFormatDeserializer() throws Exception { + final Map properties = + new InLongMsgKv() + .schema(TEST_FORMAT_SCHEMA) + .entryDelimiter('&') + .kvDelimiter('=') + .escapeCharacter('\\') + .quoteCharacter('\"') + .nullLiteral("null") + .toProperties(); + assertNotNull(properties); + + final InLongMsgKvFormatDeserializer expectedDeser = + new InLongMsgKvFormatDeserializer( + TEST_FORMAT_SCHEMA, + DEFAULT_TIME_FIELD_NAME, + DEFAULT_ATTRIBUTES_FIELD_NAME, + DEFAULT_INLONGMSGKV_CHARSET, + '&', + '=', + null, + '\\', + '\"', + "null", + false); + + final TableFormatDeserializer actualDeser = + TableFormatUtils.getTableFormatDeserializer( + properties, + getClass().getClassLoader()); + + assertEquals(expectedDeser, actualDeser); + } + + @Test(expected = Exception.class) + public void testCreateTableFormatDeserializerWithDerivation() { + final Map properties = new HashMap<>(); + properties.putAll( + new Schema() + .schema(TableSchema.fromTypeInfo(SCHEMA)) + .toProperties()); + properties.putAll(new InLongMsgKv().deriveSchema().toProperties()); + + final InLongMsgKvFormatDeserializer expectedDeser = + new InLongMsgKvFormatDeserializer.Builder(TEST_FORMAT_SCHEMA).build(); + + final TableFormatDeserializer actualDeser = + TableFormatUtils.getTableFormatDeserializer( + properties, + getClass().getClassLoader()); + + assertEquals(expectedDeser, actualDeser); + } +} diff --git a/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java new file mode 100644 index 00000000000..8b84fa125c2 --- /dev/null +++ b/inlong-sort/sort-formats/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.inlongmsgkv; + +import org.apache.inlong.sort.formats.inlongmsg.InLongMsgValidator; + +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.descriptors.Descriptor; +import org.apache.flink.table.descriptors.DescriptorTestBase; +import org.apache.flink.table.descriptors.DescriptorValidator; +import org.junit.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tests for the {@link InLongMsgKv} descriptor. + */ +public class InLongMsgKvTest extends DescriptorTestBase { + + private static final String TEST_SCHEMA = + "{" + + "\"type\":\"row\"," + + "\"fieldFormats\":[{" + + "\"name\":\"student_name\"," + + "\"format\":{\"type\":\"string\"}" + + "},{" + + "\"name\":\"score\"," + + "\"format\":{\"type\":\"int\"}" + + "},{" + + "\"name\":\"date\"," + + "\"format\":{" + + "\"type\":\"date\"," + + "\"format\":\"yyyy-MM-dd\"" + + "}" + + "}]" + + "}"; + + private static final Descriptor CUSTOM_DESCRIPTOR_WITH_SCHEMA = + new InLongMsgKv() + .schema(TEST_SCHEMA) + .timeFieldName("time") + .attributesFieldName("attributes") + .entryDelimiter('&') + .kvDelimiter('=') + .lineDelimiter('\n') + .charset(StandardCharsets.UTF_8) + .escapeCharacter('\\') + .quoteCharacter('\"') + .nullLiteral("n/a") + .ignoreErrors(); + + @Test(expected = ValidationException.class) + public void testInvalidIgnoreParseErrors() { + addPropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, "format.escape-character", "DDD"); + } + + @Test(expected = ValidationException.class) + public void testMissingSchema() { + removePropertyAndVerify(CUSTOM_DESCRIPTOR_WITH_SCHEMA, "format.schema"); + } + + // -------------------------------------------------------------------------------------------- + + @Override + public List descriptors() { + return Collections.singletonList(CUSTOM_DESCRIPTOR_WITH_SCHEMA); + } + + @Override + public List> properties() { + final Map props1 = new HashMap<>(); + props1.put("format.type", "InLongMsg-KV"); + props1.put("format.property-version", "1"); + props1.put("format.schema", TEST_SCHEMA); + props1.put("format.time-field-name", "time"); + props1.put("format.attributes-field-name", "attributes"); + props1.put("format.entry-delimiter", "&"); + props1.put("format.kv-delimiter", "="); + props1.put("format.line-delimiter", "\n"); + props1.put("format.charset", "UTF-8"); + props1.put("format.escape-character", "\\"); + props1.put("format.quote-character", "\""); + props1.put("format.null-literal", "n/a"); + props1.put("format.ignore-errors", "true"); + + return Collections.singletonList(props1); + } + + @Override + public DescriptorValidator validator() { + return new InLongMsgValidator(); + } +} diff --git a/inlong-sort/sort-formats/pom.xml b/inlong-sort/sort-formats/pom.xml index 7e6f4927ea5..3f668091db4 100644 --- a/inlong-sort/sort-formats/pom.xml +++ b/inlong-sort/sort-formats/pom.xml @@ -37,6 +37,7 @@ format-kv format-inlongmsg-base format-inlongmsg-csv + format-inlongmsg-kv format-inlongmsg-pb