Skip to content
This repository has been archived by the owner on Sep 30, 2019. It is now read-only.

Event bus #31

Merged
merged 8 commits into from
Aug 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import JsonApiModule from './json-api/json-api-module';
import AuthenticationModule from './authentication/authentication-module';
import DeploymentPlugin from './deployment/deployment-hapi-plugin';

import { default as DeploymentModule, deploymentFolderInjectSymbol } from './deployment/deployment-module';
import { CIProxy, DeploymentModule, deploymentFolderInjectSymbol } from './deployment';

import ProjectPlugin from './project/project-hapi-plugin';
import ProjectModule from './project/project-module';
Expand All @@ -23,8 +23,7 @@ import HelloPlugin from './hello/hello-hapi-plugin';

import UserModule from './user/user-module';

import { EventBus } from './event-bus/event-bus';
import LocalEventBus from './event-bus/local-event-bus';
import { LocalEventBus, injectSymbol as eventBusInjectSymbol } from './event-bus';

import MinardServer, {hostInjectSymbol, portInjectSymbol} from './server/server';

Expand All @@ -43,16 +42,17 @@ const kernel = new Kernel();
// dependencies into EventBus
//
// -- JO 25.6.2016
kernel.bind(EventBus.injectSymbol).toConstantValue(new LocalEventBus());
kernel.bind(eventBusInjectSymbol).toConstantValue(new LocalEventBus());
kernel.bind(loggerInjectSymbol).toConstantValue(Logger(undefined, false, process.env.DEBUG ? true : false));
kernel.bind(DeploymentPlugin.injectSymbol).to(DeploymentPlugin);
kernel.bind(DeploymentModule.injectSymbol).to(DeploymentModule);
kernel.bind(HelloPlugin.injectSymbol).to(HelloPlugin);
kernel.bind(DeploymentModule.injectSymbol).to(DeploymentModule).inSingletonScope();
kernel.bind(HelloPlugin.injectSymbol).to(HelloPlugin).inSingletonScope();
kernel.bind(MinardServer.injectSymbol).to(MinardServer).inSingletonScope();
kernel.bind(UserModule.injectSymbol).to(UserModule);
kernel.bind(CIProxy.injectSymbol).to(CIProxy);

kernel.bind(GitlabClient.injectSymbol).to(GitlabClient).inSingletonScope();
kernel.bind(ProjectModule.injectSymbol).to(ProjectModule);
kernel.bind(ProjectModule.injectSymbol).to(ProjectModule).inSingletonScope();
kernel.bind(ProjectPlugin.injectSymbol).to(ProjectPlugin);
kernel.bind(SystemHookModule.injectSymbol).to(SystemHookModule);
kernel.bind(AuthenticationModule.injectSymbol).to(AuthenticationModule);
Expand Down
23 changes: 5 additions & 18 deletions src/deployment/deployment-hapi-plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ import * as Hapi from 'hapi';
import { inject, injectable } from 'inversify';

import { HapiRegister } from '../server/hapi-register';
import DeploymentModule, { DeploymentKey, getDeploymentKey, isRawDeploymentHostname} from './deployment-module';
import DeploymentModule, { getDeploymentKey, isRawDeploymentHostname} from './deployment-module';

import { gitlabHostInjectSymbol } from '../shared/gitlab-client';

import { proxyCI } from './proxy-ci';
import * as path from 'path';

const directoryHandler = require('inert/lib/directory').handler;
Expand Down Expand Up @@ -53,19 +52,6 @@ class DeploymentHapiPlugin {
},
});

server.route({
method: '*',
path: '/ci/api/v1/{what}/{id}/{action?}',
handler: proxyCI.bind(null, this.gitlabHost,
this.deploymentModule.setDeploymentState.bind(this.deploymentModule)),
config: {
payload: {
output: 'stream',
parse: false,
},
},
});

server.route({
method: 'GET',
path: '/ci/projects/{id}/{ref}/{sha}/{action}',
Expand Down Expand Up @@ -95,17 +81,18 @@ my_job:
};

public async rawDeploymentHandler(request: Hapi.Request, reply: Hapi.IReply) {
const key = getDeploymentKey(request.info.hostname) as DeploymentKey;
const projectId = key.projectId;
const deploymentId = key.deploymentId;
const key = getDeploymentKey(request.info.hostname);

if (!key) {
return reply({
status: 403,
message: `Could not parse deployment URL from hostname '${request.info.hostname}'`});
}

const projectId = key.projectId;
const deploymentId = key.deploymentId;
const isReady = this.deploymentModule.isDeploymentReadyToServe(projectId, deploymentId);

if (!isReady) {
try {
await this.deploymentModule.prepareDeploymentForServing(projectId, deploymentId);
Expand Down
114 changes: 87 additions & 27 deletions src/deployment/deployment-module-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ import * as path from 'path';

import { expect } from 'chai';

import DeploymentModule, { DeploymentKey, MinardDeployment, getDeploymentKey } from './deployment-module';
import {
DEPLOYMENT_EVENT_TYPE, DeploymentEvent, DeploymentModule,
MinardDeployment, createDeploymentEvent, getDeploymentKey,
} from './';

import Authentication from '../authentication/authentication-module';
import EventBus from '../event-bus/local-event-bus';
import { IFetchStatic } from '../shared/fetch.d.ts';
import { GitlabClient } from '../shared/gitlab-client';
import Logger from '../shared/logger';
import Logger from '../shared/logger';

const fetchMock = require('fetch-mock');
const rimraf = require('rimraf');
Expand All @@ -32,8 +36,14 @@ const getClient = () => {
};

const logger = Logger(undefined, true);
const eventBus = new EventBus();

const getDeploymentModule = (client: GitlabClient, path: string) => new DeploymentModule(client, path, logger);
const getDeploymentModule = (client: GitlabClient, path: string) => new DeploymentModule(
client,
path,
eventBus,
logger
);

const gitLabBuildsResponse = [
{
Expand Down Expand Up @@ -194,16 +204,16 @@ describe('deployment-module', () => {

describe('getDeployments()', () => {
it('it should work with response returning two deployments', async () => {
// Arrange
const gitlabClient = getClient();
fetchMock.restore().mock(`${host}${gitlabClient.apiPrefix}/projects/1/builds`, gitLabBuildsResponse);
const deploymentModule = getDeploymentModule(gitlabClient, '');
// Act
const deployments = await deploymentModule.getProjectDeployments(1) as MinardDeployment[];
// Assert
expect(deployments.length).equals(2);
expect(deployments[0].id).equals(7);
});
// Arrange
const gitlabClient = getClient();
fetchMock.restore().mock(`${host}${gitlabClient.apiPrefix}/projects/1/builds`, gitLabBuildsResponse);
const deploymentModule = getDeploymentModule(gitlabClient, '');
// Act
const deployments = await deploymentModule.getProjectDeployments(1) as MinardDeployment[];
// Assert
expect(deployments.length).equals(2);
expect(deployments[0].id).equals(7);
});
});

it('downloadAndExtractDeployment()', async () => {
Expand Down Expand Up @@ -235,7 +245,7 @@ describe('deployment-module', () => {
});

it('getDeploymentPath()', () => {
const deploymentModule = getDeploymentModule({ } as GitlabClient, 'example');
const deploymentModule = getDeploymentModule({} as GitlabClient, 'example');
const deploymentPath = deploymentModule.getDeploymentPath(1, 4);
expect(deploymentPath).to.equal('example/1/4');
});
Expand Down Expand Up @@ -311,35 +321,85 @@ describe('deployment-module', () => {

describe('getDeploymentKey()', () => {

let ret: (DeploymentKey | null) = null;

it('should match localhost hostname with single-digit ids', () => {
ret = getDeploymentKey('fdlkasjs-4-1.localhost') as DeploymentKey;
const ret = getDeploymentKey('fdlkasjs-4-1.localhost');
if (ret === null) { throw new Error(); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not expect(ret).to.exist be cleaner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this way TypeScript knows to narrow down DeploymentKey | null without explicit casting. The error message could be a bit more descriptive, though :D .

expect(ret.projectId).to.equal(4);
expect(ret.deploymentId).to.equal(1);
});

it('should match localhost hostname with multi-digit ids', () => {
ret = getDeploymentKey('fdlkasjs-523-2667.localhost') as DeploymentKey;
const ret = getDeploymentKey('fdlkasjs-523-2667.localhost');
if (ret === null) { throw new Error(); }
expect(ret.projectId).to.equal(523);
expect(ret.deploymentId).to.equal(2667);
});

it('should match minard.io hostname with multi-digit ids', () => {
ret = getDeploymentKey('fdlkasjs-145-3.minard.io') as DeploymentKey;
const ret = getDeploymentKey('fdlkasjs-145-3.minard.io');
if (ret === null) { throw new Error(); }
expect(ret.projectId).to.equal(145);
expect(ret.deploymentId).to.equal(3);
});

it('should not match non-matching hostnames', () => {
ret = getDeploymentKey('fdlkasjs-523-2667');
expect(ret).to.equal(null);
ret = getDeploymentKey('fdlkasjs-525.localhost');
expect(ret).to.equal(null);
ret = getDeploymentKey('fdlkasjs525-52.localhost');
expect(ret).to.equal(null);
ret = getDeploymentKey('fdlkasjs525-52.minard.io');
expect(ret).to.equal(null);
const ret1 = getDeploymentKey('fdlkasjs-523-2667');
expect(ret1).to.equal(null);
const ret2 = getDeploymentKey('fdlkasjs-525.localhost');
expect(ret2).to.equal(null);
const ret3 = getDeploymentKey('fdlkasjs525-52.localhost');
expect(ret3).to.equal(null);
const ret4 = getDeploymentKey('fdlkasjs525-52.minard.io');
expect(ret4).to.equal(null);
});

});

describe('deployment events', () => {

it('should post \'extracted\' event', async (done) => {
// Arrange
const bus = new EventBus();
rimraf.sync(path.join(os.tmpdir(), 'minard'));
const thePath = path.join(__dirname, '../../src/deployment/test-artifact.zip');
const stream = fs.createReadStream(thePath);
const opts = {
status: 200,
statusText: 'ok',
};
const response = new Response(stream, opts);
const gitlabClient = getClient();
const mockUrl = `${host}${gitlabClient.apiPrefix}/projects/1/builds/1/artifacts`;
fetchMock.restore().mock(mockUrl, response);
const deploymentsDir = path.join(os.tmpdir(), 'minard', 'deploys');

const deploymentModule = new DeploymentModule( /* ts-lint-disable-line */
gitlabClient,
deploymentsDir,
bus,
logger
);
expect(deploymentModule.getDeploymentPath(1, 1)).to.exist;

try {
const eventPromise = bus
.filterEvents<DeploymentEvent>(DEPLOYMENT_EVENT_TYPE)
.map(e => e.payload)
.filter(e => e.status === 'extracted')
.take(1)
.toPromise();

bus.post(createDeploymentEvent({ status: 'running', id: 1, projectId: 1 }));
bus.post(createDeploymentEvent({ status: 'running', id: 2, projectId: 2 }));
bus.post(createDeploymentEvent({ status: 'success', id: 1 }));

const event = await eventPromise;
expect(event.status).to.eq('extracted');
expect(event.id).to.eq(1);
done();
} catch (err) {
done(err);
}
});

});
Expand Down
85 changes: 49 additions & 36 deletions src/deployment/deployment-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ import * as Boom from 'boom';

import { inject, injectable } from 'inversify';

import * as rx from '@reactivex/rxjs';

import { EventBus, injectSymbol as eventBusInjectSymbol } from '../event-bus';
import { GitlabClient } from '../shared/gitlab-client';
import { Deployment } from '../shared/gitlab.d.ts';
import * as logger from '../shared/logger';
import * as logger from '../shared/logger';
import { DEPLOYMENT_EVENT_TYPE, Deployment, DeploymentEvent, DeploymentStatus,
MinardDeployment, createDeploymentEvent } from './types';

import * as fs from 'fs';
import * as os from 'os';
Expand All @@ -17,28 +21,6 @@ const deepcopy = require('deepcopy');

export const deploymentFolderInjectSymbol = Symbol('deployment-folder');

export interface DeploymentKey {
projectId: number;
deploymentId: number;
}

export interface MinardDeploymentPlain {
ref: string;
status: string;
url?: string;
screenshot?: string;
finished_at: string;
}

interface CommitRef {
id: string;
}

export interface MinardDeployment extends MinardDeploymentPlain {
id: number;
commitRef: CommitRef;
}

export function isRawDeploymentHostname(hostname: string) {
return getDeploymentKey(hostname) !== null;
}
Expand All @@ -63,15 +45,52 @@ export default class DeploymentModule {
private readonly deploymentFolder: string;
private readonly logger: logger.Logger;

private eventBus: EventBus;
private buildToProject = new Map<number, number>();
private events: rx.Observable<DeploymentEvent>;

public constructor(
@inject(GitlabClient.injectSymbol) gitlab: GitlabClient,
@inject(deploymentFolderInjectSymbol) deploymentFolder: string,
@inject(eventBusInjectSymbol) eventBus: EventBus,
@inject(logger.loggerInjectSymbol) logger: logger.Logger) {
this.gitlab = gitlab;
this.deploymentFolder = deploymentFolder;
this.logger = logger;
this.eventBus = eventBus;

this.events = eventBus
.filterEvents<DeploymentEvent>(DEPLOYMENT_EVENT_TYPE)
.map(e => e.payload);

this.subscribeToEvents();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the DeploymentModule is subscribing to events like this, should we make it a singleton in app.ts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, there absolutely should only exist a single EventBus.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already bound to a constant value

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EventBus is, but DeploymentModule is not.

Should DeploymentModule not be also a singleton?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, misunderstood the question.

Should DeploymentModule not be also a singleton?

No, not really. It's the EventBus's job to enforce at-most-once delivery semantics for the events.

Copy link
Contributor

@juhoojala juhoojala Aug 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, looks like I'm missing something. What is the logic of the "at-most-once delivery semantics"?

This (of course) outputs the event twice:

const bus = new EventBus();
// subscribe twice
bus.subscribe(event => { console.log(event); });
bus.subscribe(event => { console.log(event); });
bus.post(testEventCreator({ status: 'bar', foo: 'foo' }));

With two DeploymentModules there are two similar subscriptions, and both will be getting the event.

So what am I missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that when we have a real EventBus, it will make sure that each event is delivered just once. Yes, with current implementation it broadcasts every event to every subscriber and you are right that it will cause problems if DeploymentModule is not a singleton. I didn't think that we are injecting it to multiple places, but indeed we are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A REALLY good catch actually.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being a singleton is thus a requirement for every class that uses the EventBus.


}

private subscribeToEvents() {
this.events.subscribe(e => this.setDeploymentState(e.id, e.status, e.projectId));
// On successfully completed deployments, download, extract and post an 'extracted' event
this.completedDeployments()
.filter(event => event.status === 'success')
.flatMap(event => this.downloadAndExtractDeployment(event.projectId, event.id).then(_ => event))
.subscribe(event =>
this.eventBus.post(createDeploymentEvent(Object.assign({}, event, {status: 'extracted'}))));
}

private completedDeployments() {
const events = this.events;
// The initial events for a new deployment have status 'running' and always include the projectId
const started = events.filter(e => e.status === 'running' && e.projectId !== undefined);
// We use a flatMap to return a single event *with* the projectId, when the deployment has finished
return started
.flatMap(initial => events.filter(later => later.id === initial.id && this.isFinished(later.status))
.map(later => ({id: later.id, status: later.status, projectId: initial.projectId as number}))
);

}

private isFinished(status: DeploymentStatus) {
return status === 'success' || status === 'failed' || status === 'canceled';
}

private async getDeployments(url: string): Promise<MinardDeployment[]> {
Expand Down Expand Up @@ -154,21 +173,15 @@ export default class DeploymentModule {
}
}

public setDeploymentState(buildId: number, state: string, projectId?: number) {
public setDeploymentState(deploymentId: number, state: string, projectId?: number) {
if (projectId) {
this.buildToProject.set(buildId, projectId);
this.buildToProject.set(deploymentId, projectId);
}
const _projectId = this.buildToProject.get(buildId);
const _projectId = this.buildToProject.get(deploymentId);
if (!_projectId) {
throw new Error(`Couldn't find projectId for build ${buildId}`);
}
console.log(`Build ${_projectId}/${buildId}: ${state}`);
if (state === 'success') {
this.downloadAndExtractDeployment(_projectId, buildId)
.then(path => {
console.log(`Extracted the artifacts to path ${path}`);
});
throw new Error(`Couldn't find projectId for build ${deploymentId}`);
}
// console.log(`Build ${_projectId}/${deploymentId}: ${state}`);
}

/*
Expand All @@ -182,7 +195,7 @@ export default class DeploymentModule {
const tempDir = path.join(os.tmpdir(), 'minard');
mkpath.sync(tempDir);
let readableStream = (<any> response).body;
const tempFileName = path.join(tempDir, `minard-${projectId}-${deploymentId}.zip`);
const tempFileName = path.join(tempDir, `minard-${projectId}-${deploymentId}.zip`);
const writeStream = fs.createWriteStream(tempFileName);

await new Promise<void>((resolve, reject) => {
Expand Down
Loading