Skip to content

Commit

Permalink
[INLONG-9783][Sort] Add compatibility processing of tid to streamId c…
Browse files Browse the repository at this point in the history
…hanges in the message deserialization base class (#9785)
  • Loading branch information
baomingyu authored Mar 7, 2024
1 parent 7c4adbf commit 03933e4
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

Expand All @@ -28,6 +30,7 @@
/**
* It represents CSV2 format of InLongMsg(m=9).
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgCsv2DeserializationInfo extends InLongMsgDeserializationInfo {

private static final long serialVersionUID = 2188769102604850019L;
Expand All @@ -39,14 +42,14 @@ public class InLongMsgCsv2DeserializationInfo extends InLongMsgDeserializationIn
private final Character escapeChar;

public InLongMsgCsv2DeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("delimiter") char delimiter) {
this(streamId, delimiter, null);
}

@JsonCreator
public InLongMsgCsv2DeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
super(streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -29,6 +31,7 @@
/**
* It represents CSV format of InLongMsg(m=0).
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgCsvDeserializationInfo extends InLongMsgDeserializationInfo {

private static final long serialVersionUID = 1499370571949888870L;
Expand All @@ -43,21 +46,21 @@ public class InLongMsgCsvDeserializationInfo extends InLongMsgDeserializationInf
private final boolean deleteHeadDelimiter;

public InLongMsgCsvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("delimiter") char delimiter) {
this(streamId, delimiter, null, false);
}

public InLongMsgCsvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter) {
this(streamId, delimiter, null, deleteHeadDelimiter);
}

@JsonCreator
public InLongMsgCsvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
@JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,33 @@

package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import static com.google.common.base.Preconditions.checkNotNull;

/**
* InLongMsgDeserializationInfo.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public abstract class InLongMsgDeserializationInfo implements DeserializationInfo {

private static final long serialVersionUID = 3707412713264864315L;

private final String streamId;

public InLongMsgDeserializationInfo(@JsonProperty("streamId") String streamId) {
public InLongMsgDeserializationInfo(@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId) {
this.streamId = checkNotNull(streamId);
}

@JsonProperty("streamId")
@JsonAlias(value = {"tid"})
public String getStreamId() {
return streamId;
}

public String getTid() {
return streamId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -29,6 +31,7 @@
/**
* It represents KV format of InLongMsg(m=5).
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgKvDeserializationInfo extends InLongMsgDeserializationInfo {

private static final long serialVersionUID = 8431516458466278968L;
Expand All @@ -46,15 +49,15 @@ public class InLongMsgKvDeserializationInfo extends InLongMsgDeserializationInfo
private final Character lineDelimiter;

public InLongMsgKvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter) {
this(streamId, entryDelimiter, kvDelimiter, null, null);
}

@JsonCreator
public InLongMsgKvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -29,6 +31,7 @@
/**
* It represents TLog CSV format of InLongMsg(m=10).
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgTlogCsvDeserializationInfo extends InLongMsgDeserializationInfo {

private static final long serialVersionUID = -6585242216925992303L;
Expand All @@ -40,14 +43,14 @@ public class InLongMsgTlogCsvDeserializationInfo extends InLongMsgDeserializatio
private final Character escapeChar;

public InLongMsgTlogCsvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("delimiter") char delimiter) {
this(streamId, delimiter, null);
}

@JsonCreator
public InLongMsgTlogCsvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
super(streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -29,6 +31,7 @@
/**
* It represents TLog KV format of InLongMsg(m=15).
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class InLongMsgTlogKvDeserializationInfo extends InLongMsgDeserializationInfo {

private static final long serialVersionUID = 3299931901024581425L;
Expand All @@ -44,7 +47,7 @@ public class InLongMsgTlogKvDeserializationInfo extends InLongMsgDeserialization
private final Character escapeChar;

public InLongMsgTlogKvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter) {
Expand All @@ -53,7 +56,7 @@ public InLongMsgTlogKvDeserializationInfo(

@JsonCreator
public InLongMsgTlogKvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;

/**
* Json deserialization info
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class JsonDeserializationInfo implements DeserializationInfo {

private static final long serialVersionUID = -5344203248610337314L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.inlong.sort.protocol.deserialization;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonAlias;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

Expand All @@ -29,6 +31,7 @@
/**
* Kv deserialization info
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class KvDeserializationInfo extends InLongMsgDeserializationInfo {

private static final long serialVersionUID = -3182881360079888043L;
Expand Down Expand Up @@ -58,7 +61,7 @@ public KvDeserializationInfo(

@JsonCreator
public KvDeserializationInfo(
@JsonProperty("streamId") String streamId,
@JsonProperty("streamId") @JsonAlias(value = {"tid"}) String streamId,
@JsonProperty("entry_splitter") char entrySplitter,
@JsonProperty("kv_splitter") char kvSplitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
Expand Down

0 comments on commit 03933e4

Please sign in to comment.