Skip to content

Commit

Permalink
[INLONG-10241][Sort] TLog format requires the ability to parse and de…
Browse files Browse the repository at this point in the history
…termine if the first segment is present
  • Loading branch information
baomingyu committed May 20, 2024
1 parent e4e24de commit 8790ee4
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public final class InLongMsgTlogCsvFormatDeserializer extends AbstractInLongMsgF
@Nullable
private final String nullLiteral;

@Nonnull
private Boolean isIncludeFirstSegment = false;

public InLongMsgTlogCsvFormatDeserializer(
@Nonnull RowFormatInfo rowFormatInfo,
@Nullable String timeFieldName,
Expand All @@ -116,6 +119,7 @@ public InLongMsgTlogCsvFormatDeserializer(
escapeChar,
quoteChar,
nullLiteral,
false,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}

Expand All @@ -128,6 +132,7 @@ public InLongMsgTlogCsvFormatDeserializer(
@Nullable Character escapeChar,
@Nullable Character quoteChar,
@Nullable String nullLiteral,
@Nullable boolean isIncludeFirstSegment,
@Nonnull FailureHandler failureHandler) {
super(failureHandler);

Expand All @@ -139,6 +144,7 @@ public InLongMsgTlogCsvFormatDeserializer(
this.escapeChar = escapeChar;
this.quoteChar = quoteChar;
this.nullLiteral = nullLiteral;
this.isIncludeFirstSegment = isIncludeFirstSegment;
}

@Override
Expand All @@ -154,7 +160,8 @@ protected InLongMsgHead parseHead(String attr) throws Exception {
@Override
protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception {
return Collections.singletonList(
InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, quoteChar));
InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar,
quoteChar, isIncludeFirstSegment));
}

@Override
Expand Down Expand Up @@ -183,7 +190,7 @@ public static class Builder extends TextFormatBuilder<Builder> {
private String timeFieldName = DEFAULT_TIME_FIELD_NAME;
private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
private Character delimiter = DEFAULT_DELIMITER;

private boolean isIncludeFirstSegment = true;
public Builder(RowFormatInfo rowFormatInfo) {
super(rowFormatInfo);
}
Expand Down Expand Up @@ -226,7 +233,8 @@ public InLongMsgTlogCsvFormatDeserializer build() {
escapeChar,
quoteChar,
nullLiteral,
ignoreErrors);
isIncludeFirstSegment,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}
}

Expand All @@ -252,13 +260,14 @@ public boolean equals(Object o) {
delimiter.equals(that.delimiter) &&
Objects.equals(escapeChar, that.escapeChar) &&
Objects.equals(quoteChar, that.quoteChar) &&
Objects.equals(nullLiteral, that.nullLiteral);
Objects.equals(nullLiteral, that.nullLiteral) &&
Objects.equals(isIncludeFirstSegment, that.isIncludeFirstSegment);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
attributesFieldName, charset, delimiter, escapeChar, quoteChar,
nullLiteral);
nullLiteral, isIncludeFirstSegment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,33 @@ public final class InLongMsgTlogCsvMixedFormatDeserializer
@Nullable
private final Character quoteChar;

@Nonnull
private Boolean isIncludeFirstSegment = false;

public InLongMsgTlogCsvMixedFormatDeserializer(
@Nonnull String charset,
@Nonnull Character delimiter,
@Nullable Character escapeChar,
@Nullable Character quoteChar,
@Nonnull Boolean ignoreErrors) {
this(charset, delimiter, escapeChar, quoteChar, InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
this(charset, delimiter, escapeChar, quoteChar, false,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}

public InLongMsgTlogCsvMixedFormatDeserializer(
@Nonnull String charset,
@Nonnull Character delimiter,
@Nullable Character escapeChar,
@Nullable Character quoteChar,
@Nonnull Boolean isIncludeFirstSegment,
@Nonnull FailureHandler failureHandler) {
super(failureHandler);

this.delimiter = delimiter;
this.charset = charset;
this.escapeChar = escapeChar;
this.quoteChar = quoteChar;
this.isIncludeFirstSegment = isIncludeFirstSegment;
}

@Override
Expand All @@ -105,7 +111,8 @@ protected InLongMsgHead parseHead(String attr) throws Exception {
@Override
protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception {
return Collections.singletonList(
InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, quoteChar));
InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar,
quoteChar, isIncludeFirstSegment));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public static InLongMsgBody parseBody(
String charset,
char delimiter,
Character escapeChar,
Character quoteChar) {
Character quoteChar,
boolean isIncludeFirstSegment) {
String text;
if (bytes[0] == delimiter) {
text = new String(bytes, 1, bytes.length - 1, Charset.forName(charset));
Expand All @@ -91,7 +92,7 @@ public static InLongMsgBody parseBody(

String streamId = segments[0];
List<String> fields =
Arrays.stream(segments, 1, segments.length).collect(Collectors.toList());
Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1), segments.length).collect(Collectors.toList());

return new InLongMsgBody(bytes, streamId, fields, Collections.emptyMap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {

private static final RowFormatInfo TEST_ROW_INFO =
new RowFormatInfo(
new String[]{"f1", "f2", "f3", "f4", "f5"},
new String[]{"__addcol1_", "__addcol2_", "f1", "f2", "f3", "f4"},
new FormatInfo[]{
IntFormatInfo.INSTANCE,
IntFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE
Expand All @@ -74,6 +75,7 @@ public void testExceptionHandler() throws Exception {
null,
null,
null,
true,
errorHandler);

InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public class InLongMsgUtils {
public static final String INLONGMSG_ATTR_TIME_DT = "dt";
public static final String INLONGMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol";

public static final String DEFAULT_TIME_FIELD_NAME = null;
public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = null;
public static final String DEFAULT_TIME_FIELD_NAME = "inlongmsg_time";
public static final String DEFAULT_ATTRIBUTES_FIELD_NAME = "inlongmsg_attributes";

private static final FieldToRowDataConverters.FieldToRowDataConverter TIME_FIELD_CONVERTER =
FieldToRowDataConverters.createConverter(new TimestampType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public final class InLongMsgTlogCsvFormatDeserializer extends AbstractInLongMsgF
@Nullable
private final Character quoteChar;

@Nonnull
private Boolean isIncludeFirstSegment = false;
/**
* The literal represented null values, default "".
*/
Expand Down Expand Up @@ -123,6 +125,7 @@ public InLongMsgTlogCsvFormatDeserializer(
quoteChar,
nullLiteral,
metadataKeys,
false,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}

Expand All @@ -136,6 +139,7 @@ public InLongMsgTlogCsvFormatDeserializer(
@Nullable Character quoteChar,
@Nullable String nullLiteral,
List<String> metadataKeys,
@Nonnull Boolean isIncludeFirstSegment,
@Nonnull FailureHandler failureHandler) {
super(failureHandler);

Expand All @@ -148,7 +152,7 @@ public InLongMsgTlogCsvFormatDeserializer(
this.quoteChar = quoteChar;
this.nullLiteral = nullLiteral;
this.metadataKeys = metadataKeys;

this.isIncludeFirstSegment = isIncludeFirstSegment;
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(formatInfo)))
Expand All @@ -172,7 +176,8 @@ protected InLongMsgHead parseHead(String attr) throws Exception {
@Override
protected List<InLongMsgBody> parseBodyList(byte[] bytes) throws Exception {
return Collections.singletonList(
InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar, quoteChar));
InLongMsgTlogCsvUtils.parseBody(bytes, charset, delimiter, escapeChar,
quoteChar, isIncludeFirstSegment));
}

@Override
Expand Down Expand Up @@ -204,6 +209,7 @@ public static class Builder extends TextFormatBuilder<Builder> {
private String attributesFieldName = DEFAULT_ATTRIBUTES_FIELD_NAME;
private Character delimiter = DEFAULT_DELIMITER;
private List<String> metadataKeys = Collections.emptyList();
private boolean isIncludeFirstSegment = false;

public Builder(RowFormatInfo rowFormatInfo) {
super(rowFormatInfo);
Expand All @@ -229,6 +235,11 @@ public Builder setMetadataKeys(List<String> metadataKeys) {
return this;
}

public Builder setIncludeFirstSegment(boolean isIncludeFirstSegment) {
this.isIncludeFirstSegment = isIncludeFirstSegment;
return this;
}

public InLongMsgTlogCsvFormatDeserializer build() {
return new InLongMsgTlogCsvFormatDeserializer(
rowFormatInfo,
Expand All @@ -240,7 +251,8 @@ public InLongMsgTlogCsvFormatDeserializer build() {
quoteChar,
nullLiteral,
metadataKeys,
ignoreErrors);
isIncludeFirstSegment,
InLongMsgUtils.getDefaultExceptionHandler(ignoreErrors));
}
}

Expand All @@ -267,13 +279,14 @@ public boolean equals(Object o) {
Objects.equals(escapeChar, that.escapeChar) &&
Objects.equals(quoteChar, that.quoteChar) &&
Objects.equals(nullLiteral, that.nullLiteral) &&
Objects.equals(metadataKeys, that.metadataKeys);
Objects.equals(metadataKeys, that.metadataKeys) &&
Objects.equals(isIncludeFirstSegment, that.isIncludeFirstSegment);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), rowFormatInfo, timeFieldName,
attributesFieldName, charset, delimiter, escapeChar, quoteChar,
nullLiteral, metadataKeys);
nullLiteral, metadataKeys, isIncludeFirstSegment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public static InLongMsgBody parseBody(
String charset,
char delimiter,
Character escapeChar,
Character quoteChar) {
Character quoteChar,
boolean isIncludeFirstSegment) {
String text;
if (bytes[0] == delimiter) {
text = new String(bytes, 1, bytes.length - 1, Charset.forName(charset));
Expand All @@ -92,7 +93,7 @@ public static InLongMsgBody parseBody(

String tid = segments[0];
List<String> fields =
Arrays.stream(segments, 1, segments.length).collect(Collectors.toList());
Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1), segments.length).collect(Collectors.toList());

return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ public class InLongMsgTlogCsvFormatDeserializerTest {

private static final RowFormatInfo TEST_ROW_INFO =
new RowFormatInfo(
new String[]{"f1", "f2", "f3", "f4", "f5"},
new String[]{"__addcol1_", "__addcol2_", "f1", "f2", "f3", "f4"},
new FormatInfo[]{
IntFormatInfo.INSTANCE,
IntFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE,
StringFormatInfo.INSTANCE
Expand All @@ -91,6 +92,7 @@ public void testExceptionHandler() throws Exception {
null,
null,
Collections.emptyList(),
false,
errorHandler);

InLongMsg inLongMsg1 = InLongMsg.newInLongMsg(true);
Expand Down

0 comments on commit 8790ee4

Please sign in to comment.