Skip to content

Commit

Permalink
Fixed bad code; added generic implementation for "arrayReverse" funct…
Browse files Browse the repository at this point in the history
…ion; added test [#CLICKHOUSE-3]
  • Loading branch information
alexey-milovidov committed Nov 24, 2018
1 parent 7d6ffff commit 73e3a7b
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 181 deletions.
250 changes: 69 additions & 181 deletions dbms/src/Functions/arrayReverse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int LOGICAL_ERROR;
}


/// TODO Add generic implementation.

class FunctionArrayReverse : public IFunction
{
public:
Expand All @@ -42,38 +41,20 @@ class FunctionArrayReverse : public IFunction
return arguments[0];
}

void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override;
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t) override;

private:
bool executeConst(Block & block, const ColumnNumbers & arguments, size_t result,
size_t input_rows_count);

template <typename T>
bool executeNumber(
const IColumn & src_data, const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
const ColumnNullable * nullable_col,
ColumnNullable * nullable_res_col);

bool executeFixedString(
const IColumn & src_data, const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
const ColumnNullable * nullable_col,
ColumnNullable * nullable_res_col);

bool executeString(
const IColumn & src_data, const ColumnArray::Offsets & src_array_offsets,
IColumn & res_data_col,
const ColumnNullable * nullable_col,
ColumnNullable * nullable_res_col);
bool executeNumber(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data);

bool executeFixedString(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data);
bool executeString(const IColumn & src_data, const ColumnArray::Offsets & src_array_offsets, IColumn & res_data);
bool executeGeneric(const IColumn & src_data, const ColumnArray::Offsets & src_array_offsets, IColumn & res_data);
};


void FunctionArrayReverse::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
void FunctionArrayReverse::executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t)
{
if (executeConst(block, arguments, result, input_rows_count))
return;

const ColumnArray * array = checkAndGetColumn<ColumnArray>(block.getByPosition(arguments[0]).column.get());
if (!array)
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function " + getName(),
Expand All @@ -85,89 +66,83 @@ void FunctionArrayReverse::executeImpl(Block & block, const ColumnNumbers & argu

const IColumn & src_data = array->getData();
const ColumnArray::Offsets & offsets = array->getOffsets();
IColumn & res_data = res.getData();

const ColumnNullable * nullable_col = nullptr;
ColumnNullable * nullable_res_col = nullptr;

const IColumn * inner_col;
IColumn * inner_res_col;

if (src_data.isColumnNullable())
{
nullable_col = static_cast<const ColumnNullable *>(&src_data);
inner_col = &nullable_col->getNestedColumn();

nullable_res_col = static_cast<ColumnNullable *>(&res_data);
inner_res_col = &nullable_res_col->getNestedColumn();
}
else
{
inner_col = &src_data;
inner_res_col = &res_data;
}
IColumn & res_data = res.getData();

if (!(executeNumber<UInt8>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeNumber<UInt16>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeNumber<UInt32>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeNumber<UInt64>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeNumber<Int8>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeNumber<Int16>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeNumber<Int32>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeNumber<Int64>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeNumber<Float32>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeNumber<Float64>(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeString(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)
|| executeFixedString(*inner_col, offsets, *inner_res_col, nullable_col, nullable_res_col)))
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
+ " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
const ColumnNullable * src_nullable_col = typeid_cast<const ColumnNullable *>(&src_data);
ColumnNullable * res_nullable_col = typeid_cast<ColumnNullable *>(&res_data);

const IColumn * src_inner_col = src_nullable_col ? &src_nullable_col->getNestedColumn() : &src_data;
IColumn * res_inner_col = res_nullable_col ? &res_nullable_col->getNestedColumn() : &res_data;

false
|| executeNumber<UInt8>(*src_inner_col, offsets, *res_inner_col)
|| executeNumber<UInt16>(*src_inner_col, offsets, *res_inner_col)
|| executeNumber<UInt32>(*src_inner_col, offsets, *res_inner_col)
|| executeNumber<UInt64>(*src_inner_col, offsets, *res_inner_col)
|| executeNumber<Int8>(*src_inner_col, offsets, *res_inner_col)
|| executeNumber<Int16>(*src_inner_col, offsets, *res_inner_col)
|| executeNumber<Int32>(*src_inner_col, offsets, *res_inner_col)
|| executeNumber<Int64>(*src_inner_col, offsets, *res_inner_col)
|| executeNumber<Float32>(*src_inner_col, offsets, *res_inner_col)
|| executeNumber<Float64>(*src_inner_col, offsets, *res_inner_col)
|| executeString(*src_inner_col, offsets, *res_inner_col)
|| executeFixedString(*src_inner_col, offsets, *res_inner_col)
|| executeGeneric(*src_inner_col, offsets, *res_inner_col);

if (src_nullable_col)
if (!executeNumber<UInt8>(src_nullable_col->getNullMapColumn(), offsets, res_nullable_col->getNullMapColumn()))
throw Exception("Illegal column " + src_nullable_col->getNullMapColumn().getName()
+ " of null map of the first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);

block.getByPosition(result).column = std::move(res_ptr);
}

bool FunctionArrayReverse::executeConst(Block & block, const ColumnNumbers & arguments, size_t result,
size_t input_rows_count)
{
if (const ColumnConst * const_array = checkAndGetColumnConst<ColumnArray>(block.getByPosition(arguments[0]).column.get()))
{
Array arr = const_array->getValue<Array>();

size_t size = arr.size();
Array res(size);
bool FunctionArrayReverse::executeGeneric(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data)
{
size_t size = src_offsets.size();
res_data.reserve(size);

for (size_t i = 0; i < size; ++i)
res[i] = arr[size - i - 1];
ColumnArray::Offset src_prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
ssize_t src_index = src_offsets[i] - 1;

block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(input_rows_count, res);
while (src_index >= ssize_t(src_prev_offset))
{
res_data.insertFrom(src_data, src_index);
--src_index;
}

return true;
src_prev_offset = src_offsets[i];
}
else
return false;

return true;
}

template <typename T>
bool FunctionArrayReverse::executeNumber(
const IColumn & src_data, const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
const ColumnNullable * nullable_col,
ColumnNullable * nullable_res_col)
bool FunctionArrayReverse::executeNumber(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data)
{
auto do_reverse = [](const auto & src_data, const auto & src_offsets, auto & res_data)
if (const ColumnVector<T> * src_data_concrete = checkAndGetColumn<ColumnVector<T>>(&src_data))
{
const PaddedPODArray<T> & src_vec = src_data_concrete->getData();
PaddedPODArray<T> & res_vec = typeid_cast<ColumnVector<T> &>(res_data).getData();
res_vec.resize(src_data.size());

size_t size = src_offsets.size();
ColumnArray::Offset src_prev_offset = 0;

for (size_t i = 0; i < size; ++i)
{
const auto * src = &src_data[src_prev_offset];
const auto * src_end = &src_data[src_offsets[i]];
const auto * src = &src_vec[src_prev_offset];
const auto * src_end = &src_vec[src_offsets[i]];

if (src == src_end)
continue;

auto dst = &res_data[src_offsets[i] - 1];
auto dst = &res_vec[src_offsets[i] - 1];

while (src < src_end)
{
Expand All @@ -178,43 +153,22 @@ bool FunctionArrayReverse::executeNumber(

src_prev_offset = src_offsets[i];
}
};

if (const ColumnVector<T> * src_data_concrete = checkAndGetColumn<ColumnVector<T>>(&src_data))
{
const PaddedPODArray<T> & src_data = src_data_concrete->getData();
PaddedPODArray<T> & res_data = typeid_cast<ColumnVector<T> &>(res_data_col).getData();
res_data.resize(src_data.size());
do_reverse(src_data, src_offsets, res_data);

if ((nullable_col) && (nullable_res_col))
{
/// Make a reverted null map.
const auto & src_null_map = nullable_col->getNullMapData();
auto & res_null_map = nullable_res_col->getNullMapData();
res_null_map.resize(src_data.size());
do_reverse(src_null_map, src_offsets, res_null_map);
}

return true;
}
else
return false;
}

bool FunctionArrayReverse::executeFixedString(
const IColumn & src_data, const ColumnArray::Offsets & src_offsets,
IColumn & res_data_col,
const ColumnNullable * nullable_col,
ColumnNullable * nullable_res_col)
bool FunctionArrayReverse::executeFixedString(const IColumn & src_data, const ColumnArray::Offsets & src_offsets, IColumn & res_data)
{
if (const ColumnFixedString * src_data_concrete = checkAndGetColumn<ColumnFixedString>(&src_data))
{
const size_t n = src_data_concrete->getN();
const ColumnFixedString::Chars_t & src_data = src_data_concrete->getChars();
ColumnFixedString::Chars_t & res_data = typeid_cast<ColumnFixedString &>(res_data_col).getChars();
ColumnFixedString::Chars_t & res_chars = typeid_cast<ColumnFixedString &>(res_data).getChars();
size_t size = src_offsets.size();
res_data.resize(src_data.size());
res_chars.resize(src_data.size());

ColumnArray::Offset src_prev_offset = 0;

Expand All @@ -226,7 +180,7 @@ bool FunctionArrayReverse::executeFixedString(
if (src == src_end)
continue;

UInt8 * dst = &res_data[src_offsets[i] * n - n];
UInt8 * dst = &res_chars[src_offsets[i] * n - n];

while (src < src_end)
{
Expand All @@ -238,60 +192,25 @@ bool FunctionArrayReverse::executeFixedString(

src_prev_offset = src_offsets[i];
}

if ((nullable_col) && (nullable_res_col))
{
/// Make a reverted null map.
const auto & src_null_map = nullable_col->getNullMapData();
auto & res_null_map = nullable_res_col->getNullMapData();
res_null_map.resize(src_null_map.size());

ColumnArray::Offset src_prev_offset = 0;

for (size_t i = 0; i < size; ++i)
{
const UInt8 * src = &src_null_map[src_prev_offset];
const UInt8 * src_end = &src_null_map[src_offsets[i]];

if (src == src_end)
continue;

UInt8 * dst = &res_null_map[src_offsets[i] - 1];

while (src < src_end)
{
*dst = *src;
++src;
--dst;
}

src_prev_offset = src_offsets[i];
}
}

return true;
}
else
return false;
}

bool FunctionArrayReverse::executeString(
const IColumn & src_data, const ColumnArray::Offsets & src_array_offsets,
IColumn & res_data_col,
const ColumnNullable * nullable_col,
ColumnNullable * nullable_res_col)
bool FunctionArrayReverse::executeString(const IColumn & src_data, const ColumnArray::Offsets & src_array_offsets, IColumn & res_data)
{
if (const ColumnString * src_data_concrete = checkAndGetColumn<ColumnString>(&src_data))
{
const ColumnString::Offsets & src_string_offsets = src_data_concrete->getOffsets();
ColumnString::Offsets & res_string_offsets = typeid_cast<ColumnString &>(res_data_col).getOffsets();
ColumnString::Offsets & res_string_offsets = typeid_cast<ColumnString &>(res_data).getOffsets();

const ColumnString::Chars_t & src_data = src_data_concrete->getChars();
ColumnString::Chars_t & res_data = typeid_cast<ColumnString &>(res_data_col).getChars();
ColumnString::Chars_t & res_chars = typeid_cast<ColumnString &>(res_data).getChars();

size_t size = src_array_offsets.size();
res_string_offsets.resize(src_string_offsets.size());
res_data.resize(src_data.size());
res_chars.resize(src_data.size());

ColumnArray::Offset src_array_prev_offset = 0;
ColumnString::Offset res_string_prev_offset = 0;
Expand All @@ -309,7 +228,7 @@ bool FunctionArrayReverse::executeString(
auto src_pos = src_array_prev_offset + j_reversed == 0 ? 0 : src_string_offsets[src_array_prev_offset + j_reversed - 1];
size_t string_size = src_string_offsets[src_array_prev_offset + j_reversed] - src_pos;

memcpySmallAllowReadWriteOverflow15(&res_data[res_string_prev_offset], &src_data[src_pos], string_size);
memcpySmallAllowReadWriteOverflow15(&res_chars[res_string_prev_offset], &src_data[src_pos], string_size);

res_string_prev_offset += string_size;
res_string_offsets[src_array_prev_offset + j] = res_string_prev_offset;
Expand All @@ -319,37 +238,6 @@ bool FunctionArrayReverse::executeString(
src_array_prev_offset = src_array_offsets[i];
}

if (nullable_col && nullable_res_col)
{
/// Make a reverted null map.
const auto & src_null_map = nullable_col->getNullMapData();
auto & res_null_map = nullable_res_col->getNullMapData();
res_null_map.resize(src_string_offsets.size());

size_t size = src_string_offsets.size();
ColumnArray::Offset src_prev_offset = 0;

for (size_t i = 0; i < size; ++i)
{
const auto * src = &src_null_map[src_prev_offset];
const auto * src_end = &src_null_map[src_array_offsets[i]];

if (src == src_end)
continue;

auto dst = &res_null_map[src_array_offsets[i] - 1];

while (src < src_end)
{
*dst = *src;
++src;
--dst;
}

src_prev_offset = src_array_offsets[i];
}
}

return true;
}
else
Expand Down
12 changes: 12 additions & 0 deletions dbms/tests/queries/0_stateless/00758_array_reverse.reference
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
['\0',NULL]
[NULL,123,NULL]
[NULL,'Hello']
['world','Hello']
['world',NULL,'Hello']
[NULL,NULL,NULL]
[[' '],[''],[]]
[[NULL],[''],[]]
[(nan,'World',[NULL]),(1,'Hello',[])]
\N
[]
[[[[]]]]
12 changes: 12 additions & 0 deletions dbms/tests/queries/0_stateless/00758_array_reverse.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
SELECT reverse([NULL, '\0']);
SELECT reverse([NULL, 123, NULL]);
SELECT reverse([toFixedString('Hello', 5), NULL]);
SELECT reverse(['Hello', 'world']);
SELECT reverse(['Hello', NULL, 'world']);
SELECT reverse([NULL, NULL, NULL]);
SELECT reverse([[], [''], [' ']]);
SELECT reverse([[], [''], [NULL]]);
SELECT reverse([(1, 'Hello', []), (nan, 'World', [NULL])]);
SELECT reverse(NULL);
SELECT reverse([]);
SELECT reverse([[[[]]]]);

0 comments on commit 73e3a7b

Please sign in to comment.