Skip to content

Commit 998bb22

Browse files
susanxhuynhArthur Rand
authored andcommitted
[INFINITY-2622] Added integration test that reads / writes to a Kerberized HDFS (apache#205)
* [INFINITY-2622] Initial teragen|sort|validate test with kerberos. * Added kdc and kerberized HDFS, copying over the entire dcos-commons/testing/ directory. All of the HDFS testing is now in a separate test_hdfs module. Unpinned the HDFS version. Temporarily added an HDFS stub universe with Kerberos support. * In require_spark(), avoid mutable default param value, use `None` instead. * Moved hdfs stub universe and hdfs constants into test_hdfs.py. Replaced for-loop with itertools.product. * Select itertool.product into a tuple * Updated soak test for kerberos. * Removed extra kdc.json files.
1 parent b6b48d0 commit 998bb22

24 files changed

+2373
-140
lines changed

testing/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Testing utils
2+
3+
Common python code used in the [shakedown](https://github.com/mesosphere/shakedown)-based integration tests for services in this repository.

testing/sdk_api.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
'''
2+
************************************************************************
3+
FOR THE TIME BEING WHATEVER MODIFICATIONS ARE APPLIED TO THIS FILE
4+
SHOULD ALSO BE APPLIED TO sdk_api IN ANY OTHER PARTNER REPOS
5+
************************************************************************
6+
'''
7+
import logging
8+
9+
import dcos
10+
import shakedown
11+
12+
log = logging.getLogger(__name__)
13+
14+
15+
def get(service_name, endpoint):
16+
'''
17+
:param endpoint: endpoint of the form /v1/...
18+
:type endpoint: str
19+
:returns: JSON response from the provided scheduler API endpoint
20+
:rtype: Response
21+
'''
22+
response = dcos.http.get("{}{}".format(
23+
shakedown.dcos_service_url(service_name),
24+
endpoint))
25+
response.raise_for_status()
26+
return response
27+
28+
29+
def is_suppressed(service_name):
30+
response = get(service_name, "/v1/state/properties/suppressed")
31+
log.info("{} suppressed={}".format(service_name, response.content))
32+
return response.content == b"true"

testing/sdk_auth.py

Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
'''
2+
Utilities related to setting up a Kerberos environment for services to test authentication
3+
and authorization functionality.
4+
5+
Note: This module assumes any package it's being tested with includes the relevant
6+
krb5.conf and/or JAAS file(s) as artifacts, specified as per the YAML service spec.
7+
8+
************************************************************************
9+
FOR THE TIME BEING WHATEVER MODIFICATIONS ARE APPLIED TO THIS FILE
10+
SHOULD ALSO BE APPLIED TO sdk_auth IN ANY OTHER PARTNER REPOS
11+
************************************************************************
12+
'''
13+
from retrying import retry
14+
from subprocess import run
15+
from tempfile import TemporaryDirectory
16+
17+
import dcos
18+
import json
19+
import logging
20+
import os
21+
import shakedown
22+
23+
import sdk_cmd
24+
import sdk_hosts
25+
import sdk_marathon
26+
import sdk_tasks
27+
import sdk_security
28+
29+
30+
log = logging.getLogger(__name__)
31+
32+
KERBEROS_APP_ID = "kdc"
33+
KERBEROS_IMAGE_NAME = "mesosphere/kdc"
34+
KERBEROS_KEYTAB_FILE_NAME = "keytab"
35+
BASE64_ENCODED_KEYTAB_FILE_NAME = "{keytab_name}.base64"
36+
DCOS_BASE64_PREFIX = "__dcos_base64__"
37+
LINUX_USER = "core"
38+
KERBEROS_CONF = "krb5.conf"
39+
REALM = "LOCAL"
40+
41+
# Note: Some of the helper functions in this module are wrapped in basic retry logic to provide some
42+
# resiliency towards possible intermittent network failures.
43+
44+
45+
@retry(stop_max_attempt_number=3, wait_fixed=2000)
46+
def _get_kdc_task() -> dict:
47+
"""
48+
:return (dict): The task object of the KDC app with desired properties to be retrieved by other methods.
49+
"""
50+
log.info("Getting KDC task")
51+
raw_tasks = sdk_cmd.run_cli("task --json")
52+
if raw_tasks:
53+
tasks = json.loads(raw_tasks)
54+
for task in tasks:
55+
if task["name"] == KERBEROS_APP_ID:
56+
return task
57+
58+
raise RuntimeError("Expecting marathon KDC task but no such task found. Running tasks: {tasks}".format(
59+
tasks=raw_tasks))
60+
61+
62+
@retry(stop_max_attempt_number=2, wait_fixed=1000)
63+
def _get_host_name(host_id: str) -> str:
64+
"""
65+
Fetches the host name for the host running the KDC app.
66+
:param host_id (str): The ID of the host, used to look up the appropriate node.
67+
:return (str): Name of the host running the KDC app.
68+
"""
69+
log.info("Getting host name")
70+
raw_nodes = sdk_cmd.run_cli("node --json")
71+
if raw_nodes:
72+
nodes = json.loads(raw_nodes)
73+
for node in nodes:
74+
if node["id"] == host_id:
75+
log.info("Host name is {host_name}".format(host_name=node["hostname"]))
76+
return node["hostname"]
77+
78+
raise RuntimeError("Failed to get name of host running the KDC app: {nodes}")
79+
80+
81+
@retry(stop_max_attempt_number=2, wait_fixed=1000)
82+
def _get_master_public_ip() -> str:
83+
"""
84+
:return (str): The public IP of the master node in the DC/OS cluster.
85+
"""
86+
dcos_url, headers = sdk_security.get_dcos_credentials()
87+
cluster_metadata_url = "{cluster_url}/metadata".format(cluster_url=dcos_url)
88+
response = sdk_cmd.request("GET", cluster_metadata_url, verify=False)
89+
if not response.ok:
90+
raise RuntimeError("Unable to get the master node's public IP address: {err}".format(err=repr(response)))
91+
92+
response = response.json()
93+
if "PUBLIC_IPV4" not in response:
94+
raise KeyError("Cluster metadata does not include master's public ip: {response}".format(
95+
response=repr(response)))
96+
97+
public_ip = response["PUBLIC_IPV4"]
98+
log.info("Master public ip is {public_ip}".format(public_ip=public_ip))
99+
return public_ip
100+
101+
102+
def _create_temp_working_dir() -> TemporaryDirectory:
103+
"""
104+
Creates a temporary working directory to enable setup of the Kerberos environment.
105+
:return (TemporaryDirectory): The TemporaryDirectory object holding the context of the temp dir.
106+
"""
107+
tmp_dir = TemporaryDirectory()
108+
log.info("Created temp working directory {}".format(tmp_dir.name))
109+
return tmp_dir
110+
111+
112+
#TODO: make this generic and put in sdk_utils.py
113+
def _copy_file_to_localhost(self):
114+
"""
115+
Copies the keytab that was generated inside the container running the KDC server to the localhost
116+
so it can be uploaded to the secret store later.
117+
118+
The keytab will end up in path: <temp_working_dir>/<keytab_file>
119+
"""
120+
log.info("Copying {} to the temp working directory".format(self.keytab_file_name))
121+
122+
keytab_absolute_path = "{mesos_agents_path}/{host_id}/frameworks/{framework_id}/executors/{task_id}/runs/latest/{keytab_file}".format(
123+
mesos_agents_path="/var/lib/mesos/slave/slaves",
124+
host_id=self.kdc_host_id,
125+
framework_id=self.framework_id,
126+
task_id=self.task_id,
127+
keytab_file=self.keytab_file_name
128+
)
129+
keytab_url = "{cluster_url}/slave/{agent_id}/files/download?path={path}".format(
130+
cluster_url=shakedown.dcos_url(),
131+
agent_id=self.kdc_host_id,
132+
path=keytab_absolute_path
133+
)
134+
dest = "{temp_working_dir}/{keytab_file}".format(
135+
temp_working_dir=self.temp_working_dir.name, keytab_file=self.keytab_file_name)
136+
137+
curl_cmd = "curl -k --header '{auth}' {url} > {dest_file}".format(
138+
auth="Authorization: token={token}".format(token=shakedown.dcos_acs_token()),
139+
url=keytab_url,
140+
dest_file=dest
141+
)
142+
try:
143+
run([curl_cmd], shell=True)
144+
except Exception as e:
145+
raise RuntimeError("Failed to download the keytab file: {}".format(repr(e)))
146+
147+
148+
def kinit(task_id: str, keytab: str, principal:str):
149+
"""
150+
Performs a kinit command to authenticate the specified principal.
151+
:param task_id: The task in whose environment the kinit will run.
152+
:param keytab: The keytab used by kinit to authenticate.
153+
:param principal: The name of the principal the user wants to authenticate as.
154+
"""
155+
kinit_cmd = "kinit -k -t {keytab} {principal}".format(keytab=keytab, principal=principal)
156+
sdk_tasks.task_exec(task_id, kinit_cmd)
157+
158+
159+
class KerberosEnvironment:
160+
def __init__(self):
161+
"""
162+
Installs the Kerberos Domain Controller (KDC) as the initial step in creating a kerberized cluster.
163+
This just passes a dictionary to be rendered as a JSON app defefinition to marathon.
164+
"""
165+
self.temp_working_dir = _create_temp_working_dir()
166+
kdc_app_def_path = "{current_file_dir}/../tools/kdc.json".format(
167+
current_file_dir=os.path.dirname(os.path.realpath(__file__)))
168+
with open(kdc_app_def_path) as f:
169+
kdc_app_def = json.load(f)
170+
171+
kdc_app_def["id"] = KERBEROS_APP_ID
172+
sdk_marathon.install_app(kdc_app_def)
173+
self.kdc_port = int(kdc_app_def["portDefinitions"][0]["port"])
174+
self.kdc_host = "{app_name}.{service_name}.{autoip_host_suffix}".format(
175+
app_name=KERBEROS_APP_ID, service_name="marathon", autoip_host_suffix=sdk_hosts.AUTOIP_HOST_SUFFIX)
176+
self.kdc_realm = REALM
177+
self.kdc_task = _get_kdc_task()
178+
self.framework_id = self.kdc_task["framework_id"]
179+
self.task_id = self.kdc_task["id"]
180+
self.kdc_host_id = self.kdc_task["slave_id"]
181+
self.kdc_host_name = _get_host_name(self.kdc_host_id)
182+
self.master_public_ip = _get_master_public_ip()
183+
self.principals = []
184+
self.keytab_file_name = KERBEROS_KEYTAB_FILE_NAME
185+
self.base64_encoded_keytab_file_name = BASE64_ENCODED_KEYTAB_FILE_NAME.format(keytab_name=self.keytab_file_name)
186+
187+
# For secret creation/deletion
188+
cmd = "package install --yes --cli dcos-enterprise-cli"
189+
try:
190+
sdk_cmd.run_cli(cmd)
191+
except dcos.errors.DCOSException as e:
192+
raise RuntimeError("Failed to install the dcos-enterprise-cli: {}".format(repr(e)))
193+
194+
def __run_kadmin(self, options: list, cmd: str, args: list):
195+
"""
196+
Invokes Kerberos' kadmin binary inside the container to run some command.
197+
:param options (list): A list of options given to kadmin.
198+
:param cmd (str): The name of the sub command to run.
199+
:param args (list): A list of arguments passed to the sub command. This should also include any flags
200+
needed to be set for the sub command.
201+
:raises a generic Exception if the invocation fails.
202+
"""
203+
kadmin_cmd = "/usr/sbin/kadmin {options} {cmd} {args}".format(
204+
options=' '.join(options),
205+
cmd=cmd,
206+
args=' '.join(args)
207+
)
208+
log.info("Running kadmin: {}".format(kadmin_cmd))
209+
try:
210+
sdk_tasks.task_exec(self.task_id, kadmin_cmd)
211+
except Exception as e:
212+
log.error("Failed to run kadmin: {}".format(repr(e)))
213+
raise e
214+
215+
def add_principals(self, principals: list):
216+
"""
217+
Adds a list of principals to the KDC. A principal is defined as a concatenation of 3 parts
218+
in the following order:
219+
- primary: first part of the principal. In the case of a user, it's the same as your username.
220+
For a host, the primary is the word host.
221+
- instance: The instance is a string that qualifies the primary.
222+
In the case of a user, the instance is usually null, but a user might also have an additional
223+
principal, with an instance called admin. In the case of a host, the instance is the fully
224+
qualified hostname, e.g., daffodil.mit.edu.
225+
- realm: your Kerberos realm. In most cases, your Kerberos realm is your domain name, in upper-case letters.
226+
227+
More info on principal definition:
228+
https://web.mit.edu/kerberos/krb5-1.5/krb5-1.5.4/doc/krb5-user/What-is-a-Kerberos-Principal_003f.html
229+
230+
A principal is formatted as: <primary>/instance@realm
231+
Eg. hdfs/name-0-node.hdfs.autoip.dcos.thisdcos.directory@LOCAL
232+
233+
:param principals: The list of principals to be added to KDC.
234+
"""
235+
# TODO: Perform sanitation check against validity of format for all given principals and raise an
236+
# exception when the format of a principal is invalid.
237+
self.principals = principals
238+
239+
log.info("Adding the following list of principals to the KDC: {principals}".format(principals=principals))
240+
kadmin_options = ["-l"]
241+
kadmin_cmd = "add"
242+
kadmin_args = ["--use-defaults", "--random-password"]
243+
244+
try:
245+
kadmin_args.extend(principals)
246+
self.__run_kadmin(kadmin_options, kadmin_cmd, kadmin_args)
247+
except Exception as e:
248+
raise RuntimeError("Failed to add principals {principals}: {err_msg}".format(
249+
principals=principals, err_msg=repr(e)))
250+
251+
log.info("Principals successfully added to KDC")
252+
253+
def __create_and_fetch_keytab(self):
254+
"""
255+
Creates the keytab file that holds the info about all the principals that have been
256+
added to the KDC. It also fetches it locally so that later the keytab can be uploaded to the secret store.
257+
"""
258+
log.info("Creating the keytab")
259+
kadmin_options = ["-l"]
260+
kadmin_cmd = "ext"
261+
kadmin_args = ["-k", self.keytab_file_name] + self.principals
262+
self.__run_kadmin(kadmin_options, kadmin_cmd, kadmin_args)
263+
264+
_copy_file_to_localhost(self)
265+
266+
def __create_and_upload_secret(self):
267+
"""
268+
This method base64 encodes the keytab file and creates a secret with this encoded content so the
269+
tasks can fetch it.
270+
"""
271+
log.info("Creating and uploading the keytab file to the secret store")
272+
273+
try:
274+
base64_encode_cmd = "base64 -w 0 {source} > {destination}".format(
275+
source=os.path.join(self.temp_working_dir.name, self.keytab_file_name),
276+
destination=os.path.join(self.temp_working_dir.name, self.base64_encoded_keytab_file_name)
277+
)
278+
run(base64_encode_cmd, shell=True)
279+
except Exception as e:
280+
raise Exception("Failed to base64-encode the keytab file: {}".format(repr(e)))
281+
282+
self.keytab_secret_path = "{}_keytab".format(DCOS_BASE64_PREFIX)
283+
284+
# TODO: check if a keytab secret of same name already exists
285+
create_secret_cmd = "security secrets create {keytab_secret_path} --value-file {encoded_keytab_path}".format(
286+
keytab_secret_path=self.keytab_secret_path,
287+
encoded_keytab_path=os.path.join(self.temp_working_dir.name, self.base64_encoded_keytab_file_name)
288+
)
289+
try:
290+
sdk_cmd.run_cli(create_secret_cmd)
291+
except RuntimeError as e:
292+
raise RuntimeError("Failed to create secret for the base64-encoded keytab file: {}".format(repr(e)))
293+
294+
log.info("Successfully uploaded a base64-encoded keytab file to the secret store")
295+
296+
def finalize(self):
297+
"""
298+
Once the principals have been added, the rest of the environment setup does not ask for more info and can be
299+
automated, hence this method.
300+
"""
301+
self.__create_and_fetch_keytab()
302+
self.__create_and_upload_secret()
303+
304+
def get_host(self):
305+
return self.kdc_host
306+
307+
def get_port(self):
308+
return str(self.kdc_port)
309+
310+
def get_keytab_path(self):
311+
return self.keytab_secret_path
312+
313+
def get_realm(self):
314+
return self.kdc_realm
315+
316+
def get_kdc_address(self):
317+
return "{host}:{port}".format(host=self.kdc_host, port=self.kdc_port)
318+
319+
def cleanup(self):
320+
log.info("Removing the marathon KDC app")
321+
sdk_marathon.destroy_app(KERBEROS_APP_ID)
322+
323+
log.info("Deleting temporary working directory")
324+
self.temp_working_dir.cleanup()
325+
326+
#TODO: separate secrets handling into another module
327+
log.info("Deleting keytab secret")
328+
sdk_security.delete_secret(self.keytab_secret_path)

0 commit comments

Comments
 (0)