Skip to content

Commit

Permalink
feat: implement SANs for X.509 certificates
Browse files Browse the repository at this point in the history
  • Loading branch information
rpoisel committed Jan 17, 2025
1 parent 21241ec commit 50b630c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
4 changes: 2 additions & 2 deletions tests/test_openvpn.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import openwrt
import pytest
from docker import ComposeEnv, ComposeEnvFactory
from docker import ComposeAdapter, ComposeEnv, ComposeEnvFactory
from labgrid.driver import SSHDriver
from process import run
from ssh import put_file
Expand All @@ -14,7 +14,7 @@

@pytest.fixture(scope="module")
def pki() -> PKI:
return create_pki()
return create_pki(ComposeAdapter.map_service("openvpn-server"))


@pytest.fixture(scope="module")
Expand Down
15 changes: 5 additions & 10 deletions util/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ def rendered(self) -> str:
def port_mappings(self) -> PortMappings:
return self._port_mappings

def map_service(self, hostname: str) -> str | IPv4Address:
del hostname # unused
raise NotImplementedError()
@staticmethod
def map_service(hostname: str) -> str | IPv4Address:
if in_docker_container():
return hostname
return primary_host_ip()


class LocalComposeAdapter(ComposeAdapter):
Expand Down Expand Up @@ -70,10 +72,6 @@ def __init__(self, compose_template: str) -> None:
if "shared_network" in networks:
del networks["shared_network"]

def map_service(self, hostname: str) -> str | IPv4Address:
del hostname # unused
return primary_host_ip()


DOCKER_PORT_REGEX: re.Pattern = re.compile(
r"\{([\w-]+)\}:(\d+)(/((tcp)|(udp)))?",
Expand Down Expand Up @@ -106,9 +104,6 @@ def __init__(self, compose_template: str) -> None:
if ports:
del service_data["ports"]

def map_service(self, hostname: str) -> str | IPv4Address:
return hostname


def create_compose_adapter(compose_template: str) -> ComposeAdapter:
if in_docker_container():
Expand Down
32 changes: 24 additions & 8 deletions util/x509.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
from dataclasses import dataclass
from ipaddress import IPv4Address
from pathlib import Path

from cryptography import x509
Expand All @@ -22,7 +23,12 @@ def create_certificate(
is_ca: bool = False,
is_server_cert: bool = False,
is_client_cert: bool = False,
subject_alternative_names: str | IPv4Address | list[str | IPv4Address] | None = None,
) -> Certificate:
if subject_alternative_names is None:
subject_alternative_names = []
elif not isinstance(subject_alternative_names, list):
subject_alternative_names = [subject_alternative_names]
subject = x509.Name(
[
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
Expand Down Expand Up @@ -56,18 +62,25 @@ def create_certificate(

cert_builder = cert_builder.add_extension(x509.BasicConstraints(ca=is_ca, path_length=None), critical=True)

ekus = []
if is_server_cert:
ekus.append(ExtendedKeyUsageOID.SERVER_AUTH)
if is_client_cert:
ekus.append(ExtendedKeyUsageOID.CLIENT_AUTH)
cert_builder = cert_builder.add_extension(
x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), critical=False
)

if ekus:
if is_client_cert:
cert_builder = cert_builder.add_extension(
x509.ExtendedKeyUsage(ekus),
critical=False,
x509.ExtendedKeyUsage([ExtendedKeyUsageOID.CLIENT_AUTH]), critical=False
)

if subject_alternative_names:
san_list = []
for subject_alt_name in subject_alternative_names:
if isinstance(subject_alt_name, str):
san_list.append(x509.DNSName(subject_alt_name))
if type(subject_alt_name) is IPv4Address:
san_list.append(x509.IPAddress(subject_alt_name))
cert_builder = cert_builder.add_extension(x509.SubjectAlternativeName(san_list), critical=False)

certificate = cert_builder.sign(
private_key=issuer_private_key,
algorithm=hashes.SHA256(),
Expand Down Expand Up @@ -109,7 +122,9 @@ class PKI:
client_cert: bytes


def create_pki() -> PKI:
def create_pki(
subject_alternative_names: str | IPv4Address | list[str | IPv4Address] | None = None,
) -> PKI:
ca_key = generate_private_key()
ca_cert = create_certificate(
subject_name="Root CA",
Expand All @@ -127,6 +142,7 @@ def create_pki() -> PKI:
issuer_private_key=ca_key,
is_ca=False,
is_server_cert=True,
subject_alternative_names=subject_alternative_names,
)

client_key = generate_private_key()
Expand Down

0 comments on commit 50b630c

Please sign in to comment.