forked from pgastinger/McAfeeEpoReputationImporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmcafee_epo.py
128 lines (98 loc) · 4.46 KB
/
mcafee_epo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# -*- coding: utf-8 -*-
"""
based on https://pypi.python.org/pypi/mcafee-epo/
"""
__author__ = "Peter Gastinger"
__copyright__ = "Copyright 2016, Raiffeisen Informatik GmbH"
__credits__ = [""]
__license__ = "GPL"
__version__ = "0.1"
__maintainer__ = "Peter Gastinger"
__email__ = "peter.gastinger@r-it.at"
__status__ = "Development"
import json
import requests
requests.packages.urllib3.disable_warnings()
try:
from urlparse import urljoin
except ImportError:
from urllib.parse import urljoin
TIMEOUT = 2
class APIError(Exception):
"""Represents an error with the data received within a valid HTTP response."""
class Client:
"""Communicate with an ePO server.
Instances are callable, pass a command name and parameters to make API calls.
"""
def __init__(self, url, username, password, session=None, verify=True, timeout=5):
"""Create a client for the given ePO server.
:param url: location of ePO server
:param username: username to authenticate
:param password: password to authenticate
:param session: custom instance of :class:`requests.Session`, optional
:param verify: if False, ignore SSL cert error messages
:param timeout: Timeout for requests
"""
self.url = url
self.username = username
self.password = password
self.verify = verify
self.timeout = timeout
if session is None:
session = requests.Session()
self._session = session
self._token = None
def _get_token(self, _skip=False):
"""Get the security token if it's not already cached.
:param bool _skip: used internally when making the initial request to get the token
"""
if self._token is None and not _skip:
self._token = self._request('core.getSecurityToken')
return self._token
def _request(self, name, **kwargs):
"""Format the request and interpret the response.
Usually you want to use :meth:`__call__` instead.
:param name: command name to call
:param kwargs: arguments passed to :meth:`requests.request`
:return: deserialized JSON data
"""
kwargs.setdefault('auth', (self.username, self.password))
params = kwargs.setdefault('params', {})
# check whether the response will be json (default)
is_json = params.setdefault(':output', 'json') == 'json'
# add the security token, unless this is the request to get the token
params.setdefault('orion.user.security.token', self._get_token(_skip=name == 'core.getSecurityToken'))
url = urljoin(self.url, 'remote/{}'.format(name))
if kwargs.get('data') or kwargs.get('json') or kwargs.get('files'):
# use post method if there is post data
request_result = self._session.post(url, **kwargs, verify=self.verify, timeout=self.timeout)
else:
request_result = self._session.get(url, **kwargs, verify=self.verify, timeout=self.timeout)
# check that there was a valid http response
request_result.raise_for_status()
text = request_result.text
if not text.startswith('OK:'):
# response body contains an error
raise APIError(text)
return json.loads(text[3:]) if is_json else text[3:]
def __call__(self, name, *args, **kwargs):
"""Make an API call by calling this instance.
Collects arguments and calls :meth:`_request`.
ePO commands take positional and named arguments. Positional arguments are internally numbered "param#" and
passed as named arguments.
Files can be passed to some commands. Pass a dictionary of ``'filename': file-like objects``, or other formats
accepted by :meth:`requests.request`. This command will not open files, as it is better to manage that in a
``with`` block in calling code.
:param str name: command name to call
:param args: positional arguments to command
:param kwargs: named arguments to command
:param dict params: named arguments that are not valid Python names can be provided here
:param dict files: files to upload to command
:return: deserialized JSON data
"""
params = kwargs.pop('params', {})
files = kwargs.pop('files', {})
for i, item in enumerate(args, start=1):
params['param{}'.format(i)] = item
params.update(kwargs)
return self._request(name, params=params, files=files)