Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mirroring auto-created/destroyed for EC2 instances #45

Merged
merged 1 commit into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions cdk-lib/mirror-stacks/vpc-mirror-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,23 @@ export class VpcMirrorStack extends Stack {
]
})
);
listenerLambda.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'ec2:DescribeInstances'
],
resources: ["*"]
})
);

// Make a human-readable log of the raw AWS Service events we're proccessing
const vpcLogGroup = new logs.LogGroup(this, 'LogGroup', {
logGroupName: `ArkimeInputEvents-${props.vpcId}`,
removalPolicy: RemovalPolicy.DESTROY // This is intended for debugging
});

// Capture Fargate stop/start events for processing
const fargateEventsRule = new events.Rule(this, 'RuleFargateEvents', {
eventBus: undefined, // We want to listen to the Account/Region's default bus
eventPattern: {
Expand All @@ -190,6 +201,27 @@ export class VpcMirrorStack extends Stack {
]
});

// Capture EC2 instance start/stop events. This should cover one-off instance creation, EC2 Autoscaling
// activities, and ECS-on-EC2. All three of those situations map to an ENI being created or destroyed when
// a concrete instance starts/stops, regardless of how many other steps/events are involved in the process.
//
// Unfortunately, this event does not give us the information we need to pre-screen it at the Rule level so
// we have to check if it applies to our VPC in our Lambda code.
const ec2EventsRule = new events.Rule(this, 'RuleEc2Events', {
eventBus: undefined, // We want to listen to the Account/Region's default bus
eventPattern: {
source: ["aws.ec2"],
detailType: ["EC2 Instance State-change Notification"],
detail: {
state: ["running", "shutting-down"]
}
},
targets: [
new targets.CloudWatchLogGroup(vpcLogGroup),
new targets.LambdaFunction(listenerLambda)
]
});

/**
* Configure the resources required for event-based mirroring configuration
*/
Expand Down
107 changes: 85 additions & 22 deletions cdk-lib/traffic-gen-sample/traffic-gen-stack.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import * as cdk from 'aws-cdk-lib';
import * as autoscaling from 'aws-cdk-lib/aws-autoscaling';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as iam from 'aws-cdk-lib/aws-iam';
Expand All @@ -7,61 +8,123 @@ import * as logs from 'aws-cdk-lib/aws-logs';
import * as path from 'path'
import { Construct } from 'constructs';


export class TrafficGenStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);

// Stock VPC w/ a Public/Private subnet pair in 1 AZ along with NATGateways providing internet access to the
// private VPCs.
/**
* Set up our demo Traffic Generator's networking
*/
// This is a Stock VPC w/ a Public/Private subnet pair in 1 AZ along with NATGateways providing internet access
// to the private subnet.
const vpc = new ec2.Vpc(this, 'VPC', {maxAzs: 1});

// Set up VPC Flow Logs to enable visibility of the traffic mirroring on the user-side
const flowLogsGroup = new logs.LogGroup(this, 'FlowLogsLogGroup', {
logGroupName: `FlowLogs-${id}`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
retention: logs.RetentionDays.TEN_YEARS,
});

new ec2.FlowLog(this, 'FlowLogs', {
resourceType: ec2.FlowLogResourceType.fromVpc(vpc),
destination: ec2.FlowLogDestination.toCloudWatchLogs(flowLogsGroup),
});

/**
* Set up some shared components.
*/
// Key to encrypt SSM traffic when using ECS Exec to shell into the container
const ksmEncryptionKey = new kms.Key(this, 'ECSClusterKey', {
const ssmKey = new kms.Key(this, 'SsmKey', {
enableKeyRotation: true,
});

// Create a Fargate service that runs a single instance of our traffic generation image
const cluster = new ecs.Cluster(this, 'Cluster', {
/**
* Create a Fargate service that runs our traffic generation image
*/
const fargateCluster = new ecs.Cluster(this, 'FargateCluster', {
vpc,
executeCommandConfiguration: { kmsKey: ksmEncryptionKey }
executeCommandConfiguration: { kmsKey: ssmKey }
});

const taskDefinition = new ecs.FargateTaskDefinition(this, 'TaskDef', {
const fargateTaskDef = new ecs.FargateTaskDefinition(this, 'TaskDef', {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always wonder does the id passed to these constructors matter? In the variable renaming I see you updated some (SsmKey), but there are some that you didn't. (TaskDef, Service)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ID ("TaskDef") is used in the name of the AWS Resource(s) associated with the CDK Construct you're instantiating. I can think of three considerations:

  • Making sure your name isn't too long that it overflows the available character limit
  • Making sure your name is sufficiently descriptive
  • Making sure your name doesn't collide with any other name

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said - you're really free to pass in whatever you like. Another considerations is that, typically, changing the ID will require the AWS Resource(s) associated with the Construct to be torn down and recreated in order for the name switch to take effect.

memoryLimitMiB: 512,
cpu: 256,
});
taskDefinition.addToTaskRolePolicy(
fargateTaskDef.addToTaskRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['kms:Decrypt'], // Required for ECS Exec & shelling into the container
resources: [ksmEncryptionKey.keyArn]
resources: [ssmKey.keyArn]
}),
);

const container = taskDefinition.addContainer('FargateContainer', {
const fargateContainer = fargateTaskDef.addContainer('FargateContainer', {
image: ecs.ContainerImage.fromAsset(path.resolve(__dirname, '..', '..', 'docker-traffic-gen')),
memoryLimitMiB: 512,
logging: new ecs.AwsLogDriver({ streamPrefix: 'DemoTrafficGen', mode: ecs.AwsLogDriverMode.NON_BLOCKING })
logging: new ecs.AwsLogDriver({ streamPrefix: 'DemoTrafficGenFargate', mode: ecs.AwsLogDriverMode.NON_BLOCKING })
});

const service = new ecs.FargateService(this, 'Service', {
cluster,
taskDefinition,
const fargateService = new ecs.FargateService(this, 'Service', {
cluster: fargateCluster,
taskDefinition: fargateTaskDef,
desiredCount: 1,
enableExecuteCommand: true
});

// Set up VPC Flow Logs to enable visibility of the traffic mirroring on the user-side
const flowLogsGroup = new logs.LogGroup(this, 'FlowLogsLogGroup', {
logGroupName: `FlowLogs-${id}`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
retention: logs.RetentionDays.TEN_YEARS,
/**
* Create an ECS-on-EC2 Cluster that runs our traffic generation image
*/

//
const ecsAsg = new autoscaling.AutoScalingGroup(this, 'EcsASG', {
vpc: vpc,
instanceType: new ec2.InstanceType('t3.micro'), // Arbitrarily chosen
machineImage: ecs.EcsOptimizedImage.amazonLinux2(),
desiredCapacity: 3,
minCapacity: 3,
maxCapacity: 10 // Arbitrarily chosen
});

new ec2.FlowLog(this, 'FlowLogs', {
resourceType: ec2.FlowLogResourceType.fromVpc(vpc),
destination: ec2.FlowLogDestination.toCloudWatchLogs(flowLogsGroup),
const ecsCluster = new ecs.Cluster(this, 'EcsCluster', {
vpc: vpc,
executeCommandConfiguration: { kmsKey: ssmKey }
});

const ecsCapacityProvider = new ecs.AsgCapacityProvider(this, 'EcsCapacityProvider', {
autoScalingGroup: ecsAsg,
});
ecsCluster.addAsgCapacityProvider(ecsCapacityProvider);

const ecsTaskDef = new ecs.Ec2TaskDefinition(this, 'EcsTaskDef', {
networkMode: ecs.NetworkMode.BRIDGE,
});
ecsTaskDef.addToTaskRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['kms:Decrypt'], // Required for ECS Exec & shelling into the container
resources: [ssmKey.keyArn]
}),
);

const ecsContainer = ecsTaskDef.addContainer('EcsContainer', {
image: ecs.ContainerImage.fromAsset(path.resolve(__dirname, '..', '..', 'docker-traffic-gen')),
logging: new ecs.AwsLogDriver({ streamPrefix: 'DemoTrafficGenEcs', mode: ecs.AwsLogDriverMode.NON_BLOCKING }),

// Because we're using the BRIDGE network type for our ECS Tasks, we can only place a single container
// on each of our t3.micro instances. We can't ask for all of their resources because ECS placement will
// fail, so we ask for a bit less than that.
cpu: 1536, // 1.5 vCPUs
memoryLimitMiB: 768, // 0.75 GiB
});

const ecsService = new ecs.Ec2Service(this, 'EcsService', {
cluster: ecsCluster,
taskDefinition: ecsTaskDef,
desiredCount: 1,
minHealthyPercent: 0,
enableExecuteCommand: true
});
}
}
53 changes: 43 additions & 10 deletions manage_arkime/aws_interactions/ec2_interactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,46 +38,79 @@ def get_subnets_of_vpc(vpc_id: str, aws_provider: AwsClientProvider) -> List[str

@dataclass
class NetworkInterface:
id: str
type: str
vpc_id: str
subnet_id: str
eni_id: str
eni_type: str

def to_dict(self):
return {
'vpc_id': self.vpc_id,
'subnet_id': self.subnet_id,
'eni_id': self.eni_id,
'eni_type': self.eni_type,
}

def get_enis_of_instance(instance_id: str, aws_provider: AwsClientProvider) -> List[NetworkInterface]:
ec2_client = aws_provider.get_ec2()
describe_instance_response = ec2_client.describe_instances(
InstanceIds=[instance_id]
)
instance_details = describe_instance_response["Reservations"][0]["Instances"][0]

network_interfaces = []
for eni in instance_details.get("NetworkInterfaces", []):
network_interfaces.append(
NetworkInterface(eni["VpcId"], eni["SubnetId"], eni["NetworkInterfaceId"], eni["InterfaceType"])
)

return network_interfaces

def get_enis_of_subnet(subnet_id: str, aws_provider: AwsClientProvider) -> List[NetworkInterface]:
ec2_client = aws_provider.get_ec2()
describe_eni_response = ec2_client.describe_network_interfaces(
Filters=[{"Name": "subnet-id", "Values": [subnet_id]}]
)
network_inferfaces = [NetworkInterface(eni["NetworkInterfaceId"], eni["InterfaceType"]) for eni in describe_eni_response.get("NetworkInterfaces", [])]
network_interfaces = []
for eni in describe_eni_response.get("NetworkInterfaces", []):
network_interfaces.append(
NetworkInterface(eni["VpcId"], eni["SubnetId"], eni["NetworkInterfaceId"], eni["InterfaceType"])
)

next_token = describe_eni_response.get("NextToken")
while next_token:
describe_eni_response = ec2_client.describe_network_interfaces(
Filters=[{"Name": "subnet-id", "Values": [subnet_id]}],
NextToken=next_token
)
next_interfaces = [NetworkInterface(eni["NetworkInterfaceId"], eni["InterfaceType"]) for eni in describe_eni_response.get("NetworkInterfaces", [])]
network_inferfaces.extend(next_interfaces)
next_interfaces = []
for eni in describe_eni_response.get("NetworkInterfaces", []):
next_interfaces.append(
NetworkInterface(eni["VpcId"], eni["SubnetId"], eni["NetworkInterfaceId"], eni["InterfaceType"])
)
network_interfaces.extend(next_interfaces)
next_token = describe_eni_response.get("NextToken")

return network_inferfaces
return network_interfaces

NON_MIRRORABLE_ENI_TYPES = ["gateway_load_balancer_endpoint", "nat_gateway"]

class NonMirrorableEniType(Exception):
def __init__(self, eni: NetworkInterface):
self.eni = eni
super().__init__(f"The ENI {eni.id} is of type {eni.type}, which is not mirrorable")
super().__init__(f"The ENI {eni.eni_id} is of type {eni.eni_type}, which is not mirrorable")

"""
Sets up a VPC Traffic Mirroring Session on a given ENI towards the specified Traffic Target using the specified
Traffic Filter and returns the Traffic Session ID.
"""
def mirror_eni(eni: NetworkInterface, traffic_target: str, traffic_filter: str, vpc_id: str, aws_provider: AwsClientProvider, virtual_network: int = 123) -> str:
if eni.type in NON_MIRRORABLE_ENI_TYPES:
if eni.eni_type in NON_MIRRORABLE_ENI_TYPES:
raise NonMirrorableEniType(eni)

ec2_client = aws_provider.get_ec2()
create_session_response = ec2_client.create_traffic_mirror_session(
NetworkInterfaceId=eni.id,
NetworkInterfaceId=eni.eni_id,
TrafficMirrorTargetId=traffic_target,
TrafficMirrorFilterId=traffic_filter,
SessionNumber=1,
Expand All @@ -88,7 +121,7 @@ def mirror_eni(eni: NetworkInterface, traffic_target: str, traffic_filter: str,
"Tags": [
{
"Key": "Name",
"Value": f"{vpc_id}-{eni.id}"
"Value": f"{vpc_id}-{eni.eni_id}"
},
]
},
Expand Down
4 changes: 2 additions & 2 deletions manage_arkime/commands/add_vpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ def _mirror_enis_in_subnet(event_bus_arn: str, cluster_name: str, vpc_id: str, s
# actually create the mirroring configuration, we should pre-screen (hasn't already been mirrored; right eni
# type).

logger.info(f"Initiating creation of mirroring session for ENI {eni.id}")
logger.info(f"Initiating creation of mirroring session for ENI {eni.eni_id}")

events.put_events(
[events.CreateEniMirrorEvent(cluster_name, vpc_id, subnet_id, eni.id, eni.type, traffic_filter_id, vni)],
[events.CreateEniMirrorEvent(cluster_name, vpc_id, subnet_id, eni.eni_id, eni.eni_type, traffic_filter_id, vni)],
event_bus_arn,
aws_provider
)
Loading