Skip to content

Commit 4b04403

Browse files
authored
Merge branch 'master' into release_102
2 parents 28cccc8 + 2231963 commit 4b04403

File tree

9 files changed

+66
-37
lines changed

9 files changed

+66
-37
lines changed

tests/test_base.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,12 @@ def test_uri_parsing(self):
7070
self.assertEqual(result_uri.hostname, '123.45.67.89')
7171
self.assertEqual(result_uri.port, 1234)
7272
self.assertEqual(result_uri.is_https, False)
73-
73+
7474
result_uri = base.Uri('https://test-domain.com:1234')
7575
self.assertEqual(result_uri.scheme, 'https')
7676
self.assertEqual(result_uri.hostname, 'test-domain.com')
7777
self.assertEqual(result_uri.port, 1234)
7878
self.assertEqual(result_uri.is_https, True)
79-
8079

8180
def get_client(self):
8281
client = base.BaseYarnAPI()

tests/test_hadoop_conf.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import mock
55
from mock import patch
6+
from requests import RequestException
67
from tests import TestCase
78

89
import requests_mock
@@ -103,7 +104,7 @@ def test_get_resource_endpoint(self):
103104

104105
endpoint = hadoop_conf.get_resource_manager_endpoint()
105106

106-
self.assertEqual('example.com:8022', endpoint)
107+
self.assertEqual('http://example.com:8022', endpoint)
107108
parse_mock.assert_called_with(hadoop_conf_path + 'yarn-site.xml',
108109
'yarn.resourcemanager.webapp.address')
109110

@@ -122,7 +123,7 @@ def test_get_resource_endpoint_with_ha(self, check_is_active_rm_mock, parse_mock
122123
check_is_active_rm_mock.return_value = True
123124
endpoint = hadoop_conf.get_resource_manager_endpoint()
124125

125-
self.assertEqual('example.com:8022', endpoint)
126+
self.assertEqual('http://example.com:8022', endpoint)
126127
parse_mock.assert_called_with(hadoop_conf_path + 'yarn-site.xml',
127128
'yarn.resourcemanager.webapp.address.rm1')
128129

@@ -171,23 +172,21 @@ def test_check_is_active_rm(self, is_https_only_mock):
171172

172173
# Emulate requests library exception (socket timeout, etc)
173174
with requests_mock.mock() as requests_get_mock:
174-
requests_get_mock.side_effect = Exception('error')
175-
# requests_get_mock.get('https://example2:8022/cluster', status_code=200)
176-
requests_get_mock.return_value = None
177-
self.assertFalse(hadoop_conf.check_is_active_rm('https://example2:8022'))
175+
requests_get_mock.get('example2:8022/cluster', exc=RequestException)
176+
self.assertFalse(hadoop_conf.check_is_active_rm('example2:8022'))
178177

179178
def test_get_resource_manager(self):
180179
with patch('yarn_api_client.hadoop_conf.parse') as parse_mock:
181180
parse_mock.return_value = 'example.com:8022'
182181

183182
endpoint = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, None)
184183

185-
self.assertEqual('example.com:8022', endpoint)
184+
self.assertEqual('http://example.com:8022', endpoint)
186185
parse_mock.assert_called_with(hadoop_conf_path + 'yarn-site.xml', 'yarn.resourcemanager.webapp.address')
187186

188187
endpoint = hadoop_conf._get_resource_manager(hadoop_conf.CONF_DIR, 'rm1')
189188

190-
self.assertEqual(('example.com:8022'), endpoint)
189+
self.assertEqual(('http://example.com:8022'), endpoint)
191190
parse_mock.assert_called_with(hadoop_conf_path + 'yarn-site.xml', 'yarn.resourcemanager.webapp.address.rm1')
192191

193192
parse_mock.reset_mock()

yarn_api_client/application_master.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import unicode_literals
3-
from .base import BaseYarnAPI
3+
4+
from .base import BaseYarnAPI, get_logger
45
from .hadoop_conf import get_webproxy_endpoint
56

67

8+
log = get_logger(__name__)
9+
10+
711
class ApplicationMaster(BaseYarnAPI):
812
"""
913
The MapReduce Application Master REST API's allow the user to get status
@@ -24,7 +28,6 @@ class ApplicationMaster(BaseYarnAPI):
2428
"""
2529
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
2630
if not service_endpoint:
27-
self.logger.debug('Get configuration from hadoop conf dir')
2831
service_endpoint = get_webproxy_endpoint(timeout, auth, verify)
2932

3033
super(ApplicationMaster, self).__init__(service_endpoint, timeout, auth, verify)

yarn_api_client/base.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
from __future__ import unicode_literals
33

44
import logging
5+
import os
56
import requests
67

8+
from datetime import datetime
9+
710
from .errors import APIError, ConfigurationError
811

912
try:
@@ -12,6 +15,14 @@
1215
from urllib.parse import urlparse, urlunparse
1316

1417

18+
def get_logger(logger_name):
19+
logger = logging.getLogger(logger_name)
20+
return logger
21+
22+
23+
log = get_logger(__name__)
24+
25+
1526
class Response(object):
1627
"""
1728
Basic container for response dictionary
@@ -49,7 +60,6 @@ def to_url(self, api_path=None):
4960

5061

5162
class BaseYarnAPI(object):
52-
__logger = None
5363
response_class = Response
5464

5565
def __init__(self, service_endpoint=None, timeout=None, auth=None, verify=True):
@@ -72,8 +82,6 @@ def request(self, api_path, method='GET', **kwargs):
7282
self._validate_configuration()
7383
api_endpoint = self.service_uri.to_url(api_path)
7484

75-
self.logger.info('API Endpoint {}'.format(api_endpoint))
76-
7785
if method == 'GET':
7886
headers = {}
7987
else:
@@ -82,20 +90,26 @@ def request(self, api_path, method='GET', **kwargs):
8290
if 'headers' in kwargs and kwargs['headers']:
8391
headers.update(kwargs['headers'])
8492

93+
begin = datetime.now()
8594
response = self.session.request(method=method, url=api_endpoint, headers=headers, timeout=self.timeout, **kwargs)
95+
end = datetime.now()
96+
log.debug(
97+
"'{method}' request against endpoint '{endpoint}' took {duration} ms".format(
98+
method=method,
99+
endpoint=api_endpoint,
100+
duration=round((end-begin).total_seconds()*1000,3)
101+
)
102+
)
86103

87104
if response.status_code in (200, 202):
88105
return self.response_class(response)
89106
else:
90-
msg = 'Response finished with status: %s. Details: %s' % (response.status_code, response.text)
107+
msg = "Response finished with status: {status}. Details: {msg}".format(
108+
status=response.status_code,
109+
msg=response.text
110+
)
91111
raise APIError(msg)
92112

93113
def construct_parameters(self, arguments):
94114
params = dict((key, value) for key, value in arguments if value is not None)
95115
return params
96-
97-
@property
98-
def logger(self):
99-
if self.__logger is None:
100-
self.__logger = logging.getLogger(self.__module__)
101-
return self.__logger

yarn_api_client/hadoop_conf.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
import xml.etree.ElementTree as ET
44
import requests
55

6+
from .base import get_logger
7+
8+
log = get_logger(__name__)
9+
610
CONF_DIR = os.getenv('HADOOP_CONF_DIR', '/etc/hadoop/conf')
711

812

@@ -14,7 +18,8 @@ def _get_rm_ids(hadoop_conf_path):
1418

1519

1620
def _get_maximum_container_memory(hadoop_conf_path):
17-
container_memory = int(parse(os.path.join(hadoop_conf_path,'yarn-site.xml'), 'yarn.nodemanager.resource.memory-mb'))
21+
container_memory = int(parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'),
22+
'yarn.nodemanager.resource.memory-mb'))
1823
return container_memory
1924

2025

@@ -29,7 +34,9 @@ def _is_https_only():
2934

3035
def _get_resource_manager(hadoop_conf_path, rm_id=None):
3136
# compose property name based on policy (and rm_id)
32-
if _is_https_only():
37+
is_https_only = _is_https_only()
38+
39+
if is_https_only:
3340
prop_name = 'yarn.resourcemanager.webapp.https.address'
3441
else:
3542
prop_name = 'yarn.resourcemanager.webapp.address'
@@ -38,25 +45,27 @@ def _get_resource_manager(hadoop_conf_path, rm_id=None):
3845
if rm_id:
3946
prop_name = "{name}.{rm_id}".format(name=prop_name, rm_id=rm_id)
4047

41-
rm_webapp_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name)
48+
rm_address = parse(os.path.join(hadoop_conf_path, 'yarn-site.xml'), prop_name)
4249

43-
return rm_webapp_address or None
50+
return ('https://' if is_https_only else 'http://') + rm_address if rm_address else None
4451

4552

4653
def check_is_active_rm(url, timeout=30, auth=None, verify=True):
4754
try:
4855
response = requests.get(url + "/cluster", timeout=timeout, auth=auth, verify=verify)
49-
except:
56+
except requests.RequestException as e:
57+
log.warning("Exception encountered accessing RM '{url}': '{err}', continuing...".format(url=url, err=e))
5058
return False
5159

5260
if response.status_code != 200:
53-
print("Error to access RM - HTTP Code {}".format(response.status_code))
61+
log.warning("Failed to access RM '{url}' - HTTP Code '{status}', continuing...".format(url=url, status=response.status_code))
5462
return False
5563
else:
5664
return True
5765

5866

5967
def get_resource_manager_endpoint(timeout=30, auth=None, verify=True):
68+
log.info('Getting resource manager endpoint from config: {config_path}'.format(config_path=os.path.join(CONF_DIR, 'yarn-site.xml')))
6069
hadoop_conf_path = CONF_DIR
6170
rm_ids = _get_rm_ids(hadoop_conf_path)
6271
if rm_ids:
@@ -72,18 +81,21 @@ def get_resource_manager_endpoint(timeout=30, auth=None, verify=True):
7281

7382
def get_jobhistory_endpoint():
7483
config_path = os.path.join(CONF_DIR, 'mapred-site.xml')
84+
log.info('Getting jobhistory endpoint from config: {config_path}'.format(config_path=config_path))
7585
prop_name = 'mapreduce.jobhistory.webapp.address'
7686
return parse(config_path, prop_name)
7787

7888

7989
def get_nodemanager_endpoint():
8090
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
91+
log.info('Getting nodemanager endpoint from config: {config_path}'.format(config_path=config_path))
8192
prop_name = 'yarn.nodemanager.webapp.address'
8293
return parse(config_path, prop_name)
8394

8495

8596
def get_webproxy_endpoint(timeout=30, auth=None, verify=True):
8697
config_path = os.path.join(CONF_DIR, 'yarn-site.xml')
98+
log.info('Getting webproxy endpoint from config: {config_path}'.format(config_path=config_path))
8799
prop_name = 'yarn.web-proxy.address'
88100
value = parse(config_path, prop_name)
89101
return value or get_resource_manager_endpoint(timeout, auth, verify)

yarn_api_client/history_server.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import unicode_literals
3-
from .base import BaseYarnAPI
3+
4+
from .base import BaseYarnAPI, get_logger
45
from .constants import JobStateInternal
56
from .errors import IllegalArgumentError
67
from .hadoop_conf import get_jobhistory_endpoint
78

9+
log = get_logger(__name__)
10+
811

912
class HistoryServer(BaseYarnAPI):
1013
"""
@@ -24,7 +27,6 @@ class HistoryServer(BaseYarnAPI):
2427
"""
2528
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
2629
if not service_endpoint:
27-
self.logger.debug('Get information from hadoop conf dir')
2830
service_endpoint = get_jobhistory_endpoint()
2931

3032
super(HistoryServer, self).__init__(service_endpoint, timeout, auth, verify)

yarn_api_client/main.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import unicode_literals
33
import argparse
4-
import logging
54
from pprint import pprint
6-
import sys
75

6+
from .base import get_logger
87
from .constants import (YarnApplicationState, FinalApplicationStatus,
98
ApplicationState, JobStateInternal)
109
from . import ResourceManager, NodeManager, HistoryServer, ApplicationMaster
1110

12-
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
11+
log = get_logger(__name__)
1312

1413

1514
def get_parser():

yarn_api_client/node_manager.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
# -*- coding: utf-8 -*-
2-
from .base import BaseYarnAPI
2+
from .base import BaseYarnAPI, get_logger
33
from .constants import ApplicationState
44
from .errors import IllegalArgumentError
55
from .hadoop_conf import get_nodemanager_endpoint
66

7+
log = get_logger(__name__)
8+
79
LEGAL_APPLICATION_STATES = {s for s, _ in ApplicationState}
810

911

@@ -35,7 +37,6 @@ class NodeManager(BaseYarnAPI):
3537
"""
3638
def __init__(self, service_endpoint=None, timeout=30, auth=None, verify=True):
3739
if not service_endpoint:
38-
self.logger.debug('Get configuration from hadoop conf dir')
3940
service_endpoint = get_nodemanager_endpoint()
4041

4142
super(NodeManager, self).__init__(service_endpoint, timeout, auth, verify)

yarn_api_client/resource_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import unicode_literals
3-
from .base import BaseYarnAPI
3+
from .base import BaseYarnAPI, get_logger
44
from .constants import YarnApplicationState, FinalApplicationStatus
55
from .errors import IllegalArgumentError
66
from .hadoop_conf import get_resource_manager_endpoint, check_is_active_rm, CONF_DIR, _get_maximum_container_memory
77
from collections import deque
88

9+
log = get_logger(__name__)
910
LEGAL_STATES = {s for s, _ in YarnApplicationState}
1011
LEGAL_FINAL_STATUSES = {s for s, _ in FinalApplicationStatus}
1112

@@ -73,7 +74,6 @@ class ResourceManager(BaseYarnAPI):
7374
def __init__(self, service_endpoints=None, timeout=30, auth=None, verify=True):
7475
active_service_endpoint = None
7576
if not service_endpoints:
76-
self.logger.debug('Get configuration from hadoop conf dir: {conf_dir}'.format(conf_dir=CONF_DIR))
7777
active_service_endpoint = get_resource_manager_endpoint(timeout, auth, verify)
7878
else:
7979
for endpoint in service_endpoints:

0 commit comments

Comments
 (0)