Skip to content

Commit

Permalink
Parse the annotation string correctly (#6796)
Browse files Browse the repository at this point in the history
This pull request modifies our query annotation and parsing logic by
using a JSON-structured annotation string. Basically, it prepends a JSON
string in a multiline comment that contains `tenantId` and
`colocationId` to a query string to be able to track query statistics on
the worker nodes. It also parses the received annotation in the query
string and sets the relevant tenantId and colocationId on the worker
nodes.

---------

Co-authored-by: Jelte Fennema <github-tech@jeltef.nl>
  • Loading branch information
gokhangulbiz and JelteF authored Mar 29, 2023
1 parent 4d49149 commit c93b855
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 44 deletions.
158 changes: 118 additions & 40 deletions src/backend/distributed/utils/attribute.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,28 @@
#include "distributed/log_utils.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/jsonbutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/tuplestore.h"
#include "distributed/colocation_utils.h"
#include "executor/execdesc.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include <sys/time.h>
#include "utils/builtins.h"

#include "utils/json.h"
#include "distributed/utils/attribute.h"


#include <time.h>

static void AttributeMetricsIfApplicable(void);

ExecutorEnd_hook_type prev_ExecutorEnd = NULL;

#define ATTRIBUTE_PREFIX "/* attributeTo: "
#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */"
#define ATTRIBUTE_PREFIX "/*{\"tId\":"
#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/"
#define CITUS_STATS_TENANTS_COLUMNS 7
#define ONE_QUERY_SCORE 1000000000

Expand Down Expand Up @@ -61,6 +65,9 @@ static void MultiTenantMonitorSMInit(void);
static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime);
static int FindTenantStats(MultiTenantMonitor *monitor);
static size_t MultiTenantMonitorshmemSize(void);
static char * ExtractTopComment(const char *inputString);
static char * EscapeCommentChars(const char *str);
static char * UnescapeCommentChars(const char *str);

int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF;
int CitusStatsTenantsPeriod = (time_t) 60;
Expand Down Expand Up @@ -199,52 +206,27 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)

if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0)
{
/* TODO create a function to safely parse the tenant identifier from the query comment */
/* query is attributed to a tenant */
char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX);
char *tenantEnd = tenantId;
while (true && tenantEnd[0] != '\0')
char *annotation = ExtractTopComment(query_string);
if (annotation != NULL)
{
if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/')
Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation));

text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId");
if (tenantIdTextP != NULL)
{
break;
char *tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP));
strncpy_s(attributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
}

tenantEnd++;
}
tenantEnd--;

colocationGroupId = 0;
while (*tenantEnd != ',')
{
colocationGroupId *= 10;
colocationGroupId += *tenantEnd - '0';
tenantEnd--;
colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId",
INVALID_COLOCATION_ID);
}

int t = colocationGroupId;
colocationGroupId = 0;
while (t)
{
colocationGroupId *= 10;
colocationGroupId += t % 10;
t /= 10;
}

/* hack to get a clean copy of the tenant id string */
char tenantEndTmp = *tenantEnd;
*tenantEnd = '\0';
tenantId = pstrdup(tenantId);
*tenantEnd = tenantEndTmp;

strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId);
}
else
{
strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
}

/*DetachSegment(); */
}


Expand All @@ -258,8 +240,15 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId)
{
return queryString;
}

char *commentCharsEscaped = EscapeCommentChars(partitionColumn);
StringInfo escapedSourceName = makeStringInfo();

escape_json(escapedSourceName, commentCharsEscaped);

StringInfo newQuery = makeStringInfo();
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId);
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data,
colocationId);

appendStringInfoString(newQuery, queryString);

Expand Down Expand Up @@ -668,3 +657,92 @@ MultiTenantMonitorshmemSize(void)

return size;
}


/*
* ExtractTopComment extracts the top-level multi-line comment from a given input string.
*/
static char *
ExtractTopComment(const char *inputString)
{
int commentCharsLength = 2;
int inputStringLen = strlen(inputString);
if (inputStringLen < commentCharsLength)
{
return NULL;
}

const char *commentStartChars = "/*";
const char *commentEndChars = "*/";

/* If query doesn't start with a comment, return NULL */
if (strstr(inputString, commentStartChars) != inputString)
{
return NULL;
}

StringInfo commentData = makeStringInfo();

/* Skip the comment start characters */
const char *commentStart = inputString + commentCharsLength;

/* Find the first comment end character */
const char *commentEnd = strstr(commentStart, commentEndChars);
if (commentEnd == NULL)
{
return NULL;
}

/* Append the comment to the StringInfo buffer */
int commentLength = commentEnd - commentStart;
appendStringInfo(commentData, "%.*s", commentLength, commentStart);

/* Return the extracted comment */
return commentData->data;
}


/* EscapeCommentChars adds a backslash before each occurrence of '*' or '/' in the input string */
static char *
EscapeCommentChars(const char *str)
{
int originalStringLength = strlen(str);
StringInfo escapedString = makeStringInfo();

for (int originalStringIndex = 0; originalStringIndex < originalStringLength;
originalStringIndex++)
{
if (str[originalStringIndex] == '*' || str[originalStringIndex] == '/')
{
appendStringInfoChar(escapedString, '\\');
}

appendStringInfoChar(escapedString, str[originalStringIndex]);
}

return escapedString->data;
}


/* UnescapeCommentChars removes the backslash that precedes '*' or '/' in the input string. */
static char *
UnescapeCommentChars(const char *str)
{
int originalStringLength = strlen(str);
StringInfo unescapedString = makeStringInfo();

for (int originalStringindex = 0; originalStringindex < originalStringLength;
originalStringindex++)
{
if (str[originalStringindex] == '\\' &&
originalStringindex < originalStringLength - 1 &&
(str[originalStringindex + 1] == '*' ||
str[originalStringindex + 1] == '/'))
{
originalStringindex++;
}
appendStringInfoChar(unescapedString, str[originalStringindex]);
}

return unescapedString->data;
}
19 changes: 19 additions & 0 deletions src/backend/distributed/utils/jsonbutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,25 @@ ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue)
}


/*
* ExtractFieldInt32 gets value of fieldName from jsonbDoc, or returns
* defaultValue if it doesn't exist.
*/
int32
ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue)
{
Datum jsonbDatum = 0;
bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, false);
if (!found)
{
return defaultValue;
}

Datum int32Datum = DirectFunctionCall1(jsonb_int4, jsonbDatum);
return DatumGetInt32(int32Datum);
}


/*
* ExtractFieldTextP gets value of fieldName as text* from jsonbDoc, or
* returns NULL if it doesn't exist.
Expand Down
1 change: 1 addition & 0 deletions src/include/distributed/jsonbutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result);
text * ExtractFieldTextP(Datum jsonbDoc, const char *fieldName);
bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue);
int32 ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue);

#endif /* CITUS_JSONBUTILS_H */
4 changes: 2 additions & 2 deletions src/test/regress/bin/normalize.sed
Original file line number Diff line number Diff line change
Expand Up @@ -307,5 +307,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+

# shard_rebalancer output, flaky improvement number
s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g

s/\/\* attributeTo.*\*\///g
# normalize tenants statistics annotations
s/\/\*\{"tId":.*\*\///g
Loading

0 comments on commit c93b855

Please sign in to comment.