Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start Maintenance Daemon for Main DB at the server start. #7254

Merged
merged 29 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/backend/distributed/shared_library_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ _PG_init(void)
#endif

InitializeMaintenanceDaemon();
InitializeMaintenanceDaemonForMainDb();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you treat DB as a word? Should it be ...ForMainDb() or...ForMainDB()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/* initialize coordinated transaction management */
InitializeTransactionManagement();
Expand Down Expand Up @@ -1820,6 +1821,16 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_UNIT_MS,
NULL, NULL, NULL);

DefineCustomStringVariable(
"citus.main_db",
gettext_noop("Which database is designated as the main_db"),
NULL,
&MainDb,
"",
PGC_POSTMASTER,
GUC_STANDARD,
NULL, NULL, NULL);

DefineCustomIntVariable(
"citus.max_adaptive_executor_pool_size",
gettext_noop("Sets the maximum number of connections per worker node used by "
Expand Down
271 changes: 200 additions & 71 deletions src/backend/distributed/utils/maintenanced.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000;
int BackgroundTaskQueueCheckInterval = 5000;
int MaxBackgroundTaskExecutors = 4;
char *MainDb = "";

/* config variables for metadata sync timeout */
int MetadataSyncInterval = 60000;
Expand All @@ -112,7 +113,7 @@ static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
* activated.
*/
static HTAB *MaintenanceDaemonDBHash;

static ErrorContextCallback errorCallback = { 0 };
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false;

Expand All @@ -125,6 +126,8 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg);
static void MaintenanceDaemonErrorContext(void *arg);
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
static void WarnMaintenanceDaemonNotStarted(void);
static MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId,
bool *found);

/*
* InitializeMaintenanceDaemon, called at server start, is responsible for
Expand All @@ -139,6 +142,82 @@ InitializeMaintenanceDaemon(void)
}


/*
* GetMaintenanceDaemonDBHashEntry searches the MaintenanceDaemonDBHash for the
* databaseId. It returns the entry if found or creates a new entry and initializes
* the value with zeroes.
*/
MaintenanceDaemonDBData *
GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found)
{
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
MaintenanceDaemonDBHash,
&MyDatabaseId,
HASH_ENTER_NULL,
found);

if (!dbData)
{
elog(LOG,
"cannot create or find the maintenance deamon hash entry for database %u",
databaseId);
return NULL;
}

if (!*found)
{
/* ensure the values in MaintenanceDaemonDBData are zero */
memset(((char *) dbData) + sizeof(Oid), 0,
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
}

return dbData;
}


/*
* InitializeMaintenanceDaemonForMainDb is called in _PG_Init
* at which stage we are not in a transaction or have databaseOid
*/
void
InitializeMaintenanceDaemonForMainDb(void)
{
if (strcmp(MainDb, "") == 0)
{
elog(LOG, "There is no designated Main database.");
return;
}

BackgroundWorker worker;

memset(&worker, 0, sizeof(worker));


strcpy_s(worker.bgw_name, sizeof(worker.bgw_name),
"Citus Maintenance Daemon for Main DB");

/* request ability to connect to target database */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;

/*
* No point in getting started before able to run query, but we do
* want to get started on Hot-Standby.
*/
worker.bgw_start_time = BgWorkerStart_ConsistentState;

/* Restart after a bit after errors, but don't bog the system. */
worker.bgw_restart_time = 5;
strcpy_s(worker.bgw_library_name,
sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"CitusMaintenanceDaemonMain");

worker.bgw_main_arg = (Datum) 0;

RegisterBackgroundWorker(&worker);
}


/*
* InitializeMaintenanceDaemonBackend, called at backend start and
* configuration changes, is responsible for starting a per-database
Expand All @@ -148,31 +227,20 @@ void
InitializeMaintenanceDaemonBackend(void)
{
Oid extensionOwner = CitusExtensionOwner();
bool found;
bool found = false;

LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);

MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
MaintenanceDaemonDBHash,
&MyDatabaseId,
HASH_ENTER_NULL,
&found);
MaintenanceDaemonDBData *dbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId,
&found);

if (dbData == NULL)
{
WarnMaintenanceDaemonNotStarted();
LWLockRelease(&MaintenanceDaemonControl->lock);

return;
}

if (!found)
{
/* ensure the values in MaintenanceDaemonDBData are zero */
memset(((char *) dbData) + sizeof(Oid), 0,
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
}

if (IsMaintenanceDaemon)
{
/*
Expand Down Expand Up @@ -271,66 +339,97 @@ WarnMaintenanceDaemonNotStarted(void)


/*
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
* be started by the background worker infrastructure. If it errors out,
* it'll be restarted after a few seconds.
* ConnectToDatabase connects to the database for the given databaseOid.
* if databaseOid is 0, connects to MainDb and then creates a hash entry.
* If a hash entry cannot be created for MainDb it exits the process requesting a restart.
* However for regular databases, it exits without requesting a restart since another
* subsequent backend is expected to start the Maintenance Daemon.
* If the found hash entry has a valid workerPid, it exits
* without requesting a restart since there is already a daemon running.
*/
void
CitusMaintenanceDaemonMain(Datum main_arg)
static MaintenanceDaemonDBData *
ConnectToDatabase(Oid databaseOid)
{
Oid databaseOid = DatumGetObjectId(main_arg);
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
TimestampTz lastRecoveryTime = 0;
TimestampTz lastShardCleanTime = 0;
TimestampTz lastStatStatementsPurgeTime = 0;
TimestampTz nextMetadataSyncTime = 0;
MaintenanceDaemonDBData *myDbData = NULL;

/* state kept for the background tasks queue monitor */
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
bool backgroundTasksQueueWarnedForLock = false;

/*
* We do metadata sync in a separate background worker. We need its
* handle to be able to check its status.
*/
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
bool isMainDb = false;

/*
* Look up this worker's configuration.
*/
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);

MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonDBHash, &databaseOid,
HASH_FIND, NULL);
if (!myDbData)

if (databaseOid == 0)
{
char *databaseName = MainDb;

/*
* When the database crashes, background workers are restarted, but
* the state in shared memory is lost. In that case, we exit and
* wait for a session to call InitializeMaintenanceDaemonBackend
* to properly add it to the hash.
* Since we cannot query databaseOid without initializing Postgres
* first, connect to the database by name.
*/
BackgroundWorkerInitializeConnection(databaseName, NULL, 0);

proc_exit(0);
}

if (myDbData->workerPid != 0)
{
/*
* Another maintenance daemon is running. This usually happens because
emelsimsek marked this conversation as resolved.
Show resolved Hide resolved
* postgres restarts the daemon after an non-zero exit, and
* InitializeMaintenanceDaemonBackend started one before postgres did.
* In that case, the first one stays and the last one exits.
* Now we have a valid MyDatabaseId.
* Insert the hash entry for the database to the Maintenance Deamon Hash.
*/
bool found = false;

myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found);

if (!myDbData)
{
/*
* If an entry cannot be created,
* return code of 1 requests worker restart
* Since BackgroundWorker for the MainDb is only registered
* once during server startup, we need to retry.
*/
proc_exit(1);
}

if (found && myDbData->workerPid != 0)
{
/* Another maintenance daemon is running.*/

proc_exit(0);
}

proc_exit(0);
databaseOid = MyDatabaseId;
myDbData->userOid = GetSessionUserId();
isMainDb = true;
}
else
{
myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonDBHash, &databaseOid,
HASH_FIND, NULL);

before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
if (!myDbData)
{
/*
* When the database crashes, background workers are restarted, but
* the state in shared memory is lost. In that case, we exit and
* wait for a session to call InitializeMaintenanceDaemonBackend
* to properly add it to the hash.
*/

proc_exit(0);
}

if (myDbData->workerPid != 0)
{
/*
* Another maintenance daemon is running. This usually happens because
* postgres restarts the daemon after an non-zero exit, and
* InitializeMaintenanceDaemonBackend started one before postgres did.
* In that case, the first one stays and the last one exits.
*/

proc_exit(0);
}
}

before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid));

/*
* Signal that I am the maintenance daemon now.
Expand All @@ -356,25 +455,55 @@ CitusMaintenanceDaemonMain(Datum main_arg)

LWLockRelease(&MaintenanceDaemonControl->lock);

/*
* Setup error context so log messages can be properly attributed. Some of
* them otherwise sound like they might be from a normal user connection.
* Do so before setting up signals etc, so we never exit without the
* context setup.
*/
ErrorContextCallback errorCallback = { 0 };
memset(&errorCallback, 0, sizeof(errorCallback));
errorCallback.callback = MaintenanceDaemonErrorContext;
errorCallback.arg = (void *) myDbData;
errorCallback.previous = error_context_stack;
error_context_stack = &errorCallback;


elog(LOG, "starting maintenance daemon on database %u user %u",
databaseOid, myDbData->userOid);

/* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
if (!isMainDb)
{
/* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
}

return myDbData;
}


/*
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
* be started by the background worker infrastructure. If it errors out,
* it'll be restarted after a few seconds.
*/
void
CitusMaintenanceDaemonMain(Datum main_arg)
{
Oid databaseOid = DatumGetObjectId(main_arg);
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
TimestampTz lastRecoveryTime = 0;
TimestampTz lastShardCleanTime = 0;
TimestampTz lastStatStatementsPurgeTime = 0;
TimestampTz nextMetadataSyncTime = 0;

/* state kept for the background tasks queue monitor */
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
bool backgroundTasksQueueWarnedForLock = false;


/*
* We do metadata sync in a separate background worker. We need its
* handle to be able to check its status.
*/
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;

MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);

/* make worker recognizable in pg_stat_activity */
pgstat_report_appname("Citus Maintenance Daemon");
Expand All @@ -383,7 +512,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
* Terminate orphaned metadata sync daemons spawned from previously terminated
* or crashed maintenanced instances.
*/
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM);

/* enter main loop */
while (!got_SIGTERM)
Expand Down Expand Up @@ -945,7 +1074,7 @@ MaintenanceDaemonShmemExit(int code, Datum arg)
}


/* MaintenanceDaemonSigTermHandler calls proc_exit(0) */
/* MaintenanceDaemonSigTermHandler sets the got_SIGTERM flag.*/
static void
MaintenanceDaemonSigTermHandler(SIGNAL_ARGS)
{
Expand Down
Loading
Loading