Skip to content

Commit

Permalink
Support MERGE Phase – I
Browse files Browse the repository at this point in the history
All the tables (target, source or any CTE present) in the SQL statement are local i.e. a merge-sql with a combination of Citus local and
Non-Citus tables (regular Postgres tables) should work and give the same result as Postgres MERGE on regular tables. Catch and throw an
exception (not-yet-supported) for all other scenarios during Citus-planning phase.
  • Loading branch information
tejeswarm committed Dec 19, 2022
1 parent 5268d0a commit 963c1dd
Show file tree
Hide file tree
Showing 17 changed files with 6,554 additions and 95 deletions.
238 changes: 202 additions & 36 deletions src/backend/distributed/deparser/ruleutils_15.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ static void get_insert_query_def(Query *query, deparse_context *context,
bool colNamesVisible);
static void get_update_query_def(Query *query, deparse_context *context,
bool colNamesVisible);
static void get_merge_query_def(Query *query, deparse_context *context);

static void get_update_query_targetlist_def(Query *query, List *targetList,
deparse_context *context,
RangeTblEntry *rte);
Expand Down Expand Up @@ -459,6 +461,7 @@ static char *generate_function_name(Oid funcid, int nargs,
List *argnames, Oid *argtypes,
bool has_variadic, bool *use_variadic_p,
ParseExprKind special_exprkind);
static List *get_insert_column_names_list(List *targetList, StringInfo buf, deparse_context *context, RangeTblEntry *rte);

#define only_marker(rte) ((rte)->inh ? "" : "ONLY ")

Expand Down Expand Up @@ -2095,6 +2098,10 @@ get_query_def_extended(Query *query, StringInfo buf, List *parentnamespace,
get_delete_query_def(query, &context, colNamesVisible);
break;

case CMD_MERGE:
get_merge_query_def(query, &context);
break;

case CMD_NOTHING:
appendStringInfoString(buf, "NOTHING");
break;
Expand Down Expand Up @@ -3225,9 +3232,8 @@ get_insert_query_def(Query *query, deparse_context *context,
RangeTblEntry *select_rte = NULL;
RangeTblEntry *values_rte = NULL;
RangeTblEntry *rte;
char *sep;
ListCell *l;
List *strippedexprs;
List *strippedexprs = NIL;

/* Insert the WITH clause if given */
get_with_clause(query, context);
Expand Down Expand Up @@ -3281,43 +3287,11 @@ get_insert_query_def(Query *query, deparse_context *context,
* Add the insert-column-names list. Any indirection decoration needed on
* the column names can be inferred from the top targetlist.
*/
strippedexprs = NIL;
sep = "";
if (query->targetList)
appendStringInfoChar(buf, '(');
foreach(l, query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);

if (tle->resjunk)
continue; /* ignore junk entries */

appendStringInfoString(buf, sep);
sep = ", ";

/*
* Put out name of target column; look in the catalogs, not at
* tle->resname, since resname will fail to track RENAME.
*/
appendStringInfoString(buf,
quote_identifier(get_attname(rte->relid,
tle->resno,
false)));

/*
* Print any indirection needed (subfields or subscripts), and strip
* off the top-level nodes representing the indirection assignments.
* Add the stripped expressions to strippedexprs. (If it's a
* single-VALUES statement, the stripped expressions are the VALUES to
* print below. Otherwise they're just Vars and not really
* interesting.)
*/
strippedexprs = lappend(strippedexprs,
processIndirection((Node *) tle->expr,
context));
strippedexprs = get_insert_column_names_list(query->targetList,
buf, context, rte);
}
if (query->targetList)
appendStringInfoString(buf, ") ");

if (query->override)
{
Expand Down Expand Up @@ -3741,6 +3715,148 @@ get_delete_query_def(Query *query, deparse_context *context,
}
}

/* ----------
* get_merge_query_def - Parse back a MERGE parsetree
* ----------
*/
static void
get_merge_query_def(Query *query, deparse_context *context)
{
StringInfo buf = context->buf;
RangeTblEntry *targetRte;

/* Insert the WITH clause if given */
get_with_clause(query, context);

/*
* Start the query with MERGE INTO <target>
*/
targetRte = rt_fetch(query->resultRelation, query->rtable);

if (PRETTY_INDENT(context))
{
appendStringInfoChar(buf, ' ');
context->indentLevel += PRETTYINDENT_STD;
}

/* if it's a shard, do differently */
if (GetRangeTblKind(targetRte) == CITUS_RTE_SHARD)
{
char *fragmentSchemaName = NULL;
char *fragmentTableName = NULL;

ExtractRangeTblExtraData(targetRte, NULL, &fragmentSchemaName, &fragmentTableName, NULL);

/* use schema and table name from the remote alias */
appendStringInfo(buf, "MERGE INTO %s%s",
only_marker(targetRte),
generate_fragment_name(fragmentSchemaName, fragmentTableName));

if(targetRte->eref != NULL)
appendStringInfo(buf, " %s",
quote_identifier(get_rtable_name(query->resultRelation, context)));
}
else
{
appendStringInfo(buf, "MERGE INTO %s%s",
only_marker(targetRte),
generate_relation_or_shard_name(targetRte->relid,
context->distrelid,
context->shardid, NIL));

if (targetRte->alias != NULL)
appendStringInfo(buf, " %s",
quote_identifier(get_rtable_name(query->resultRelation, context)));
}

/*
* Add the MERGE source relation -- USING <source>
*/
get_from_clause(query, " USING ", context);

/*
* Add the MERGE ON condition
*/
Assert(query->jointree->quals != NULL);
{
appendContextKeyword(context, " ON ",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
get_rule_expr(query->jointree->quals, context, false);
}

ListCell *actionCell = NULL;
foreach(actionCell, query->mergeActionList)
{
MergeAction *action = (MergeAction *) lfirst(actionCell);

/* Add WHEN [NOT] MATCHED */
appendContextKeyword(context, " WHEN",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
appendStringInfo(buf, " %s", action->matched ? "MATCHED" : "NOT MATCHED");

/* Add optional AND <condition> */
if (action->qual)
{
appendContextKeyword(context, " AND ",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);
get_rule_expr(action->qual, context, false);
}

appendContextKeyword(context, " THEN",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 1);

switch (action->commandType)
{
case CMD_INSERT:
{
appendStringInfo(buf, " INSERT " );
List *strippedexprs = NIL;

if (action->targetList)
{
strippedexprs = get_insert_column_names_list(action->targetList,
buf, context, targetRte);
}

if (strippedexprs)
{
/* Add the single-VALUES expression list */
appendContextKeyword(context, "VALUES (",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 2);
get_rule_list_toplevel(strippedexprs, context, false);
appendStringInfoChar(buf, ')');
}
else
{
/* No expressions, so it must be DEFAULT VALUES */
appendStringInfoString(buf, "DEFAULT VALUES");
}
}
break;

case CMD_UPDATE:
appendStringInfo(buf, " UPDATE SET " );
get_update_query_targetlist_def(query, action->targetList,
context, targetRte);
break;

case CMD_DELETE:
appendStringInfo(buf, " DELETE" );
break;

case CMD_NOTHING:
appendStringInfo(buf, " DO NOTHING " );
break;

default:
elog(ERROR, "unknown action in MERGE WHEN clause");
}
}

ereport(DEBUG1, (errmsg("<Deparsed MERGE query: %s>", buf->data)));
}


/* ----------
* get_utility_query_def - Parse back a UTILITY parsetree
* ----------
Expand Down Expand Up @@ -8761,4 +8877,54 @@ getOwnedSequences_internal(Oid relid, AttrNumber attnum, char deptype)
return result;
}

/*
* get_insert_column_names_list Prepares the insert-column-names list. Any indirection
* decoration needed on the column names can be inferred from the top targetlist.
*/
static List *
get_insert_column_names_list(List *targetList, StringInfo buf,
deparse_context *context, RangeTblEntry *rte)
{
char *sep;
ListCell *l;
List *strippedexprs;

strippedexprs = NIL;
sep = "";
appendStringInfoChar(buf, '(');
foreach(l, targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(l);

if (tle->resjunk)
continue; /* ignore junk entries */

appendStringInfoString(buf, sep);
sep = ", ";

/*
* Put out name of target column; look in the catalogs, not at
* tle->resname, since resname will fail to track RENAME.
*/
appendStringInfoString(buf,
quote_identifier(get_attname(rte->relid,
tle->resno,
false)));

/*
* Print any indirection needed (subfields or subscripts), and strip
* off the top-level nodes representing the indirection assignments.
* Add the stripped expressions to strippedexprs. (If it's a
* single-VALUES statement, the stripped expressions are the VALUES to
* print below. Otherwise they're just Vars and not really
* interesting.)
*/
strippedexprs = lappend(strippedexprs,
processIndirection((Node *) tle->expr,
context));
}
appendStringInfoString(buf, ") ");

return strippedexprs;
}
#endif /* (PG_VERSION_NUM >= PG_VERSION_15) && (PG_VERSION_NUM < PG_VERSION_16) */
2 changes: 1 addition & 1 deletion src/backend/distributed/planner/deparse_shard_query.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ RebuildQueryStrings(Job *workerJob)
query = copyObject(originalQuery);
}

if (UpdateOrDeleteQuery(query))
if (UpdateOrDeleteOrMergeQuery(query))
{
List *relationShardList = task->relationShardList;

Expand Down
Loading

0 comments on commit 963c1dd

Please sign in to comment.