Skip to content

Commit

Permalink
Mark objects as distributed even when pg_dist_node is empty (#6900)
Browse files Browse the repository at this point in the history
We mark objects as distributed objects in Citus metadata only if we need
to propagate given the command that creates it to worker nodes. For this
reason, we were not doing this for the objects that are created while
pg_dist_node is empty.

One implication of doing so is that we defer the schema propagation to
the time when user creates the first distributed table in the schema.
However, this doesn't help for schema-based sharding (#6866) because we
want to sync pg_dist_tenant_schema to the worker nodes even for empty
schemas too.

* Support test dependencies for isolation tests without a schedule

* Comment out a test due to a known issue (#6901)

* Also, reduce the verbosity for some log messages and make some
   tests compatible with run_test.py.
  • Loading branch information
onurctirtir authored May 16, 2023
1 parent e7abde7 commit 56d217b
Show file tree
Hide file tree
Showing 21 changed files with 278 additions and 240 deletions.
44 changes: 22 additions & 22 deletions src/backend/distributed/commands/utility_hook.c
Original file line number Diff line number Diff line change
Expand Up @@ -1591,37 +1591,37 @@ DDLTaskList(Oid relationId, const char *commandString)
List *
NodeDDLTaskList(TargetWorkerSet targets, List *commands)
{
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetObjectAddress = InvalidObjectAddress;
ddlJob->metadataSyncCommand = NULL;

/* don't allow concurrent node list changes that require an exclusive lock */
List *workerNodes = TargetWorkerSetNodeList(targets, RowShareLock);

if (list_length(workerNodes) <= 0)
/*
* if there are no nodes we don't have to plan any ddl tasks. Planning them would
* cause the executor to stop responding.
*/
if (list_length(workerNodes) > 0)
{
/*
* if there are no nodes we don't have to plan any ddl tasks. Planning them would
* cause the executor to stop responding.
*/
return NIL;
}
Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;
SetTaskQueryStringList(task, commands);

Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;
SetTaskQueryStringList(task, commands);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodes)
{
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
targetPlacement->nodeName = workerNode->workerName;
targetPlacement->nodePort = workerNode->workerPort;
targetPlacement->groupId = workerNode->groupId;

WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodes)
{
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
targetPlacement->nodeName = workerNode->workerName;
targetPlacement->nodePort = workerNode->workerPort;
targetPlacement->groupId = workerNode->groupId;
task->taskPlacementList = lappend(task->taskPlacementList, targetPlacement);
}

task->taskPlacementList = lappend(task->taskPlacementList, targetPlacement);
ddlJob->taskList = list_make1(task);
}

DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetObjectAddress = InvalidObjectAddress;
ddlJob->metadataSyncCommand = NULL;
ddlJob->taskList = list_make1(task);
return list_make1(ddlJob);
}

Expand Down
6 changes: 6 additions & 0 deletions src/backend/distributed/executor/adaptive_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,12 @@ ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList,
uint64
ExecuteTaskListExtended(ExecutionParams *executionParams)
{
/* if there are no tasks to execute, we can return early */
if (list_length(executionParams->taskList) == 0)
{
return 0;
}

ParamListInfo paramListInfo = NULL;
uint64 locallyProcessedRows = 0;

Expand Down
4 changes: 2 additions & 2 deletions src/backend/distributed/planner/function_call_delegation.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
if (!procedure->forceDelegation)
{
/* cannot delegate function calls in a multi-statement transaction */
ereport(DEBUG1, (errmsg("not pushing down function calls in "
ereport(DEBUG4, (errmsg("not pushing down function calls in "
"a multi-statement transaction")));
return NULL;
}
Expand Down Expand Up @@ -388,7 +388,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
Oid colocatedRelationId = ColocatedTableId(procedure->colocationId);
if (colocatedRelationId == InvalidOid)
{
ereport(DEBUG1, (errmsg("function does not have co-located tables")));
ereport(DEBUG4, (errmsg("function does not have co-located tables")));
return NULL;
}

Expand Down
7 changes: 5 additions & 2 deletions src/test/regress/citus_tests/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ def extra_tests(self):
"multi_mx_schema_support": TestDeps(None, ["multi_mx_copy_data"]),
"multi_simple_queries": TestDeps("base_schedule"),
"create_single_shard_table": TestDeps("minimal_schedule"),
"isolation_extension_commands": TestDeps(
None, ["isolation_setup", "isolation_add_remove_node"]
),
}


Expand Down Expand Up @@ -195,9 +198,9 @@ def run_schedule_with_multiregress(test_name, schedule, dependencies, args):
worker_count = needed_worker_count(test_name, dependencies)

# find suitable make recipe
if dependencies.schedule == "base_isolation_schedule":
if dependencies.schedule == "base_isolation_schedule" or "isolation" in test_name:
make_recipe = "check-isolation-custom-schedule"
elif dependencies.schedule == "failure_base_schedule":
elif dependencies.schedule == "failure_base_schedule" or "failure" in test_name:
make_recipe = "check-failure-custom-schedule"
else:
make_recipe = "check-custom-schedule"
Expand Down
7 changes: 0 additions & 7 deletions src/test/regress/expected/fast_path_router_modify.out
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ $$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
SELECT modify_fast_path_plpsql(1,1);
DEBUG: function does not have co-located tables
DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statement
Expand All @@ -244,7 +243,6 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statem
(1 row)

SELECT modify_fast_path_plpsql(2,2);
DEBUG: function does not have co-located tables
DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statement
Expand All @@ -257,7 +255,6 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statem
(1 row)

SELECT modify_fast_path_plpsql(3,3);
DEBUG: function does not have co-located tables
DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statement
Expand All @@ -270,7 +267,6 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statem
(1 row)

SELECT modify_fast_path_plpsql(4,4);
DEBUG: function does not have co-located tables
DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statement
Expand All @@ -283,7 +279,6 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statem
(1 row)

SELECT modify_fast_path_plpsql(5,5);
DEBUG: function does not have co-located tables
DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statement
Expand All @@ -296,7 +291,6 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statem
(1 row)

SELECT modify_fast_path_plpsql(6,6);
DEBUG: function does not have co-located tables
DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statement
Expand All @@ -309,7 +303,6 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line XX at SQL statem
(1 row)

SELECT modify_fast_path_plpsql(6,6);
DEBUG: function does not have co-located tables
modify_fast_path_plpsql
---------------------------------------------------------------------

Expand Down
17 changes: 0 additions & 17 deletions src/test/regress/expected/forcedelegation_functions.out
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ SELECT public.wait_until_metadata_sync(30000);

BEGIN;
SELECT func_calls_forcepush_func();
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT inner_force_delegation_function FROM inner_force_delegation_function(100)"
PL/pgSQL function func_calls_forcepush_func() line XX at SQL statement
Expand All @@ -340,7 +339,6 @@ PL/pgSQL function func_calls_forcepush_func() line XX at SQL statement

COMMIT;
SELECT func_calls_forcepush_func();
DEBUG: function does not have co-located tables
NOTICE: inner_force_delegation_function():101
DETAIL: from localhost:xxxxx
CONTEXT: SQL statement "SELECT inner_force_delegation_function FROM inner_force_delegation_function(100)"
Expand Down Expand Up @@ -380,7 +378,6 @@ $$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
SELECT func_calls_forcepush_func_infrom();
DEBUG: function does not have co-located tables
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT inner_force_delegation_function FROM inner_force_delegation_function(add_val + 100)"
PL/pgSQL function func_calls_forcepush_func_infrom() line XX at SQL statement
Expand All @@ -398,7 +395,6 @@ PL/pgSQL function func_calls_forcepush_func_infrom() line XX at SQL statement

BEGIN;
SELECT func_calls_forcepush_func_infrom();
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT inner_force_delegation_function FROM inner_force_delegation_function(add_val + 100)"
PL/pgSQL function func_calls_forcepush_func_infrom() line XX at SQL statement
Expand Down Expand Up @@ -435,7 +431,6 @@ $$ LANGUAGE plpgsql;
DEBUG: switching to sequential query execution mode
DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands
SELECT func_calls_forcepush_func_intarget();
DEBUG: function does not have co-located tables
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT inner_force_delegation_function(100 + 100) OFFSET 0"
PL/pgSQL function func_calls_forcepush_func_intarget() line XX at SQL statement
Expand All @@ -453,7 +448,6 @@ PL/pgSQL function func_calls_forcepush_func_intarget() line XX at SQL statement

BEGIN;
SELECT func_calls_forcepush_func_intarget();
DEBUG: not pushing down function calls in a multi-statement transaction
NOTICE: inner_force_delegation_function():201
DETAIL: from localhost:xxxxx
CONTEXT: SQL statement "SELECT inner_force_delegation_function(100 + 100) OFFSET 0"
Expand Down Expand Up @@ -648,7 +642,6 @@ DETAIL: A command for a distributed function is run. To make sure subsequent co
(1 row)

SELECT outer_emp();
DEBUG: function does not have co-located tables
DEBUG: Skipping pushdown of function from a PL/PgSQL simple expression
CONTEXT: SQL statement "SELECT inner_emp('hello')"
PL/pgSQL function outer_emp() line XX at PERFORM
Expand Down Expand Up @@ -1282,7 +1275,6 @@ DETAIL: A command for a distributed function is run. To make sure subsequent co
-- First 5 get delegated and succeeds
BEGIN;
SELECT outer_test_prepare(1,1);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT FROM test_prepare(x, y)"
PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
Expand All @@ -1301,7 +1293,6 @@ PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
(1 row)

SELECT outer_test_prepare(1,1);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT FROM test_prepare(x, y)"
PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
Expand All @@ -1320,7 +1311,6 @@ PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
(1 row)

SELECT outer_test_prepare(1,1);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT FROM test_prepare(x, y)"
PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
Expand All @@ -1339,7 +1329,6 @@ PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
(1 row)

SELECT outer_test_prepare(1,1);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT FROM test_prepare(x, y)"
PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
Expand All @@ -1358,7 +1347,6 @@ PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
(1 row)

SELECT outer_test_prepare(1,1);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT FROM test_prepare(x, y)"
PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
Expand All @@ -1385,7 +1373,6 @@ SELECT COUNT(*) FROM table_test_prepare;

-- 6th execution will be generic plan and should get delegated
SELECT outer_test_prepare(1,1);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT FROM test_prepare(x, y)"
PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
Expand All @@ -1404,7 +1391,6 @@ PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
(1 row)

SELECT outer_test_prepare(1,1);
DEBUG: not pushing down function calls in a multi-statement transaction
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT FROM test_prepare(x, y)"
PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
Expand All @@ -1425,7 +1411,6 @@ PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
END;
-- Fails as expected
SELECT outer_test_prepare(1,2);
DEBUG: function does not have co-located tables
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT FROM test_prepare(x, y)"
PL/pgSQL function outer_test_prepare(integer,integer) line XX at PERFORM
Expand Down Expand Up @@ -1525,7 +1510,6 @@ DETAIL: A command for a distributed function is run. To make sure subsequent co
(1 row)

SELECT outer_local_fn();
DEBUG: function does not have co-located tables
DEBUG: pushing down function call in a multi-statement transaction
CONTEXT: SQL statement "SELECT 1 FROM inner_fn(1)"
PL/pgSQL function outer_local_fn() line XX at PERFORM
Expand Down Expand Up @@ -1555,7 +1539,6 @@ SELECT * FROM testnested_table ORDER BY 1;

BEGIN;
SELECT outer_local_fn();
DEBUG: not pushing down function calls in a multi-statement transaction
outer_local_fn
---------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,6 @@ INSERT INTO upsert_test_1 (unique_col, other_col) SELECT unique_col, other_col F
DO UPDATE SET other_col = 5 WHERE upsert_test_1.other_col = random()::int;
ERROR: functions used in the WHERE clause of the ON CONFLICT clause of INSERTs on distributed tables must be marked IMMUTABLE
SELECT reload_tables();
DEBUG: function does not have co-located tables
reload_tables
---------------------------------------------------------------------

Expand Down
Loading

0 comments on commit 56d217b

Please sign in to comment.