Skip to content

Commit

Permalink
Added feature - TenableAD AD Object APIs (#525)
Browse files Browse the repository at this point in the history
updated schema file

removed unused imports

implemented AD object iterator for search all method

fixed broken test

Co-authored-by: Steven McGrath <smcgrath@tenable.com>
  • Loading branch information
tushar-balwani and SteveMcGrath authored May 21, 2024
1 parent 5014645 commit 321eed7
Show file tree
Hide file tree
Showing 8 changed files with 581 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/api/ad/ad_object.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.. automodule:: tenable.ad.ad_object.api
1 change: 1 addition & 0 deletions tenable/ad/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
:glob:
about
ad_object
alert
api_keys
attacks
Expand Down
Empty file.
282 changes: 282 additions & 0 deletions tenable/ad/ad_object/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
'''
AD Object
=============
Methods described in this section relate to the ad object API.
These methods can be accessed at ``TenableAD.ad_object``.
.. rst-class:: hide-signature
.. autoclass:: ADObjectAPI
:members:
'''
from typing import List, Dict, Mapping
from restfly.utils import dict_clean
from tenable.ad.ad_object.schema import ADObjectSchema, ADObjectChangesSchema
from tenable.ad.base.iterator import ADIterator
from tenable.base.endpoint import APIEndpoint


class ADObjectIterator(ADIterator):
'''
The ad object iterator provides a scalable way to work through alert list
result sets of any size. The iterator will walk through each page of data,
returning one record at a time. If it reaches the end of a page of
records, then it will request the next page of information and then
continue to return records from the next page (and the next, and the next)
until the counter reaches the total number of records that the API has
reported.
'''


class ADObjectAPI(APIEndpoint):
_schema = ADObjectSchema()

def details(self,
directory_id: str,
infrastructure_id: str,
ad_object_id: str
) -> Dict:
'''
Retrieves the details of a specific AD object.
Args:
directory_id (str):
The directory instance identifier.
infrastructure_id (str):
The infrastructure instance identifier.
ad_object_id (str):
The AD Object identifier.
Returns:
dict:
The AD object.
Examples:
>>> tad.ad_object.details(
... directory_id='1',
... infrastructure_id='1',
... ad_object_id='1'
... )
'''
return self._schema.load(
self._api.get(f"infrastructures/{infrastructure_id}/"
f"directories/{directory_id}/"
f"ad-objects/{ad_object_id}"))

def details_by_profile_and_checker(self,
profile_id: str,
checker_id: str,
ad_object_id: str
) -> Dict:
'''
Retrieves an AD object details by id that have deviances for a
specific profile and checker
Args:
profile_id (str):
The profile instance identifier.
checker_id (str):
The checker instance identifier.
ad_object_id (str):
The AD Object identifier.
Returns:
dict:
The AD object.
Examples:
>>> tad.ad_object.details_by_profile_and_checker(
... profile_id='1',
... checker_id='1',
... ad_object_id='1'
... )
'''
return self._schema.load(
self._api.get(f"profiles/{profile_id}/"
f"checkers/{checker_id}/"
f"ad-objects/{ad_object_id}"))

def details_by_event(self,
directory_id: str,
infrastructure_id: str,
ad_object_id: str,
event_id: str
) -> Dict:
'''
Retrieves the details of a specific AD object.
Args:
directory_id (str):
The directory instance identifier.
infrastructure_id (str):
The infrastructure instance identifier.
ad_object_id (str):
The AD Object identifier.
event_id (str):
The event identifier.
Returns:
dict:
The AD object.
Examples:
>>> tad.ad_object.details_by_event(
... directory_id='1',
... infrastructure_id='1',
... ad_object_id='1',
... event_id='1'
... )
'''
return self._schema.load(
self._api.get(f"infrastructures/{infrastructure_id}/"
f"directories/{directory_id}/"
f"events/{event_id}/"
f"ad-objects/{ad_object_id}"))

def get_changes(self,
directory_id: str,
infrastructure_id: str,
ad_object_id: str,
event_id: str,
**kwargs
) -> List[Dict]:
'''
Get the AD object changes between a given event and event which
precedes it.
Args:
directory_id (str):
The directory instance identifier.
infrastructure_id (str):
The infrastructure instance identifier.
ad_object_id (str):
The AD Object identifier.
event_id (str):
The event identifier.
wanted_values (optional, list[str]):
Which values user wants to include. ``before`` to include the
values just before the event, ``after`` to include the values
just after the event or ``current`` to include the current
values.
Returns:
list[dict]:
The list of AD objects.
Examples:
>>> tad.ad_object.get_changes(
... directory_id='1',
... infrastructure_id='1',
... ad_object_id='1',
... event_id='1',
... wanted_values=['current', 'after']
... )
'''
schema = ADObjectChangesSchema()
params = self._schema.dump(self._schema.load(kwargs))
return schema.load(
self._api.get(f"infrastructures/{infrastructure_id}/"
f"directories/{directory_id}/"
f"events/{event_id}/"
f"ad-objects/{ad_object_id}/changes", params=params),
many=True)

def search_all(self,
profile_id: str,
checker_id: str,
expression: Mapping,
directories: List[int],
reasons: List[int],
show_ignored: bool,
**kwargs
) -> ADObjectIterator:
'''
Search all AD objects having deviances by profile by checker
Args:
profile_id (str):
The profile instance identifier.
checker_id (str):
The checker instance identifier.
expression (mapping):
An object describing a filter for searched items.
directories (list[int]):
The list of directory instance identifiers.
reasons (list[int]):
The list of reasons identifiers.
show_ignored (bool):
Whether AD Object that only have ignored deviances should be
included?
date_start (optional, str):
The date after which the AD object deviances should have been
emitted.
date_end (optional, str):
The date before which the AD object deviances should have been
emitted.
page (optional, int):
The page number user wants to retrieve.
per_page (optional, int):
The number of records per page user wants to retrieve.
max_items (optional, int):
The maximum number of records to return before
stopping iteration.
max_pages (optional, int):
The maximum number of pages to request before throwing
stopping iteration.
Returns:
:obj:`ADObjectIterator`:
An iterator that handles the page management of the requested
records.
Examples:
>>> for ado in tad.ad_object.search_all(
... profile_id='1',
... checker_id='1',
... show_ignored=False,
... reasons=[1, 2],
... directories=[1],
... expression={'OR': [{
... 'whencreated': '2021-07-29T12:27:50.0000000Z'
... }]},
... date_end='2022-12-31T18:30:00.000Z',
... date_start='2021-12-31T18:30:00.000Z',
... page=1,
... per_page=20,
... max_pages=10,
... max_items=200
... ):
... pprint(ado)
'''
params = self._schema.dump(self._schema.load({
'page': kwargs.get('page') or 1,
'perPage': kwargs.get('per_page'),
'maxItems': kwargs.get('max_items'),
'maxPages': kwargs.get('max_pages')
}))

payload = self._schema.dump(self._schema.load(
dict_clean({
'expression': expression,
'directories': directories,
'reasons': reasons,
'dateStart': kwargs.get('date_start'),
'dateEnd': kwargs.get('date_end'),
'showIgnored': show_ignored
})
))

return ADObjectIterator(
api=self._api,
_path=f'profiles/{profile_id}/'
f'checkers/{checker_id}/'
f'ad-objects/search',
_method='post',
num_pages=params.get('page'),
_per_page=params.get('perPage'),
_query=params,
_payload=payload,
_schema=self._schema,
max_pages=params.pop('maxPages', None),
max_items=params.pop('maxItems', None)
)
39 changes: 39 additions & 0 deletions tenable/ad/ad_object/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from marshmallow import fields
from tenable.ad.base.schema import CamelCaseSchema


class ADObjectChangeValuesSchema(CamelCaseSchema):
after = fields.Str()
before = fields.Str(allow_none=True)
current = fields.Str()


class ADObjectChangesSchema(CamelCaseSchema):
attribute_name = fields.Str()
values = fields.Nested(ADObjectChangeValuesSchema)
value_type = fields.Str()


class ADObjectAttributesSchema(CamelCaseSchema):
name = fields.Str()
value = fields.Str()
value_type = fields.Str()


class ADObjectSchema(CamelCaseSchema):
id = fields.Int()
directory_id = fields.Int()
object_id = fields.Str()
type = fields.Str()
object_attributes = fields.Nested(ADObjectAttributesSchema, many=True)
reasons = fields.List(fields.Int())
wanted_values = fields.List(fields.Str())
expression = fields.Mapping()
directories = fields.List(fields.Int())
date_start = fields.DateTime()
date_end = fields.DateTime()
show_ignored = fields.Bool()
page = fields.Int(allow_none=True)
per_page = fields.Int(allow_none=True)
max_pages = fields.Int(allow_none=True)
max_items = fields.Int(allow_none=True)
9 changes: 9 additions & 0 deletions tenable/ad/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tenable.base.platform import APIPlatform

from .about import AboutAPI
from .ad_object.api import ADObjectAPI
from .alert.api import AlertsAPI
from .api_keys import APIKeyAPI
from .attack_type_options.api import AttackTypeOptionsAPI
Expand Down Expand Up @@ -67,6 +68,14 @@ def about(self):
'''
return AboutAPI(self)

@property
def ad_object(self):
'''
The interface object for the
:doc:`Tenable.ad AD Object APIs <ad_object>`.
'''
return ADObjectAPI(self)

@property
def alerts(self):
'''
Expand Down
Loading

0 comments on commit 321eed7

Please sign in to comment.