Skip to content

Commit

Permalink
[INLONG-9622][Sort] Update all deserializationInfo in sort-common mod…
Browse files Browse the repository at this point in the history
…ule (#9623)
  • Loading branch information
baomingyu authored Jan 29, 2024
1 parent 1b80093 commit d9df926
Show file tree
Hide file tree
Showing 20 changed files with 367 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,82 @@

package org.apache.inlong.sort.protocol.deserialization;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.util.Objects;

/**
* Csv deserialization info
*/
public class CsvDeserializationInfo implements DeserializationInfo {
public class CsvDeserializationInfo extends InLongMsgDeserializationInfo {

private static final long serialVersionUID = -5035426390567887081L;
private static final long serialVersionUID = 7424482369272150638L;

private final char splitter;

@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final Character escapeChar;

private final String tid;

// TODO: support mapping index to field
public CsvDeserializationInfo(
@JsonProperty("splitter") char splitter) {
this(TID_DEFAULT_VALUE, splitter, null);
}

public CsvDeserializationInfo(
@JsonProperty("splitter") char splitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
this(TID_DEFAULT_VALUE, splitter, escapeChar);
}

@JsonCreator
public CsvDeserializationInfo(
@JsonProperty("splitter") char splitter) {
@JsonProperty("tid") String tid,
@JsonProperty("splitter") char splitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
super(tid);
this.tid = (StringUtils.isEmpty(tid) ? TID_DEFAULT_VALUE : tid);
this.splitter = splitter;
this.escapeChar = escapeChar;
}

@JsonProperty("splitter")
public char getSplitter() {
return splitter;
}

@JsonProperty("escape_char")
@Nullable
public Character getEscapeChar() {
return escapeChar;
}

@JsonProperty("tid")
public String getTid() {
return tid;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

CsvDeserializationInfo other = (CsvDeserializationInfo) o;
return Objects.equals(tid, other.getTid()) && splitter == other.splitter
&& Objects.equals(escapeChar, other.escapeChar);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,5 @@
})
public interface DeserializationInfo extends Serializable {

String TID_DEFAULT_VALUE = "-";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.util.Objects;

/**
* It represents CSV2 format of InLongMsg(m=9).
*/
Expand All @@ -29,16 +34,51 @@ public class InLongMsgCsv2DeserializationInfo extends InLongMsgDeserializationIn

private final char delimiter;

@JsonCreator
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
private final Character escapeChar;

public InLongMsgCsv2DeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter) {
this(tid, delimiter, null);
}

@JsonCreator
public InLongMsgCsv2DeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
super(tid);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
}

@JsonProperty("delimiter")
public char getDelimiter() {
return delimiter;
}
}

@JsonProperty("escape_char")
@Nullable
public Character getEscapeChar() {
return escapeChar;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

InLongMsgCsv2DeserializationInfo other = (InLongMsgCsv2DeserializationInfo) o;
return super.equals(other)
&& delimiter == other.delimiter
&& Objects.equals(escapeChar, other.escapeChar);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.util.Objects;

/**
* It represents CSV format of InLongMsg(m=0).
*/
Expand All @@ -31,22 +35,35 @@ public class InLongMsgCsvDeserializationInfo extends InLongMsgDeserializationInf

private final char delimiter;

@JsonInclude(Include.NON_NULL)
@Nullable
private final Character escapeChar;

@JsonInclude(Include.NON_NULL)
private final boolean deleteHeadDelimiter;

public InLongMsgCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter) {
this(tid, delimiter, true);
this(tid, delimiter, null, false);
}

public InLongMsgCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter) {
this(tid, delimiter, null, deleteHeadDelimiter);
}

@JsonCreator
public InLongMsgCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
@JsonProperty("delete_head_delimiter") boolean deleteHeadDelimiter) {
super(tid);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
this.deleteHeadDelimiter = deleteHeadDelimiter;
}

Expand All @@ -55,8 +72,31 @@ public char getDelimiter() {
return delimiter;
}

@JsonProperty("escape_char")
@Nullable
public Character getEscapeChar() {
return escapeChar;
}

@JsonProperty("delete_head_delimiter")
public boolean isDeleteHeadDelimiter() {
return deleteHeadDelimiter;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

InLongMsgCsvDeserializationInfo other = (InLongMsgCsvDeserializationInfo) o;
return super.equals(other)
&& delimiter == other.delimiter
&& Objects.equals(escapeChar, other.escapeChar)
&& deleteHeadDelimiter == other.deleteHeadDelimiter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@

package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.util.Objects;

/**
* It represents KV format of InLongMsg(m=5).
*/
Expand All @@ -30,13 +37,33 @@ public class InLongMsgKvDeserializationInfo extends InLongMsgDeserializationInfo

private final char kvDelimiter;

@JsonInclude(Include.NON_NULL)
@Nullable
private final Character escapeChar;

@JsonInclude(Include.NON_NULL)
@Nullable
private final Character lineDelimiter;

public InLongMsgKvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter) {
this(tid, entryDelimiter, kvDelimiter, null, null);
}

@JsonCreator
public InLongMsgKvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
@JsonProperty("line_delimiter") @Nullable Character lineDelimiter) {
super(tid);
this.entryDelimiter = entryDelimiter;
this.kvDelimiter = kvDelimiter;
this.escapeChar = escapeChar;
this.lineDelimiter = lineDelimiter == null ? '\n' : lineDelimiter;
}

@JsonProperty("entry_delimiter")
Expand All @@ -48,4 +75,34 @@ public char getEntryDelimiter() {
public char getKvDelimiter() {
return kvDelimiter;
}
}

@JsonProperty("escape_char")
@Nullable
public Character getEscapeChar() {
return escapeChar;
}

@JsonProperty("line_delimiter")
@Nullable
public Character getLineDelimiter() {
return lineDelimiter;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

InLongMsgKvDeserializationInfo other = (InLongMsgKvDeserializationInfo) o;
return super.equals(other)
&& entryDelimiter == other.entryDelimiter
&& kvDelimiter == other.kvDelimiter
&& Objects.equals(escapeChar, other.escapeChar)
&& Objects.equals(lineDelimiter, other.lineDelimiter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
package org.apache.inlong.sort.protocol.deserialization;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;

import java.util.Objects;

/**
* It represents TLog CSV format of InLongMsg(m=10).
*/
Expand All @@ -29,16 +35,51 @@ public class InLongMsgTlogCsvDeserializationInfo extends InLongMsgDeserializatio

private final char delimiter;

@JsonCreator
@JsonInclude(Include.NON_NULL)
@Nullable
private final Character escapeChar;

public InLongMsgTlogCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter) {
this(tid, delimiter, null);
}

@JsonCreator
public InLongMsgTlogCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
super(tid);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
}

@JsonProperty("delimiter")
public char getDelimiter() {
return delimiter;
}
}

@JsonProperty("escape_char")
@Nullable
public Character getEscapeChar() {
return escapeChar;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

InLongMsgTlogCsvDeserializationInfo other = (InLongMsgTlogCsvDeserializationInfo) o;
return super.equals(other)
&& delimiter == other.delimiter
&& Objects.equals(escapeChar, other.escapeChar);
}

}
Loading

0 comments on commit d9df926

Please sign in to comment.