Skip to content

Commit

Permalink
Address reviews on the PR
Browse files Browse the repository at this point in the history
  • Loading branch information
halilozanakgul committed Apr 3, 2023
1 parent 5dfdcc8 commit 3e2cf73
Show file tree
Hide file tree
Showing 17 changed files with 214 additions and 189 deletions.
10 changes: 9 additions & 1 deletion src/backend/distributed/executor/local_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,15 @@ LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString,
LocalExecutorShardId = task->anchorShardId;
}

AttributeTask(task->partitionColumn, task->colocationId, taskPlan->commandType);

char *partitionKeyValueString = NULL;
if (task->partitionKeyValue != NULL)
{
partitionKeyValueString = DatumToString(task->partitionKeyValue->constvalue,
task->partitionKeyValue->consttype);
}

AttributeTask(partitionKeyValueString, task->colocationId, taskPlan->commandType);

PG_TRY();
{
Expand Down
15 changes: 2 additions & 13 deletions src/backend/distributed/planner/deparse_shard_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,7 @@ RebuildQueryStrings(Job *workerJob)
? "(null)"
: TaskQueryString(task))));

Datum partitionColumnValue;
Oid partitionColumnType = 0;
char *partitionColumnString = NULL;
if (workerJob->partitionKeyValue != NULL)
{
partitionColumnValue = workerJob->partitionKeyValue->constvalue;
partitionColumnType = workerJob->partitionKeyValue->consttype;
partitionColumnString = DatumToString(partitionColumnValue,
partitionColumnType);
}

task->partitionColumn = partitionColumnString;
task->partitionKeyValue = workerJob->partitionKeyValue;
SetJobColocationId(workerJob);
task->colocationId = workerJob->colocationId;

Expand Down Expand Up @@ -404,7 +393,7 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
}

SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query),
task->partitionColumn, task->colocationId));
task->partitionKeyValue, task->colocationId));
}


Expand Down
7 changes: 5 additions & 2 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ distributed_planner(Query *parse,
bool fastPathRouterQuery = false;
Node *distributionKeyValue = NULL;

AttributeQueryIfAnnotated(query_string, parse->commandType);

List *rangeTableList = ExtractRangeTableEntryList(parse);

if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
Expand Down Expand Up @@ -309,6 +307,11 @@ distributed_planner(Query *parse,
errhint("Consider using PL/pgSQL functions instead.")));
}

/*
* We annotate the query for tenant statisisics.
*/
AttributeQueryIfAnnotated(query_string, parse->commandType);

return result;
}

Expand Down
32 changes: 5 additions & 27 deletions src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ static int CompareInsertValuesByShardId(const void *leftElement,
static List * SingleShardTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList,
uint64 shardId, bool parametersInQueryResolved,
bool isLocalTableModification, char *partitionColumn,
bool isLocalTableModification, Const *partitionKeyValue,
int colocationId);
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
Expand Down Expand Up @@ -1952,25 +1952,14 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,

if (originalQuery->commandType == CMD_SELECT)
{
Datum partitionColumnValue;
Oid partitionColumnType = 0;
char *partitionColumnString = NULL;
if (job->partitionKeyValue != NULL)
{
partitionColumnValue = job->partitionKeyValue->constvalue;
partitionColumnType = job->partitionKeyValue->consttype;
partitionColumnString = DatumToString(partitionColumnValue,
partitionColumnType);
}

SetJobColocationId(job);

job->taskList = SingleShardTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved,
isLocalTableModification,
partitionColumnString, job->colocationId);
job->partitionKeyValue, job->colocationId);

/*
* Queries to reference tables, or distributed tables with multiple replica's have
Expand All @@ -1994,25 +1983,14 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
}
else
{
Datum partitionColumnValue;
Oid partitionColumnType = 0;
char *partitionColumnString = NULL;
if (job->partitionKeyValue != NULL)
{
partitionColumnValue = job->partitionKeyValue->constvalue;
partitionColumnType = job->partitionKeyValue->consttype;
partitionColumnString = DatumToString(partitionColumnValue,
partitionColumnType);
}

SetJobColocationId(job);

job->taskList = SingleShardTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved,
isLocalTableModification,
partitionColumnString, job->colocationId);
job->partitionKeyValue, job->colocationId);
}
}

Expand Down Expand Up @@ -2106,7 +2084,7 @@ static List *
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, uint64 shardId,
bool parametersInQueryResolved,
bool isLocalTableModification, char *partitionColumn,
bool isLocalTableModification, Const *partitionKeyValue,
int colocationId)
{
TaskType taskType = READ_TASK;
Expand Down Expand Up @@ -2177,7 +2155,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
* that the query cannot be executed locally.
*/
task->taskPlacementList = placementList;
task->partitionColumn = partitionColumn;
task->partitionKeyValue = partitionKeyValue;
task->colocationId = colocationId;
SetTaskQueryIfShouldLazyDeparse(task, query);
task->anchorShardId = shardId;
Expand Down
11 changes: 7 additions & 4 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -2306,8 +2306,10 @@ RegisterCitusConfigVariables(void)

DefineCustomEnumVariable(
"citus.stat_tenants_track",
gettext_noop("enable disable"),
NULL,
gettext_noop("Enables/Disables the stats collection for citus_stat_tenants."),
gettext_noop("Enables the stats collection when set to 'all'. "
"Disables when set to 'none'. Disabling can be useful for "
"avoiding extra CPU cycles needed for the calculations."),
&StatTenantsTrack,
STAT_TENANTS_TRACK_ALL,
stat_tenants_track_options,
Expand All @@ -2317,7 +2319,7 @@ RegisterCitusConfigVariables(void)

DefineCustomIntVariable(
"citus.stats_tenants_limit",
gettext_noop("monitor limit"),
gettext_noop("Number of tenants to be shown in citus_stat_tenants."),
NULL,
&CitusStatsTenantsLimit,
10, 1, 100,
Expand All @@ -2327,7 +2329,8 @@ RegisterCitusConfigVariables(void)

DefineCustomIntVariable(
"citus.stats_tenants_period",
gettext_noop("monitor period"),
gettext_noop("Period in seconds to be used for calculating the tenant "
"statistics in citus_stat_tenants."),
NULL,
&CitusStatsTenantsPeriod,
60, 1, 1000000000,
Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/sql/citus--11.2-1--11.3-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@

#include "udfs/citus_stats_tenants_local/11.3-1.sql"
#include "udfs/citus_stats_tenants/11.3-1.sql"

#include "udfs/citus_stats_tenants_local_reset/11.3-1.sql"
#include "udfs/citus_stats_tenants_reset/11.3-1.sql"
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ DROP FUNCTION pg_catalog.citus_stats_tenants_local(boolean);

DROP VIEW pg_catalog.citus_stats_tenants;
DROP FUNCTION pg_catalog.citus_stats_tenants(boolean);

DROP FUNCTION pg_catalog.citus_stats_tenants_local_reset();
DROP FUNCTION pg_catalog.citus_stats_tenants_reset();

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_local_reset()
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_stats_tenants_local_reset$$;

COMMENT ON FUNCTION pg_catalog.citus_stats_tenants_local_reset()
IS 'resets the local tenant statistics';

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_reset()
RETURNS VOID
LANGUAGE plpgsql
AS $function$
BEGIN
PERFORM run_command_on_all_nodes($$SELECT citus_stats_tenants_local_reset()$$);
END;
$function$;
38 changes: 38 additions & 0 deletions src/backend/distributed/test/citus_stat_tenants.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*-------------------------------------------------------------------------
*
* citus_stat_tenants.c
*
* This file contains functions to test citus_stats_tenants.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/

#include "postgres.h"
#include "fmgr.h"

#include "distributed/utils/attribute.h"
#include "sys/time.h"

PG_FUNCTION_INFO_V1(sleep_until_next_period);

/*
* sleep_until_next_period sleeps until the next monitoring period starts.
*/
Datum
sleep_until_next_period(PG_FUNCTION_ARGS)
{
struct timeval currentTime;
gettimeofday(&currentTime, NULL);

long int nextPeriodStart = currentTime.tv_sec -
(currentTime.tv_sec % CitusStatsTenantsPeriod) +
CitusStatsTenantsPeriod;

long int sleepTime = (nextPeriodStart - currentTime.tv_sec) * 1000000 -
currentTime.tv_usec + 100000;
pg_usleep(sleepTime);

PG_RETURN_VOID();
}
Loading

0 comments on commit 3e2cf73

Please sign in to comment.