Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDC implementation for Citus using Logical Replication #6623

Merged
merged 57 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
2aa5877
CDC changes with DonotReplicateId
rajeshkt78 Jan 20, 2023
a9b2a59
fixed style.
rajeshkt78 Jan 20, 2023
23c7ede
Merge branch 'main' into cdc_with_logical_replication_new
rajeshkt78 Jan 20, 2023
2831d7b
Added CDC basic test case for distributed tables event publication.
rajeshkt78 Jan 27, 2023
ee23f30
Fixed style.
rajeshkt78 Jan 27, 2023
85cde60
Fixed the test cases to avoid output non-producibile data like LSN.
rajeshkt78 Jan 27, 2023
4ff6f32
Added check-cdc test schedule to config.yml
rajeshkt78 Jan 27, 2023
88aa202
Added test target for check-cdc in config.yml
rajeshkt78 Jan 27, 2023
2024e92
added consts for const values and improved the CDC basic tests
rajeshkt78 Jan 31, 2023
5df5820
fixed style
rajeshkt78 Jan 31, 2023
34e762f
Merge branch 'main' into cdc_with_logical_replication_new
rajeshkt78 Jan 31, 2023
558d64b
Merge branch 'main' into cdc_with_logical_replication_new
rajeshkt78 Jan 31, 2023
960cb6b
Added update and delete oprations to basic CDC test
rajeshkt78 Jan 31, 2023
7787e29
Fixed the CDC basic test cases and added cocurrent test case
rajeshkt78 Feb 2, 2023
54dcf88
Added TAP tests for CDC and handled schema change after move
rajeshkt78 Feb 16, 2023
efa19a1
Fixed a warning and moved the config options to lib
rajeshkt78 Feb 16, 2023
9e04cae
Fixed style for cdctestlib.pm
rajeshkt78 Feb 16, 2023
55a88e1
Resolved conflicts with main branch
rajeshkt78 Feb 17, 2023
9c668a3
Adds all branches trigger to trigger pipeline exec
gurkanindibay Feb 17, 2023
df81910
Rollbacks trigger code
gurkanindibay Feb 17, 2023
e1ee5d9
Fix regression in allowed foreign keys on distributed tables (#6550)
JelteF Jan 24, 2023
bca915d
Merge branch 'main' into cdc_with_logical_replication_new
rajeshkt78 Feb 17, 2023
eaf7a6c
Added check-cdc-custom-schedule
rajeshkt78 Feb 17, 2023
382369e
Merge branch 'main' into cdc_with_logical_replication_new
rajeshkt78 Feb 17, 2023
0536d9e
work around for cache lookup issue in CDC decoder and review comments…
rajeshkt78 Mar 15, 2023
02ab0b2
Removed a log message since some tests failed due to that.
rajeshkt78 Mar 15, 2023
3fa9753
Fixed changes in upgrade_list_citus_objects.out
rajeshkt78 Mar 15, 2023
10e9bac
Merge branch 'main' into cdc_with_logical_replication_new
rajeshkt78 Mar 15, 2023
0ce1812
Fixed chages in multi_extension.out
rajeshkt78 Mar 15, 2023
d5bc002
Fixed style.
rajeshkt78 Mar 15, 2023
a471b84
Fixed style for run_test.py
rajeshkt78 Mar 15, 2023
a823a8c
Fixed style and GUC alphabetical order.
rajeshkt78 Mar 15, 2023
4b1cfd4
Added downgrade script from 11.3.1 to 11.2.1
rajeshkt78 Mar 16, 2023
c5e3089
Update multi_extension.out for latest changes.
rajeshkt78 Mar 16, 2023
9f01399
Fixed upgrade_list_citus_objects
rajeshkt78 Mar 16, 2023
fb0e7fb
Updated upgrade_list_citus_objects.out
rajeshkt78 Mar 16, 2023
2a7cd35
Removed the CDC SQL tests and schedule and keep CDC TAP tests only.
rajeshkt78 Mar 16, 2023
61fdea5
Added a CDC block for running CDC tests.
rajeshkt78 Mar 16, 2023
2424620
Removed CDC changes from run_test.py
rajeshkt78 Mar 16, 2023
aa964b9
Added CDC TAP test in CircleCI
rajeshkt78 Mar 16, 2023
56a4126
Updated .circleci/config.yml
rajeshkt78 Mar 16, 2023
b20f923
Added run command to CDC Tap test.
rajeshkt78 Mar 16, 2023
3cd206d
Updated Run command for installing DBI package.
rajeshkt78 Mar 16, 2023
00f8578
Updated Run command to install DBI module in CircleCI
rajeshkt78 Mar 16, 2023
0f460d5
Added sudo for installing packages in CircleCI
rajeshkt78 Mar 16, 2023
0fd9e9b
Removed sudo command from CircleCI config
rajeshkt78 Mar 16, 2023
dace06e
Added apt-get update
rajeshkt78 Mar 16, 2023
35c6a99
Install libdbd-pg-perl for CDC in CircleCI config.yml
rajeshkt78 Mar 17, 2023
520e038
Updated the pg_regress_multi.pl to enable CDC for tests.
rajeshkt78 Mar 17, 2023
b1d7dcf
Refactored the schema change handling logic in CDC decoder and
rajeshkt78 Mar 20, 2023
e8fe49f
Merge branch 'main' into cdc_with_logical_replication_new
rajeshkt78 Mar 21, 2023
f2adbc6
Added test cases for update/delete after schema change and fixed bugs.
rajeshkt78 Mar 21, 2023
2976988
Incorporated review comments.
rajeshkt78 Mar 27, 2023
30525c1
Updated formatting.
rajeshkt78 Mar 27, 2023
4f2fede
moved CDC to seperate dir and test fixes.
rajeshkt78 Mar 28, 2023
55abb26
Merge branch 'main' into cdc_with_logical_replication_new
rajeshkt78 Mar 28, 2023
b666991
Merge branch 'main' into cdc_with_logical_replication_new
rajeshkt78 Mar 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,11 @@ workflows:
name: 'test-15_check-failure'
image: citus/failtester
make: check-failure
- test-citus:
<<: *test-citus-15
name: 'test-15_check-cdc'
make: check-cdc


- tap-test-citus: &tap-test-citus-13
name: 'test-13_tap-recovery'
Expand Down Expand Up @@ -978,6 +983,7 @@ workflows:
- test-13-14_check-pg-upgrade
- test-14-15_check-pg-upgrade
- test-13_check-citus-upgrade
- test-15_check-cdc

- ch_benchmark:
requires: [build-13]
Expand Down
7 changes: 6 additions & 1 deletion src/backend/distributed/commands/alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "distributed/multi_partitioning_utils.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/shared_library_init.h"
#include "distributed/shard_utils.h"
#include "distributed/worker_protocol.h"
Expand Down Expand Up @@ -402,7 +403,11 @@ UndistributeTable(TableConversionParameters *params)
params->conversionType = UNDISTRIBUTE_TABLE;
params->shardCountIsNull = true;
TableConversionState *con = CreateTableConversion(params);
return ConvertTable(con);

SetupReplicationOriginLocalSession();
TableConversionReturn *conv = ConvertTable(con);
ResetReplicationOriginLocalSession();
return conv;
}


Expand Down
4 changes: 2 additions & 2 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -2219,12 +2219,12 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
EState *estate = CreateExecutorState();
ExprContext *econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;

const bool nonPublishableData = false;
DestReceiver *copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList,
partitionColumnIndex,
estate, NULL);
estate, NULL, nonPublishableData);

/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);
Expand Down
17 changes: 13 additions & 4 deletions src/backend/distributed/commands/local_multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "distributed/local_multi_copy.h"
#include "distributed/shard_utils.h"
#include "distributed/version_compat.h"
#include "distributed/replication_origin_session_utils.h"

/* managed via GUC, default is 512 kB */
int LocalCopyFlushThresholdByte = 512 * 1024;
Expand All @@ -46,7 +47,7 @@ static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDes
static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
static bool ShouldSendCopyNow(StringInfo buffer);
static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId,
CopyStmt *copyStatement, bool isEndOfCopy);
CopyStmt *copyStatement, bool isEndOfCopy, bool isPublishable);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);


Expand Down Expand Up @@ -94,7 +95,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in
bool isEndOfCopy = false;
DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId,
shardId,
copyDest->copyStatement, isEndOfCopy);
copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable);
resetStringInfo(localCopyOutState->fe_msgbuf);
}
}
Expand Down Expand Up @@ -133,7 +134,7 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
}
bool isEndOfCopy = true;
DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId,
copyDest->copyStatement, isEndOfCopy);
copyDest->copyStatement, isEndOfCopy, copyDest->isPublishable);
}


Expand Down Expand Up @@ -197,14 +198,18 @@ ShouldSendCopyNow(StringInfo buffer)
*/
static void
DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement,
bool isEndOfCopy)
bool isEndOfCopy, bool isPublishable)
{
/*
* Set the buffer as a global variable to allow ReadFromLocalBufferCallback
* to read from it. We cannot pass additional arguments to
* ReadFromLocalBufferCallback.
*/
LocalCopyBuffer = buffer;
if (!isPublishable)
{
SetupReplicationOriginLocalSession();
}

Oid shardOid = GetTableLocalShardOid(relationId, shardId);
Relation shard = table_open(shardOid, RowExclusiveLock);
Expand All @@ -219,6 +224,10 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat
EndCopyFrom(cstate);

table_close(shard, NoLock);
if (!isPublishable)
{
ResetReplicationOriginLocalSession();
}
free_parsestate(pState);
}

Expand Down
41 changes: 33 additions & 8 deletions src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/remote_transaction.h"
#include "distributed/replication_origin_session_utils.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_pruning.h"
#include "distributed/shared_connection_stats.h"
Expand Down Expand Up @@ -270,7 +271,8 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash,
bool *found, bool shouldUseLocalCopy, CopyOutState
copyOutState, bool isColocatedIntermediateResult);
copyOutState, bool isColocatedIntermediateResult,
bool isPublishable);
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
ShardPlacement *placement,
bool colocatedIntermediateResult);
Expand All @@ -285,7 +287,8 @@ static void InitializeCopyShardState(CopyShardState *shardState,
uint64 shardId,
bool canUseLocalCopy,
CopyOutState copyOutState,
bool colocatedIntermediateResult);
bool colocatedIntermediateResult, bool
isPublishable);
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
CopyStmt *copyStatement,
CopyOutState copyOutState);
Expand Down Expand Up @@ -492,9 +495,11 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletion *completionTag)
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);

/* set up the destination for the COPY */
const bool publishableData = true;
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
partitionColumnIndex,
executorState, NULL);
executorState, NULL,
publishableData);

/* if the user specified an explicit append-to_shard option, write to it */
uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement);
Expand Down Expand Up @@ -1934,7 +1939,7 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
CitusCopyDestReceiver *
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
EState *executorState,
char *intermediateResultIdPrefix)
char *intermediateResultIdPrefix, bool isPublishable)
{
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
sizeof(CitusCopyDestReceiver));
Expand All @@ -1953,6 +1958,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
copyDest->executorState = executorState;
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
copyDest->memoryContext = CurrentMemoryContext;
copyDest->isPublishable = isPublishable;

return copyDest;
}
Expand Down Expand Up @@ -2318,7 +2324,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
&cachedShardStateFound,
copyDest->shouldUseLocalCopy,
copyDest->copyOutState,
isColocatedIntermediateResult);
isColocatedIntermediateResult,
copyDest->isPublishable);

if (!cachedShardStateFound)
{
firstTupleInShard = true;
Expand Down Expand Up @@ -2751,6 +2759,11 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState,
if (activePlacementState != NULL)
{
EndPlacementStateCopyCommand(activePlacementState, copyOutState);
if (!copyDest->isPublishable)
{
ResetReplicationOriginRemoteSession(
activePlacementState->connectionState->connection);
}
}

dlist_foreach(iter, &connectionState->bufferedPlacementList)
Expand All @@ -2764,6 +2777,10 @@ ShutdownCopyConnectionState(CopyConnectionState *connectionState,
SendCopyDataToPlacement(placementState->data, shardId,
connectionState->connection);
EndPlacementStateCopyCommand(placementState, copyOutState);
if (!copyDest->isPublishable)
{
ResetReplicationOriginRemoteSession(connectionState->connection);
}
}
}

Expand Down Expand Up @@ -3436,15 +3453,16 @@ static CopyShardState *
GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool *found, bool
shouldUseLocalCopy, CopyOutState copyOutState,
bool isColocatedIntermediateResult)
bool isColocatedIntermediateResult, bool isPublishable)
{
CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
HASH_ENTER, found);
if (!*found)
{
InitializeCopyShardState(shardState, connectionStateHash,
shardId, shouldUseLocalCopy,
copyOutState, isColocatedIntermediateResult);
copyOutState, isColocatedIntermediateResult,
isPublishable);
}

return shardState;
Expand All @@ -3461,7 +3479,8 @@ InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, uint64 shardId,
bool shouldUseLocalCopy,
CopyOutState copyOutState,
bool colocatedIntermediateResult)
bool colocatedIntermediateResult,
bool isPublishable)
{
ListCell *placementCell = NULL;
int failedPlacementCount = 0;
Expand Down Expand Up @@ -3532,6 +3551,12 @@ InitializeCopyShardState(CopyShardState *shardState,
RemoteTransactionBeginIfNecessary(connection);
}

if (!isPublishable)
{
/*elog(LOG,"InitializeCopyShardState: calling SetupReplicationOriginRemoteSession conn id: %lu", connection->connectionId); */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to remove this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

SetupReplicationOriginRemoteSession(connection);
}

CopyPlacementState *placementState = palloc0(sizeof(CopyPlacementState));
placementState->shardState = shardState;
placementState->data = makeStringInfo();
Expand Down
2 changes: 2 additions & 0 deletions src/backend/distributed/connection/connection_management.c
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
* - Current cached connections is already at MaxCachedConnectionsPerWorker
* - Connection is forced to close at the end of transaction
* - Connection is not in OK state
* - Connection has a replication origin setup
* - A transaction is still in progress (usually because we are cancelling a distributed transaction)
* - A connection reached its maximum lifetime
*/
Expand All @@ -1500,6 +1501,7 @@ ShouldShutdownConnection(MultiConnection *connection, const int cachedConnection
PQstatus(connection->pgConn) != CONNECTION_OK ||
!RemoteTransactionIdle(connection) ||
connection->requiresReplication ||
connection->isReplicationOriginSessionSetup ||
(MaxCachedConnectionLifetime >= 0 &&
MillisecondsToTimeout(connection->connectionEstablishmentStart,
MaxCachedConnectionLifetime) <= 0);
Expand Down
8 changes: 6 additions & 2 deletions src/backend/distributed/executor/insert_select_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,13 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
columnNameList);

/* set up a DestReceiver that copies into the intermediate table */
const bool publishableData = true;
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
columnNameList,
partitionColumnIndex,
executorState,
intermediateResultIdPrefix);
intermediateResultIdPrefix,
publishableData);

ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);

Expand Down Expand Up @@ -443,10 +445,12 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
columnNameList);

/* set up a DestReceiver that copies into the distributed table */
const bool publishableData = true;
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
columnNameList,
partitionColumnIndex,
executorState, NULL);
executorState, NULL,
publishableData);

ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);

Expand Down
22 changes: 22 additions & 0 deletions src/backend/distributed/operations/worker_shard_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "distributed/relation_utils.h"
#include "distributed/version_compat.h"
#include "distributed/local_executor.h"
#include "distributed/replication_origin_session_utils.h"

/*
* LocalCopyBuffer is used in copy callback to return the copied rows.
Expand Down Expand Up @@ -80,6 +81,7 @@ static void LocalCopyToShard(ShardCopyDestReceiver *copyDest, CopyOutState
localCopyOutState);
static void ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest);


static bool
CanUseLocalCopy(uint32_t destinationNodeId)
{
Expand All @@ -103,6 +105,12 @@ ConnectToRemoteAndStartCopy(ShardCopyDestReceiver *copyDest)
NULL /* database (current) */);
ClaimConnectionExclusively(copyDest->connection);


RemoteTransactionBeginIfNecessary(copyDest->connection);

SetupReplicationOriginRemoteSession(copyDest->connection);


StringInfo copyStatement = ConstructShardCopyStatement(
copyDest->destinationShardFullyQualifiedName,
copyDest->copyOutState->binary);
Expand Down Expand Up @@ -184,6 +192,8 @@ ShardCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
CopyOutState copyOutState = copyDest->copyOutState;
if (copyDest->useLocalCopy)
{
/* Setup replication origin session for local copy*/

WriteLocalTuple(slot, copyDest);
if (copyOutState->fe_msgbuf->len > LocalCopyFlushThresholdByte)
{
Expand Down Expand Up @@ -259,6 +269,11 @@ ShardCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc
copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
copyDest->copyOutState = copyOutState;
if (copyDest->useLocalCopy)
{
/* Setup replication origin session for local copy*/
SetupReplicationOriginLocalSession();
}
}


Expand Down Expand Up @@ -317,6 +332,9 @@ ShardCopyDestReceiverShutdown(DestReceiver *dest)

PQclear(result);
ForgetResults(copyDest->connection);

ResetReplicationOriginRemoteSession(copyDest->connection);

CloseConnection(copyDest->connection);
}
}
Expand All @@ -329,6 +347,10 @@ static void
ShardCopyDestReceiverDestroy(DestReceiver *dest)
{
ShardCopyDestReceiver *copyDest = (ShardCopyDestReceiver *) dest;
if (copyDest->useLocalCopy)
{
ResetReplicationOriginLocalSession();
}

if (copyDest->copyOutState)
{
Expand Down
Loading