Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized aggregation with grouping by one fixed-size column #7341

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b92e622
Vectorized hash grouping on one column
akuzm Oct 2, 2024
4ce0e99
Merge remote-tracking branch 'origin/main' into HEAD
akuzm Oct 2, 2024
74d4419
benchmark vectorized grouping (2024-10-02 no. 6)
akuzm Oct 2, 2024
baedf7f
fixes
akuzm Oct 2, 2024
35dbd36
benchmark vectorized grouping (2024-10-02 no. 7)
akuzm Oct 2, 2024
74fffd3
some ugly stuff
akuzm Oct 2, 2024
f8db454
benchmark vectorized grouping (2024-10-02 no. 9)
akuzm Oct 2, 2024
00a9d11
someething
akuzm Oct 4, 2024
339f91a
reduce indirections
akuzm Oct 4, 2024
f075589
skip null bitmap words
akuzm Oct 8, 2024
88f325d
cleanup
akuzm Oct 9, 2024
15ab443
crc32
akuzm Oct 9, 2024
ff16ec8
license
akuzm Oct 9, 2024
4291b17
benchmark vectorized hash grouping (2024-10-09 no. 10)
akuzm Oct 9, 2024
795ef6b
test deltadelta changes
akuzm Oct 11, 2024
1fabb22
some speedups and simplehash simplifications
akuzm Oct 11, 2024
717abc4
Revert "test deltadelta changes"
akuzm Oct 11, 2024
b03bd6b
test deltadelta changes
akuzm Oct 11, 2024
166d0e8
work with signed types
akuzm Oct 14, 2024
7f578b4
Revert "work with signed types"
akuzm Oct 14, 2024
e70cb0b
bulk stuff specialized to element type
akuzm Oct 14, 2024
0040844
roll back the delta delta stuff
akuzm Oct 14, 2024
694faf6
use simplehash
akuzm Oct 14, 2024
3d05674
cleanup
akuzm Oct 14, 2024
d90a90f
benchmark vectorized hash grouping (simple) (2024-10-14 no. 11)
akuzm Oct 14, 2024
4a93549
add more tests
akuzm Oct 15, 2024
3e06b92
remove modified simplehash
akuzm Oct 15, 2024
a7942ed
offsets
akuzm Oct 15, 2024
6fb517f
cleanup
akuzm Oct 15, 2024
ffb28cf
changelog
akuzm Oct 15, 2024
778ca97
cleanup
akuzm Oct 15, 2024
ef3847a
benchmark vectorized hash grouping (simple) (2024-10-15 no. 12)
akuzm Oct 15, 2024
1409c74
32-bit
akuzm Oct 15, 2024
514ae96
some renames
akuzm Oct 15, 2024
22d23b3
cleanup
akuzm Oct 15, 2024
cd7a1dc
spelling
akuzm Oct 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/linux-32bit-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ jobs:
CC: clang-14
CXX: clang++-14
DEBIAN_FRONTEND: noninteractive
IGNORES: "append-* transparent_decompression-* transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler*"
# vectorized_aggregation has different output on i386 because int8 is by
# reference and currently it cannot be used for vectorized hash grouping.
IGNORES: "append-* transparent_decompression-* transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler* vectorized_aggregation"
SKIPS: chunk_adaptive histogram_test-*
EXTENSIONS: "postgres_fdw test_decoding"
strategy:
Expand Down
1 change: 1 addition & 0 deletions .unreleased/vectorized-grouping-one-fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7341 Vectorized aggregation with grouping by one fixed-size by-value compressed column (such as arithmetic types).
12 changes: 6 additions & 6 deletions tsl/src/compression/algorithms/deltadelta_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
* Pad the number of elements to multiple of 64 bytes if needed, so that we
* can work in 64-byte blocks.
*/
#define INNER_LOOP_SIZE_LOG2 3
#define INNER_LOOP_SIZE (1 << INNER_LOOP_SIZE_LOG2)
const uint32 n_total = has_nulls ? nulls.num_elements : num_deltas;
const uint32 n_total_padded =
((n_total * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE);
const uint32 n_total_padded = pad_to_multiple(INNER_LOOP_SIZE, n_total);
const uint32 n_notnull = num_deltas;
const uint32 n_notnull_padded =
((n_notnull * sizeof(ELEMENT_TYPE) + 63) / 64) * 64 / sizeof(ELEMENT_TYPE);
const uint32 n_notnull_padded = pad_to_multiple(INNER_LOOP_SIZE, n_notnull);
Assert(n_total_padded >= n_total);
Assert(n_notnull_padded >= n_notnull);
Assert(n_total >= n_notnull);
Assert(n_total <= GLOBAL_MAX_ROWS_PER_COMPRESSION);

/*
* We need additional padding at the end of buffer, because the code that
* converts the elements to postres Datum always reads in 8 bytes.
* converts the elements to postgres Datum always reads in 8 bytes.
*/
const int buffer_bytes = n_total_padded * sizeof(ELEMENT_TYPE) + 8;
ELEMENT_TYPE *restrict decompressed_values = MemoryContextAlloc(dest_mctx, buffer_bytes);
Expand All @@ -75,7 +75,6 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
* Also tried zig-zag decoding in a separate loop, seems to be slightly
* slower, around the noise threshold.
*/
#define INNER_LOOP_SIZE 8
Assert(n_notnull_padded % INNER_LOOP_SIZE == 0);
for (uint32 outer = 0; outer < n_notnull_padded; outer += INNER_LOOP_SIZE)
{
Expand All @@ -86,6 +85,7 @@ FUNCTION_NAME(delta_delta_decompress_all, ELEMENT_TYPE)(Datum compressed, Memory
decompressed_values[outer + inner] = current_element;
}
}
#undef INNER_LOOP_SIZE_LOG2
#undef INNER_LOOP_SIZE

uint64 *restrict validity_bitmap = NULL;
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/compression/arrow_c_data_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ pad_to_multiple(uint64 pad_to, uint64 source_value)
}

static inline size_t
arrow_num_valid(uint64 *bitmap, size_t total_rows)
arrow_num_valid(const uint64 *bitmap, size_t total_rows)
{
if (bitmap == NULL)
{
Expand Down
1 change: 1 addition & 0 deletions tsl/src/nodes/vector_agg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ add_subdirectory(function)
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/exec.c
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_batch.c
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_hash.c
${CMAKE_CURRENT_SOURCE_DIR}/plan.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
34 changes: 28 additions & 6 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
Aggref *aggref = castNode(Aggref, tlentry->expr);
VectorAggFunctions *func = get_vector_aggregate(aggref->aggfnoid);
Assert(func != NULL);
def->func = func;
def->func = *func;

if (list_length(aggref->args) > 0)
{
Expand Down Expand Up @@ -122,11 +122,33 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)
}
}

List *grouping_column_offsets = linitial(cscan->custom_private);
vector_agg_state->grouping =
create_grouping_policy_batch(vector_agg_state->agg_defs,
vector_agg_state->output_grouping_columns,
/* partial_per_batch = */ grouping_column_offsets != NIL);
if (list_length(vector_agg_state->output_grouping_columns) == 1)
{
GroupingColumn *col =
(GroupingColumn *) linitial(vector_agg_state->output_grouping_columns);
DecompressContext *dcontext = &decompress_state->decompress_context;
CompressionColumnDescription *desc = &dcontext->compressed_chunk_columns[col->input_offset];
if (desc->type == COMPRESSED_COLUMN && desc->by_value && desc->value_bytes > 0 &&
(size_t) desc->value_bytes <= sizeof(Datum))
{
/*
* Hash grouping by a single fixed-size by-value compressed column.
*/
vector_agg_state->grouping =
create_grouping_policy_hash(vector_agg_state->agg_defs,
vector_agg_state->output_grouping_columns);
}
}

if (vector_agg_state->grouping == NULL)
{
/*
* Per-batch grouping.
*/
vector_agg_state->grouping =
create_grouping_policy_batch(vector_agg_state->agg_defs,
vector_agg_state->output_grouping_columns);
}
}

static void
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/vector_agg/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

typedef struct
{
VectorAggFunctions *func;
VectorAggFunctions func;
int input_offset;
int output_offset;
} VectorAggDef;
Expand Down
33 changes: 33 additions & 0 deletions tsl/src/nodes/vector_agg/function/agg_many_vector_helper.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

/*
* A generic implementation of adding the given batch to many aggregate function
* states with given offsets. Used for hash aggregation, and builds on the
* FUNCTION_NAME(one) function, which adds one passing non-null row to the given
* aggregate function state.
*/
static void
FUNCTION_NAME(many_vector)(void *restrict agg_states, uint32 *restrict offsets, int start_row,
int end_row, const ArrowArray *vector, MemoryContext agg_extra_mctx)
{
MemoryContext old = MemoryContextSwitchTo(agg_extra_mctx);
const CTYPE *values = vector->buffers[1];
const uint64 *valid = vector->buffers[0];
for (int row = start_row; row < end_row; row++)
{
FUNCTION_NAME(state) *state = (offsets[row] + (FUNCTION_NAME(state) *) agg_states);
const CTYPE value = values[row];
const bool row_passes = (offsets[row] != 0);
const bool value_notnull = arrow_row_is_valid(valid, row);

if (row_passes && value_notnull)
{
FUNCTION_NAME(one)(state, value);
}
}
MemoryContextSwitchTo(old);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,20 @@
* implementation otherwise.
*/
static void
FUNCTION_NAME(const)(void *agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx)
FUNCTION_NAME(scalar)(void *agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx)
{
const uint64 valid = constisnull ? 0 : 1;
const CTYPE value = valid ? DATUM_TO_CTYPE(constvalue) : 0;
if (constisnull)
{
return;
}

const CTYPE value = DATUM_TO_CTYPE(constvalue);

MemoryContext old = MemoryContextSwitchTo(agg_extra_mctx);
for (int i = 0; i < n; i++)
{
FUNCTION_NAME(vector_impl)(agg_state, 1, &value, &valid, NULL, agg_extra_mctx);
FUNCTION_NAME(one)(agg_state, value);
}
MemoryContextSwitchTo(old);
}
51 changes: 42 additions & 9 deletions tsl/src/nodes/vector_agg/function/float48_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ typedef struct
} FUNCTION_NAME(state);

static void
FUNCTION_NAME(init)(void *agg_state)
FUNCTION_NAME(init)(void *restrict agg_states, int n)
{
FUNCTION_NAME(state) *state = (FUNCTION_NAME(state) *) agg_state;
*state = (FUNCTION_NAME(state)){ 0 };
FUNCTION_NAME(state) *states = (FUNCTION_NAME(state) *) agg_states;
for (int i = 0; i < n; i++)
{
states[i] = (FUNCTION_NAME(state)){ 0 };
}
}

static void
Expand Down Expand Up @@ -290,14 +293,44 @@ FUNCTION_NAME(vector_impl)(void *agg_state, size_t n, const CTYPE *values, const
COMBINE(&state->N, &state->Sx, &state->Sxx, Narray[0], Sxarray[0], Sxxarray[0]);
}

#include "agg_const_helper.c"
static pg_attribute_always_inline void
FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
{
FUNCTION_NAME(state) *state = (FUNCTION_NAME(state) *) agg_state;
/*
* This code follows the Postgres float8_accum() transition function, see
* the comments there.
*/
const double newN = state->N + 1.0;
const double newSx = state->Sx + value;
#ifdef NEED_SXX
if (state->N > 0.0)
{
const double tmp = value * newN - newSx;
state->Sxx += tmp * tmp / (state->N * newN);
}
else
{
state->Sxx = 0 * value;
}
#endif

state->N = newN;
state->Sx = newSx;
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

VectorAggFunctions FUNCTION_NAME(argdef) = { .state_bytes = sizeof(FUNCTION_NAME(state)),
.agg_init = FUNCTION_NAME(init),
.agg_emit = FUNCTION_NAME(emit),
.agg_const = FUNCTION_NAME(const),
.agg_vector = FUNCTION_NAME(vector) };
VectorAggFunctions FUNCTION_NAME(argdef) = {
.state_bytes = sizeof(FUNCTION_NAME(state)),
.agg_init = FUNCTION_NAME(init),
.agg_emit = FUNCTION_NAME(emit),
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};
#undef UPDATE
#undef COMBINE

Expand Down
63 changes: 51 additions & 12 deletions tsl/src/nodes/vector_agg/function/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ typedef struct
} CountState;

static void
count_init(void *agg_state)
count_init(void *restrict agg_states, int n)
{
CountState *state = (CountState *) agg_state;
state->count = 0;
CountState *states = (CountState *) agg_states;
for (int i = 0; i < n; i++)
{
states[i].count = 0;
}
}

static void
Expand All @@ -42,26 +45,44 @@ count_emit(void *agg_state, Datum *out_result, bool *out_isnull)
}

static void
count_star_const(void *agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx)
count_star_scalar(void *agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx)
{
CountState *state = (CountState *) agg_state;
state->count += n;
}

static void
count_star_many_scalar(void *restrict agg_states, uint32 *restrict offsets, int start_row,
int end_row, Datum constvalue, bool constisnull,
MemoryContext agg_extra_mctx)
{
CountState *states = (CountState *) agg_states;
for (int row = start_row; row < end_row; row++)
{
if (offsets[row] == 0)
{
continue;
}

states[offsets[row]].count++;
}
}

VectorAggFunctions count_star_agg = {
.state_bytes = sizeof(CountState),
.agg_init = count_init,
.agg_const = count_star_const,
.agg_scalar = count_star_scalar,
.agg_emit = count_emit,
.agg_many_scalar = count_star_many_scalar,
};

/*
* Aggregate function count(x).
*/
static void
count_any_const(void *agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx)
count_any_scalar(void *agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx)
{
if (constisnull)
{
Expand All @@ -73,8 +94,8 @@ count_any_const(void *agg_state, Datum constvalue, bool constisnull, int n,
}

static void
count_any_vector(void *agg_state, const ArrowArray *vector, const uint64 *filter,
MemoryContext agg_extra_mctx)
count_any_many_vector(void *agg_state, const ArrowArray *vector, const uint64 *filter,
MemoryContext agg_extra_mctx)
{
CountState *state = (CountState *) agg_state;
const int n = vector->length;
Expand Down Expand Up @@ -110,12 +131,30 @@ count_any_vector(void *agg_state, const ArrowArray *vector, const uint64 *filter
}
}

static void
count_any_many(void *restrict agg_states, uint32 *restrict offsets, int start_row, int end_row,
const ArrowArray *vector, MemoryContext agg_extra_mctx)
{
const uint64 *valid = vector->buffers[0];
for (int row = start_row; row < end_row; row++)
{
CountState *state = (offsets[row] + (CountState *) agg_states);
const bool row_passes = (offsets[row] != 0);
const bool value_notnull = arrow_row_is_valid(valid, row);
if (row_passes && value_notnull)
{
state->count++;
}
}
}

VectorAggFunctions count_any_agg = {
.state_bytes = sizeof(CountState),
.agg_init = count_init,
.agg_emit = count_emit,
.agg_const = count_any_const,
.agg_vector = count_any_vector,
.agg_scalar = count_any_scalar,
.agg_vector = count_any_many_vector,
.agg_many_vector = count_any_many,
};

/*
Expand Down
Loading
Loading