Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b8ff8f6
wip: main.py refactor
nimish-ks Sep 10, 2024
1de043b
wip: main
nimish-ks Sep 11, 2024
5b044ef
fix: change cmd to internals to avoid module naming conflict
nimish-ks Sep 11, 2024
4c0a5b1
feat: fetch kubernetes service account JWT
nimish-ks Sep 11, 2024
f5eaf47
feat: token caching
nimish-ks Sep 11, 2024
2c83148
feat: added local caching on disk
nimish-ks Sep 11, 2024
a80e484
feat: add cache dir
nimish-ks Sep 11, 2024
320da52
feat: added authenticate with service account
nimish-ks Sep 11, 2024
d5ea7ff
chore: init auth module
nimish-ks Sep 11, 2024
1d28592
feat: moved types into secrets module
nimish-ks Sep 11, 2024
7a8ca4f
feat: moved all re-deployment logic to workload module
nimish-ks Sep 11, 2024
b8c8fa5
feat: added utils for writing secrets
nimish-ks Sep 11, 2024
32e1e96
fix: expiry
nimish-ks Sep 11, 2024
79c4207
fix: update cached token
nimish-ks Sep 11, 2024
cebd379
tmp: debug on
nimish-ks Sep 11, 2024
449f20f
feat: logging level set to debug
nimish-ks Sep 11, 2024
533034b
feat: added kubernetes auth config in crd
nimish-ks Sep 11, 2024
ba3fa3a
feat: refactored get_context to support fetching metadata via via app_id
nimish-ks Sep 11, 2024
8341ad7
feat: add app_id support to phase.get
nimish-ks Sep 11, 2024
6562351
feat: added support for app_id
nimish-ks Sep 11, 2024
53752ab
feat: add support for app_id
nimish-ks Sep 11, 2024
c3cda01
fix: import logging
nimish-ks Sep 11, 2024
f82bf20
feat: improve exception handling
nimish-ks Sep 11, 2024
8313615
feat: set log level to WARNING
nimish-ks Sep 11, 2024
c2e2a86
feat: added app ID in crd
nimish-ks Sep 16, 2024
fda988f
chore: remove hard coded host
nimish-ks Sep 16, 2024
117df7c
fix: auth function name and comments
nimish-ks Sep 16, 2024
7341f85
feat: updated deploy-operator to be consistent with the template
nimish-ks Sep 16, 2024
7e048b0
feat: added a service account for the operator
nimish-ks Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions crd-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ spec:
description: PhaseSecretSpec defines the desired state of PhaseSecret
properties:
phaseApp:
description: The Phase application to fetch secrets from.
description: The Phase application name to fetch secrets from.
type: string
phaseAppId:
description: The Phase application ID to fetch secrets from.
type: string
phaseAppEnv:
description: The environment variable representing the app environment in Phase.
Expand Down Expand Up @@ -63,6 +66,27 @@ spec:
secretNamespace:
description: The namespace where the Kubernetes Secret is located.
type: string
kubernetesAuth:
type: object
required:
- phaseServiceAccountId
- serviceAccountRef
properties:
phaseServiceAccountId:
description: The service account ID in Phase.
type: string
serviceAccountRef:
type: object
required:
- name
- namespace
properties:
name:
description: The name of the Kubernetes ServiceAccount.
type: string
namespace:
description: The namespace of the Kubernetes ServiceAccount.
type: string
phaseHost:
description: Phase host to pull secrets from.
type: string
Expand Down Expand Up @@ -127,7 +151,7 @@ spec:
type: integer
default: 60
required:
- phaseApp
- phaseAppId
- managedSecretReferences
- phaseHost
status:
Expand Down
8 changes: 8 additions & 0 deletions deploy-operator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,18 @@ spec:
labels:
app: phase-kubernetes-operator
spec:
serviceAccountName: phase-kubernetes-operator
securityContext:
runAsNonRoot: true
runAsUser: 1000
containers:
- name: phase-kubernetes-operator
image: phasehq/kubernetes-operator:latest
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 100m
memory: 128Mi
6 changes: 6 additions & 0 deletions service-account.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: phase-kubernetes-operator
namespace: default
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions src/cmd/secrets/fetch.py → src/internals/secrets/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

def phase_secrets_fetch(phase_service_token=None, phase_service_host=None, env_name=None, phase_app=None, keys=None, tags=None, path='/'):
def phase_secrets_fetch(phase_service_token=None, phase_service_host=None, env_name=None, phase_app=None, phase_app_id=None, keys=None, tags=None, path='/'):
"""
Fetch and return secrets based on the provided environment, keys, and tags.
"""

phase = Phase(init=False, pss=phase_service_token, host=phase_service_host)

try:
all_secrets = phase.get(env_name=env_name, app_name=phase_app, tag=tags, path=path)
all_secrets = phase.get(env_name=env_name, app_name=phase_app, app_id=phase_app_id, tag=tags, path=path)
resolved_secrets = []
for secret in all_secrets:
try:
Expand Down
228 changes: 118 additions & 110 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,129 @@
import kopf
import base64
import datetime
import kubernetes.client
from kubernetes import client
from kubernetes.client.rest import ApiException
from kubernetes.client import AppsV1Api
from kubernetes.client import CoreV1Api
from cmd.secrets.fetch import phase_secrets_fetch
from utils.const import REDEPLOY_ANNOTATION
from utils.misc import transform_name


@kopf.timer('secrets.phase.dev', 'v1alpha1', 'phasesecrets', interval=60)
import base64
from utils.auth.kubernetes import get_service_account_jwt
from utils.network import authenticate_service_account
from internals.secrets.fetch import phase_secrets_fetch
from typing import Dict
from utils.const import PHASE_CLOUD_API_HOST
from utils.cache import get_cached_token, update_cached_token
from utils.secrets.types import process_secrets
from utils.secrets.write import create_secret
from utils.workload.deploy import redeploy_affected_deployments
import base64
from kubernetes import client
from typing import Dict
import json
import logging

logging.basicConfig(level=logging.WARNING)

def get_phase_service_token(auth_config: Dict, phase_host: str, namespace: str, logger) -> str:
logger.debug(f"Entering get_phase_service_token. Auth config: {json.dumps(auth_config)}")
logger.debug(f"Phase host: {phase_host}, Namespace: {namespace}")

if 'serviceToken' in auth_config:
logger.debug("Using serviceToken authentication method")
service_token_secret_name = auth_config['serviceToken']['serviceTokenSecretReference']['secretName']
service_token_secret_namespace = auth_config['serviceToken']['serviceTokenSecretReference'].get('secretNamespace', namespace)
logger.debug(f"Secret name: {service_token_secret_name}, Secret namespace: {service_token_secret_namespace}")

try:
api_instance = client.CoreV1Api()
api_response = api_instance.read_namespaced_secret(service_token_secret_name, service_token_secret_namespace)
logger.debug("Successfully read the service token secret")
token = base64.b64decode(api_response.data['token']).decode('utf-8')
logger.debug("Successfully decoded the service token")
return token
except Exception as e:
logger.error(f"Error reading or decoding service token: {str(e)}")
raise

elif 'kubernetesAuth' in auth_config:
logger.debug("Using kubernetesAuth authentication method")
kubernetes_auth = auth_config['kubernetesAuth']
service_account_id = kubernetes_auth['phaseServiceAccountId']
logger.debug(f"Service account ID: {service_account_id}")

cached_token = get_cached_token(service_account_id)
if cached_token:
logger.debug("Using cached token")
return cached_token['token']

logger.debug("No valid cached token found. Proceeding with authentication.")

try:
jwt_token = get_service_account_jwt()
logger.debug("Successfully retrieved service account JWT")
except Exception as e:
logger.error(f"Error getting service account JWT: {str(e)}")
raise

try:
auth_response = authenticate_service_account(
host=phase_host,
auth_token=jwt_token,
service_account_id=service_account_id,
auth_type="kubernetes"
)
logger.debug(f"Received auth response: {json.dumps(auth_response)}")
except Exception as e:
logger.error(f"Error authenticating with Phase API: {str(e)}")
raise

if not auth_response or 'token' not in auth_response:
logger.error(f"Invalid auth response: {json.dumps(auth_response)}")
raise Exception("Failed to authenticate with Phase API")

try:
update_cached_token(service_account_id, {
'token': auth_response['token'],
'id': auth_response['id'],
'expiry': auth_response['expiry']
})
logger.debug("Successfully cached the new token")
except Exception as e:
logger.error(f"Error caching token: {str(e)}")
# Continue even if caching fails

logger.info(f"Refreshed Phase service token for service account {service_account_id}")
return auth_response['token']
else:
logger.error(f"Invalid auth config: {json.dumps(auth_config)}")
raise Exception("No valid authentication method found in the spec")

@kopf.timer('secrets.phase.dev', 'v1alpha1', 'phasesecrets', interval=10)
def sync_secrets(spec, name, namespace, logger, **kwargs):
try:
api_instance = CoreV1Api()
# Get Config
api_instance = client.CoreV1Api()
managed_secret_references = spec.get('managedSecretReferences', [])
phase_host = spec.get('phaseHost', 'https://console.phase.dev')
phase_host = spec.get('phaseHost', PHASE_CLOUD_API_HOST)
phase_app = spec.get('phaseApp')
phase_app_id = spec.get('phaseAppId')
phase_app_env = spec.get('phaseAppEnv', 'production')
phase_app_env_path = spec.get('phaseAppEnvPath', '/')
phase_app_env_tag = spec.get('phaseAppEnvTag')
service_token_secret_name = spec.get('authentication', {}).get('serviceToken', {}).get('serviceTokenSecretReference', {}).get('secretName', 'phase-service-token')

api_response = api_instance.read_namespaced_secret(service_token_secret_name, namespace)
service_token = base64.b64decode(api_response.data['token']).decode('utf-8')

phase_secrets_dict = phase_secrets_fetch(
phase_service_token=service_token,
phase_service_host=phase_host,
phase_app=phase_app,
env_name=phase_app_env,
tags=phase_app_env_tag,
path=phase_app_env_path
)
# Get Phase service token
auth_config = spec.get('authentication', {})
phase_service_token = get_phase_service_token(auth_config, phase_host, namespace, logger)

# Fetch secrets from Phase
try:
phase_secrets_dict = phase_secrets_fetch(
phase_service_token=phase_service_token,
phase_service_host=phase_host,
phase_app=phase_app,
phase_app_id=phase_app_id,
env_name=phase_app_env,
tags=phase_app_env_tag,
path=phase_app_env_path
)
except ValueError as ve:
logger.warning(f"Failed to fetch secrets for PhaseSecret {name} in namespace {namespace}: {ve}")
return # Exit the function early, but don't crash the operator

secret_changed = False
for secret_reference in managed_secret_references:
Expand Down Expand Up @@ -64,89 +155,6 @@ def sync_secrets(spec, name, namespace, logger, **kwargs):
logger.info(f"Secrets for PhaseSecret {name} have been successfully updated in namespace {namespace}")

except ApiException as e:
logger.error(f"Failed to fetch secrets for PhaseSecret {name} in namespace {namespace}: {e}")
logger.error(f"Kubernetes API error when handling PhaseSecret {name} in namespace {namespace}: {e}")
except Exception as e:
logger.error(f"Unexpected error when handling PhaseSecret {name} in namespace {namespace}: {e}")

def redeploy_affected_deployments(namespace, affected_secrets, logger, api_instance):
try:
apps_v1_api = AppsV1Api(api_instance.api_client)
deployments = apps_v1_api.list_namespaced_deployment(namespace)
for deployment in deployments.items:
if should_redeploy(deployment, affected_secrets):
patch_deployment_for_redeploy(deployment, namespace, apps_v1_api, logger)
except ApiException as e:
logger.error(f"Error listing deployments in namespace {namespace}: {e}")

def should_redeploy(deployment, affected_secrets):
if not (deployment.metadata.annotations and REDEPLOY_ANNOTATION in deployment.metadata.annotations):
return False

deployment_secrets = extract_deployment_secrets(deployment)
return any(secret in affected_secrets for secret in deployment_secrets)

def extract_deployment_secrets(deployment):
secrets = []
for container in deployment.spec.template.spec.containers:
if container.env_from:
for env_from in container.env_from:
if env_from.secret_ref:
secrets.append(env_from.secret_ref.name)
return secrets

def patch_deployment_for_redeploy(deployment, namespace, apps_v1_api, logger):
try:
timestamp = datetime.datetime.utcnow().isoformat()
patch_body = {
"spec": {
"template": {
"metadata": {
"annotations": {
"phase.autoredeploy.timestamp": timestamp
}
}
}
}
}
apps_v1_api.patch_namespaced_deployment(name=deployment.metadata.name, namespace=namespace, body=patch_body)
logger.info(f"Triggered redeployment of {deployment.metadata.name} in namespace {namespace}")
except ApiException as e:
logger.error(f"Failed to patch deployment {deployment.metadata.name} in namespace {namespace}: {e}")

def process_secrets(fetched_secrets, processors, secret_type, name_transformer):
processed_secrets = {}
for key, value in fetched_secrets.items():
processor_info = processors.get(key, {})
processor_type = processor_info.get('type', 'plain')
as_name = processor_info.get('asName', key) # Use asName for mapping

# Check and process the value based on the processor type
if processor_type == 'base64':
# Assume value is already base64 encoded; do not re-encode
processed_value = value
elif processor_type == 'plain':
# Base64 encode the value
processed_value = base64.b64encode(value.encode()).decode()
else:
# Default to plain processing if processor type is not recognized
processed_value = base64.b64encode(value.encode()).decode()

# Map the processed value to the asName
processed_secrets[as_name] = processed_value

return processed_secrets

def create_secret(api_instance, secret_name, secret_namespace, secret_type, secret_data, logger):
try:
response = api_instance.create_namespaced_secret(
namespace=secret_namespace,
body=kubernetes.client.V1Secret(
metadata=kubernetes.client.V1ObjectMeta(name=secret_name),
type=secret_type,
data=secret_data
)
)
if response:
logger.info(f"Created secret {secret_name} in namespace {secret_namespace}")
except ApiException as e:
logger.error(f"Failed to create secret {secret_name} in namespace {secret_namespace}: {e}")
logger.error(f"Unexpected error when handling PhaseSecret {name} in namespace {namespace}: {e}", exc_info=True)
Empty file added src/utils/auth/__init__.py
Empty file.
22 changes: 22 additions & 0 deletions src/utils/auth/kubernetes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os

def get_service_account_jwt() -> str:
"""
Retrieve the JWT token from the current service account in a Kubernetes environment.

This function reads the JWT token from the filesystem, which is the standard
location in a Kubernetes pod.

Returns:
str: The service account JWT token

Raises:
Exception: If the token cannot be retrieved from the filesystem
"""
token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'

if os.path.exists(token_path):
with open(token_path, 'r') as token_file:
return token_file.read().strip()

raise Exception("Service account JWT token not found. Are you running inside a Kubernetes pod?")
24 changes: 24 additions & 0 deletions src/utils/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os
import json
import time
from typing import Dict, Optional

from utils.const import TOKEN_CACHE_DIR

def get_cache_file_path(service_account_id: str) -> str:
return os.path.join(TOKEN_CACHE_DIR, f"{service_account_id}.json")

def get_cached_token(service_account_id: str) -> Optional[Dict]:
cache_file = get_cache_file_path(service_account_id)
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
cache = json.load(f)
if time.time() < cache.get('expiry', 0):
return cache
return None

def update_cached_token(service_account_id: str, token_data: Dict):
os.makedirs(TOKEN_CACHE_DIR, exist_ok=True)
cache_file = get_cache_file_path(service_account_id)
with open(cache_file, 'w') as f:
json.dump(token_data, f)
3 changes: 3 additions & 0 deletions src/utils/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@

PHASE_CLOUD_API_HOST = "https://console.phase.dev"


TOKEN_CACHE_DIR = "/tmp/phase-token-cache"

pss_user_pattern = re.compile(r"^pss_user:v(\d+):([a-fA-F0-9]{64}):([a-fA-F0-9]{64}):([a-fA-F0-9]{64}):([a-fA-F0-9]{64})$")
pss_service_pattern = re.compile(r"^pss_service:v(\d+):([a-fA-F0-9]{64}):([a-fA-F0-9]{64}):([a-fA-F0-9]{64}):([a-fA-F0-9]{64})$")

Expand Down
Loading
Loading