Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport identity column improvements to v11.2 #6811

Merged
merged 10 commits into from
Apr 6, 2023
5 changes: 2 additions & 3 deletions .github/workflows/packaging-test-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ jobs:

apt-get update -y
## Install required packages to execute packaging tools for deb based distros
apt install python3-dev python3-pip -y
sudo apt-get purge -y python3-yaml
python3 -m pip install --upgrade pip setuptools==57.5.0
apt-get install python3-dev python3-pip -y
apt-get purge -y python3-yaml
./.github/packaging/validate_build_output.sh "deb"
4 changes: 2 additions & 2 deletions configure
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='11.2.0'
PACKAGE_STRING='Citus 11.2.0'
PACKAGE_VERSION='11.2.1'
PACKAGE_STRING='Citus 11.2.1'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''

Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.

AC_INIT([Citus], [11.2.0])
AC_INIT([Citus], [11.2.1])
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])

# we'll need sed and awk for some of the version commands
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/citus.control
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '11.2-1'
default_version = '11.2-2'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog
153 changes: 7 additions & 146 deletions src/backend/distributed/commands/alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,13 @@ CopyTableConversionReturnIntoCurrentContext(TableConversionReturn *tableConversi
static TableConversionReturn *
ConvertTable(TableConversionState *con)
{
/*
* We do not allow alter_distributed_table and undistribute_table operations
* for tables with identity columns. This is because we do not have a proper way
* of keeping sequence states consistent across the cluster.
*/
ErrorIfTableHasIdentityColumn(con->relationId);

/*
* when there are many partitions or colocated tables, memory usage is
* accumulated. Free context for each call to ConvertTable.
Expand Down Expand Up @@ -1588,96 +1595,6 @@ CreateMaterializedViewDDLCommand(Oid matViewOid)
}


/*
* This function marks all the identity sequences as distributed on the given table.
*/
static void
MarkIdentitiesAsDistributed(Oid targetRelationId)
{
Relation relation = relation_open(targetRelationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);

bool missingSequenceOk = false;

for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex);

if (attributeForm->attidentity)
{
Oid seqOid = getIdentitySequence(targetRelationId, attributeForm->attnum,
missingSequenceOk);

ObjectAddress seqAddress = { 0 };
ObjectAddressSet(seqAddress, RelationRelationId, seqOid);
MarkObjectDistributed(&seqAddress);
}
}
}


/*
* This function returns sql statements to rename identites on the given table
*/
static void
PrepareRenameIdentitiesCommands(Oid sourceRelationId, Oid targetRelationId,
List **outCoordinatorCommands, List **outWorkerCommands)
{
Relation targetRelation = relation_open(targetRelationId, AccessShareLock);
TupleDesc targetTupleDescriptor = RelationGetDescr(targetRelation);
relation_close(targetRelation, NoLock);

bool missingSequenceOk = false;

for (int attributeIndex = 0; attributeIndex < targetTupleDescriptor->natts;
attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(targetTupleDescriptor,
attributeIndex);

if (attributeForm->attidentity)
{
char *columnName = NameStr(attributeForm->attname);

Oid targetSequenceOid = getIdentitySequence(targetRelationId,
attributeForm->attnum,
missingSequenceOk);
char *targetSequenceName = generate_relation_name(targetSequenceOid, NIL);

Oid sourceSequenceOid = getIdentitySequence(sourceRelationId,
attributeForm->attnum,
missingSequenceOk);
char *sourceSequenceName = generate_relation_name(sourceSequenceOid, NIL);

/* to rename sequence on the coordinator */
*outCoordinatorCommands = lappend(*outCoordinatorCommands, psprintf(
"SET citus.enable_ddl_propagation TO OFF; ALTER SEQUENCE %s RENAME TO %s; RESET citus.enable_ddl_propagation;",
quote_identifier(
targetSequenceName),
quote_identifier(
sourceSequenceName)));

/* update workers to use existing sequence and drop the new one generated by PG */
bool missingTableOk = true;
*outWorkerCommands = lappend(*outWorkerCommands,
GetAlterColumnWithNextvalDefaultCmd(
sourceSequenceOid, sourceRelationId,
columnName,
missingTableOk));


/* drop the sequence generated by identity column */
*outWorkerCommands = lappend(*outWorkerCommands, psprintf(
"DROP SEQUENCE IF EXISTS %s",
quote_identifier(
targetSequenceName)));
}
}
}


/*
* ReplaceTable replaces the source table with the target table.
* It moves all the rows of the source table to target table with INSERT SELECT.
Expand Down Expand Up @@ -1736,24 +1653,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
ExecuteQueryViaSPI(query->data, SPI_OK_INSERT);
}

/*
* Drop identity dependencies (sequences marked as DEPENDENCY_INTERNAL) on the workers
* to keep their states after the source table is dropped.
*/
List *ownedIdentitySequences = getOwnedSequences_internal(sourceId, 0,
DEPENDENCY_INTERNAL);
if (ownedIdentitySequences != NIL && ShouldSyncTableMetadata(sourceId))
{
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
StringInfo command = makeStringInfo();

appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
quote_literal_cstr(qualifiedTableName));

SendCommandToWorkersWithMetadata(command->data);
}

/*
* Modify regular sequence dependencies (sequences marked as DEPENDENCY_AUTO)
*/
Expand Down Expand Up @@ -1813,23 +1712,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
quote_qualified_identifier(schemaName, sourceName))));
}

/*
* We need to prepare rename identities commands before dropping the original table,
* otherwise we can't find the original names of the identity sequences.
* We prepare separate commands for the coordinator and the workers because:
* In the coordinator, we simply need to rename the identity sequences
* to their names on the old table, because right now the identity
* sequences have default names generated by Postgres with the creation of the new table
* In the workers, we have not dropped the original identity sequences,
* so what we do is we alter the columns and set their default to the
* original identity sequences, and after that we drop the new sequences.
*/
List *coordinatorCommandsToRenameIdentites = NIL;
List *workerCommandsToRenameIdentites = NIL;
PrepareRenameIdentitiesCommands(sourceId, targetId,
&coordinatorCommandsToRenameIdentites,
&workerCommandsToRenameIdentites);

resetStringInfo(query);
appendStringInfo(query, "DROP %sTABLE %s CASCADE",
IsForeignTable(sourceId) ? "FOREIGN " : "",
Expand All @@ -1847,27 +1729,6 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
quote_qualified_identifier(schemaName, targetName),
quote_identifier(sourceName));
ExecuteQueryViaSPI(query->data, SPI_OK_UTILITY);

char *coordinatorCommand = NULL;
foreach_ptr(coordinatorCommand, coordinatorCommandsToRenameIdentites)
{
ExecuteQueryViaSPI(coordinatorCommand, SPI_OK_UTILITY);
}

char *workerCommand = NULL;
foreach_ptr(workerCommand, workerCommandsToRenameIdentites)
{
SendCommandToWorkersWithMetadata(workerCommand);
}

/*
* To preserve identity sequences states in case of redistributing the table again,
* we don't drop them when we undistribute a table. To maintain consistency and
* avoid future problems if we redistribute the table, we want to apply all changes happening to
* the identity sequence in the coordinator to their corresponding sequences in the workers as well.
* That's why we have to mark identity sequences as distributed
*/
MarkIdentitiesAsDistributed(targetId);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ DropIdentitiesOnTable(Oid relationId)
{
Relation relation = relation_open(relationId, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(relation);
relation_close(relation, NoLock);
List *dropCommandList = NIL;

for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts;
attributeIndex++)
Expand All @@ -1151,15 +1151,23 @@ DropIdentitiesOnTable(Oid relationId)
qualifiedTableName,
columnName);

/*
* We need to disable/enable ddl propagation for this command, to prevent
* sending unnecessary ALTER COLUMN commands for partitions, to MX workers.
*/
ExecuteAndLogUtilityCommandList(list_make3(DISABLE_DDL_PROPAGATION,
dropCommand->data,
ENABLE_DDL_PROPAGATION));
dropCommandList = lappend(dropCommandList, dropCommand->data);
}
}

relation_close(relation, NoLock);

char *dropCommand = NULL;
foreach_ptr(dropCommand, dropCommandList)
{
/*
* We need to disable/enable ddl propagation for this command, to prevent
* sending unnecessary ALTER COLUMN commands for partitions, to MX workers.
*/
ExecuteAndLogUtilityCommandList(list_make3(DISABLE_DDL_PROPAGATION,
dropCommand,
ENABLE_DDL_PROPAGATION));
}
}


Expand Down
6 changes: 4 additions & 2 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId
foreach_oid(citusTableId, citusTableIdList)
{
List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0);
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, DEPENDENCY_AUTO);

SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList)
Expand Down Expand Up @@ -1267,7 +1267,7 @@ EnsureRelationHasCompatibleSequenceTypes(Oid relationId)
{
List *seqInfoList = NIL;

GetDependentSequencesWithRelation(relationId, &seqInfoList, 0);
GetDependentSequencesWithRelation(relationId, &seqInfoList, 0, DEPENDENCY_AUTO);
EnsureDistributedSequencesHaveOneType(relationId, seqInfoList);
}

Expand Down Expand Up @@ -1608,6 +1608,8 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
{
Oid parentRelationId = InvalidOid;

ErrorIfTableHasUnsupportedIdentityColumn(relationId);

EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod);

/* user really wants triggers? */
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/dependencies.c
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
bool creatingShellTableOnRemoteNode = true;
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
WORKER_NEXTVAL_SEQUENCE_DEFAULTS,
INCLUDE_IDENTITY_AS_SEQUENCE_DEFAULTS,
INCLUDE_IDENTITY,
creatingShellTableOnRemoteNode);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)
Expand Down
23 changes: 14 additions & 9 deletions src/backend/distributed/commands/sequence.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@

/* Local functions forward declarations for helper functions */
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress);
static Oid SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char
depType);
static List * FilterDistributedSequences(GrantStmt *stmt);


Expand Down Expand Up @@ -183,7 +184,7 @@ ExtractDefaultColumnsAndOwnedSequences(Oid relationId, List **columnNameList,

char *columnName = NameStr(attributeForm->attname);
List *columnOwnedSequences =
getOwnedSequences_internal(relationId, attributeIndex + 1, 0);
getOwnedSequences_internal(relationId, attributeIndex + 1, DEPENDENCY_AUTO);

if (attributeForm->atthasdef && list_length(columnOwnedSequences) == 0)
{
Expand Down Expand Up @@ -453,21 +454,22 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
/* the code-path only supports a single object */
Assert(list_length(addresses) == 1);

/* We have already asserted that we have exactly 1 address in the addresses. */
ObjectAddress *address = linitial(addresses);

/* error out if the sequence is distributed */
if (IsAnyObjectDistributed(addresses))
if (IsAnyObjectDistributed(addresses) || SequenceUsedInDistributedTable(address,
DEPENDENCY_INTERNAL))
{
ereport(ERROR, (errmsg(
"Altering a distributed sequence is currently not supported.")));
}

/* We have already asserted that we have exactly 1 address in the addresses. */
ObjectAddress *address = linitial(addresses);

/*
* error out if the sequence is used in a distributed table
* and this is an ALTER SEQUENCE .. AS .. statement
*/
Oid citusTableId = SequenceUsedInDistributedTable(address);
Oid citusTableId = SequenceUsedInDistributedTable(address, DEPENDENCY_AUTO);
if (citusTableId != InvalidOid)
{
List *options = stmt->options;
Expand Down Expand Up @@ -497,16 +499,19 @@ PreprocessAlterSequenceStmt(Node *node, const char *queryString,
* SequenceUsedInDistributedTable returns true if the argument sequence
* is used as the default value of a column in a distributed table.
* Returns false otherwise
* See DependencyType for the possible values of depType.
* We use DEPENDENCY_INTERNAL for sequences created by identity column.
* DEPENDENCY_AUTO for regular sequences.
*/
static Oid
SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress)
SequenceUsedInDistributedTable(const ObjectAddress *sequenceAddress, char depType)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
{
List *seqInfoList = NIL;
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0);
GetDependentSequencesWithRelation(citusTableId, &seqInfoList, 0, depType);
SequenceInfo *seqInfo = NULL;
foreach_ptr(seqInfo, seqInfoList)
{
Expand Down
Loading