Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 4 additions & 29 deletions be/src/vec/exec/format/json/new_json_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,6 @@ Status NewJsonReader::_get_range_params() {
if (_range.table_format_params.table_format_type == "hive") {
_is_hive_table = true;
}
if (_params.file_attributes.__isset.openx_json_ignore_malformed) {
_openx_json_ignore_malformed = _params.file_attributes.openx_json_ignore_malformed;
}
return Status::OK();
}

static Status ignore_malformed_json_append_null(Block& block) {
for (auto& column : block.get_columns()) {
if (!column->is_nullable()) [[unlikely]] {
return Status::DataQualityError("malformed json, but the column `{}` is not nullable.",
column->get_name());
}
static_cast<ColumnNullable*>(column->assume_mutable().get())->insert_default();
}
return Status::OK();
}

Expand Down Expand Up @@ -500,13 +486,8 @@ Status NewJsonReader::_vhandle_simple_json(RuntimeState* /*state*/, Block& block
bool valid = false;
if (_next_row >= _total_rows) { // parse json and generic document
Status st = _parse_json(is_empty_row, eof);
if (st.is<DATA_QUALITY_ERROR>()) {
if (_is_load) {
continue; // continue to read next (for load, after this , already append error to file.)
} else if (_openx_json_ignore_malformed) {
RETURN_IF_ERROR(ignore_malformed_json_append_null(block));
continue;
}
if (_is_load && st.is<DATA_QUALITY_ERROR>()) {
continue; // continue to read next (for load, after this , already append error to file.)
}
RETURN_IF_ERROR(st);
if (*is_empty_row) {
Expand Down Expand Up @@ -1315,15 +1296,9 @@ Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Bloc

// step2: get json value by json doc
Status st = _get_json_value(&size, eof, &error, is_empty_row);
if (st.is<DATA_QUALITY_ERROR>()) {
if (_is_load) {
return Status::OK();
} else if (_openx_json_ignore_malformed) {
RETURN_IF_ERROR(ignore_malformed_json_append_null(block));
return Status::OK();
}
if (_is_load && st.is<DATA_QUALITY_ERROR>()) {
return Status::OK();
}

RETURN_IF_ERROR(st);
if (*is_empty_row || *eof) {
return Status::OK();
Expand Down
8 changes: 2 additions & 6 deletions be/src/vec/exec/format/json/new_json_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,22 +293,18 @@ class NewJsonReader : public GenericReader {

int32_t skip_bitmap_col_idx {-1};

bool _is_load = true;
//Used to indicate whether it is a stream load. When loading, only data will be inserted into columnString.
//If an illegal value is encountered during the load process, `_append_error_msg` should be called
//instead of directly returning `Status::DataQualityError`
bool _is_load = true;

bool _is_hive_table = false;
// In hive : create table xxx ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
// Hive will not allow you to create columns with the same name but different case, including field names inside
// structs, and will automatically convert uppercase names in create sql to lowercase.However, when Hive loads data
// to table, the column names in the data may be uppercase,and there may be multiple columns with
// the same name but different capitalization.We refer to the behavior of hive, convert all column names
// in the data to lowercase,and use the last one as the insertion value
bool _is_hive_table = false;

// hive : org.openx.data.jsonserde.JsonSerDe, `ignore.malformed.json` prop.
// If the variable is true, `null` will be inserted for llegal json format instead of return error.
bool _openx_json_ignore_malformed = false;

DataTypeSerDeSPtrs _serdes;
vectorized::DataTypeSerDe::FormatOptions _serde_options;
Expand Down
Binary file not shown.

This file was deleted.

11 changes: 0 additions & 11 deletions docker/thirdparties/docker-compose/hive/scripts/hive-metastore.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,6 @@ set -e -x

parallel=$(getconf _NPROCESSORS_ONLN)



AUX_LIB="/mnt/scripts/auxlib"
for file in "${AUX_LIB}"/*.tar.gz; do
[ -e "$file" ] || continue
tar -xzvf "$file" -C "$AUX_LIB"
echo "file = ${file}"
done
ls "${AUX_LIB}/"
mv "${AUX_LIB}"/ /opt/hive

nohup /opt/hive/bin/hive --service metastore &

# wait lockfile
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1084,14 +1084,6 @@ public boolean isPartitionedTable() {
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
}

public boolean firstColumnIsString() {
List<Column> columns = getColumns();
if (columns == null || columns.isEmpty()) {
return false;
}
return columns.get(0).getType().isScalarType(PrimitiveType.STRING);
}

public HoodieTableMetaClient getHudiClient() {
return Env.getCurrentEnv()
.getExtMetaCacheMgr()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public class HiveMetaStoreClientHelper {

public static final String HIVE_JSON_SERDE = "org.apache.hive.hcatalog.data.JsonSerDe";
public static final String LEGACY_HIVE_JSON_SERDE = "org.apache.hadoop.hive.serde2.JsonSerDe";
public static final String OPENX_JSON_SERDE = "org.openx.data.jsonserde.JsonSerDe";

public enum HiveFileFormat {
TEXT_FILE(0, "text"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ public class HiveProperties {
public static final String PROP_ESCAPE_CHAR = OpenCSVSerde.ESCAPECHAR;
public static final String DEFAULT_ESCAPE_CHAR = "\\";

// org.openx.data.jsonserde.JsonSerDe
public static final String PROP_OPENX_IGNORE_MALFORMED_JSON = "ignore.malformed.json";
public static final String DEFAULT_OPENX_IGNORE_MALFORMED_JSON = "false";


public static final Set<String> HIVE_SERDE_PROPERTIES = ImmutableSet.of(
PROP_FIELD_DELIMITER,
PROP_COLLECTION_DELIMITER_HIVE2,
Expand Down Expand Up @@ -136,13 +131,6 @@ public static String getEscapeChar(Table table) {
return HiveMetaStoreClientHelper.firstPresentOrDefault(DEFAULT_ESCAPE_CHAR, escapeChar);
}

public static String getOpenxJsonIgnoreMalformed(Table table) {
Optional<String> escapeChar = HiveMetaStoreClientHelper.getSerdeProperty(table,
PROP_OPENX_IGNORE_MALFORMED_JSON);
return HiveMetaStoreClientHelper.firstPresentOrDefault(DEFAULT_OPENX_IGNORE_MALFORMED_JSON, escapeChar);
}


// Set properties to table
public static void setTableProperties(Table table, Map<String, String> properties) {
HashMap<String, String> serdeProps = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected void doInitialize() throws UserException {
this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
ConnectContext.get().getQualifiedUser(), hmsTable, hmsTable.isFullAcidTable());
Env.getCurrentHiveTransactionMgr().register(hiveTransaction);
skipCheckingAcidVersionFile = sessionVariable.skipCheckingAcidVersionFile;
skipCheckingAcidVersionFile = ConnectContext.get().getSessionVariable().skipCheckingAcidVersionFile;
}
}

Expand Down Expand Up @@ -413,17 +413,6 @@ public TFileFormatType getFileFormatType() throws UserException {
if (serDeLib.equals(HiveMetaStoreClientHelper.HIVE_JSON_SERDE)
|| serDeLib.equals(HiveMetaStoreClientHelper.LEGACY_HIVE_JSON_SERDE)) {
type = TFileFormatType.FORMAT_JSON;
} else if (serDeLib.equals(HiveMetaStoreClientHelper.OPENX_JSON_SERDE)) {
if (!sessionVariable.isReadHiveJsonInOneColumn()) {
type = TFileFormatType.FORMAT_JSON;
} else if (sessionVariable.isReadHiveJsonInOneColumn()
&& hmsTable.firstColumnIsString()) {
type = TFileFormatType.FORMAT_CSV_PLAIN;
} else {
throw new UserException("You set read_hive_json_in_one_column = true, but the first column of "
+ "table " + hmsTable.getName()
+ " is not a string column.");
}
} else {
type = TFileFormatType.FORMAT_CSV_PLAIN;
}
Expand Down Expand Up @@ -460,7 +449,7 @@ protected TFileAttributes getFileAttributes() throws UserException {
fileAttributes.setTextParams(textParams);
fileAttributes.setHeaderType("");
fileAttributes.setEnableTextValidateUtf8(
sessionVariable.enableTextValidateUtf8);
ConnectContext.get().getSessionVariable().enableTextValidateUtf8);
} else if (serDeLib.equals("org.apache.hadoop.hive.serde2.OpenCSVSerde")) {
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
// set set properties of OpenCSVSerde
Expand All @@ -478,7 +467,7 @@ protected TFileAttributes getFileAttributes() throws UserException {
fileAttributes.setTrimDoubleQuotes(true);
}
fileAttributes.setEnableTextValidateUtf8(
sessionVariable.enableTextValidateUtf8);
ConnectContext.get().getSessionVariable().enableTextValidateUtf8);
} else if (serDeLib.equals("org.apache.hive.hcatalog.data.JsonSerDe")) {
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
textParams.setColumnSeparator("\t");
Expand All @@ -492,37 +481,6 @@ protected TFileAttributes getFileAttributes() throws UserException {
fileAttributes.setReadJsonByLine(true);
fileAttributes.setStripOuterArray(false);
fileAttributes.setHeaderType("");
} else if (serDeLib.equals("org.openx.data.jsonserde.JsonSerDe")) {
if (!sessionVariable.isReadHiveJsonInOneColumn()) {
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
textParams.setColumnSeparator("\t");
textParams.setLineDelimiter("\n");
fileAttributes.setTextParams(textParams);

fileAttributes.setJsonpaths("");
fileAttributes.setJsonRoot("");
fileAttributes.setNumAsString(true);
fileAttributes.setFuzzyParse(false);
fileAttributes.setReadJsonByLine(true);
fileAttributes.setStripOuterArray(false);
fileAttributes.setHeaderType("");

fileAttributes.setOpenxJsonIgnoreMalformed(
Boolean.parseBoolean(HiveProperties.getOpenxJsonIgnoreMalformed(table)));
} else if (sessionVariable.isReadHiveJsonInOneColumn()
&& hmsTable.firstColumnIsString()) {
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
textParams.setLineDelimiter("\n");
textParams.setColumnSeparator("\n");
//First, perform row splitting according to `\n`. When performing column splitting,
// since there is no `\n`, only one column of data will be generated.
fileAttributes.setTextParams(textParams);
fileAttributes.setHeaderType("");
} else {
throw new UserException("You set read_hive_json_in_one_column = true, but the first column of table "
+ hmsTable.getName()
+ " is not a string column.");
}
} else {
throw new UserException(
"unsupported hive table serde: " + serDeLib);
Expand Down
21 changes: 0 additions & 21 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -708,8 +708,6 @@ public class SessionVariable implements Serializable, Writable {
"enable_cooldown_replica_affinity";
public static final String SKIP_CHECKING_ACID_VERSION_FILE = "skip_checking_acid_version_file";

public static final String READ_HIVE_JSON_IN_ONE_COLUMN = "read_hive_json_in_one_column";

/**
* Inserting overwrite for auto partition table allows creating partition for
* datas which cannot find partition to overwrite.
Expand Down Expand Up @@ -1222,17 +1220,6 @@ public enum IgnoreSplitType {
@VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD, fuzzy = true)
public int parallelPrepareThreshold = 32;

@VariableMgr.VarAttr(name = READ_HIVE_JSON_IN_ONE_COLUMN,
description = {"在读取hive json的时候,由于存在一些不支持的json格式,我们默认会报错。为了让用户使用体验更好,"
+ "当该变量为true的时候,将一整行json读取到第一列中,用户可以自行选择对一整行json进行处理,例如JSON_PARSE。"
+ "需要表的第一列的数据类型为string.",
"When reading hive json, we will report an error by default because there are some unsupported "
+ "json formats. In order to provide users with a better experience, when this variable is true,"
+ "a whole line of json is read into the first column. Users can choose to process a whole line"
+ "of json, such as JSON_PARSE. The data type of the first column of the table needs to"
+ "be string."})
private boolean readHiveJsonInOneColumn = false;

@VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER)
private boolean enableJoinReorderBasedCost = false;

Expand Down Expand Up @@ -3784,14 +3771,6 @@ public void setKeepCarriageReturn(boolean keepCarriageReturn) {
this.keepCarriageReturn = keepCarriageReturn;
}

public boolean isReadHiveJsonInOneColumn() {
return readHiveJsonInOneColumn;
}

public void setReadHiveJsonInOneColumn(boolean readHiveJsonInOneColumn) {
this.readHiveJsonInOneColumn = readHiveJsonInOneColumn;
}

public boolean isDropTableIfCtasFailed() {
return dropTableIfCtasFailed;
}
Expand Down
3 changes: 0 additions & 3 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,6 @@ struct TFileAttributes {
11: optional i32 skip_lines;
//For text type file reading, whether to enable utf8 encoding check.(Catalog && TVF)
12: optional bool enable_text_validate_utf8 = true;
// org.openx.data.jsonserde.JsonSerDe
13: optional bool openx_json_ignore_malformed = false;

// for cloud copy into
1001: optional bool ignore_csv_redundant_col;
}
Expand Down

This file was deleted.

Loading
Loading