Skip to content

Commit

Permalink
Revert "Fixes flakiness in multi_metadata_sync test (#6824)"
Browse files Browse the repository at this point in the history
This reverts commit 6cfcc37.
  • Loading branch information
emelsimsek committed Apr 11, 2023
1 parent b49cd14 commit 1aea5b1
Show file tree
Hide file tree
Showing 42 changed files with 266 additions and 1,785 deletions.
6 changes: 5 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ orbs:
parameters:
image_suffix:
type: string
default: '-v3417e8d'
default: '-v087ecd7'
pg13_version:
type: string
default: '13.10'
Expand Down Expand Up @@ -490,6 +490,10 @@ jobs:
pg_major: << parameters.pg_major >>
- configure
- enable_core
- run:
name: 'Install DBI.pm'
command: |
apt-get update && apt-get install libdbi-perl && apt-get install libdbd-pg-perl
- run:
name: 'Run Test'
command: |
Expand Down
10 changes: 5 additions & 5 deletions src/backend/distributed/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ OBJS += \
all: cdc

cdc:
$(MAKE) -C cdc all
echo "running cdc make"
$(MAKE) DECODER=pgoutput -C cdc all

NO_PGXS = 1

Expand Down Expand Up @@ -84,13 +85,12 @@ ifneq (,$(SQL_Po_files))
include $(SQL_Po_files)
endif


.PHONY: clean-full install install-downgrades install-all install-cdc clean-cdc
.PHONY: clean-full install install-downgrades install-all

clean: clean-cdc

clean-cdc:
$(MAKE) -C cdc clean
$(MAKE) DECODER=pgoutput -C cdc clean

cleanup-before-install:
rm -f $(DESTDIR)$(datadir)/$(datamoduledir)/citus.control
Expand All @@ -99,7 +99,7 @@ cleanup-before-install:
install: cleanup-before-install install-cdc

install-cdc:
$(MAKE) -C cdc install
$(MAKE) DECODER=pgoutput -C cdc install

# install and install-downgrades should be run sequentially
install-all: install
Expand Down
53 changes: 17 additions & 36 deletions src/backend/distributed/cdc/Makefile
Original file line number Diff line number Diff line change
@@ -1,45 +1,26 @@
citus_top_builddir = ../../../..
include $(citus_top_builddir)/Makefile.global
ifndef DECODER
DECODER = pgoutput
endif

MODULE_big = citus_$(DECODER)
citus_subdir = src/backend/distributed/cdc
SRC_DIR = $(citus_abs_top_srcdir)/$(citus_subdir)

#List of supported based decoders. Add new decoders here.
cdc_base_decoders :=pgoutput wal2json

all: build-cdc-decoders

copy-decoder-files-to-build-dir:
$(eval DECODER_BUILD_DIR=build-cdc-$(DECODER))
mkdir -p $(DECODER_BUILD_DIR)
@for file in $(SRC_DIR)/*.c $(SRC_DIR)/*.h; do \
if [ -f $$file ]; then \
if [ -f $(DECODER_BUILD_DIR)/$$(basename $$file) ]; then \
if ! diff -q $$file $(DECODER_BUILD_DIR)/$$(basename $$file); then \
cp $$file $(DECODER_BUILD_DIR)/$$(basename $$file); \
fi \
else \
cp $$file $(DECODER_BUILD_DIR)/$$(basename $$file); \
fi \
fi \
done
cp $(SRC_DIR)/Makefile.decoder $(DECODER_BUILD_DIR)/Makefile

build-cdc-decoders:
$(foreach base_decoder,$(cdc_base_decoders),$(MAKE) DECODER=$(base_decoder) build-cdc-decoder;)
citus_top_builddir = ../../../..
citus_decoders_dir = $(DESTDIR)$(pkglibdir)/citus_decoders

install-cdc-decoders:
$(foreach base_decoder,$(cdc_base_decoders),$(MAKE) DECODER=$(base_decoder) -C build-cdc-$(base_decoder) install;)
OBJS += cdc_decoder.o cdc_decoder_utils.o

clean-cdc-decoders:
$(foreach base_decoder,$(cdc_base_decoders),rm -rf build-cdc-$(base_decoder);)
include $(citus_top_builddir)/Makefile.global

override CFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include
override CPPFLAGS += -DDECODER=\"$(DECODER)\" -I$(citus_abs_top_srcdir)/include

build-cdc-decoder:
$(MAKE) DECODER=$(DECODER) copy-decoder-files-to-build-dir
$(MAKE) DECODER=$(DECODER) -C build-cdc-$(DECODER)
install: install-cdc

install: install-cdc-decoders
clean: clean-cdc

clean: clean-cdc-decoders
install-cdc:
mkdir -p '$(citus_decoders_dir)'
$(INSTALL_SHLIB) citus_$(DECODER).so '$(citus_decoders_dir)/$(DECODER).so'

clean-cdc:
rm -f '$(DESTDIR)$(datadir)/$(datamoduledir)/citus_decoders/$(DECODER).so'
24 changes: 0 additions & 24 deletions src/backend/distributed/cdc/Makefile.decoder

This file was deleted.

22 changes: 3 additions & 19 deletions src/backend/distributed/metadata/metadata_utility.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "distributed/background_jobs.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/citus_nodes.h"
Expand All @@ -58,9 +57,7 @@
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/remote_commands.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/tuplestore.h"
#include "distributed/utils/array_type.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
Expand Down Expand Up @@ -780,6 +777,7 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
{
partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
}

/* for non-partitioned tables, we will use Postgres' size functions */
else
{
Expand Down Expand Up @@ -2818,8 +2816,7 @@ CreateBackgroundJob(const char *jobType, const char *description)
*/
BackgroundTask *
ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskCount,
int64 dependingTaskIds[], int nodesInvolvedCount, int32
nodesInvolved[])
int64 dependingTaskIds[])
{
BackgroundTask *task = NULL;

Expand Down Expand Up @@ -2893,11 +2890,6 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum("");
nulls[Anum_pg_dist_background_task_message - 1] = false;

values[Anum_pg_dist_background_task_nodes_involved - 1] =
IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount ==
0);

HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask),
values, nulls);
CatalogTupleInsert(pgDistBackgroundTask, newTuple);
Expand Down Expand Up @@ -3209,13 +3201,6 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]);
}

if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1])
{
ArrayType *nodesInvolvedArrayObject =
DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]);
task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject);
}

return task;
}

Expand Down Expand Up @@ -3348,8 +3333,7 @@ GetRunnableBackgroundTask(void)
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
task = DeformBackgroundTaskHeapTuple(tupleDescriptor, taskTuple);
if (BackgroundTaskReadyToRun(task) &&
IncrementParallelTaskCountForNodesInvolved(task))
if (BackgroundTaskReadyToRun(task))
{
/* found task, close table and return */
break;
Expand Down
101 changes: 33 additions & 68 deletions src/backend/distributed/operations/shard_rebalancer.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,32 +190,13 @@ typedef struct WorkerShardStatistics
HTAB *statistics;
} WorkerShardStatistics;

/*
* ShardMoveDependencyHashEntry contains the taskId which any new shard
* move task within the corresponding colocation group
* must take a dependency on
*/
/* ShardMoveDependencyHashEntry contains the taskId which any new shard move task within the corresponding colocation group must take a dependency on */
typedef struct ShardMoveDependencyInfo
{
int64 key;
int64 taskId;
} ShardMoveDependencyInfo;

/*
* ShardMoveSourceNodeHashEntry keeps track of the source nodes
* of the moves.
*/
typedef struct ShardMoveSourceNodeHashEntry
{
/* this is the key */
int32 node_id;
List *taskIds;
} ShardMoveSourceNodeHashEntry;

/*
* ShardMoveDependencies keeps track of all needed dependencies
* between shard moves.
*/
typedef struct ShardMoveDependencies
{
HTAB *colocationDependencies;
Expand Down Expand Up @@ -293,15 +274,6 @@ static void AddToWorkerShardIdSet(HTAB *shardsByWorker, char *workerName, int wo
static HTAB * BuildShardSizesHash(ProgressMonitorData *monitor, HTAB *shardStatistics);
static void ErrorOnConcurrentRebalance(RebalanceOptions *);
static List * GetSetCommandListForNewConnections(void);
static int64 GetColocationId(PlacementUpdateEvent *move);
static ShardMoveDependencies InitializeShardMoveDependencies();
static int64 * GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64
colocationId,
ShardMoveDependencies shardMoveDependencies,
int *nDepends);
static void UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId,
int64 taskId,
ShardMoveDependencies shardMoveDependencies);

/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(rebalance_table_shards);
Expand Down Expand Up @@ -1958,7 +1930,8 @@ GetColocationId(PlacementUpdateEvent *move)
* InitializeShardMoveDependencies function creates the hash maps that we use to track
* the latest moves so that subsequent moves with the same properties must take a dependency
* on them. There are two hash maps. One is for tracking the latest move scheduled in a
* given colocation group and the other one is for tracking source nodes of all moves.
* given colocation group and the other one is for tracking the latest move which involves
* a given node either as its source node or its target node.
*/
static ShardMoveDependencies
InitializeShardMoveDependencies()
Expand All @@ -1968,17 +1941,18 @@ InitializeShardMoveDependencies()
ShardMoveDependencyInfo,
"colocationDependencyHashMap",
6);
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int32,
ShardMoveSourceNodeHashEntry,
shardMoveDependencies.nodeDependencies = CreateSimpleHashWithNameAndSize(int64,
ShardMoveDependencyInfo,
"nodeDependencyHashMap",
6);

return shardMoveDependencies;
}


/*
* GenerateTaskMoveDependencyList creates and returns a List of taskIds that
* the move must take a dependency on, given the shard move dependencies as input.
* the move must take a dependency on.
*/
static int64 *
GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
Expand All @@ -1998,24 +1972,27 @@ GenerateTaskMoveDependencyList(PlacementUpdateEvent *move, int64 colocationId,
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}

/*
* Check if there exists moves scheduled earlier whose source node
* overlaps with the current move's target node.
* The earlier/first move might make space for the later/second move.
* So we could run out of disk space (or at least overload the node)
* if we move the second shard to it before the first one is moved away. 
*/
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_FIND,
/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's source node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
&found);

if (found)
{
int64 *taskId = NULL;
foreach_ptr(taskId, shardMoveSourceNodeHashEntry->taskIds)
{
hash_search(dependsList, taskId, HASH_ENTER, NULL);
}
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}

/* Check if there exists a move scheduled earlier whose source or target node
* overlaps with the current move's target node. */
shardMoveDependencyInfo = hash_search(
shardMoveDependencies.nodeDependencies, &move->targetNode->nodeId, HASH_ENTER,
&found);


if (found)
{
hash_search(dependsList, &shardMoveDependencyInfo->taskId, HASH_ENTER, NULL);
}

*nDepends = hash_get_num_entries(dependsList);
Expand Down Expand Up @@ -2053,20 +2030,15 @@ UpdateShardMoveDependencies(PlacementUpdateEvent *move, uint64 colocationId, int
shardMoveDependencies.colocationDependencies, &colocationId, HASH_ENTER, NULL);
shardMoveDependencyInfo->taskId = taskId;

bool found;
ShardMoveSourceNodeHashEntry *shardMoveSourceNodeHashEntry = hash_search(
shardMoveDependencies.nodeDependencies, &move->sourceNode->nodeId, HASH_ENTER,
&found);
shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->sourceNode->nodeId, HASH_ENTER, NULL);

if (!found)
{
shardMoveSourceNodeHashEntry->taskIds = NIL;
}
shardMoveDependencyInfo->taskId = taskId;

int64 *newTaskId = palloc0(sizeof(int64));
*newTaskId = taskId;
shardMoveSourceNodeHashEntry->taskIds = lappend(
shardMoveSourceNodeHashEntry->taskIds, newTaskId);
shardMoveDependencyInfo = hash_search(shardMoveDependencies.nodeDependencies,
&move->targetNode->nodeId, HASH_ENTER, NULL);

shardMoveDependencyInfo->taskId = taskId;
}


Expand Down Expand Up @@ -2163,10 +2135,8 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
appendStringInfo(&buf,
"SELECT pg_catalog.replicate_reference_tables(%s)",
quote_literal_cstr(shardTranferModeLabel));

int32 nodesInvolved[] = { 0 };
BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data, 0,
NULL, 0, nodesInvolved);
NULL);
replicateRefTablesTaskId = task->taskid;
}

Expand Down Expand Up @@ -2200,14 +2170,9 @@ RebalanceTableShardsBackground(RebalanceOptions *options, Oid shardReplicationMo
dependsArray[0] = replicateRefTablesTaskId;
}

int32 nodesInvolved[2] = { 0 };
nodesInvolved[0] = move->sourceNode->nodeId;
nodesInvolved[1] = move->targetNode->nodeId;

BackgroundTask *task = ScheduleBackgroundTask(jobId, GetUserId(), buf.data,
nDepends,
dependsArray, 2,
nodesInvolved);
dependsArray);

UpdateShardMoveDependencies(move, colocationId, task->taskid,
shardMoveDependencies);
Expand Down
Loading

0 comments on commit 1aea5b1

Please sign in to comment.