Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for TenableAD AD Object APIs #525

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/api/ad/ad_object.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: tenable.ad.ad_object.api
1 change: 1 addition & 0 deletions tenable/ad/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
:glob:

about
ad_object
alert
api_keys
attacks
Expand Down
Empty file.
282 changes: 282 additions & 0 deletions tenable/ad/ad_object/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
'''
AD Object
=============

Methods described in this section relate to the ad object API.
These methods can be accessed at ``TenableAD.ad_object``.

.. rst-class:: hide-signature
.. autoclass:: ADObjectAPI
:members:
'''
from typing import List, Dict, Mapping
from restfly.utils import dict_clean
from tenable.ad.ad_object.schema import ADObjectSchema, ADObjectChangesSchema
from tenable.ad.base.iterator import ADIterator
from tenable.base.endpoint import APIEndpoint


class ADObjectIterator(ADIterator):
'''
The ad object iterator provides a scalable way to work through alert list
result sets of any size. The iterator will walk through each page of data,
returning one record at a time. If it reaches the end of a page of
records, then it will request the next page of information and then
continue to return records from the next page (and the next, and the next)
until the counter reaches the total number of records that the API has
reported.
'''


class ADObjectAPI(APIEndpoint):
_schema = ADObjectSchema()

def details(self,
directory_id: str,
infrastructure_id: str,
ad_object_id: str
) -> Dict:
'''
Retrieves the details of a specific AD object.

Args:
directory_id (str):
The directory instance identifier.
infrastructure_id (str):
The infrastructure instance identifier.
ad_object_id (str):
The AD Object identifier.

Returns:
dict:
The AD object.

Examples:
>>> tad.ad_object.details(
... directory_id='1',
... infrastructure_id='1',
... ad_object_id='1'
... )
'''
return self._schema.load(
self._api.get(f"infrastructures/{infrastructure_id}/"
f"directories/{directory_id}/"
f"ad-objects/{ad_object_id}"))

def details_by_profile_and_checker(self,
profile_id: str,
checker_id: str,
ad_object_id: str
) -> Dict:
'''
Retrieves an AD object details by id that have deviances for a
specific profile and checker

Args:
profile_id (str):
The profile instance identifier.
checker_id (str):
The checker instance identifier.
ad_object_id (str):
The AD Object identifier.

Returns:
dict:
The AD object.

Examples:
>>> tad.ad_object.details_by_profile_and_checker(
... profile_id='1',
... checker_id='1',
... ad_object_id='1'
... )
'''
return self._schema.load(
self._api.get(f"profiles/{profile_id}/"
f"checkers/{checker_id}/"
f"ad-objects/{ad_object_id}"))

def details_by_event(self,
directory_id: str,
infrastructure_id: str,
ad_object_id: str,
event_id: str
) -> Dict:
'''
Retrieves the details of a specific AD object.

Args:
directory_id (str):
The directory instance identifier.
infrastructure_id (str):
The infrastructure instance identifier.
ad_object_id (str):
The AD Object identifier.
event_id (str):
The event identifier.

Returns:
dict:
The AD object.

Examples:
>>> tad.ad_object.details_by_event(
... directory_id='1',
... infrastructure_id='1',
... ad_object_id='1',
... event_id='1'
... )
'''
return self._schema.load(
self._api.get(f"infrastructures/{infrastructure_id}/"
f"directories/{directory_id}/"
f"events/{event_id}/"
f"ad-objects/{ad_object_id}"))

def get_changes(self,
directory_id: str,
infrastructure_id: str,
ad_object_id: str,
event_id: str,
**kwargs
) -> List[Dict]:
'''
Get the AD object changes between a given event and event which
precedes it.

Args:
directory_id (str):
The directory instance identifier.
infrastructure_id (str):
The infrastructure instance identifier.
ad_object_id (str):
The AD Object identifier.
event_id (str):
The event identifier.
wanted_values (optional, list[str]):
Which values user wants to include. ``before`` to include the
values just before the event, ``after`` to include the values
just after the event or ``current`` to include the current
values.

Returns:
list[dict]:
The list of AD objects.

Examples:
>>> tad.ad_object.get_changes(
... directory_id='1',
... infrastructure_id='1',
... ad_object_id='1',
... event_id='1',
... wanted_values=['current', 'after']
... )
'''
schema = ADObjectChangesSchema()
params = self._schema.dump(self._schema.load(kwargs))
return schema.load(
self._api.get(f"infrastructures/{infrastructure_id}/"
f"directories/{directory_id}/"
f"events/{event_id}/"
f"ad-objects/{ad_object_id}/changes", params=params),
many=True)

def search_all(self,
profile_id: str,
checker_id: str,
expression: Mapping,
directories: List[int],
reasons: List[int],
show_ignored: bool,
**kwargs
) -> ADObjectIterator:
'''
Search all AD objects having deviances by profile by checker

Args:
profile_id (str):
The profile instance identifier.
checker_id (str):
The checker instance identifier.
expression (mapping):
An object describing a filter for searched items.
directories (list[int]):
The list of directory instance identifiers.
reasons (list[int]):
The list of reasons identifiers.
show_ignored (bool):
Whether AD Object that only have ignored deviances should be
included?
date_start (optional, str):
The date after which the AD object deviances should have been
emitted.
date_end (optional, str):
The date before which the AD object deviances should have been
emitted.
page (optional, int):
The page number user wants to retrieve.
per_page (optional, int):
The number of records per page user wants to retrieve.
max_items (optional, int):
The maximum number of records to return before
stopping iteration.
max_pages (optional, int):
The maximum number of pages to request before throwing
stopping iteration.

Returns:
:obj:`ADObjectIterator`:
An iterator that handles the page management of the requested
records.

Examples:
>>> for ado in tad.ad_object.search_all(
... profile_id='1',
... checker_id='1',
... show_ignored=False,
... reasons=[1, 2],
... directories=[1],
... expression={'OR': [{
... 'whencreated': '2021-07-29T12:27:50.0000000Z'
... }]},
... date_end='2022-12-31T18:30:00.000Z',
... date_start='2021-12-31T18:30:00.000Z',
... page=1,
... per_page=20,
... max_pages=10,
... max_items=200
... ):
... pprint(ado)
'''
params = self._schema.dump(self._schema.load({
'page': kwargs.get('page') or 1,
'perPage': kwargs.get('per_page'),
'maxItems': kwargs.get('max_items'),
'maxPages': kwargs.get('max_pages')
}))

payload = self._schema.dump(self._schema.load(
dict_clean({
'expression': expression,
'directories': directories,
'reasons': reasons,
'dateStart': kwargs.get('date_start'),
'dateEnd': kwargs.get('date_end'),
'showIgnored': show_ignored
})
))

return ADObjectIterator(
api=self._api,
_path=f'profiles/{profile_id}/'
f'checkers/{checker_id}/'
f'ad-objects/search',
_method='post',
num_pages=params.get('page'),
_per_page=params.get('perPage'),
_query=params,
_payload=payload,
_schema=self._schema,
max_pages=params.pop('maxPages', None),
max_items=params.pop('maxItems', None)
)
39 changes: 39 additions & 0 deletions tenable/ad/ad_object/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from marshmallow import fields
from tenable.ad.base.schema import CamelCaseSchema


class ADObjectChangeValuesSchema(CamelCaseSchema):
after = fields.Str()
before = fields.Str(allow_none=True)
current = fields.Str()


class ADObjectChangesSchema(CamelCaseSchema):
attribute_name = fields.Str()
values = fields.Nested(ADObjectChangeValuesSchema)
value_type = fields.Str()


class ADObjectAttributesSchema(CamelCaseSchema):
name = fields.Str()
value = fields.Str()
value_type = fields.Str()


class ADObjectSchema(CamelCaseSchema):
id = fields.Int()
directory_id = fields.Int()
object_id = fields.Str()
type = fields.Str()
object_attributes = fields.Nested(ADObjectAttributesSchema, many=True)
reasons = fields.List(fields.Int())
wanted_values = fields.List(fields.Str())
expression = fields.Mapping()
directories = fields.List(fields.Int())
date_start = fields.DateTime()
date_end = fields.DateTime()
show_ignored = fields.Bool()
page = fields.Int(allow_none=True)
per_page = fields.Int(allow_none=True)
max_pages = fields.Int(allow_none=True)
max_items = fields.Int(allow_none=True)
9 changes: 9 additions & 0 deletions tenable/ad/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tenable.base.platform import APIPlatform

from .about import AboutAPI
from .ad_object.api import ADObjectAPI
from .alert.api import AlertsAPI
from .api_keys import APIKeyAPI
from .attack_type_options.api import AttackTypeOptionsAPI
Expand Down Expand Up @@ -67,6 +68,14 @@ def about(self):
'''
return AboutAPI(self)

@property
def ad_object(self):
'''
The interface object for the
:doc:`Tenable.ad AD Object APIs <ad_object>`.
'''
return ADObjectAPI(self)

@property
def alerts(self):
'''
Expand Down
Loading
Loading