Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

22.8.20 Pre-release (2) #291

Merged
merged 98 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
8597c79
Merge pull request #230 from Altinity/customizations/22.8.13
Enmk Feb 10, 2023
2ecca4e
Scheduled CI/CD run 00:00 every Sunday
Enmk Feb 15, 2023
2d733a0
Merge pull request #237 from Altinity/customizations/22.8.13
Enmk Mar 4, 2023
e91634c
Merge remote-tracking branch 'altinity/customizations/22.8.13' into c…
Enmk Mar 16, 2023
edd1b03
Merge pull request #42469 from amosbird/issue_42456
alexey-milovidov Oct 28, 2022
0c3acbb
Merge pull request #41483 from amosbird/jbod-fix1
alexey-milovidov Sep 20, 2022
6507a34
Merge pull request #240 from Altinity/backports/22.8_fix_key_analysis…
Enmk Mar 21, 2023
0b958ed
Merge pull request #241 from Altinity/backports/22.8_fix_nullptr_acce…
Enmk Mar 21, 2023
6b004a6
Fix aborts in arrow lib
Avogar Jan 20, 2023
2a6d875
Better comment
Avogar Jan 20, 2023
a02b8ab
Better comment
Avogar Jan 23, 2023
c28462f
Fix typo
Avogar Jan 23, 2023
217b88e
Removed scheduled run
Enmk Mar 23, 2023
94ae067
Merge pull request #242 from Altinity/backports/22.8.15/optimization_…
arthurpassos Mar 24, 2023
a7a8033
backport upstream 47958
arthurpassos Mar 24, 2023
62fbe6c
Merge pull request #243 from Altinity/backport_arrow_logging_abort_ut
Enmk Mar 27, 2023
729969c
Fix reading columns that are not presented in input data in Parquet/O…
Avogar Dec 19, 2022
0e73a84
Fix style
Avogar Dec 20, 2022
ae51a14
Skip fasttest
Avogar Dec 20, 2022
124d25d
optimize parquet reader
liuneng1994 Feb 1, 2023
f3c6e96
add parquet max_block_size setting
liuneng1994 Feb 1, 2023
96a87dc
Update ParquetBlockInputFormat.cpp
alexey-milovidov Feb 1, 2023
252fa66
Under multi-threading, different order result sets may be generated
liuneng1994 Feb 2, 2023
eada70e
partially backport upstream #47538
arthurpassos Mar 28, 2023
b970425
Merge pull request #246 from Altinity/backports/22.8.15/45878_47538_4…
arthurpassos Mar 29, 2023
4842ef9
Update release_branches.yml
MyroTk Apr 5, 2023
f7a62a9
Update release_branches.yml
MyroTk Apr 5, 2023
84bc4c2
Update release_branches.yml
MyroTk Apr 5, 2023
a0a080e
Update release_branches.yml
MyroTk Apr 5, 2023
225c4b0
Update release_branches.yml
MyroTk Apr 6, 2023
ca7321c
Merge pull request #249 from Altinity/regression-ci
Enmk Apr 6, 2023
90f6912
Bumped Go version to get some CVE fixes
Enmk Apr 11, 2023
fb8124d
Do not install clickhouse-diagnostics
Enmk Apr 12, 2023
5b5c0fd
Merge pull request #256 from Altinity/go_version_up
Enmk Apr 12, 2023
fda518a
Merge pull request #257 from Altinity/not_installing_clickhouse-diagn…
Enmk Apr 13, 2023
ab55b6a
Updated version to v22.8.15.25.altinitystable
Enmk Apr 13, 2023
3485b16
Updated CH db name for flaky test
Enmk Apr 19, 2023
94667d6
Starting regression tests after stateless, stateful and integration
Enmk Apr 19, 2023
0b01a6d
Collapsing regression tests
MyroTk Apr 20, 2023
84098f5
output nice -> classic
MyroTk Apr 21, 2023
59f78b6
Update release_branches.yml
MyroTk Apr 27, 2023
d92aede
Attempt to fix integration tests
Enmk May 22, 2023
2021b75
Fixed typo
Enmk May 22, 2023
81d302f
Merge pull request #262 from Altinity/build_system_update
Enmk May 23, 2023
774588a
Trigger regression_start only if BuilderDebRelease passed
Enmk May 23, 2023
dfdf795
Simplified dependencies of regression_start
Enmk May 24, 2023
b194fcc
Merge pull request #264 from Altinity/Enmk-fix-builds_regression_start
Enmk May 24, 2023
e074fef
first test
arthurpassos Dec 16, 2022
7b48c67
.
arthurpassos Dec 16, 2022
ea12951
.
arthurpassos Dec 16, 2022
a57f27c
aa
arthurpassos Dec 16, 2022
f3dc5c8
cancel dhpa64 for now
arthurpassos Dec 16, 2022
a28f44e
.
arthurpassos Dec 16, 2022
031988e
,
arthurpassos Dec 16, 2022
a27efb8
,
arthurpassos Dec 16, 2022
1228d1a
,
arthurpassos Dec 16, 2022
f7932ac
,
arthurpassos Dec 16, 2022
e32addb
,
arthurpassos Dec 16, 2022
8ac8587
a
arthurpassos Dec 18, 2022
530ccbb
a
arthurpassos Dec 18, 2022
d7e2171
a
arthurpassos Dec 18, 2022
e34475e
a
arthurpassos Dec 18, 2022
ac5f1a0
a
arthurpassos Dec 19, 2022
05d7de2
a
arthurpassos Dec 19, 2022
e6f24ec
a
arthurpassos Dec 19, 2022
8955cd8
a
arthurpassos Dec 19, 2022
04ca68f
a
arthurpassos Jan 3, 2023
13c071e
a
arthurpassos Jan 3, 2023
a5334ac
a
arthurpassos Jan 3, 2023
7cb68a8
a
arthurpassos Jan 3, 2023
bef2fd2
a
arthurpassos Jan 4, 2023
24d919e
a
arthurpassos Jan 4, 2023
cd8af6a
a
arthurpassos Jan 4, 2023
df3523c
a
arthurpassos Jan 5, 2023
40c209b
a
arthurpassos Jan 5, 2023
075c70e
a
arthurpassos Jan 5, 2023
3d6553a
a
arthurpassos Jan 5, 2023
2c038f1
a
arthurpassos Jan 6, 2023
605d0e1
a
arthurpassos Jan 6, 2023
6a0e2e0
a
arthurpassos Jan 6, 2023
452e642
a
arthurpassos Jan 6, 2023
d6de686
a
arthurpassos Jan 6, 2023
9b4a342
working as epxected, just a cleanup
arthurpassos Jan 6, 2023
64d5b55
sha256 instead of 512
arthurpassos Jan 11, 2023
42d46b7
remove comment
arthurpassos Jan 12, 2023
bbf76c9
Add comment specifying files
arthurpassos Jan 19, 2023
ae171d4
changing 512 to 256
MyroTk May 31, 2023
10c29f1
merge fix
MyroTk May 31, 2023
dba9adf
Update env_helper.py
MyroTk Jun 2, 2023
cf062bd
Update env_helper.py
MyroTk Jun 5, 2023
b860787
Update sign_release.py
MyroTk Jun 5, 2023
625d735
Merge pull request #266 from Altinity/22.8.15_sha256
MyroTk Jun 6, 2023
06271fb
Update release_branches.yml
MyroTk Jun 6, 2023
fac3bc6
Update build_check.py
MyroTk Jun 8, 2023
f12dca2
Merge pull request #269 from Altinity/22.8.15_source_upload
MyroTk Jun 9, 2023
a0e86f4
Merge remote-tracking branch 'altinity/customizations/22.8.15' into HEAD
Enmk Jul 31, 2023
7edcb6b
Fixed test 02681_final_excessive_reading_bug.sh
Enmk Aug 1, 2023
84104d5
Merge pull request #292 from Altinity/fix_02681_final_excessive_readi…
Enmk Aug 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
467 changes: 465 additions & 2 deletions .github/workflows/release_branches.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion contrib/arrow
2 changes: 1 addition & 1 deletion docker/packager/binary/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ RUN arch=${TARGETARCH:-amd64} \
&& dpkg -i /tmp/nfpm.deb \
&& rm /tmp/nfpm.deb

ARG GO_VERSION=1.18.3
ARG GO_VERSION=1.19.8
# We need go for clickhouse-diagnostics
RUN arch=${TARGETARCH:-amd64} \
&& curl -Lo /tmp/go.tgz "https://go.dev/dl/go${GO_VERSION}.linux-${arch}.tar.gz" \
Expand Down
1 change: 1 addition & 0 deletions docker/test/integration/runner/dockerd-entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ while true; do
reties=$((reties+1))
if [[ $reties -ge 100 ]]; then # 10 sec max
echo "Can't start docker daemon, timeout exceeded." >&2
cat /ClickHouse/tests/integration/dockerd.log >&2
exit 1;
fi
sleep 0.1
Expand Down
5 changes: 3 additions & 2 deletions packages/clickhouse-common-static.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ deb:
contents:
- src: root/usr/bin/clickhouse
dst: /usr/bin/clickhouse
- src: root/usr/bin/clickhouse-diagnostics
dst: /usr/bin/clickhouse-diagnostics
# Excluded due to CVEs in go runtime that popup constantly
# - src: root/usr/bin/clickhouse-diagnostics
# dst: /usr/bin/clickhouse-diagnostics
- src: root/usr/bin/clickhouse-extract-from-config
dst: /usr/bin/clickhouse-extract-from-config
- src: root/usr/bin/clickhouse-library-bridge
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ if (ENABLE_TESTS)
dbms
clickhouse_common_config
clickhouse_common_zookeeper
ch_contrib::parquet
string_utils)

if (TARGET ch_contrib::simdjson)
Expand Down
1 change: 1 addition & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, input_format_csv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in CSV format", 0) \
M(Bool, input_format_tsv_use_best_effort_in_schema_inference, true, "Use some tweaks and heuristics to infer schema in TSV format", 0) \
M(Bool, input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format Parquet", 0) \
M(UInt64, input_format_parquet_max_block_size, 8192, "Max block size for parquet reader.", 0) \
M(Bool, input_format_protobuf_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip fields with unsupported types while schema inference for format Protobuf", 0) \
M(Bool, input_format_capn_proto_skip_fields_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format CapnProto", 0) \
M(Bool, input_format_orc_skip_columns_with_unsupported_types_in_schema_inference, false, "Skip columns with unsupported types while schema inference for format ORC", 0) \
Expand Down
1 change: 1 addition & 0 deletions src/Disks/IDisk.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class Space : public std::enable_shared_from_this<Space>
virtual const String & getName() const = 0;

/// Reserve the specified number of bytes.
/// Returns valid reservation or nullptr when failure.
virtual ReservationPtr reserve(UInt64 bytes) = 0;

virtual ~Space() = default;
Expand Down
3 changes: 3 additions & 0 deletions src/Disks/VolumeJBOD.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ class VolumeJBOD : public IVolume
ReservationPtr reserve(uint64_t bytes)
{
ReservationPtr reservation = disk->reserve(bytes);
if (!reservation)
return {};

/// Not just subtract bytes, but update the value,
/// since some reservations may be done directly via IDisk, or not by ClickHouse.
free_size = reservation->getUnreservedSpace();
Expand Down
1 change: 1 addition & 0 deletions src/Formats/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.allow_missing_columns = settings.input_format_parquet_allow_missing_columns;
format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference = settings.input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference;
format_settings.parquet.output_string_as_string = settings.output_format_parquet_string_as_string;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;
Expand Down
1 change: 1 addition & 0 deletions src/Formats/FormatSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ struct FormatSettings
bool case_insensitive_column_matching = false;
std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false;
UInt64 max_block_size = 8192;
} parquet;

struct Pretty
Expand Down
5 changes: 3 additions & 2 deletions src/Interpreters/convertFieldToType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,11 @@ Field convertFieldToTypeImpl(const Field & src, const IDataType & type, const ID
}

if (which_type.isDateTime64()
&& (which_from_type.isNativeInt() || which_from_type.isNativeUInt() || which_from_type.isDate() || which_from_type.isDate32() || which_from_type.isDateTime() || which_from_type.isDateTime64()))
&& (src.getType() == Field::Types::UInt64 || src.getType() == Field::Types::Int64 || src.getType() == Field::Types::Decimal64))
{
const auto scale = static_cast<const DataTypeDateTime64 &>(type).getScale();
const auto decimal_value = DecimalUtils::decimalFromComponents<DateTime64>(applyVisitor(FieldVisitorConvertToNumber<Int64>(), src), 0, scale);
const auto decimal_value
= DecimalUtils::decimalFromComponents<DateTime64>(applyVisitor(FieldVisitorConvertToNumber<Int64>(), src), 0, scale);
return Field(DecimalField<DateTime64>(decimal_value, scale));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ Chunk ArrowBlockInputFormat::generate()

++record_batch_current;

arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result);
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result, (*table_result)->num_rows());

/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
Expand Down
7 changes: 7 additions & 0 deletions src/Processors/Formats/Impl/ArrowBufferedStreams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <IO/copyData.h>
#include <IO/PeekableReadBuffer.h>
#include <arrow/buffer.h>
#include <arrow/util/future.h>
#include <arrow/io/memory.h>
#include <arrow/result.h>
#include <Core/Settings.h>
Expand Down Expand Up @@ -95,6 +96,12 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBu
return buffer;
}

arrow::Future<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBuffer::ReadAsync(const arrow::io::IOContext &, int64_t position, int64_t nbytes)
{
/// Just a stub to to avoid using internal arrow thread pool
return arrow::Future<std::shared_ptr<arrow::Buffer>>::MakeFinished(ReadAt(position, nbytes));
}

arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position)
{
seekable_in.seek(position, SEEK_SET);
Expand Down
5 changes: 5 additions & 0 deletions src/Processors/Formats/Impl/ArrowBufferedStreams.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFil

arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;

/// Override async reading to avoid using internal arrow thread pool.
/// In our code we don't use async reading, so implementation is sync,
/// we just call ReadAt and return future with ready value.
arrow::Future<std::shared_ptr<arrow::Buffer>> ReadAsync(const arrow::io::IOContext&, int64_t position, int64_t nbytes) override;

arrow::Status Seek(int64_t position) override;

private:
Expand Down
15 changes: 5 additions & 10 deletions src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ namespace ErrorCodes
extern const int DUPLICATE_COLUMN;
extern const int THERE_IS_NO_COLUMN;
extern const int UNKNOWN_EXCEPTION;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int INCORRECT_DATA;
}

Expand All @@ -90,7 +89,7 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::Ch

/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
}
return {std::move(internal_column), std::move(internal_type), column_name};
Expand Down Expand Up @@ -347,7 +346,7 @@ static ColumnWithTypeAndName readColumnWithIndexesDataImpl(std::shared_ptr<arrow

/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const auto * data = reinterpret_cast<const NumericType *>(buffer->data());
const auto * data = reinterpret_cast<const NumericType *>(buffer->data()) + chunk->offset();

/// Check that indexes are correct (protection against corrupted files)
for (int64_t i = 0; i != chunk->length(); ++i)
Expand Down Expand Up @@ -767,7 +766,7 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
{
}

void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table)
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, size_t num_rows)
{
NameToColumnPtr name_to_column_ptr;
for (auto column_name : table->ColumnNames())
Expand All @@ -781,16 +780,12 @@ void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arr
name_to_column_ptr[std::move(column_name)] = arrow_column;
}

arrowColumnsToCHChunk(res, name_to_column_ptr);
arrowColumnsToCHChunk(res, name_to_column_ptr, num_rows);
}

void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr)
void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr, size_t num_rows)
{
if (unlikely(name_to_column_ptr.empty()))
throw Exception(ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS, "Columns is empty");

Columns columns_list;
UInt64 num_rows = name_to_column_ptr.begin()->second->length();
columns_list.reserve(header.columns());
std::unordered_map<String, std::pair<BlockPtr, std::shared_ptr<NestedColumnExtractHelper>>> nested_tables;
bool skipped = false;
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Formats/Impl/ArrowColumnToCHColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class ArrowColumnToCHColumn
bool allow_missing_columns_,
bool case_insensitive_matching_ = false);

void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table);
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, size_t num_rows);

void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr);
void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr, size_t num_rows);

/// Get missing columns that exists in header but not in arrow::Schema
std::vector<size_t> getMissingColumns(const arrow::Schema & schema) const;
Expand Down
9 changes: 7 additions & 2 deletions src/Processors/Formats/Impl/ORCBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ Chunk ORCBlockInputFormat::generate()
throw ParsingException(
ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of ORC data: {}", table_result.status().ToString());

/// We should extract the number of rows directly from the stripe, because in case when
/// record batch contains 0 columns (for example if we requested only columns that
/// are not presented in data) the number of rows in record batch will be 0.
size_t num_rows = file_reader->GetRawORCReader()->getStripe(stripe_current)->getNumberOfRows();

auto table = table_result.ValueOrDie();
if (!table || !table->num_rows())
if (!table || !num_rows)
return {};

++stripe_current;

Chunk res;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table);
arrow_column_to_ch_column->arrowTableToCHChunk(res, table, num_rows);
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
if (format_settings.defaults_for_omitted_fields)
Expand Down
52 changes: 29 additions & 23 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "ArrowColumnToCHColumn.h"
#include <DataTypes/NestedUtils.h>


namespace DB
{

Expand Down Expand Up @@ -43,34 +44,38 @@ Chunk ParquetBlockInputFormat::generate()
block_missing_values.clear();

if (!file_reader)
{
prepareReader();
file_reader->set_batch_size(format_settings.parquet.max_block_size);
std::vector<int> row_group_indices;
for (int i = 0; i < row_group_total; ++i)
{
if (!skip_row_groups.contains(i))
row_group_indices.emplace_back(i);
}
auto read_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &current_record_batch_reader);
if (!read_status.ok())
throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
}

if (is_stopped)
return {};

for (; row_group_current < row_group_total && skip_row_groups.contains(row_group_current); ++row_group_current)
;

if (row_group_current >= row_group_total)
return res;

std::shared_ptr<arrow::Table> table;

std::unique_ptr<::arrow::RecordBatchReader> rbr;
std::vector<int> row_group_indices { row_group_current };
arrow::Status get_batch_reader_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &rbr);

if (!get_batch_reader_status.ok())
throw ParsingException{"Error while reading Parquet data: " + get_batch_reader_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA};

arrow::Status read_status = rbr->ReadAll(&table);

if (!read_status.ok())
throw ParsingException{"Error while reading Parquet data: " + read_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA};

++row_group_current;

arrow_column_to_ch_column->arrowTableToCHChunk(res, table);
auto batch = current_record_batch_reader->Next();
if (!batch.ok())
{
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}",
batch.status().ToString());
}
if (*batch)
{
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
arrow_column_to_ch_column->arrowTableToCHChunk(res, *tmp_table, (*tmp_table)->num_rows());
}
else
{
return {};
}

/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
Expand All @@ -85,6 +90,7 @@ void ParquetBlockInputFormat::resetParser()
IInputFormat::resetParser();

file_reader.reset();
current_record_batch_reader.reset();
column_indices.clear();
row_group_current = 0;
block_missing_values.clear();
Expand Down
3 changes: 2 additions & 1 deletion src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace parquet::arrow { class FileReader; }

namespace arrow { class Buffer; }
namespace arrow { class Buffer; class RecordBatchReader;}

namespace DB
{
Expand Down Expand Up @@ -46,6 +46,7 @@ class ParquetBlockInputFormat : public IInputFormat
BlockMissingValues block_missing_values;
const FormatSettings format_settings;
const std::unordered_set<int> & skip_row_groups;
std::shared_ptr<arrow::RecordBatchReader> current_record_batch_reader;

std::atomic<int> is_stopped{0};
};
Expand Down
18 changes: 18 additions & 0 deletions src/Processors/tests/gtest_assert_arrow_log_does_not_abort.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#include <gtest/gtest.h>
#include <arrow/chunked_array.h>
#include <vector>
#include <arrow/util/logging.h>

using namespace DB;

TEST(ChunkedArray, ChunkedArrayWithZeroChunksShouldNotAbort)
{
std::vector<std::shared_ptr<::arrow::Array>> empty_chunks_vector;

EXPECT_ANY_THROW(::arrow::ChunkedArray{empty_chunks_vector});
}

TEST(ArrowLog, FatalLogShouldThrow)
{
EXPECT_ANY_THROW(::arrow::util::ArrowLog(__FILE__, __LINE__, ::arrow::util::ArrowLogLevel::ARROW_FATAL));
}
3 changes: 2 additions & 1 deletion src/Storages/MergeTree/KeyCondition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,8 @@ bool KeyCondition::transformConstantWithValidFunctions(

if (is_valid_chain)
{
auto const_type = cur_node->result_type;
out_type = removeLowCardinality(out_type);
auto const_type = removeLowCardinality(cur_node->result_type);
auto const_column = out_type->createColumnConst(1, out_value);
auto const_value = (*castColumnAccurateOrNull({const_column, out_type, ""}, const_type))[0];

Expand Down
12 changes: 12 additions & 0 deletions tests/ci/build_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,18 @@ def main():

print(f"::notice ::Log URL: {log_url}")

src_path = os.path.join(TEMP_PATH, "build_source.src.tar.gz")

if os.path.exists(src_path):
src_url = s3_helper.upload_build_file_to_s3(
src_path, s3_path_prefix + "/clickhouse-" + version.string + ".src.tar.gz"
)
logging.info("Source tar %s", src_url)
else:
logging.info("Source tar doesn't exist")

print(f"::notice ::Source tar URL: {src_url}")

create_json_artifact(
TEMP_PATH, build_name, log_url, build_urls, build_config, elapsed, success
)
Expand Down
3 changes: 3 additions & 0 deletions tests/ci/ci_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@
"required_build": "package_aarch64",
"test_grep_exclude_filter": "constant_column_search",
},
"Sign release (actions)": {
"required_build": "package_release"
}
},
} # type: dict

Expand Down
2 changes: 1 addition & 1 deletion tests/ci/clickhouse_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def mark_flaky_tests(clickhouse_helper, check_name, test_results):
AND pull_request_number = 0
"""

tests_data = clickhouse_helper.select_json_each_row("default", query)
tests_data = clickhouse_helper.select_json_each_row("gh-data", query)
master_failed_tests = {row["test_name"] for row in tests_data}
logging.info("Found flaky tests: %s", ", ".join(master_failed_tests))

Expand Down
Loading