Skip to content

Commit

Permalink
feat: Dns reconciler queue (#447)
Browse files Browse the repository at this point in the history
* feat: Dns Reconciler queue

* fix: init

* fix: comments

* improvements

* small fixes

* fix: error during testing Credit: @humphd
  • Loading branch information
dadolhay authored Mar 27, 2023
1 parent b612baa commit ca739e0
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 42 deletions.
49 changes: 49 additions & 0 deletions app/models/system-state.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { StateEnumType } from '@prisma/client';

import { prisma } from '~/db.server';

import type { SystemState } from '@prisma/client';

/**
* The SystemState represents global (i.e., cross-instance), shared settings among Starchart instance(s).
* Initially, this includes info on whether or not the DNS Reconciler needs to be run at the next opportunity.
* We use this global SystemState to make sure that we stop hitting the Route53 API when no changes were made
* to our Records table
*/

function initialize() {
return prisma.systemState.create({
data: {
unique: StateEnumType.unique,
reconciliationNeeded: true,
},
});
}

export function getIsReconciliationNeeded(): Promise<SystemState['reconciliationNeeded']> {
return prisma.systemState
.findUnique({
select: { reconciliationNeeded: true },
where: { unique: StateEnumType.unique },
})
.then((data) => data?.reconciliationNeeded ?? true);
}

export function setIsReconciliationNeeded(
reconciliationNeeded: SystemState['reconciliationNeeded']
) {
try {
return prisma.systemState.update({
data: { reconciliationNeeded },
where: { unique: StateEnumType.unique },
});
} catch (error) {
/**
* This should never happen, as the table should always be seeded.
* In case it isn't, let's seed it here Next queue run will set the
* correct reconciliationNeeded
*/

return initialize();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { get, set } from 'lodash';
import { getReconciliationData } from '~/models/dns-record.server';
import { buildDomain } from '../utils';
import { buildDomain } from '../../utils';

import type { ReconcilerCompareStructure } from './ReconcilerTypes';

Expand Down
File renamed without changes.
43 changes: 43 additions & 0 deletions app/queues/reconciler/reconciler-queue.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logger from '~/lib/logger.server';
import { reconcilerQueue } from './reconciler-worker.server';

declare global {
var __reconciler_queue_init__: boolean;
}

reconcilerQueue.on('error', (err) => {
logger.error('Reconciler encountered an error', err);
});

// function to add jobs
export const addReconcilerJob = async () => {
if (process.env.NODE_ENV !== 'production' && global.__reconciler_queue_init__) {
// Only do this setup once if in dev
return;
}

global.__reconciler_queue_init__ = true;

logger.info('Starting DNS reconciler queue');

const jobName = `reconciler-scheduler`;

try {
// Remove all previously existing repeatable jobs
// This is important because multiple repeatable jobs can exist and they persist
// within redis (even with the same key)
const repeatableJobs = await reconcilerQueue.getRepeatableJobs();
await Promise.all(repeatableJobs.map(({ key }) => reconcilerQueue.removeRepeatableByKey(key)));

await reconcilerQueue.add(
jobName,
{},
{
repeatJobKey: jobName,
repeat: { every: 2 * 60 * 1000 },
}
);
} catch (err) {
logger.error(`Failed to start reconciler queue: ${err}`);
}
};
154 changes: 154 additions & 0 deletions app/queues/reconciler/reconciler-worker.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { Worker, Queue } from 'bullmq';
import { redis } from '~/lib/redis.server';
import { executeChangeSet } from '~/lib/dns.server';
import logger from '~/lib/logger.server';
import DnsDbCompareStructureGenerator from './DnsDbCompareStructureGenerator.server';
import Route53CompareStructureGenerator from './Route53CompareStructureGenerator.server';
import {
createRemovedChangeSetFromCompareStructures,
createUpsertedChangeSetFromCompareStructures,
} from './createChangeSetFromCompareStructures.server';
import { getIsReconciliationNeeded, setIsReconciliationNeeded } from '~/models/system-state.server';
import type { Change } from '@aws-sdk/client-route-53';

// S3 limit for a ChangeSet
const CHANGE_SET_MAX_SIZE = 1000;
const reconcilerQueueName = 'reconciler';

// Queue initialization
export const reconcilerQueue = new Queue(reconcilerQueueName, {
connection: redis,
});

const createChangeSet = async (): Promise<Change[]> => {
const [dbStructure, route53Structure] = await Promise.all([
new DnsDbCompareStructureGenerator().generate(),
new Route53CompareStructureGenerator().generate(),
]);

const changeSet = [
...createRemovedChangeSetFromCompareStructures({ dbStructure, route53Structure }),
...createUpsertedChangeSetFromCompareStructures({ dbStructure, route53Structure }),
];

return changeSet;
};

/**
* NORMAL MODE
*
* Execute the complete changeSet at once
*/
const pushChangesBulk = async (changeSet: Change[]): Promise<boolean> => {
const recordSetsToPush = Math.min(CHANGE_SET_MAX_SIZE, changeSet.length);

logger.debug(
`Reconciler NORMAL MODE - Reconciler intends to push the following ${recordSetsToPush} changes`,
{
changeSet,
}
);

await executeChangeSet(changeSet.slice(0, CHANGE_SET_MAX_SIZE));

// Return boolean => Is additional reconciliation needed
return changeSet.length > CHANGE_SET_MAX_SIZE;
};

/**
* LIMP MODE
*
* Try each change in the set one by one, isolate the offending one
*/
const pushChangesIndividually = async (changeSet: Change[]) => {
for (const change of changeSet) {
try {
logger.debug(`Reconciler LIMP MODE - Reconciler intends to push the following change`, {
change,
});

await executeChangeSet([change]);
} catch (error) {
logger.error(`Reconciler LIMP MODE - the following single change failed`, {
change,
error,
});
}
}
};

const reconcilerWorker = new Worker(
reconcilerQueueName,
async () => {
/**
* When a BullMQ worker is added, it will behave as single
* threaded ... will only execute one job at a time.
* But, if we use multiple instances, i.e. our docker swarm,
* we have one worker per instance, taking jobs from the queue.
*
* When you add a repeat job to a queue, it is a special thing,
* not a regular job. It causes the queue system to keep adding
* delayed `regular` jobs when the repeat pattern/integer
* dictates it.
*
* In theory, it would be possible, that a job is running long
* (more than our job repeat time), so the second swarm
* node would pick up the next scheduled job, causing the system
* to run two reconcilers in parallel.
*
* For this reason, I'm asking the BullMQ system ... tell me
* how many active jobs are there (this includes the current
* job too, that we are in right now). If the answer is > 1,
* it means that there was a pre-existing job already running,
* when we were started ==> we must exit to inhibit concurrency
*/

const activeJobs = (await reconcilerQueue.getJobs('active'))
// BullMQ bug, sometimes I get an array element with `undefined`, that should not be possible
.filter((v) => !!v);

if (activeJobs.length > 1) {
logger.debug('Reconciler - Inhibiting concurrent run');
return;
}

// Only run if reconciler was explicitly requested
if (!(await getIsReconciliationNeeded())) {
logger.debug('Reconciler - skipping current job, reconciler not needed.');
return;
}

const changeSet = await createChangeSet();

if (!changeSet.length) {
logger.debug('Reconciler - found no changes to be pushed');
await setIsReconciliationNeeded(false);
return;
}

// We are defaulting to true, if everything fails, the queue will retry in 2 mins
let isAdditionalReconciliationNeeded = true;
try {
// First, we try to bulk push all the cahnges at once.
isAdditionalReconciliationNeeded = await pushChangesBulk(changeSet);
} catch (error) {
// If that fails, we switch to limp mode, that pushes changes one by one
// This way we can pinpoint the offending change in the set
logger.error('Reconciler - Change set failed, switching to limp mode', { error });

await pushChangesIndividually(changeSet);
}

/**
* Update system state
*
* If changeSet is < CHANGE_SET_MAX_SIZE elements, then dns data that has been altered
* have now been reconciled
*/
await setIsReconciliationNeeded(isAdditionalReconciliationNeeded);
logger.debug('Reconciler - job complete', { isAdditionalReconciliationNeeded });
},
{ connection: redis }
);

process.on('SIGINT', () => reconcilerWorker.close());
39 changes: 0 additions & 39 deletions app/reconciler/index.ts

This file was deleted.

4 changes: 2 additions & 2 deletions app/routes/dev.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import {
addDeleteDnsRequest,
} from '~/queues/dns/index.server';
import { DnsRecordType } from '@prisma/client';
import { reconcile } from '~/reconciler';
import { setIsReconciliationNeeded } from '~/models/system-state.server';

import type { LoaderArgs, ActionArgs } from '@remix-run/node';

Expand All @@ -42,7 +42,7 @@ export const action = async ({ request }: ActionArgs) => {
message: 'Certificate requested',
});
case 'dns-reconciliation':
await reconcile();
await setIsReconciliationNeeded(true);
return json({
result: 'ok',
message: 'Reconciliation requested',
Expand Down
12 changes: 12 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ model Challenge {
certificate Certificate @relation(fields: [certificateId], references: [id], onDelete: Cascade)
}

model SystemState {
// As the following type is unique, enum, required and only one option
// it enforces that there is only one row in the table
unique StateEnumType @unique @default(unique)
reconciliationNeeded Boolean @default(false)
}

// This is to force only a single row to exist in the SystemState table
enum StateEnumType {
unique
}

enum DnsRecordType {
A
AAAA
Expand Down
8 changes: 8 additions & 0 deletions prisma/seed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@ async function seed() {
temporaryExpDate.setDate(temporaryExpDate.getDate() + 7); // expiration for notifications

// cleanup the existing database; no worries if it doesn't exist yet
await prisma.systemState.deleteMany().catch(() => {});
await prisma.dnsRecord.deleteMany().catch(() => {});
await prisma.challenge.deleteMany().catch(() => {});
await prisma.certificate.deleteMany().catch(() => {});
await prisma.user.deleteMany().catch(() => {});

await prisma.systemState.create({
data: {
unique: 'unique',
reconciliationNeeded: false,
},
});

await prisma.user.createMany({
data: [
{
Expand Down
5 changes: 5 additions & 0 deletions server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import helmet from 'helmet';
import cors from 'cors';

import logger from '~/lib/logger.server';
import { addReconcilerJob } from './app/queues/reconciler/reconciler-queue.server';

import type { Request, Response } from 'express';

Expand Down Expand Up @@ -81,6 +82,10 @@ const port = process.env.PORT || 8080;
const server = app.listen(port, () => {
// require the built app so we're ready when the first request comes in
require(BUILD_DIR);

// start the DNS reconciler
addReconcilerJob();

logger.info(`✅ app ready: http://localhost:${port}`);
});

Expand Down

0 comments on commit ca739e0

Please sign in to comment.