Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ecs): Ensure public IP addresses are not assigned automatically #5128

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 122 additions & 30 deletions prowler/providers/aws/services/ecs/ecs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.task_definitions = {}
self.services = {}
self.clusters = {}
self.__threading_call__(self._list_task_definitions)
self._describe_task_definition()
self.__threading_call__(
self._describe_task_definition, self.task_definitions.values()
)
self.__threading_call__(self._list_clusters)
self.__threading_call__(self._describe_clusters, self.clusters.values())
self.__threading_call__(self._describe_services, self.clusters.values())

def _list_task_definitions(self, regional_client):
logger.info("ECS - Listing Task Definitions...")
Expand All @@ -39,41 +46,110 @@
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _describe_task_definition(self):
def _describe_task_definition(self, task_definition):
logger.info("ECS - Describing Task Definitions...")
try:
for task_definition in self.task_definitions.values():
client = self.regional_clients[task_definition.region]
response = client.describe_task_definition(
taskDefinition=task_definition.arn,
include=[
"TAGS",
],
)
container_definitions = response["taskDefinition"][
"containerDefinitions"
]
for container in container_definitions:
environment = []
if "environment" in container:
for env_var in container["environment"]:
environment.append(
ContainerEnvVariable(
name=env_var["name"], value=env_var["value"]
)
client = self.regional_clients[task_definition.region]
response = client.describe_task_definition(
taskDefinition=task_definition.arn,
include=[
"TAGS",
],
)
container_definitions = response["taskDefinition"]["containerDefinitions"]
for container in container_definitions:
environment = []
if "environment" in container:
for env_var in container["environment"]:
environment.append(
ContainerEnvVariable(
name=env_var["name"], value=env_var["value"]
)
task_definition.container_definitions.append(
ContainerDefinition(
name=container["name"],
privileged=container.get("privileged", False),
user=container.get("user", ""),
environment=environment,
)
task_definition.container_definitions.append(
ContainerDefinition(
name=container["name"],
privileged=container.get("privileged", False),
user=container.get("user", ""),
environment=environment,
)
task_definition.tags = response.get("tags")
task_definition.network_mode = response["taskDefinition"].get(
"networkMode"
)
task_definition.tags = response.get("tags")
task_definition.network_mode = response["taskDefinition"].get("networkMode")
except Exception as error:
logger.error(

Check warning on line 80 in prowler/providers/aws/services/ecs/ecs_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/ecs/ecs_service.py#L79-L80

Added lines #L79 - L80 were not covered by tests
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _describe_services(self, cluster):
logger.info("ECS - Describing Services for each Cluster...")
try:
client = self.regional_clients[cluster.region]

list_ecs_paginator = client.get_paginator("list_services")
service_arns = []
for page in list_ecs_paginator.paginate(cluster=cluster.arn):
service_arns.extend(page["serviceArns"])

if service_arns:
for service_arn in service_arns:
describe_response = client.describe_services(
cluster=cluster.arn,
services=[service_arn],
include=["TAGS"],
)

service_desc = describe_response["services"][0]
service_arn = service_desc["serviceArn"]
service_obj = Service(
name=sub(":.*", "", service_arn.split("/")[2]),
arn=service_arn,
region=cluster.region,
assign_public_ip=(
service_desc.get("networkConfiguration", {})
.get("awsvpcConfiguration", {})
.get("assignPublicIp", "DISABLED")
== "ENABLED"
),
tags=service_desc.get("tags", []),
)
cluster.services[service_arn] = service_obj
self.services[service_arn] = service_obj
except Exception as error:
logger.error(

Check warning on line 119 in prowler/providers/aws/services/ecs/ecs_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/ecs/ecs_service.py#L118-L119

Added lines #L118 - L119 were not covered by tests
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _list_clusters(self, regional_client):
logger.info("ECS - Listing Clusters...")
try:
list_ecs_paginator = regional_client.get_paginator("list_clusters")
for page in list_ecs_paginator.paginate():
for cluster in page["clusterArns"]:
if not self.audit_resources or (
is_resource_filtered(cluster, self.audit_resources)
):
self.clusters[cluster] = Cluster(
name=sub(":.*", "", cluster.split("/")[1]),
arn=cluster,
region=regional_client.region,
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _describe_clusters(self, cluster):
logger.info("ECS - Describing Clusters...")
try:
client = self.regional_clients[cluster.region]
response = client.describe_clusters(
clusters=[cluster.arn],
include=[
"TAGS",
],
)
cluster.tags = response["clusters"][0].get("tags", [])
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
Expand All @@ -100,3 +176,19 @@
container_definitions: list[ContainerDefinition] = []
tags: Optional[list] = []
network_mode: Optional[str]


class Service(BaseModel):
name: str
arn: str
region: str
assign_public_ip: Optional[bool]
tags: Optional[list] = []


class Cluster(BaseModel):
name: str
arn: str
region: str
services: dict = {}
tags: Optional[list] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "ecs_service_no_assign_public_ip",
"CheckTitle": "ECS services should not assign public IPs automatically",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "ecs",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:ecs:{region}:{account-id}:service/{service-name}",
"Severity": "high",
"ResourceType": "AwsEcsService",
"Description": "This control checks whether Amazon ECS services are configured to automatically assign public IP addresses. The control fails if AssignPublicIP is ENABLED and passes if it is DISABLED.",
"Risk": "Having public IP addresses assigned to ECS services automatically can expose services to the internet, increasing the risk of unauthorized access, data breaches, and cyberattacks.",
"RelatedUrl": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/Welcome.html",
"Remediation": {
"Code": {
"CLI": "aws ecs update-service --cluster <cluster-name> --service <service-name> --network-configuration 'awsvpcConfiguration={assignPublicIp=DISABLED}'",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/ecs-controls.html#ecs-2",
"Terraform": ""
},
"Recommendation": {
"Text": "Disable automatic public IP address assignment for your ECS services to ensure they are not publicly accessible.",
"Url": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/security.html"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.ecs.ecs_client import ecs_client


class ecs_service_no_assign_public_ip(Check):
def execute(self):
findings = []
for service in ecs_client.services.values():
report = Check_Report_AWS(self.metadata())
report.region = service.region
report.resource_id = service.name
report.resource_arn = service.arn
report.resource_tags = service.tags
report.status = "PASS"
report.status_extended = f"ECS Service {service.name} does not have automatic public IP assignment."

if service.assign_public_ip:
report.status = "FAIL"
report.status_extended = (
f"ECS Service {service.name} has automatic public IP assignment."
)

findings.append(report)
return findings
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def execute(self):
report.resource_arn = task_definition.arn
report.resource_tags = task_definition.tags
report.status = "PASS"
report.status_extended = f"ECS task definition '{task_definition.name}' does not have host network mode."
report.status_extended = f"ECS task definition {task_definition.name} does not have host network mode."
failed_containers = []
if task_definition.network_mode == "host":
for container in task_definition.container_definitions:
Expand All @@ -23,8 +23,8 @@ def execute(self):
failed_containers.append(container.name)

if failed_containers:
report.status_extended = f"ECS task definition '{task_definition.name}' has containers with host network mode and non-privileged containers running as root or with no user specified: {', '.join(failed_containers)}"
report.status_extended = f"ECS task definition {task_definition.name} has containers with host network mode and non-privileged containers running as root or with no user specified: {', '.join(failed_containers)}"
else:
report.status_extended = f"ECS task definition '{task_definition.name}' has host network mode but no containers running as root or with no user specified."
report.status_extended = f"ECS task definition {task_definition.name} has host network mode but no containers running as root or with no user specified."
findings.append(report)
return findings
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from unittest import mock

from prowler.providers.aws.services.ecs.ecs_service import Service
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_US_EAST_1

SERVICE_ARN = (
f"arn:aws:ecs:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:service/sample-service"
)
SERVICE_NAME = "sample-service"


class Test_ecs_service_no_assign_public_ip:
def test_no_services(self):
ecs_client = mock.MagicMock
ecs_client.services = {}

with mock.patch(
"prowler.providers.aws.services.ecs.ecs_service.ECS",
ecs_client,
):
from prowler.providers.aws.services.ecs.ecs_service_no_assign_public_ip.ecs_service_no_assign_public_ip import (
ecs_service_no_assign_public_ip,
)

check = ecs_service_no_assign_public_ip()
result = check.execute()
assert len(result) == 0

def test_service_with_no_public_ip(self):
ecs_client = mock.MagicMock
ecs_client.services = {}
ecs_client.services[SERVICE_ARN] = Service(
name=SERVICE_NAME,
arn=SERVICE_ARN,
region=AWS_REGION_US_EAST_1,
assign_public_ip=False,
tags=[],
)

with mock.patch(
"prowler.providers.aws.services.ecs.ecs_service.ECS",
ecs_client,
):
from prowler.providers.aws.services.ecs.ecs_service_no_assign_public_ip.ecs_service_no_assign_public_ip import (
ecs_service_no_assign_public_ip,
)

check = ecs_service_no_assign_public_ip()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"ECS Service {SERVICE_NAME} does not have automatic public IP assignment."
)
assert result[0].resource_id == SERVICE_NAME
assert result[0].resource_arn == SERVICE_ARN

def test_task_definition_no_host_network_mode(self):
ecs_client = mock.MagicMock
ecs_client.services = {}
ecs_client.services[SERVICE_ARN] = Service(
name=SERVICE_NAME,
arn=SERVICE_ARN,
region=AWS_REGION_US_EAST_1,
assign_public_ip=True,
tags=[],
)

with mock.patch(
"prowler.providers.aws.services.ecs.ecs_service.ECS",
ecs_client,
):
from prowler.providers.aws.services.ecs.ecs_service_no_assign_public_ip.ecs_service_no_assign_public_ip import (
ecs_service_no_assign_public_ip,
)

check = ecs_service_no_assign_public_ip()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"ECS Service {SERVICE_NAME} has automatic public IP assignment."
)
assert result[0].resource_id == SERVICE_NAME
assert result[0].resource_arn == SERVICE_ARN
Loading
Loading