Skip to content

Commit

Permalink
chore(search): Added Cert Observation Endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
thehappydinoa committed Oct 17, 2024
1 parent d03e58b commit 31d6daa
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 0 deletions.
36 changes: 36 additions & 0 deletions censys/search/v2/certs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
"""Interact with the Censys Search Cert API."""

import warnings
from typing import List, Optional, Union

from ...common.types import Datetime
from ...common.utils import format_rfc3339
from .api import CensysSearchAPIv2


Expand Down Expand Up @@ -306,6 +310,11 @@ def get_hosts_by_cert(self, fingerprint: str, cursor: Optional[str] = None) -> d
Returns:
dict: A list of hosts which contain services presenting this certificate.
"""
warnings.warn(
"This API endpoint is deprecated and scheduled for removal during a future release. Users should migrate to using the search endpoint on the Host index using the 'services.certificate: {fingerprint}' query to find any hosts currently presenting a certificate.",
category=DeprecationWarning,
stacklevel=2,
)
args = {"cursor": cursor}
return self._get(self.view_path + fingerprint + "/hosts", args)["result"]

Expand All @@ -319,3 +328,30 @@ def list_certs_with_tag(self, tag_id: str) -> List[dict]:
List[dict]: A list of certs which are tagged with the specified tag.
"""
return self._list_documents_with_tag(tag_id, "certificates", "certs")

def get_observations(
self,
fingerprint: str,
per_page: int = 50,
start_time: Optional[Datetime] = None,
end_time: Optional[Datetime] = None,
cursor: Optional[str] = None,
) -> dict:
"""Returns a list of observations for the specified certificate.
Args:
fingerprint (str): The SHA-256 fingerprint of the requested certificate.
per_page (int): The number of results to return per page. Defaults to 50.
start_time (str): The start time of the observations to return.
end_time (str): The end time of the observations to return.
cursor (str): Cursor token from the API response, which fetches the next page of observations when added to the endpoint URL.
Returns:
dict: A list of observations for the specified certificate.
"""
args = {"per_page": per_page, "cursor": cursor}
if start_time:
args["start_time"] = format_rfc3339(start_time)
if end_time:
args["end_time"] = format_rfc3339(end_time)
return self._get(self.view_path + fingerprint + "/observations", args)["result"]
56 changes: 56 additions & 0 deletions tests/search/v2/test_certs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from typing import Any, Dict, List, Optional

import responses
Expand Down Expand Up @@ -178,6 +179,27 @@
"links": {"next": "nextCursorToken"},
},
}
OBSERVATIONS_CERT_JSON = {
"code": 200,
"status": "OK",
"result": {
"fingerprint": "string",
"observations": [
{
"ip": "string",
"name": "string",
"port": 0,
"service_name": "string",
"transport_protocol": "TCP",
"first_observed_at": "2024-10-17T18:13:23.554Z",
"last_observed_at": "2024-10-17T18:13:23.554Z",
"first_updated_at": "2024-10-17T18:13:23.554Z",
"last_updated_at": "2024-10-17T18:13:23.554Z",
}
],
"links": {"next": "nextCursorToken"},
},
}


class TestCerts(CensysTestCase):
Expand Down Expand Up @@ -385,3 +407,37 @@ def test_get_hosts_by_cert_with_cursor(self):
)
results = self.api.get_hosts_by_cert(TEST_CERT, cursor="nextCursorToken")
assert results == VIEW_HOSTS_BY_CERT_JSON["result"]

def test_get_observations_by_cert(self):
self.responses.add(
responses.GET,
f"{V2_URL}/certificates/{TEST_CERT}/observations",
status=200,
json=OBSERVATIONS_CERT_JSON,
)
result = self.api.get_observations(TEST_CERT)
assert result == OBSERVATIONS_CERT_JSON["result"]

def test_get_observations_by_cert_with_cursor(self):
self.responses.add(
responses.GET,
f"{V2_URL}/certificates/{TEST_CERT}/observations?per_page=50&cursor=nextCursorToken",
status=200,
json=OBSERVATIONS_CERT_JSON,
)
results = self.api.get_observations(TEST_CERT, cursor="nextCursorToken")
assert results == OBSERVATIONS_CERT_JSON["result"]

def test_get_observations_with_rfc3339_timestampts(self):
self.responses.add(
responses.GET,
f"{V2_URL}/certificates/{TEST_CERT}/observations?per_page=50&start_time=2024-10-14T18%3A13%3A23.554000Z&end_time=2024-10-17T18%3A13%3A23.554000Z",
status=200,
json=OBSERVATIONS_CERT_JSON,
)
result = self.api.get_observations(
TEST_CERT,
start_time=datetime(2024, 10, 14, 18, 13, 23, 554000),
end_time=datetime(2024, 10, 17, 18, 13, 23, 554000),
)
assert result == OBSERVATIONS_CERT_JSON["result"]

0 comments on commit 31d6daa

Please sign in to comment.