Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dns reconciler queue #447

Merged
6 commits merged into from Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions app/models/system-state.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { StateEnumType } from '@prisma/client';

import { prisma } from '~/db.server';

import type { SystemState } from '@prisma/client';

This conversation was marked as resolved.
Show resolved Hide resolved
/**
* The SystemState represents global (i.e., cross-instance), shared settings among Starchart instance(s).
* Initially, this includes info on whether or not the DNS Reconciler needs to be run at the next opportunity.
* We use this global SystemState to make sure that we stop hitting the Route53 API when no changes were made
* to our Records table
*/

function initialize() {
return prisma.systemState.create({
data: {
unique: StateEnumType.unique,
reconciliationNeeded: true,
},
});
}

export function getIsReconciliationNeeded(): Promise<SystemState['reconciliationNeeded']> {
return prisma.systemState
.findUnique({
select: { reconciliationNeeded: true },
where: { unique: StateEnumType.unique },
})
.then((data) => data?.reconciliationNeeded ?? true);
}

export function setIsReconciliationNeeded(
reconciliationNeeded: SystemState['reconciliationNeeded']
) {
try {
return prisma.systemState.update({
data: { reconciliationNeeded },
where: { unique: StateEnumType.unique },
});
} catch (error) {
/**
* This should never happen, as the table should always be seeded.
* In case it isn't, let's seed it here Next queue run will set the
* correct reconciliationNeeded
*/

return initialize();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { get, set } from 'lodash';
import { getReconciliationData } from '~/models/dns-record.server';
import { buildDomain } from '../utils';
import { buildDomain } from '../../utils';

import type { ReconcilerCompareStructure } from './ReconcilerTypes';

Expand Down
43 changes: 43 additions & 0 deletions app/queues/reconciler/reconciler-queue.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logger from '~/lib/logger.server';
import { reconcilerQueue } from './reconciler-worker.server';

declare global {
var __reconciler_queue_init__: boolean;
}

reconcilerQueue.on('error', (err) => {
logger.error('Reconciler encountered an error', err);
});

// function to add jobs
export const addReconcilerJob = async () => {
if (process.env.NODE_ENV !== 'production' && global.__reconciler_queue_init__) {
// Only do this setup once if in dev
This conversation was marked as resolved.
Show resolved Hide resolved
return;
}

global.__reconciler_queue_init__ = true;

logger.info('Starting DNS reconciler queue');

const jobName = `reconciler-scheduler`;

try {
// Remove all previously existing repeatable jobs
// This is important because multiple repeatable jobs can exist and they persist
// within redis (even with the same key)
const repeatableJobs = await reconcilerQueue.getRepeatableJobs();
await Promise.all(repeatableJobs.map(({ key }) => reconcilerQueue.removeRepeatableByKey(key)));

await reconcilerQueue.add(
jobName,
{},
{
repeatJobKey: jobName,
repeat: { every: 2 * 60 * 1000 },
}
);
} catch (err) {
logger.error(`Failed to start reconciler queue: ${err}`);
}
};
154 changes: 154 additions & 0 deletions app/queues/reconciler/reconciler-worker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { Worker, Queue } from 'bullmq';
import { redis } from '~/lib/redis.server';
import { executeChangeSet } from '~/lib/dns.server';
import logger from '~/lib/logger.server';
import DnsDbCompareStructureGenerator from './DnsDbCompareStructureGenerator.server';
import Route53CompareStructureGenerator from './Route53CompareStructureGenerator.server';
import {
createRemovedChangeSetFromCompareStructures,
createUpsertedChangeSetFromCompareStructures,
} from './createChangeSetFromCompareStructures.server';
import { getIsReconciliationNeeded, setIsReconciliationNeeded } from '~/models/system-state.server';
import type { Change } from '@aws-sdk/client-route-53';

// S3 limit for a ChangeSet
const CHANGE_SET_MAX_SIZE = 1000;
const reconcilerQueueName = 'reconciler';

// Queue initialization
export const reconcilerQueue = new Queue(reconcilerQueueName, {
connection: redis,
});

const createChangeSet = async (): Promise<Change[]> => {
const [dbStructure, route53Structure] = await Promise.all([
new DnsDbCompareStructureGenerator().generate(),
new Route53CompareStructureGenerator().generate(),
]);

const changeSet = [
...createRemovedChangeSetFromCompareStructures({ dbStructure, route53Structure }),
...createUpsertedChangeSetFromCompareStructures({ dbStructure, route53Structure }),
];

return changeSet;
};

/**
* NORMAL MODE
*
* Execute the complete changeSet at once
*/
const pushChangesBulk = async (changeSet: Change[]): Promise<boolean> => {
This conversation was marked as resolved.
Show resolved Hide resolved
const recordSetsToPush = Math.min(CHANGE_SET_MAX_SIZE, changeSet.length);

logger.debug(
`Reconciler NORMAL MODE - Reconciler intends to push the following ${recordSetsToPush} changes`,
{
changeSet,
}
);

await executeChangeSet(changeSet.slice(0, CHANGE_SET_MAX_SIZE));

// Return boolean => Is additional reconciliation needed
return changeSet.length > CHANGE_SET_MAX_SIZE;
};

/**
* LIMP MODE
This conversation was marked as resolved.
Show resolved Hide resolved
*
* Try each change in the set one by one, isolate the offending one
*/
const pushChangesIndividually = async (changeSet: Change[]) => {
for (const change of changeSet) {
try {
logger.debug(`Reconciler LIMP MODE - Reconciler intends to push the following change`, {
change,
});

await executeChangeSet([change]);
} catch (error) {
logger.error(`Reconciler LIMP MODE - the following single change failed`, {
change,
error,
});
}
}
};

const reconcilerWorker = new Worker(
reconcilerQueueName,
async () => {
/**
* When a BullMQ worker is added, it will behave as single
This conversation was marked as resolved.
Show resolved Hide resolved
* threaded ... will only execute one job at a time.
* But, if we use multiple instances, i.e. our docker swarm,
* we have one worker per instance, taking jobs from the queue.
*
* When you add a repeat job to a queue, it is a special thing,
* not a regular job. It causes the queue system to keep adding
* delayed `regular` jobs when the repeat pattern/integer
* dictates it.
*
* In theory, it would be possible, that a job is running long
* (more than our job repeat time), so the second swarm
* node would pick up the next scheduled job, causing the system
* to run two reconcilers in parallel.
*
* For this reason, I'm asking the BullMQ system ... tell me
* how many active jobs are there (this includes the current
* job too, that we are in right now). If the answer is > 1,
* it means that there was a pre-existing job already running,
* when we were started ==> we must exit to inhibit concurrency
*/

const activeJobs = (await reconcilerQueue.getJobs('active'))
// BullMQ bug, sometimes I get an array element with `undefined`, that should not be possible
This conversation was marked as resolved.
Show resolved Hide resolved
.filter((v) => !!v);

if (activeJobs.length > 1) {
This conversation was marked as resolved.
Show resolved Hide resolved
logger.debug('Reconciler - Inhibiting concurrent run');
return;
}

// Only run if reconciler was explicitly requested
if (!(await getIsReconciliationNeeded())) {
logger.debug('Reconciler - skipping current job, reconciler not needed.');
return;
}

const changeSet = await createChangeSet();

if (!changeSet.length) {
logger.debug('Reconciler - found no changes to be pushed');
await setIsReconciliationNeeded(false);
return;
}

// We are defaulting to true, if everything fails, the queue will retry in 2 mins
let isAdditionalReconciliationNeeded = true;
This conversation was marked as resolved.
Show resolved Hide resolved
try {
// First, we try to bulk push all the cahnges at once.
isAdditionalReconciliationNeeded = await pushChangesBulk(changeSet);
This conversation was marked as resolved.
Show resolved Hide resolved
} catch (error) {
// If that fails, we switch to limp mode, that pushes changes one by one
// This way we can pinpoint the offending change in the set
logger.error('Reconciler - Change set failed, switching to limp mode', { error });

await pushChangesIndividually(changeSet);
}

/**
* Update system state
*
* If changeSet is < CHANGE_SET_MAX_SIZE elements, then dns data that has been altered
* have now been reconciled
*/
await setIsReconciliationNeeded(isAdditionalReconciliationNeeded);
This conversation was marked as resolved.
Show resolved Hide resolved
logger.debug('Reconciler - job complete', { isAdditionalReconciliationNeeded });
},
{ connection: redis }
);

process.on('SIGINT', () => reconcilerWorker.close());
39 changes: 0 additions & 39 deletions app/reconciler/index.ts

This file was deleted.

4 changes: 2 additions & 2 deletions app/routes/dev.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
addDeleteDnsRequest,
} from '~/queues/dns/index.server';
import { DnsRecordType } from '@prisma/client';
import { reconcile } from '~/reconciler';
import { setIsReconciliationNeeded } from '~/models/system-state.server';

import type { LoaderArgs, ActionArgs } from '@remix-run/node';

Expand All @@ -42,7 +42,7 @@ export const action = async ({ request }: ActionArgs) => {
message: 'Certificate requested',
});
case 'dns-reconciliation':
await reconcile();
await setIsReconciliationNeeded(true);
return json({
result: 'ok',
message: 'Reconciliation requested',
Expand Down
12 changes: 12 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ model Challenge {
certificate Certificate @relation(fields: [certificateId], references: [id], onDelete: Cascade)
}

model SystemState {
// As the following type is unique, enum, required and only one option
// it enforces that there is only one row in the table
unique StateEnumType @unique @default(unique)
reconciliationNeeded Boolean @default(false)
}

// This is to force only a single row to exist in the SystemState table
enum StateEnumType {
unique
}

enum DnsRecordType {
A
AAAA
Expand Down
8 changes: 8 additions & 0 deletions prisma/seed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@ async function seed() {
temporaryExpDate.setDate(temporaryExpDate.getDate() + 7); // expiration for notifications

// cleanup the existing database; no worries if it doesn't exist yet
await prisma.systemState.deleteMany().catch(() => {});
await prisma.dnsRecord.deleteMany().catch(() => {});
await prisma.challenge.deleteMany().catch(() => {});
await prisma.certificate.deleteMany().catch(() => {});
await prisma.user.deleteMany().catch(() => {});

await prisma.systemState.create({
data: {
unique: 'unique',
reconciliationNeeded: false,
This conversation was marked as resolved.
Show resolved Hide resolved
},
});

await prisma.user.createMany({
data: [
{
Expand Down
5 changes: 5 additions & 0 deletions server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import helmet from 'helmet';
import cors from 'cors';

import logger from '~/lib/logger.server';
import { addReconcilerJob } from './app/queues/reconciler/reconciler-queue.server';

import type { Request, Response } from 'express';

Expand Down Expand Up @@ -81,6 +82,10 @@ const port = process.env.PORT || 8080;
const server = app.listen(port, () => {
// require the built app so we're ready when the first request comes in
require(BUILD_DIR);

// start the DNS reconciler
addReconcilerJob();

logger.info(`✅ app ready: http://localhost:${port}`);
});

Expand Down