Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.load.RoutineLoadDesc;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.routineload.AbstractDataSourceProperties;
Expand Down Expand Up @@ -167,22 +168,8 @@ public class CreateRoutineLoadStmt extends DdlStmt implements NotFallbackInParse
private String timezone = TimeUtils.DEFAULT_TIME_ZONE;
private int sendBatchParallelism = 1;
private boolean loadToSingleTablet = false;
/**
* RoutineLoad support json data.
* Require Params:
* 1) dataFormat = "json"
* 2) jsonPaths = "$.XXX.xxx"
*/
private String format = ""; //default is csv.
private String jsonPaths = "";
private String jsonRoot = ""; // MUST be a jsonpath string
private boolean stripOuterArray = false;
private boolean numAsString = false;
private boolean fuzzyParse = false;

private byte enclose;

private byte escape;
private FileFormatProperties fileFormatProperties;

private long workloadGroupId = -1;

Expand Down Expand Up @@ -227,6 +214,8 @@ public CreateRoutineLoadStmt(LabelName labelName, String tableName, List<ParseNo
if (comment != null) {
this.comment = comment;
}
String format = jobProperties.getOrDefault(FileFormatProperties.PROP_FORMAT, "csv");
fileFormatProperties = FileFormatProperties.createFileFormatProperties(format);
}

public String getName() {
Expand Down Expand Up @@ -293,42 +282,14 @@ public String getTimezone() {
return timezone;
}

public String getFormat() {
return format;
}

public boolean isStripOuterArray() {
return stripOuterArray;
}

public boolean isNumAsString() {
return numAsString;
}

public boolean isFuzzyParse() {
return fuzzyParse;
}

public String getJsonPaths() {
return jsonPaths;
}

public byte getEnclose() {
return enclose;
}

public byte getEscape() {
return escape;
}

public String getJsonRoot() {
return jsonRoot;
}

public LoadTask.MergeType getMergeType() {
return mergeType;
}

public FileFormatProperties getFileFormatProperties() {
return fileFormatProperties;
}

public AbstractDataSourceProperties getDataSourceProperties() {
return dataSourceProperties;
}
Expand Down Expand Up @@ -517,23 +478,6 @@ private void checkJobProperties() throws UserException {
RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET,
LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean");

String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE);
if (encloseStr != null) {
if (encloseStr.length() != 1) {
throw new AnalysisException("enclose must be single-char");
} else {
enclose = encloseStr.getBytes()[0];
}
}
String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE);
if (escapeStr != null) {
if (escapeStr.length() != 1) {
throw new AnalysisException("enclose must be single-char");
} else {
escape = escapeStr.getBytes()[0];
}
}

String inputWorkloadGroupStr = jobProperties.get(WORKLOAD_GROUP);
if (!StringUtils.isEmpty(inputWorkloadGroupStr)) {
this.workloadGroupId = Env.getCurrentEnv().getWorkloadGroupMgr()
Expand All @@ -545,23 +489,7 @@ private void checkJobProperties() throws UserException {
}
timezone = TimeUtils.checkTimeZoneValidAndStandardize(jobProperties.getOrDefault(LoadStmt.TIMEZONE, timezone));

format = jobProperties.get(FORMAT);
if (format != null) {
if (format.equalsIgnoreCase("csv")) {
format = ""; // if it's not json, then it's mean csv and set empty
} else if (format.equalsIgnoreCase("json")) {
format = "json";
jsonPaths = jobProperties.getOrDefault(JSONPATHS, "");
jsonRoot = jobProperties.getOrDefault(JSONROOT, "");
stripOuterArray = Boolean.parseBoolean(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
numAsString = Boolean.parseBoolean(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
fuzzyParse = Boolean.parseBoolean(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
} else {
throw new UserException("Format type is invalid. format=`" + format + "`");
}
} else {
format = "csv"; // default csv
}
fileFormatProperties.analyzeFileFormatProperties(jobProperties, false);
}

private void checkDataSourceProperties() throws UserException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ public OutFileClause clone() {
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ")
.append(fileFormatProperties.getFileFormatType());
.append(fileFormatProperties.getFormatName());
if (properties != null && !properties.isEmpty()) {
sb.append(" PROPERTIES(");
sb.append(new PrintableMap<>(properties, " = ", true, false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import java.util.regex.Pattern;

public class FileFormatConstants {
public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
public static final String DEFAULT_LINE_DELIMITER = "\n";

public static final String FORMAT_CSV = "csv";
public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names";
public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public class AvroFileFormatProperties extends FileFormatProperties {
public AvroFileFormatProperties() {
super(TFileFormatType.FORMAT_AVRO);
super(TFileFormatType.FORMAT_AVRO, FileFormatProperties.FORMAT_AVRO);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class CsvFileFormatProperties extends FileFormatProperties {
public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";

public static final String PROP_ENCLOSE = "enclose";
public static final String PROP_ESCAPE = "escape";

private String headerType = "";
private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
Expand All @@ -62,24 +63,26 @@ public class CsvFileFormatProperties extends FileFormatProperties {
private int skipLines;
private byte enclose;

private byte escape;

// used by tvf
// User specified csv columns, it will override columns got from file
private final List<Column> csvSchema = Lists.newArrayList();

String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR;

public CsvFileFormatProperties() {
super(TFileFormatType.FORMAT_CSV_PLAIN);
public CsvFileFormatProperties(String formatName) {
super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
}

public CsvFileFormatProperties(String defaultColumnSeparator, TTextSerdeType textSerdeType) {
super(TFileFormatType.FORMAT_CSV_PLAIN);
public CsvFileFormatProperties(String defaultColumnSeparator, TTextSerdeType textSerdeType, String formatName) {
super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
this.defaultColumnSeparator = defaultColumnSeparator;
this.textSerdeType = textSerdeType;
}

public CsvFileFormatProperties(String headerType) {
super(TFileFormatType.FORMAT_CSV_PLAIN);
public CsvFileFormatProperties(String headerType, String formatName) {
super(TFileFormatType.FORMAT_CSV_PLAIN, formatName);
this.headerType = headerType;
}

Expand Down Expand Up @@ -115,6 +118,16 @@ public void analyzeFileFormatProperties(Map<String, String> formatProperties, bo
}
}

String escapeStr = getOrDefault(formatProperties, PROP_ESCAPE,
"", isRemoveOriginProperty);
if (!Strings.isNullOrEmpty(escapeStr)) {
if (escapeStr.length() != 1) {
throw new AnalysisException("escape must be single-char");
} else {
escape = escapeStr.getBytes()[0];
}
}

trimDoubleQuotes = Boolean.valueOf(getOrDefault(formatProperties,
PROP_TRIM_DOUBLE_QUOTES, "", isRemoveOriginProperty))
.booleanValue();
Expand Down Expand Up @@ -186,6 +199,10 @@ public byte getEnclose() {
return enclose;
}

public byte getEscape() {
return escape;
}

public List<Column> getCsvSchema() {
return csvSchema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ public abstract class FileFormatProperties {
public static final String FORMAT_ARROW = "arrow";
public static final String PROP_COMPRESS_TYPE = "compress_type";

protected String formatName;
protected TFileFormatType fileFormatType;

protected TFileCompressType compressionType;

public FileFormatProperties(TFileFormatType fileFormatType) {
public FileFormatProperties(TFileFormatType fileFormatType, String formatName) {
this.fileFormatType = fileFormatType;
this.formatName = formatName;
}

/**
Expand Down Expand Up @@ -73,16 +75,14 @@ public abstract void analyzeFileFormatProperties(
public static FileFormatProperties createFileFormatProperties(String formatString) {
switch (formatString) {
case FORMAT_CSV:
return new CsvFileFormatProperties();
return new CsvFileFormatProperties(formatString);
case FORMAT_HIVE_TEXT:
return new CsvFileFormatProperties(CsvFileFormatProperties.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR,
TTextSerdeType.HIVE_TEXT_SERDE);
TTextSerdeType.HIVE_TEXT_SERDE, formatString);
case FORMAT_CSV_WITH_NAMES:
return new CsvFileFormatProperties(
FORMAT_CSV_WITH_NAMES);
return new CsvFileFormatProperties(FORMAT_CSV_WITH_NAMES, formatString);
case FORMAT_CSV_WITH_NAMES_AND_TYPES:
return new CsvFileFormatProperties(
FORMAT_CSV_WITH_NAMES_AND_TYPES);
return new CsvFileFormatProperties(FORMAT_CSV_WITH_NAMES_AND_TYPES, formatString);
case FORMAT_PARQUET:
return new ParquetFileFormatProperties();
case FORMAT_ORC:
Expand Down Expand Up @@ -121,4 +121,8 @@ public TFileFormatType getFileFormatType() {
public TFileCompressType getCompressionType() {
return compressionType;
}

public String getFormatName() {
return formatName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public class JsonFileFormatProperties extends FileFormatProperties {
// from ExternalFileTableValuedFunction:
private String jsonRoot = "";
private String jsonPaths = "";
private boolean stripOuterArray;
private boolean stripOuterArray = false;
private boolean readJsonByLine;
private boolean numAsString;
private boolean fuzzyParse;
private boolean numAsString = false;
private boolean fuzzyParse = false;


public JsonFileFormatProperties() {
super(TFileFormatType.FORMAT_JSON);
super(TFileFormatType.FORMAT_JSON, FileFormatProperties.FORMAT_JSON);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class OrcFileFormatProperties extends FileFormatProperties {
private TFileCompressType orcCompressionType = TFileCompressType.ZLIB;

public OrcFileFormatProperties() {
super(TFileFormatType.FORMAT_ORC);
super(TFileFormatType.FORMAT_ORC, FileFormatProperties.FORMAT_ORC);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class ParquetFileFormatProperties extends FileFormatProperties {
private TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0;

public ParquetFileFormatProperties() {
super(TFileFormatType.FORMAT_PARQUET);
super(TFileFormatType.FORMAT_PARQUET, FileFormatProperties.FORMAT_PARQUET);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

public class WalFileFormatProperties extends FileFormatProperties {
public WalFileFormatProperties() {
super(TFileFormatType.FORMAT_WAL);
super(TFileFormatType.FORMAT_WAL, FileFormatProperties.FORMAT_WAL);
}

@Override
Expand Down
Loading