Skip to content

Commit

Permalink
create topic subscriptions on consuming side
Browse files Browse the repository at this point in the history
To avoid cyclic references [1], we need to create
SNS subscriptions on the conuming stack instead of the
topic's stack.

This change implements a workaround to the issue.

aws/aws-cdk#3064
  • Loading branch information
Elad Ben-Israel committed Jun 25, 2019
1 parent 4329c5a commit 0a5826c
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
1 change: 1 addition & 0 deletions grapl-cdk/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cdk.out/
35 changes: 24 additions & 11 deletions grapl-cdk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ class SysmonSubgraphGenerator extends cdk.Stack {
const service = new Service(this, 'sysmon-subgraph-generator', environment);

service.readsFrom(reads_from);
subscribes_to.addSubscription(new snsSubs.SqsSubscription(service.queues.queue));
addSubscription(this, subscribes_to, new snsSubs.SqsSubscription(service.queues.queue));
service.publishesToBucket(writes_to);
}
}
Expand All @@ -475,11 +475,25 @@ class GenericSubgraphGenerator extends cdk.Stack {
const service = new Service(this, 'generic-subgraph-generator', environment);

service.readsFrom(reads_from);
subscribes_to.addSubscription(new snsSubs.SqsSubscription(service.queues.queue));

addSubscription(this, subscribes_to, new snsSubs.SqsSubscription(service.queues.queue));

service.publishesToBucket(writes_to);
}
}

function addSubscription(scope, topic, subscription) {
const config = subscription.bind(topic);

new sns.Subscription(scope, 'Subscription', {
topic: topic,
endpoint: config.endpoint,
filterPolicy: config.filterPolicy,
protocol: config.protocol,
rawMessageDelivery: config.rawMessageDelivery
});
}


class NodeIdentityMapper extends cdk.Stack {

Expand All @@ -500,7 +514,7 @@ class NodeIdentityMapper extends cdk.Stack {

service.readsFrom(reads_from);

subscribes_to.addSubscription(new snsSubs.SqsSubscription(service.queues.queue));
addSubscription(this, subscribes_to, new snsSubs.SqsSubscription(service.queues.queue));

service.event_handler.connections.allowToAnyIPv4(ec2.Port.tcp(443), 'Allow outbound to S3');
service.event_retry_handler.connections.allowToAnyIPv4(ec2.Port.tcp(443), 'Allow outbound to S3');
Expand Down Expand Up @@ -530,7 +544,7 @@ class NodeIdentifier extends cdk.Stack {

history_db.allowReadWrite(service);
service.publishesToBucket(writes_to);
subscribes_to.addSubscription(new snsSubs.SqsSubscription(service.queues.queue));
addSubscription(this, subscribes_to,, new snsSubs.SqsSubscription(service.queues.queue));
service.event_handler.connections.allowToAnyIPv4(ec2.Port.tcp(443), 'Allow outbound to S3');
service.event_retry_handler.connections.allowToAnyIPv4(ec2.Port.tcp(443), 'Allow outbound to S3');

Expand Down Expand Up @@ -563,7 +577,7 @@ class GraphMerger extends cdk.Stack {
service.readsFrom(reads_from);
service.publishesToTopic(publishes_to);

subscribes_to.addSubscription(new snsSubs.SqsSubscription(service.queues.queue));
addSubscription(this, subscribes_to, new snsSubs.SqsSubscription(service.queues.queue));
//
// service.event_handler.connections
// .allowToAnyIPv4(new ec2.Port({
Expand Down Expand Up @@ -594,12 +608,12 @@ class AnalyzerDispatch extends cdk.Stack {
};

const service = new Service(this, 'analyzer-dispatcher', environment, vpc);
;

service.publishesToBucket(writes_to);
// We need the List capability to find each of the analyzers
service.readsFrom(reads_from, true);

subscribes_to.addSubscription(new snsSubs.SqsSubscription(service.queues.queue));
addSubscription(this, subscribes_to, new snsSubs.SqsSubscription(service.queues.queue));

service.event_handler.connections.allowToAnyIPv4(ec2.Port.allTcp(), 'Allow outbound to S3');
service.event_retry_handler.connections.allowToAnyIPv4(ec2.Port.allTcp(), 'Allow outbound to S3');
Expand Down Expand Up @@ -658,8 +672,7 @@ class AnalyzerExecutor extends cdk.Stack {
service.event_handler.addToRolePolicy(policy);
service.event_retry_handler.addToRolePolicy(policy);


subscribes_to.addSubscription(new snsSubs.SqsSubscription(service.queues.queue));
addSubscription(this, subscribes_to, new snsSubs.SqsSubscription(service.queues.queue));

service.event_handler.connections.allowToAnyIPv4(ec2.Port.allTcp(), 'Allow outbound to S3');
service.event_retry_handler.connections.allowToAnyIPv4(ec2.Port.allTcp(), 'Allow outbound to S3');
Expand Down Expand Up @@ -696,7 +709,7 @@ class EngagementCreator extends cdk.Stack {
service.readsFrom(reads_from);
service.publishesToTopic(publishes_to);

subscribes_to.addSubscription(new snsSubs.SqsSubscription(service.queues.queue));
addSubscription(this, subscribes_to, new snsSubs.SqsSubscription(service.queues.queue));

service.event_handler.connections.allowToAnyIPv4(ec2.Port.allTcp(), 'Allow outbound to S3');
service.event_retry_handler.connections.allowToAnyIPv4(ec2.Port.allTcp(), 'Allow outbound to S3');
Expand Down Expand Up @@ -997,7 +1010,7 @@ class Grapl extends cdk.App {
constructor() {
super();

env(__dirname + '/.env');
// env(__dirname + '/.env');

const mgZeroCount = Number(process.env.MG_ZEROS_COUNT) || 3;
const mgAlphaCount = Number(process.env.MG_ALPHAS_COUNT) || 5;
Expand Down
3 changes: 1 addition & 2 deletions grapl-cdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"@aws-cdk/core": "^0.36.0",
"@types/node": "^10.14.6",
"aws-cdk": "^0.36.0",
"node-env-file": "^0.1.8",
"@aws-cdk/aws-cloudmap": "latest"
"node-env-file": "^0.1.8"
}
}

0 comments on commit 0a5826c

Please sign in to comment.