Skip to content

Commit

Permalink
avoid rebuilding MetadataCache for each placement insertion
Browse files Browse the repository at this point in the history
for reference table and distributed table without colocation field,
each time we insert a shard placement, it invalidate the metadata
cache with the relation oid, and build it by a following
`LoadShardPlacement`, this was partly fixed by citusdata#6722 for
CreateColocatedShards, problem still exists for other cases, this
patch fix them all.

Signed-off-by: Zhao Junwang <zhjwpku@gmail.com>
  • Loading branch information
zhjwpku committed Aug 30, 2023
1 parent f03291a commit d5c192f
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 47 deletions.
6 changes: 4 additions & 2 deletions src/backend/distributed/commands/create_distributed_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -1262,19 +1262,21 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
CreateTruncateTrigger(relationId);
}

/* create shards for hash distributed and reference tables */
if (tableType == HASH_DISTRIBUTED)
{
/* create shards for hash distributed table */
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
colocatedTableId,
localTableEmpty);
}
else if (tableType == REFERENCE_TABLE)
{
/* create shards for reference table */
CreateReferenceTableShard(relationId);
}
else if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
/* create the shard of given single-shard distributed table */
CreateSingleShardTableShard(relationId, colocatedTableId,
colocationId);
}
Expand Down Expand Up @@ -1900,7 +1902,7 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,


/*
* CreateHashDistributedTableShards creates the shard of given single-shard
* CreateSingleShardTableShard creates the shard of given single-shard
* distributed table.
*/
static void
Expand Down
4 changes: 1 addition & 3 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
* InsertShardPlacementRowGlobally inserts shard placement that has given
* parameters into pg_dist_placement globally.
*/
ShardPlacement *
void
InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId)
{
Expand All @@ -1807,8 +1807,6 @@ InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId,
char *insertPlacementCommand =
AddPlacementMetadataCommand(shardId, placementId, shardLength, groupId);
SendCommandToWorkersWithMetadata(insertPlacementCommand);

return LoadShardPlacement(shardId, placementId);
}


Expand Down
66 changes: 47 additions & 19 deletions src/backend/distributed/operations/create_shards.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;

/* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId);
Expand Down Expand Up @@ -174,7 +175,9 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* initialize the hash token space for this shard */
int32 shardMinHashToken = PG_INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId();
uint64 *shardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*shardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, shardIdPtr);

/* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1))
Expand All @@ -186,17 +189,27 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken);

InsertShardRow(distributedTableId, shardId, shardStorageType,
InsertShardRow(distributedTableId, *shardIdPtr, shardStorageType,
minHashTokenText, maxHashTokenText);

List *currentInsertedShardPlacements = InsertShardPlacementRows(
distributedTableId,
shardId,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
*shardIdPtr,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
}

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
foreach_ptr(shardIdPtr, insertedShardIds)
{
List *placementsForShard = ShardPlacementList(*shardIdPtr);
insertedShardPlacements = list_concat(insertedShardPlacements,
currentInsertedShardPlacements);
placementsForShard);
}

CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
Expand Down Expand Up @@ -292,7 +305,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool

/*
* load shard placements for the shard at once after all placement insertions
* finished. That prevents MetadataCache from rebuilding unnecessarily after
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
Expand Down Expand Up @@ -360,9 +373,18 @@ CreateReferenceTableShard(Oid distributedTableId)
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue);

List *insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
nodeList, workerStartIndex,
replicationFactor);
InsertShardPlacementRows(distributedTableId,
shardId,
nodeList,
workerStartIndex,
replicationFactor);

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);

CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection);
Expand Down Expand Up @@ -408,12 +430,18 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
minHashTokenText, maxHashTokenText);

int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows(
relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);
InsertShardPlacementRows(relationId,
shardId,
workerNodeList,
roundRobinNodeIdx,
replicationFactor);

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedShardPlacements = ShardPlacementList(shardId);

/*
* We don't need to force using exclusive connections because we're anyway
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,20 @@ NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId,
uint64 shardLength = ShardLength(shardId);

/* insert new placements to pg_dist_placement */
List *insertedPlacementList = NIL;
WorkerNode *targetNode = NULL;
foreach_ptr(targetNode, targetNodeList)
{
ShardPlacement *shardPlacement =
InsertShardPlacementRowGlobally(shardId, GetNextPlacementId(),
shardLength, targetNode->groupId);

/* and save the placement for shard creation on workers */
insertedPlacementList = lappend(insertedPlacementList, shardPlacement);
InsertShardPlacementRowGlobally(shardId, GetNextPlacementId(),
shardLength, targetNode->groupId);
}

/*
* load shard placements for the shard at once after all placement insertions
* finished. This prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
List *insertedPlacementList = ShardPlacementList(shardId);

/* create new placements */
bool useExclusiveConnection = false;
CreateShardsOnWorkers(noneDistTableId, insertedPlacementList,
Expand Down
15 changes: 6 additions & 9 deletions src/backend/distributed/operations/stage_protocol.c
Original file line number Diff line number Diff line change
Expand Up @@ -383,14 +383,13 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,

/*
* InsertShardPlacementRows inserts shard placements to the metadata table on
* the coordinator node. Then, returns the list of added shard placements.
* the coordinator node.
*/
List *
void
InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
int workerStartIndex, int replicationFactor)
{
int workerNodeCount = list_length(workerNodeList);
List *insertedShardPlacements = NIL;

for (int placementIndex = 0; placementIndex < replicationFactor; placementIndex++)
{
Expand All @@ -399,13 +398,11 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
uint32 nodeGroupId = workerNode->groupId;
const uint64 shardSize = 0;

uint64 shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID,
shardSize, nodeGroupId);
ShardPlacement *shardPlacement = LoadShardPlacement(shardId, shardPlacementId);
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
InsertShardPlacementRow(shardId,
INVALID_PLACEMENT_ID,
shardSize,
nodeGroupId);
}

return insertedShardPlacements;
}


Expand Down
6 changes: 3 additions & 3 deletions src/include/distributed/coordinator_protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId
replicationFactor);
extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection);
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern void InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor,
Expand Down
8 changes: 4 additions & 4 deletions src/include/distributed/metadata_utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,10 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue);
extern void DeleteShardRow(uint64 shardId);
extern ShardPlacement * InsertShardPlacementRowGlobally(uint64 shardId,
uint64 placementId,
uint64 shardLength,
int32 groupId);
extern void InsertShardPlacementRowGlobally(uint64 shardId,
uint64 placementId,
uint64 shardLength,
int32 groupId);
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId);
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Expand Down

0 comments on commit d5c192f

Please sign in to comment.