From c93b855d274a7b32f481b3efce9c36be380705be Mon Sep 17 00:00:00 2001 From: Gokhan Gulbiz Date: Wed, 29 Mar 2023 15:51:05 +0300 Subject: [PATCH] Parse the annotation string correctly (#6796) This pull request modifies our query annotation and parsing logic by using a JSON-structured annotation string. Basically, it prepends a JSON string in a multiline comment that contains `tenantId` and `colocationId` to a query string to be able to track query statistics on the worker nodes. It also parses the received annotation in the query string and sets the relevant tenantId and colocationId on the worker nodes. --------- Co-authored-by: Jelte Fennema --- src/backend/distributed/utils/attribute.c | 158 +++++++++++++----- src/backend/distributed/utils/jsonbutils.c | 19 +++ src/include/distributed/jsonbutils.h | 1 + src/test/regress/bin/normalize.sed | 4 +- .../regress/expected/citus_stats_tenants.out | 136 ++++++++++++++- src/test/regress/pg_regress_multi.pl | 2 +- src/test/regress/sql/citus_stats_tenants.sql | 29 ++++ 7 files changed, 305 insertions(+), 44 deletions(-) diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 5c482bcd982..7b697ae0648 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -15,24 +15,28 @@ #include "distributed/log_utils.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" +#include "distributed/jsonbutils.h" +#include "distributed/colocation_utils.h" #include "distributed/tuplestore.h" +#include "distributed/colocation_utils.h" #include "executor/execdesc.h" #include "storage/ipc.h" #include "storage/lwlock.h" #include "storage/shmem.h" #include #include "utils/builtins.h" - +#include "utils/json.h" #include "distributed/utils/attribute.h" + #include static void AttributeMetricsIfApplicable(void); ExecutorEnd_hook_type prev_ExecutorEnd = NULL; -#define ATTRIBUTE_PREFIX "/* attributeTo: " -#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */" +#define ATTRIBUTE_PREFIX "/*{\"tId\":" +#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/" #define CITUS_STATS_TENANTS_COLUMNS 7 #define ONE_QUERY_SCORE 1000000000 @@ -61,6 +65,9 @@ static void MultiTenantMonitorSMInit(void); static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime); static int FindTenantStats(MultiTenantMonitor *monitor); static size_t MultiTenantMonitorshmemSize(void); +static char * ExtractTopComment(const char *inputString); +static char * EscapeCommentChars(const char *str); +static char * UnescapeCommentChars(const char *str); int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; int CitusStatsTenantsPeriod = (time_t) 60; @@ -199,52 +206,27 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) { - /* TODO create a function to safely parse the tenant identifier from the query comment */ - /* query is attributed to a tenant */ - char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX); - char *tenantEnd = tenantId; - while (true && tenantEnd[0] != '\0') + char *annotation = ExtractTopComment(query_string); + if (annotation != NULL) { - if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/') + Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation)); + + text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId"); + if (tenantIdTextP != NULL) { - break; + char *tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP)); + strncpy_s(attributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId, + MAX_TENANT_ATTRIBUTE_LENGTH - 1); } - tenantEnd++; - } - tenantEnd--; - - colocationGroupId = 0; - while (*tenantEnd != ',') - { - colocationGroupId *= 10; - colocationGroupId += *tenantEnd - '0'; - tenantEnd--; + colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId", + INVALID_COLOCATION_ID); } - - int t = colocationGroupId; - colocationGroupId = 0; - while (t) - { - colocationGroupId *= 10; - colocationGroupId += t % 10; - t /= 10; - } - - /* hack to get a clean copy of the tenant id string */ - char tenantEndTmp = *tenantEnd; - *tenantEnd = '\0'; - tenantId = pstrdup(tenantId); - *tenantEnd = tenantEndTmp; - - strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId); } else { strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); } - - /*DetachSegment(); */ } @@ -258,8 +240,15 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId) { return queryString; } + + char *commentCharsEscaped = EscapeCommentChars(partitionColumn); + StringInfo escapedSourceName = makeStringInfo(); + + escape_json(escapedSourceName, commentCharsEscaped); + StringInfo newQuery = makeStringInfo(); - appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId); + appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data, + colocationId); appendStringInfoString(newQuery, queryString); @@ -668,3 +657,92 @@ MultiTenantMonitorshmemSize(void) return size; } + + +/* + * ExtractTopComment extracts the top-level multi-line comment from a given input string. + */ +static char * +ExtractTopComment(const char *inputString) +{ + int commentCharsLength = 2; + int inputStringLen = strlen(inputString); + if (inputStringLen < commentCharsLength) + { + return NULL; + } + + const char *commentStartChars = "/*"; + const char *commentEndChars = "*/"; + + /* If query doesn't start with a comment, return NULL */ + if (strstr(inputString, commentStartChars) != inputString) + { + return NULL; + } + + StringInfo commentData = makeStringInfo(); + + /* Skip the comment start characters */ + const char *commentStart = inputString + commentCharsLength; + + /* Find the first comment end character */ + const char *commentEnd = strstr(commentStart, commentEndChars); + if (commentEnd == NULL) + { + return NULL; + } + + /* Append the comment to the StringInfo buffer */ + int commentLength = commentEnd - commentStart; + appendStringInfo(commentData, "%.*s", commentLength, commentStart); + + /* Return the extracted comment */ + return commentData->data; +} + + +/* EscapeCommentChars adds a backslash before each occurrence of '*' or '/' in the input string */ +static char * +EscapeCommentChars(const char *str) +{ + int originalStringLength = strlen(str); + StringInfo escapedString = makeStringInfo(); + + for (int originalStringIndex = 0; originalStringIndex < originalStringLength; + originalStringIndex++) + { + if (str[originalStringIndex] == '*' || str[originalStringIndex] == '/') + { + appendStringInfoChar(escapedString, '\\'); + } + + appendStringInfoChar(escapedString, str[originalStringIndex]); + } + + return escapedString->data; +} + + +/* UnescapeCommentChars removes the backslash that precedes '*' or '/' in the input string. */ +static char * +UnescapeCommentChars(const char *str) +{ + int originalStringLength = strlen(str); + StringInfo unescapedString = makeStringInfo(); + + for (int originalStringindex = 0; originalStringindex < originalStringLength; + originalStringindex++) + { + if (str[originalStringindex] == '\\' && + originalStringindex < originalStringLength - 1 && + (str[originalStringindex + 1] == '*' || + str[originalStringindex + 1] == '/')) + { + originalStringindex++; + } + appendStringInfoChar(unescapedString, str[originalStringindex]); + } + + return unescapedString->data; +} diff --git a/src/backend/distributed/utils/jsonbutils.c b/src/backend/distributed/utils/jsonbutils.c index 22fa4f568a2..4855ee00465 100644 --- a/src/backend/distributed/utils/jsonbutils.c +++ b/src/backend/distributed/utils/jsonbutils.c @@ -83,6 +83,25 @@ ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue) } +/* + * ExtractFieldInt32 gets value of fieldName from jsonbDoc, or returns + * defaultValue if it doesn't exist. + */ +int32 +ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue) +{ + Datum jsonbDatum = 0; + bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, false); + if (!found) + { + return defaultValue; + } + + Datum int32Datum = DirectFunctionCall1(jsonb_int4, jsonbDatum); + return DatumGetInt32(int32Datum); +} + + /* * ExtractFieldTextP gets value of fieldName as text* from jsonbDoc, or * returns NULL if it doesn't exist. diff --git a/src/include/distributed/jsonbutils.h b/src/include/distributed/jsonbutils.h index 3e37fa38eb0..d44044fcbbb 100644 --- a/src/include/distributed/jsonbutils.h +++ b/src/include/distributed/jsonbutils.h @@ -16,5 +16,6 @@ bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result); text * ExtractFieldTextP(Datum jsonbDoc, const char *fieldName); bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue); +int32 ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue); #endif /* CITUS_JSONBUTILS_H */ diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 33a35f28624..65692e1c9e9 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -307,5 +307,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+ # shard_rebalancer output, flaky improvement number s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g - -s/\/\* attributeTo.*\*\///g +# normalize tenants statistics annotations +s/\/\*\{"tId":.*\*\///g diff --git a/src/test/regress/expected/citus_stats_tenants.out b/src/test/regress/expected/citus_stats_tenants.out index e4705eb241d..c0a8c896e8e 100644 --- a/src/test/regress/expected/citus_stats_tenants.out +++ b/src/test/regress/expected/citus_stats_tenants.out @@ -220,8 +220,10 @@ SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tena bcde | 3 | 3000000000 2 | 1 | 1000000000 3 | 1 | 1000000000 + 4 | 1 | 1000000000 + cdef | 1 | 1000000000 defg | 1 | 1000000000 -(5 rows) +(7 rows) -- test period passing SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); @@ -262,6 +264,14 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q 5 | 0 | 0 | 0 | 1 (2 rows) +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + 1 | 0 | 500000000 + 5 | 0 | 500000000 +(2 rows) + \c - - - :master_port SET search_path TO citus_stats_tenants; -- test logs @@ -304,5 +314,129 @@ CONTEXT: PL/pgSQL function citus_stats_tenants(boolean) line XX at RAISE t (1 row) +-- test special and multibyte characters in tenant attribute +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + result +--------------------------------------------------------------------- + + + +(3 rows) + +TRUNCATE TABLE dist_tbl_text; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /*bcde | 1 | 0 | 1 | 0 + /b*c/de | 1 | 0 | 1 | 0 + /b*cde | 1 | 0 | 1 | 0 + /b/**/cde | 1 | 0 | 1 | 0 + /b/*/cde | 1 | 0 | 1 | 0 + /bcde | 1 | 0 | 1 | 0 + äbc | 1 | 0 | 1 | 0 + b/*//cde | 1 | 0 | 1 | 0 + bcde* | 1 | 0 | 1 | 0 + bcde*/ | 1 | 0 | 1 | 0 +(10 rows) + +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /*bcde | 1 | 0 | 1 | 0 + /b*c/de | 1 | 0 | 1 | 0 + /b*cde | 1 | 0 | 1 | 0 + /b/**/cde | 1 | 0 | 1 | 0 + /b/*/cde | 1 | 0 | 1 | 0 + /bcde | 1 | 0 | 1 | 0 + äbc | 1 | 0 | 1 | 0 + b/*//cde | 1 | 0 | 1 | 0 + bcde* | 1 | 0 | 1 | 0 + bcde*/ | 1 | 0 | 1 | 0 +(10 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + result +--------------------------------------------------------------------- + + + +(3 rows) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + thisisaverylooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo | 1 | 0 | 1 | 0 +(1 row) + SET client_min_messages TO ERROR; DROP SCHEMA citus_stats_tenants CASCADE; diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index f4e85ab610d..edee0eef43d 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -487,7 +487,7 @@ sub generate_hba push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on"); push(@pgOptions, "citus.stat_statements_track = 'all'"); -push(@pgOptions, "citus.stats_tenants_limit = 2"); +push(@pgOptions, "citus.stats_tenants_limit = 10"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); diff --git a/src/test/regress/sql/citus_stats_tenants.sql b/src/test/regress/sql/citus_stats_tenants.sql index b895a842368..daafea712ec 100644 --- a/src/test/regress/sql/citus_stats_tenants.sql +++ b/src/test/regress/sql/citus_stats_tenants.sql @@ -93,6 +93,9 @@ SET citus.stats_tenants_period TO 2; SELECT sleep_until_next_period(); SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute; +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + \c - - - :master_port SET search_path TO citus_stats_tenants; @@ -108,5 +111,31 @@ SELECT count(*)>=0 FROM citus_stats_tenants; SET citus.multi_tenant_monitoring_log_level TO DEBUG; SELECT count(*)>=0 FROM citus_stats_tenants; +-- test special and multibyte characters in tenant attribute +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +TRUNCATE TABLE dist_tbl_text; + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname'; +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + SET client_min_messages TO ERROR; DROP SCHEMA citus_stats_tenants CASCADE;