Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle container lifecycle events through eventbridge #261

Merged
merged 15 commits into from
Aug 17, 2020
2 changes: 2 additions & 0 deletions .github/workflows/frontend.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@ jobs:
run: npm run build
env:
REACT_APP_API_URL: https://api.staging.crossfeed.cyber.dhs.gov
REACT_APP_FARGATE_LOG_GROUP: crossfeed-staging-worker

- name: Build Production
if: github.ref == 'refs/heads/production'
run: npm run build
env:
REACT_APP_API_URL: https://api.crossfeed.cyber.dhs.gov
REACT_APP_FARGATE_LOG_GROUP: crossfeed-prod-worker

- name: Deploy Staging
if: github.ref == 'refs/heads/master'
Expand Down
2 changes: 1 addition & 1 deletion backend/Dockerfile.worker
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ COPY --from=deps /usr/bin/findomain /usr/bin/amass /go/bin/csv2cpe /go/bin/nvdsy

COPY --from=deps /etc/ssl/certs /etc/ssl/certs

CMD ["node", "worker.bundle.js"]
CMD ["node", "--unhandled-rejections=strict", "worker.bundle.js"]
4 changes: 4 additions & 0 deletions backend/env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ prod-vpc:
- ${ssm:/crossfeed/prod/SG_ID}
subnetIds:
- ${ssm:/crossfeed/prod/SUBNET_ID}

staging-ecs-cluster: ${ssm:/crossfeed/staging/WORKER_CLUSTER_ARN}

prod-ecs-cluster: ${ssm:/crossfeed/prod/WORKER_CLUSTER_ARN}
6 changes: 6 additions & 0 deletions backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"ts-node": "^8.10.2",
"ts-node-dev": "^1.0.0-pre.52",
"typescript": "^3.9.7",
"wait-for-expect": "^3.0.2",
"webpack": "^4.43.0",
"webpack-cli": "^3.3.12"
},
Expand Down
9 changes: 8 additions & 1 deletion backend/src/api/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,21 @@ import * as bodyParser from 'body-parser';
import * as cors from 'cors';
import { handler as healthcheck } from './healthcheck';
import * as auth from './auth';
import { login } from './auth';
import * as domains from './domains';
import * as vulnerabilities from './vulnerabilities';
import * as organizations from './organizations';
import * as scans from './scans';
import * as users from './users';
import * as scanTasks from './scan-tasks';
import * as stats from './stats';
import { listenForDockerEvents } from './docker-events';

if (
(process.env.IS_OFFLINE || process.env.IS_LOCAL) &&
typeof jest === 'undefined'
) {
listenForDockerEvents();
}

const handlerToExpress = (handler) => async (req, res, next) => {
const { statusCode, body } = await handler(
Expand Down
51 changes: 51 additions & 0 deletions backend/src/api/docker-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import * as Docker from 'dockerode';
import {
handler as updateScanTaskStatus,
EventBridgeEvent
} from '../tasks/updateScanTaskStatus';

/**
* Listens for Docker events and converts start / stop events to corresponding Fargate EventBridge events,
* so that they can be handled by the updateScanTaskStatus lambda function.
*
* This function is only used for runs during local development in order to simulate EventBridge events.
*/
export const listenForDockerEvents = async () => {
const docker = new Docker();
const stream = await docker.getEvents();
stream.on('data', async (chunk: any) => {
const message = JSON.parse(Buffer.from(chunk).toString('utf-8'));
if (message.from !== 'crossfeed-worker') {
return;
}
let payload: EventBridgeEvent;
if (message.status === 'start') {
payload = {
detail: {
stopCode: '',
stoppedReason: '',
taskArn: message.Actor.Attributes.name,
lastStatus: 'RUNNING',
containers: [{}]
}
};
} else if (message.status === 'die') {
payload = {
detail: {
stopCode: 'EssentialContainerExited',
stoppedReason: 'Essential container in task exited',
taskArn: message.Actor.Attributes.name,
lastStatus: 'STOPPED',
containers: [
{
exitCode: Number(message.Actor.Attributes.exitCode)
}
]
}
};
} else {
return;
}
await updateScanTaskStatus(payload, {} as any, () => null);
});
};
9 changes: 9 additions & 0 deletions backend/src/models/scan-task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ export class ScanTask extends BaseEntity {
@Column('text')
type: 'fargate' | 'lambda';

/**
* ARN of the associated fargate task.
*/
@Column({
type: 'text',
nullable: true
})
fargateTaskArn: string | null;

@Column({
type: 'text',
nullable: true
Expand Down
8 changes: 7 additions & 1 deletion backend/src/tasks/__mocks__/ecs-client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { CommandOptions } from '../ecs-client';

export const runCommand = jest.fn(async (commandOptions: CommandOptions) => {
return { tasks: [{}] };
return {
tasks: [
{
taskArn: 'mock_task_arn'
}
]
};
});

export default jest.fn(() => ({
Expand Down
23 changes: 14 additions & 9 deletions backend/src/tasks/ecs-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ECS } from 'aws-sdk';
import { SCAN_SCHEMA } from '../api/scans';
import * as Docker from 'dockerode';
cablej marked this conversation as resolved.
Show resolved Hide resolved

export interface CommandOptions {
organizationId?: string;
Expand All @@ -20,14 +21,13 @@ const toSnakeCase = (input) => input.replace(/ /g, '-');
*/
class ECSClient {
ecs?: ECS;
docker?: any;
docker?: Docker;
isLocal: boolean;

constructor() {
this.isLocal =
process.env.IS_OFFLINE || process.env.IS_LOCAL ? true : false;
if (this.isLocal) {
const Docker = require('dockerode');
this.docker = new Docker();
} else {
this.ecs = new ECS();
Expand All @@ -50,13 +50,14 @@ class ECSClient {
const { cpu, memory, global } = SCAN_SCHEMA[scanName];
if (this.isLocal) {
try {
const containerName = toSnakeCase(
`crossfeed_worker_${
global ? 'global' : organizationName
}_${scanName}_` + Math.floor(Math.random() * 10000000)
);
const container = await this.docker!.createContainer({
// We need to create unique container names to avoid conflicts.
name: toSnakeCase(
`crossfeed_worker_${
global ? 'global' : organizationName
}_${scanName}_` + Math.floor(Math.random() * 10000000)
),
name: containerName,
Image: 'crossfeed-worker',
Env: [
`CROSSFEED_COMMAND_OPTIONS=${JSON.stringify(commandOptions)}`,
Expand All @@ -76,10 +77,14 @@ class ECSClient {
// crossfeed-worker image; instead, we set NetworkMode to "host" and
// connect to "localhost."
NetworkMode: 'host'
});
} as any);
await container.start();
return {
tasks: [{}],
tasks: [
{
taskArn: containerName
}
],
failures: []
};
} catch (e) {
Expand Down
13 changes: 13 additions & 0 deletions backend/src/tasks/functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,16 @@ syncdb:

makeGlobalAdmin:
handler: src/tasks/makeGlobalAdmin.handler

updateScanTaskStatus:
handler: src/tasks/updateScanTaskStatus.handler
events:
- eventBridge:
pattern:
source:
- aws.ecs
detail-type:
- ECS Task State Change
detail:
clusterArn:
- ${file(env.yml):${self:provider.stage}-ecs-cluster, ''}
4 changes: 3 additions & 1 deletion backend/src/tasks/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ const launchSingleScanTask = async ({
} failures.`
);
}
const taskArn = result.tasks![0].taskArn;
scanTask.fargateTaskArn = taskArn;
if (typeof jest === 'undefined') {
console.log(
`Successfully invoked ${scan.name} scan with fargate. ` +
`Successfully invoked ${scan.name} scan with fargate, with ECS task ARN ${taskArn}. ` +
(numChunks ? ` Chunk ${chunkNumber}/${numChunks}` : '')
);
}
Expand Down
1 change: 1 addition & 0 deletions backend/src/tasks/test/scheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ describe('scheduler', () => {
runCommand.mock.calls[0][0].scanTaskId
);
expect(scanTask?.status).toEqual('requested');
expect(scanTask?.fargateTaskArn).toEqual('mock_task_arn');

scan = (await Scan.findOne(scan.id))!;
expect(scan.lastRun).toBeTruthy();
Expand Down
126 changes: 126 additions & 0 deletions backend/src/tasks/test/updateScanTaskStatus.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import {
handler as updateScanTaskStatus,
EventBridgeEvent
} from '../updateScanTaskStatus';
import { connectToDatabase, Scan, ScanTask } from '../../models';

let scan;
beforeAll(async () => {
await connectToDatabase();
scan = await Scan.create({
name: 'findomain',
arguments: {},
frequency: 999
}).save();
});

const createSampleEvent = ({
lastStatus,
taskArn,
exitCode = 0
}): EventBridgeEvent => ({
detail: {
attachments: [],
availabilityZone: 'us-east-1b',
clusterArn:
'arn:aws:ecs:us-east-1:563873274798:cluster/crossfeed-staging-worker',
containers: [
{
containerArn:
'arn:aws:ecs:us-east-1:563873274798:container/75d926f3-c850-4722-a143-639f02ff4756',
exitCode: exitCode,
lastStatus: lastStatus,
name: 'main',
image:
'563873274798.dkr.ecr.us-east-1.amazonaws.com/crossfeed-staging-worker:latest',
imageDigest:
'sha256:080467614c0d7d5a5b092023b762931095308ae1dec8e54481fbd951f9784391',
runtimeId: 'eeccdb34-d8cf-49e7-b379-1cf4f123c0ee-3935363592',
taskArn:
'arn:aws:ecs:us-east-1:563873274798:task/eeccdb34-d8cf-49e7-b379-1cf4f123c0ee',
networkInterfaces: [
{
attachmentId: '5ff58204-1180-48ab-a62e-0a65a17cc4ef',
privateIpv4Address: '10.0.3.61'
}
],
cpu: '0'
}
],
launchType: 'FARGATE',
cpu: '256',
memory: '512',
desiredStatus: 'STOPPED',
group: 'family:crossfeed-staging-worker',
lastStatus: lastStatus,
overrides: {
containerOverrides: [
{
environment: [],
name: 'main'
}
]
},
connectivity: 'CONNECTED',
stoppedReason: 'Essential container in task exited',
stopCode: 'EssentialContainerExited',
taskArn: taskArn,
taskDefinitionArn:
'arn:aws:ecs:us-east-1:563873274798:task-definition/crossfeed-staging-worker:2',
version: 5,
platformVersion: '1.4.0'
}
});

test('starting event', async () => {
const taskArn = Math.random() + '';
let scanTask = await ScanTask.create({
scan,
type: 'fargate',
status: 'requested',
fargateTaskArn: taskArn
}).save();
await updateScanTaskStatus(
createSampleEvent({ lastStatus: 'RUNNING', taskArn }),
{} as any,
() => null
);
scanTask = (await ScanTask.findOne(scanTask.id))!;
expect(scanTask.status).toEqual('started');
});

test('finished event', async () => {
const taskArn = Math.random() + '';
let scanTask = await ScanTask.create({
scan,
type: 'fargate',
status: 'started',
fargateTaskArn: taskArn
}).save();
await updateScanTaskStatus(
createSampleEvent({ lastStatus: 'STOPPED', taskArn, exitCode: 0 }),
{} as any,
() => null
);
scanTask = (await ScanTask.findOne(scanTask.id))!;
expect(scanTask.status).toEqual('finished');
expect(scanTask.finishedAt).toBeTruthy();
expect(scanTask.output).toContain('EssentialContainerExited');
});

test('failed event', async () => {
const taskArn = Math.random() + '';
let scanTask = await ScanTask.create({
scan,
type: 'fargate',
status: 'started',
fargateTaskArn: taskArn
}).save();
await updateScanTaskStatus(
createSampleEvent({ lastStatus: 'STOPPED', taskArn, exitCode: 1 }),
{} as any,
() => null
);
scanTask = (await ScanTask.findOne(scanTask.id))!;
expect(scanTask.status).toEqual('failed');
});
Loading