Skip to content

Commit

Permalink
Add multi tenant statistics monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
halilozanakgul committed Apr 4, 2023
1 parent dcee370 commit 46e0089
Show file tree
Hide file tree
Showing 32 changed files with 1,414 additions and 55 deletions.
3 changes: 1 addition & 2 deletions src/backend/distributed/executor/citus_custom_scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ static DistributedPlan * CopyDistributedPlanWithoutCache(
DistributedPlan *originalDistributedPlan);
static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node);
static void SetJobColocationId(Job *job);
static void EnsureForceDelegationDistributionKey(Job *job);
static void EnsureAnchorShardsInJobExist(Job *job);
static bool AnchorShardsInTaskListExist(List *taskList);
Expand Down Expand Up @@ -892,7 +891,7 @@ IsCitusCustomScan(Plan *plan)
* colocation group, the Job's colocation ID is set to the group ID, else,
* it will be set to INVALID_COLOCATION_ID.
*/
static void
void
SetJobColocationId(Job *job)
{
uint32 jobColocationId = INVALID_COLOCATION_ID;
Expand Down
19 changes: 18 additions & 1 deletion src/backend/distributed/planner/deparse_shard_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/shard_utils.h"
#include "distributed/utils/attribute.h"
#include "distributed/version_compat.h"
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
Expand Down Expand Up @@ -141,6 +142,21 @@ RebuildQueryStrings(Job *workerJob)
? "(null)"
: TaskQueryString(task))));

Datum partitionColumnValue;
Oid partitionColumnType = 0;
char *partitionColumnString = NULL;
if (workerJob->partitionKeyValue != NULL)
{
partitionColumnValue = workerJob->partitionKeyValue->constvalue;
partitionColumnType = workerJob->partitionKeyValue->consttype;
partitionColumnString = DatumToString(partitionColumnValue,
partitionColumnType);
}

task->partitionColumn = partitionColumnString;
SetJobColocationId(workerJob);
task->colocationId = workerJob->colocationId;

UpdateTaskQueryString(query, task);

/*
Expand Down Expand Up @@ -387,7 +403,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
return;
}

SetTaskQueryString(task, DeparseTaskQuery(task, query));
SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query),
task->partitionColumn, task->colocationId));
}


Expand Down
3 changes: 3 additions & 0 deletions src/backend/distributed/planner/distributed_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "distributed/recursive_planning.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_utils.h"
#include "distributed/utils/attribute.h"
#include "distributed/version_compat.h"
#include "distributed/worker_shard_visibility.h"
#include "executor/executor.h"
Expand Down Expand Up @@ -156,6 +157,8 @@ distributed_planner(Query *parse,
bool fastPathRouterQuery = false;
Node *distributionKeyValue = NULL;

AttributeQueryIfAnnotated(query_string, parse->commandType);

List *rangeTableList = ExtractRangeTableEntryList(parse);

if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
Expand Down
40 changes: 36 additions & 4 deletions src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ static int CompareInsertValuesByShardId(const void *leftElement,
static List * SingleShardTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList,
uint64 shardId, bool parametersInQueryResolved,
bool isLocalTableModification);
bool isLocalTableModification, char *partitionColumn,
int colocationId);
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType
Expand Down Expand Up @@ -1939,11 +1940,25 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,

if (originalQuery->commandType == CMD_SELECT)
{
Datum partitionColumnValue;
Oid partitionColumnType = 0;
char *partitionColumnString = NULL;
if (job->partitionKeyValue != NULL)
{
partitionColumnValue = job->partitionKeyValue->constvalue;
partitionColumnType = job->partitionKeyValue->consttype;
partitionColumnString = DatumToString(partitionColumnValue,
partitionColumnType);
}

SetJobColocationId(job);

job->taskList = SingleShardTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved,
isLocalTableModification);
isLocalTableModification,
partitionColumnString, job->colocationId);

/*
* Queries to reference tables, or distributed tables with multiple replica's have
Expand All @@ -1967,11 +1982,25 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
}
else
{
Datum partitionColumnValue;
Oid partitionColumnType = 0;
char *partitionColumnString = NULL;
if (job->partitionKeyValue != NULL)
{
partitionColumnValue = job->partitionKeyValue->constvalue;
partitionColumnType = job->partitionKeyValue->consttype;
partitionColumnString = DatumToString(partitionColumnValue,
partitionColumnType);
}

SetJobColocationId(job);

job->taskList = SingleShardTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved,
isLocalTableModification);
isLocalTableModification,
partitionColumnString, job->colocationId);
}
}

Expand Down Expand Up @@ -2065,7 +2094,8 @@ static List *
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, uint64 shardId,
bool parametersInQueryResolved,
bool isLocalTableModification)
bool isLocalTableModification, char *partitionColumn,
int colocationId)
{
TaskType taskType = READ_TASK;
char replicationModel = 0;
Expand Down Expand Up @@ -2135,6 +2165,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
* that the query cannot be executed locally.
*/
task->taskPlacementList = placementList;
task->partitionColumn = partitionColumn;
task->colocationId = colocationId;
SetTaskQueryIfShouldLazyDeparse(task, query);
task->anchorShardId = shardId;
task->jobId = jobId;
Expand Down
35 changes: 35 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h"
#include "distributed/utils/attribute.h"
#include "distributed/utils/directory.h"
#include "distributed/worker_log_messages.h"
#include "distributed/worker_manager.h"
Expand Down Expand Up @@ -447,6 +448,8 @@ _PG_init(void)
ExecutorStart_hook = CitusExecutorStart;
ExecutorRun_hook = CitusExecutorRun;
ExplainOneQuery_hook = CitusExplainOneQuery;
prev_ExecutorEnd = ExecutorEnd_hook;
ExecutorEnd_hook = CitusAttributeToEnd;

/* register hook for error messages */
emit_log_hook = multi_log_hook;
Expand Down Expand Up @@ -491,6 +494,8 @@ _PG_init(void)
/* initialize shard split shared memory handle management */
InitializeShardSplitSMHandleManagement();

InitializeMultiTenantMonitorSMHandleManagement();

/* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade)
{
Expand Down Expand Up @@ -1979,6 +1984,16 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomEnumVariable(
"citus.multi_tenant_monitoring_log_level",
gettext_noop("Sets the level of multi tenant monitoring log messages"),
NULL,
&MultiTenantMonitoringLogLevel,
CITUS_LOG_LEVEL_OFF, log_level_options,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.next_cleanup_record_id",
gettext_noop("Set the next cleanup record ID to use in operation creation."),
Expand Down Expand Up @@ -2363,6 +2378,26 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.stats_tenants_limit",
gettext_noop("monitor limit"),
NULL,
&CitusStatsTenantsLimit,
10, 1, 100,
PGC_POSTMASTER,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.stats_tenants_period",
gettext_noop("monitor period"),
NULL,
&CitusStatsTenantsPeriod,
60, 1, 1000000000,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomBoolVariable(
"citus.subquery_pushdown",
gettext_noop("Usage of this GUC is highly discouraged, please read the long "
Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/sql/citus--11.2-1--11.3-1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_

#include "udfs/worker_drop_all_shell_tables/11.3-1.sql"
#include "udfs/citus_internal_mark_node_not_synced/11.3-1.sql"
#include "udfs/citus_stats_tenants/11.3-1.sql"
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING;

DROP PROCEDURE pg_catalog.worker_drop_all_shell_tables(bool);
DROP FUNCTION pg_catalog.citus_internal_mark_node_not_synced(int, int);

DROP VIEW pg_catalog.citus_stats_tenants;
DROP FUNCTION pg_catalog.citus_stats_tenants(boolean);
27 changes: 27 additions & 0 deletions src/backend/distributed/sql/udfs/citus_stats_tenants/11.3-1.sql

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions src/backend/distributed/sql/udfs/citus_stats_tenants/latest.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants(
return_all_tenants BOOLEAN DEFAULT FALSE,
OUT colocation_id INT,
OUT tenant_attribute TEXT,
OUT read_count_in_this_period INT,
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT score BIGINT)
RETURNS SETOF RECORD
LANGUAGE C
AS 'citus', $$citus_stats_tenants$$;


CREATE OR REPLACE VIEW citus.citus_stats_tenants AS
SELECT
colocation_id,
tenant_attribute,
read_count_in_this_period,
read_count_in_last_period,
query_count_in_this_period,
query_count_in_last_period
FROM pg_catalog.citus_stats_tenants()
ORDER BY score DESC;

ALTER VIEW citus.citus_stats_tenants SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stats_tenants TO PUBLIC;
Loading

0 comments on commit 46e0089

Please sign in to comment.