Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Tenable AD Event APIs #528

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api/ad/event.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: tenable.ad.event.api
1 change: 1 addition & 0 deletions tenable/ad/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
checker_option
dashboard
directories
event
infrastructure
ldap_configuration
lockout_policy
Expand Down
Empty file added tenable/ad/event/__init__.py
Empty file.
106 changes: 106 additions & 0 deletions tenable/ad/event/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
'''
Event
=====
Methods described in this section relate to the the event API.
These methods can be accessed at ``TenableAD.event``.

.. rst-class:: hide-signature
.. autoclass:: EventAPI
:members:
'''
from typing import Dict, List, Mapping
from restfly.utils import dict_clean
from tenable.ad.event.schema import EventSchema
from tenable.base.endpoint import APIEndpoint


class EventAPI(APIEndpoint):
_schema = EventSchema()
_path = 'events'

def details(self,
event_id: str,
infrastructure_id: str,
directory_id: str
) -> Dict:
'''
Retrieves the details of specific event instance.

Args:
event_id (str):
The event instance identifier.
infrastructure_id (str):
The infrastructure instance identifier.
directory_id (str):
The directory instance identifier.

Returns:
dict:
Details of the event object.

Examples:
>>> tad.event.details(
... event_id='1',
... infrastructure_id='1',
... directory_id='1'
... )

'''
return self._schema.load(
self._api.get(f'infrastructures/{infrastructure_id}/directories'
f'/{directory_id}/events/{event_id}'))

def search_events(self,
expression: Mapping,
profile_id: int,
date_start: str,
date_end: str,
directory_ids: List[int],
**kwargs
) -> List[Dict]:
'''
Searches the events.

Args:
expression (mapping):
An object describing a filter for searched items.
profile_id (int):
The profile instance identifier.
date_start (str):
The starting date from where the events are expected.
date_end (str):
The date till which the events are expected.
directory_ids (List[int]):
List of directory instance identifiers.
order (optional, str):
The desired sorting order of the event identifier. Default
is ``desc``

Returns:
list[dict]:
The search result object.

Examples:

>>> tad.event.search_events(
... expression={'AND': [{'systemOnly': 'True'}]},
... profile_id=5,
... date_start='2022-01-05T00:00:00.000Z',
... date_end='2022-01-12T23:59:59.999Z',
... directory_ids=[1,2,3],
... order='asc'
... )

'''
payload = self._schema.dump(self._schema.load(
dict_clean({
'expression': expression,
'profileId': profile_id,
'dateStart': date_start,
'dateEnd': date_end,
'directoryIds': directory_ids,
'order': dict(column='id', direction=kwargs.get('order'))
})
))
return self._schema.load(self._post('search', json=payload),
many=True)
22 changes: 22 additions & 0 deletions tenable/ad/event/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from marshmallow import fields, validate as v
from tenable.ad.base.schema import CamelCaseSchema


class SearchDirectionSchema(CamelCaseSchema):
column = fields.Str()
direction = fields.Str(validate=v.OneOf(['asc', 'desc']),
dump_default='desc')


class EventSchema(CamelCaseSchema):
directory_id = fields.Int()
id = fields.Int()
expression = fields.Mapping()
order = fields.Nested(SearchDirectionSchema)
directory_ids = fields.List(fields.Int())
profile_id = fields.Int()
date_start = fields.DateTime()
date_end = fields.DateTime()
ad_object_id = fields.Int()
type = fields.Str()
date = fields.DateTime()
9 changes: 9 additions & 0 deletions tenable/ad/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .checker_option.api import CheckerOptionAPI
from .dashboard.api import DashboardAPI
from .directories.api import DirectoriesAPI
from .event.api import EventAPI
from .infrastructure.api import InfrastructureAPI
from .ldap_configuration.api import LDAPConfigurationAPI
from .lockout_policy.api import LockoutPolicyAPI
Expand Down Expand Up @@ -126,6 +127,14 @@ def directories(self):
'''
return DirectoriesAPI(self)

@property
def event(self):
'''
The interface object for the
:doc:`Tenable.ad Event APIs <event>`.
'''
return EventAPI(self)

@property
def infrastructure(self):
'''
Expand Down
74 changes: 74 additions & 0 deletions tests/ad/event/test_event_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
'''API tests for Event APIs'''
import datetime
import responses
from tests.ad.conftest import RE_BASE


@responses.activate
def test_event_details(api):
'''test for details method response'''
responses.add(responses.GET,
SteveMcGrath marked this conversation as resolved.
Show resolved Hide resolved
f'{RE_BASE}/infrastructures/1/directories/1/events/1',
json={
'ad_object_id': 1,
'date': '2022-01-10T13:57:47.340Z',
'directory_id': 1,
'id': 1,
'type': 'some_type'
}
)
resp = api.event.details(event_id='1',
infrastructure_id='1',
directory_id='1')
assert isinstance(resp, dict)
assert resp['ad_object_id'] == 1
assert resp['date'] == datetime.datetime(2022, 1, 10, 13, 57, 47, 340000,
tzinfo=datetime.timezone.utc)
assert resp['directory_id'] == 1
assert resp['id'] == 1
assert resp['type'] == 'some_type'


@responses.activate
def test_event_search_events(api):
'''test for search event method response'''
responses.add(responses.POST,
f'{RE_BASE}/events/search',
json=[{
'ad_object_id': 1,
'date': '2022-01-12T09:24:11.000Z',
'directory_id': 1,
'id': 1,
'type': 'object_type'
}, {
'ad_object_id': 2,
'date': '2022-01-13T09:24:11.000Z',
'directory_id': 2,
'id': 2,
'type': 'object_type'
}]
)
resp = api.event.search_events(
expression={'OR': [{'systemOnly': 'True'}]},
profile_id=1,
date_start='2022-01-05T00:00:00.000Z',
date_end='2022-01-12T23:59:59.999Z',
directory_ids=[2],
order='desc'
)

assert isinstance(resp, list)
assert len(resp) == 2
assert resp[0]['ad_object_id'] == 1
assert resp[0]['date'] == datetime.datetime(2022, 1, 12, 9, 24, 11,
tzinfo=datetime.timezone.utc)
assert resp[0]['directory_id'] == 1
assert resp[0]['id'] == 1
assert resp[0]['type'] == 'object_type'

assert resp[1]['ad_object_id'] == 2
assert resp[1]['date'] == datetime.datetime(2022, 1, 13, 9, 24, 11,
tzinfo=datetime.timezone.utc)
assert resp[1]['directory_id'] == 2
assert resp[1]['id'] == 2
assert resp[1]['type'] == 'object_type'
41 changes: 41 additions & 0 deletions tests/ad/event/test_event_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'''Schema test for Event API schemas'''
import datetime
import pytest
from marshmallow import ValidationError
from tenable.ad.event.schema import EventSchema


@pytest.fixture
def event_schema():
return {
'expression': {'OR': [{'systemOnly': 'True'}]},
'profileId': 1,
'dateStart': '2022-11-12T23:59:10.214Z',
'dateEnd': '2022-01-13T23:59:59.999Z',
'directoryIds': [1],
'order': {'column': 'id', 'direction': 'desc'}
}


def test_event_schema_search(event_schema):
'''test to check the Event schema'''
test_resp = {
'adObjectId': 1,
'date': '2022-11-12T23:59:10.214Z',
'directoryId': 1,
'id': 1,
'type': 'type'
}
schema = EventSchema()
req = schema.load(event_schema)
assert req['date_start'] == datetime.datetime(2022, 11, 12, 23, 59,
10, 214000,
tzinfo=datetime.
timezone.utc)
assert test_resp['adObjectId'] == 1
assert test_resp['directoryId'] == 1
assert test_resp['id'] == 1
assert test_resp['type'] == 'type'
with pytest.raises(ValidationError):
event_schema['new_val'] = 'something'
schema.load(event_schema)
Loading