Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions be/src/vec/exec/format/parquet/parquet_column_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,55 @@ namespace doris::vectorized::parquet {
#include "common/compile_check_begin.h"
const cctz::time_zone ConvertParams::utc0 = cctz::utc_time_zone();

void ConvertParams::init(const FieldSchema* field_schema_, const cctz::time_zone* ctz_) {
field_schema = field_schema_;
if (ctz_ != nullptr) {
ctz = ctz_;
}
const auto& schema = field_schema->parquet_schema;
if (field_schema->physical_type == tparquet::Type::INT96) {
ctz = &utc0;
} else if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) {
const auto& timestamp_info = schema.logicalType.TIMESTAMP;
if (!timestamp_info.isAdjustedToUTC) {
// should set timezone to utc+0
// Reference: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#instant-semantics-timestamps-normalized-to-utc
// If isAdjustedToUTC = false, the reader should display the same value no mater what local time zone is. For example:
// When a timestamp is stored as `1970-01-03 12:00:00`,
// if isAdjustedToUTC = true, UTC8 should read as `1970-01-03 20:00:00`, UTC6 should read as `1970-01-03 18:00:00`
// if isAdjustedToUTC = false, UTC8 and UTC6 should read as `1970-01-03 12:00:00`, which is the same as `1970-01-03 12:00:00` in UTC0
ctz = &utc0;
}
const auto& time_unit = timestamp_info.unit;
if (time_unit.__isset.MILLIS) {
second_mask = 1000;
scale_to_nano_factor = 1000000;
} else if (time_unit.__isset.MICROS) {
second_mask = 1000000;
scale_to_nano_factor = 1000;
} else if (time_unit.__isset.NANOS) {
second_mask = 1000000000;
scale_to_nano_factor = 1;
}
} else if (schema.__isset.converted_type) {
const auto& converted_type = schema.converted_type;
if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) {
second_mask = 1000;
scale_to_nano_factor = 1000000;
} else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) {
second_mask = 1000000;
scale_to_nano_factor = 1000;
}
}

if (ctz) {
VecDateTimeValue t;
t.from_unixtime(0, *ctz);
offset_days = t.day() == 31 ? -1 : 0;
}
is_type_compatibility = field_schema_->is_type_compatibility;
}

#define FOR_LOGICAL_DECIMAL_TYPES(M) \
M(TYPE_DECIMAL32) \
M(TYPE_DECIMAL64) \
Expand Down
47 changes: 1 addition & 46 deletions be/src/vec/exec/format/parquet/parquet_column_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,52 +71,7 @@ struct ConvertParams {
}
}

void init(const FieldSchema* field_schema_, const cctz::time_zone* ctz_) {
field_schema = field_schema_;
if (ctz_ != nullptr) {
ctz = ctz_;
}
const auto& schema = field_schema->parquet_schema;
if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) {
const auto& timestamp_info = schema.logicalType.TIMESTAMP;
if (!timestamp_info.isAdjustedToUTC) {
// should set timezone to utc+0
// Reference: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#instant-semantics-timestamps-normalized-to-utc
// If isAdjustedToUTC = false, the reader should display the same value no mater what local time zone is. For example:
// When a timestamp is stored as `1970-01-03 12:00:00`,
// if isAdjustedToUTC = true, UTC8 should read as `1970-01-03 20:00:00`, UTC6 should read as `1970-01-03 18:00:00`
// if isAdjustedToUTC = false, UTC8 and UTC6 should read as `1970-01-03 12:00:00`, which is the same as `1970-01-03 12:00:00` in UTC0
ctz = &utc0;
}
const auto& time_unit = timestamp_info.unit;
if (time_unit.__isset.MILLIS) {
second_mask = 1000;
scale_to_nano_factor = 1000000;
} else if (time_unit.__isset.MICROS) {
second_mask = 1000000;
scale_to_nano_factor = 1000;
} else if (time_unit.__isset.NANOS) {
second_mask = 1000000000;
scale_to_nano_factor = 1;
}
} else if (schema.__isset.converted_type) {
const auto& converted_type = schema.converted_type;
if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) {
second_mask = 1000;
scale_to_nano_factor = 1000000;
} else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) {
second_mask = 1000000;
scale_to_nano_factor = 1000;
}
}

if (ctz) {
VecDateTimeValue t;
t.from_unixtime(0, *ctz);
offset_days = t.day() == 31 ? -1 : 0;
}
is_type_compatibility = field_schema_->is_type_compatibility;
}
void init(const FieldSchema* field_schema_, const cctz::time_zone* ctz_);
};

/**
Expand Down
13 changes: 11 additions & 2 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ VHivePartitionWriter::VHivePartitionWriter(const TDataSink& t_sink, std::string
_file_format_type(file_format_type),
_hive_compress_type(hive_compress_type),
_hive_serde_properties(hive_serde_properties),
_hadoop_conf(hadoop_conf) {}
_hadoop_conf(hadoop_conf) {
if (t_sink.__isset.hive_table_sink) [[likely]] {
const auto& hive_table_sink = t_sink.hive_table_sink;
if (hive_table_sink.__isset.enable_int96_timestamps) [[likely]] {
_enable_int96_timestamps = hive_table_sink.enable_int96_timestamps;
}
}
}

Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* operator_profile) {
_state = state;
Expand Down Expand Up @@ -89,8 +96,10 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* operator_
to_string(_hive_compress_type));
}
}

ParquetFileOptions parquet_options = {parquet_compression_type,
TParquetVersion::PARQUET_1_0, false, true};
TParquetVersion::PARQUET_1_0, false,
_enable_int96_timestamps};
_file_format_transformer = std::make_unique<VParquetTransformer>(
state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names, false,
parquet_options);
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/sink/writer/vhive_partition_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class VHivePartitionWriter {
std::unique_ptr<VFileFormatTransformer> _file_format_transformer = nullptr;

RuntimeState* _state;
// write int96 timestamps in parquet file. config by `fe.conf: parquet_default_timestamps_physical_type`.
bool _enable_int96_timestamps = true;
};
} // namespace vectorized
} // namespace doris
10 changes: 10 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,16 @@ public class Config extends ConfigBase {
+ "`exec_mem_limit / min (query_colocate_join_memory_limit_penalty_factor, instance_num)`"})
public static int query_colocate_join_memory_limit_penalty_factor = 1;


@ConfField(mutable = true, masterOnly = true, description = {
"export 和 hive 写入 timestamp 类型到 parquet 文件时的默认物理类型。"
+ "默认为int64。 可以通过设置为 int96 来保持新老版本的一致性。",
"The default physical type used when export and when Hive sinks timestamp types to Parquet files."
+ "Default is 'int64'."
+ "It can be configured to 'int96' to maintain compatibility between old and old versions."
})
public static String parquet_default_timestamps_physical_type = "int64";

/**
* This configs can set to true to disable the automatic colocate tables's relocate and balance.
* If 'disable_colocate_balance' is set to true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.property.fileformat;

import org.apache.doris.common.Config;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
Expand Down Expand Up @@ -61,7 +62,7 @@ public class ParquetFileFormatProperties extends FileFormatProperties {
private TParquetCompressionType parquetCompressionType = TParquetCompressionType.SNAPPY;
private boolean parquetDisableDictionary = false;
private TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0;
private boolean enableInt96Timestamps = true;
private boolean enableInt96Timestamps = Config.parquet_default_timestamps_physical_type.equalsIgnoreCase("int96");

public ParquetFileFormatProperties() {
super(TFileFormatType.FORMAT_PARQUET, FileFormatProperties.FORMAT_PARQUET);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
Expand Down Expand Up @@ -94,6 +95,8 @@ public void bindDataSink(Optional<InsertCommandContext> insertCtx)
THiveTableSink tSink = new THiveTableSink();
tSink.setDbName(targetTable.getDbName());
tSink.setTableName(targetTable.getName());
tSink.setEnableInt96Timestamps(
Config.parquet_default_timestamps_physical_type.equalsIgnoreCase("int96"));
Set<String> partNames = new HashSet<>(targetTable.getPartitionColumnNames());
List<Column> allColumns = targetTable.getColumns();
Set<String> colNames = allColumns.stream().map(Column::getName).collect(Collectors.toSet());
Expand Down
3 changes: 2 additions & 1 deletion gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ struct TResultFileSinkOptions {
20: optional i64 orc_writer_version;

//iceberg write sink use int64
//hive write sink use int96
//hive write sink use 'fe.conf: parquet_default_timestamps',see : `THiveTableSink`
//export data to file use by user define properties
21: optional bool enable_int96_timestamps
// currently only for csv
Expand Down Expand Up @@ -365,6 +365,7 @@ struct THiveTableSink {
10: optional bool overwrite
11: optional THiveSerDeProperties serde_properties
12: optional list<Types.TNetworkAddress> broker_addresses;
13: optional bool enable_int96_timestamps // fe.conf : parquet_default_timestamps_physical_type = int64/int96
}

enum TUpdateMode {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@
[{"latitude":0, "longitude":180}, {"latitude":0, "longitude":0}]

-- !test_17 --
1 1880-01-01T15:58:41
2 1884-01-01T16:05:43
3 1990-01-01T16:00
1 1880-01-01T07:52:58
2 1884-01-01T08:00
3 1990-01-01T08:00

-- !test_18 --
[{"latitude":0, "longitude":0}, null, {"latitude":0, "longitude":180}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ apple_banana_mango9
3 Cecilia 2022-11-16T02:32:09

-- !test_17 --
1001-01-07T17:07:47.172032 1001-01-07T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-08T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-09T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-10T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-11T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-12T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-13T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-14T17:07:47.172032
1001-01-07T09:02:04.172032 1001-01-07T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-08T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-09T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-10T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-11T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-12T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-13T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-14T09:02:04.172032

-- !test_18 --
0.00
Expand All @@ -151,14 +151,14 @@ apple_banana_mango9
2

-- !test_20 --
1001-01-07T17:07:47.172032 1001-01-07T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-08T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-09T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-10T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-11T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-12T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-13T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-14T17:07:47.172032
1001-01-07T09:02:04.172032 1001-01-07T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-08T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-09T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-10T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-11T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-12T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-13T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-14T09:02:04.172032

-- !test_21 --
1001-01-07 1001-01-07
Expand All @@ -181,14 +181,14 @@ apple_banana_mango9
1001-01-07T17:07:47.171 1001-01-14T17:07:47.171

-- !test_23 --
1001-01-07T17:07:47.172032 1001-01-07T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-08T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-09T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-10T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-11T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-12T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-13T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-14T17:07:47.172032
1001-01-07T09:02:04.172032 1001-01-07T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-08T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-09T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-10T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-11T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-12T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-13T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-14T09:02:04.172032

-- !test_24 --
false 1 2 3 10 1.2 val_1 val_1 HEARTS false 1 2 3 10 1.2 val_1 val_1 HEARTS ["arr_1", "arr_2", "arr_3"] [1] {1:"val_1", 2:"val_2", 3:"val_3"} {1:[{"nestedintscolumn":[1, 2, 3], "nestedstringcolumn":"val_1"}, {"nestedintscolumn":[2, 3, 4], "nestedstringcolumn":"val_2"}, {"nestedintscolumn":[3, 4, 5], "nestedstringcolumn":"val_3"}], 2:[{"nestedintscolumn":[1, 2, 3], "nestedstringcolumn":"val_1"}, {"nestedintscolumn":[2, 3, 4], "nestedstringcolumn":"val_2"}, {"nestedintscolumn":[3, 4, 5], "nestedstringcolumn":"val_3"}], 3:[{"nestedintscolumn":[1, 2, 3], "nestedstringcolumn":"val_1"}, {"nestedintscolumn":[2, 3, 4], "nestedstringcolumn":"val_2"}, {"nestedintscolumn":[3, 4, 5], "nestedstringcolumn":"val_3"}]}
Expand Down Expand Up @@ -238,14 +238,14 @@ true 8 9 10 80 8.2 val_8 val_8 SPADES true 8 9 10 80 8.2 val_8 val_8 SPADES ["ar
9.00

-- !test_29 --
1001-01-07T17:07:47.172032 1001-01-07T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-08T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-09T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-10T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-11T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-12T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-13T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-14T17:07:47.172032
1001-01-07T09:02:04.172032 1001-01-07T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-08T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-09T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-10T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-11T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-12T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-13T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-14T09:02:04.172032

-- !test_30 --
\N
Expand All @@ -268,14 +268,14 @@ true 8 9 10 80 8.2 val_8 val_8 SPADES true 8 9 10 80 8.2 val_8 val_8 SPADES ["ar
1970-01-01T08:00:00.010

-- !test_33 --
1001-01-07T17:07:47.172032 1001-01-07T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-08T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-09T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-10T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-11T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-12T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-13T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-14T17:07:47.172032
1001-01-07T09:02:04.172032 1001-01-07T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-08T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-09T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-10T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-11T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-12T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-13T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-14T09:02:04.172032

-- !test_34 --
1001-01-07 1001-01-07
Expand All @@ -298,12 +298,12 @@ true 8 9 10 80 8.2 val_8 val_8 SPADES true 8 9 10 80 8.2 val_8 val_8 SPADES ["ar
1001-01-07T17:07:47.171 1001-01-14T17:07:47.171

-- !test_36 --
1001-01-07T17:07:47.172032 1001-01-07T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-08T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-09T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-10T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-11T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-12T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-13T17:07:47.172032
1001-01-07T17:07:47.172032 1001-01-14T17:07:47.172032
1001-01-07T09:02:04.172032 1001-01-07T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-08T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-09T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-10T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-11T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-12T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-13T09:02:04.172032
1001-01-07T09:02:04.172032 1001-01-14T09:02:04.172032

Binary file not shown.
Binary file not shown.
Loading