Skip to content

Commit

Permalink
Add support for publicly readable taxii 2 api roots
Browse files Browse the repository at this point in the history
  • Loading branch information
erwin-eiq committed May 25, 2022
1 parent 01e751a commit cb1bd99
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Changelog
0.6.0 (2022-??-??)
------------------
* Add `public_discovery` option to taxii2 config
* Add support for publicly readable taxii 2 api roots

0.5.0 (2022-05-24)
------------------
Expand Down
7 changes: 6 additions & 1 deletion opentaxii/persistence/sqldb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ def get_api_roots(self) -> List[entities.ApiRoot]:
default=obj.default,
title=obj.title,
description=obj.description,
is_public=obj.is_public,
)
for obj in query.all()
]
Expand All @@ -536,6 +537,7 @@ def get_api_root(self, api_root_id: str) -> Optional[entities.ApiRoot]:
default=api_root.default,
title=api_root.title,
description=api_root.description,
is_public=api_root.is_public,
)
else:
return None
Expand All @@ -545,18 +547,20 @@ def add_api_root(
title: str,
description: Optional[str] = None,
default: Optional[bool] = False,
is_public: bool = False,
) -> entities.ApiRoot:
"""
Add a new api root.
:param str title: Title of the new api root
:param str description: [Optional] Description of the new api root
:param bool default: [Optional, False] If the new api should be the default
:param bool is_public: whether this is a publicly readable API root
:return: The added ApiRoot entity.
"""
api_root = taxii2models.ApiRoot(
title=title, description=description, default=False
title=title, description=description, default=default, is_public=is_public
)
self.db.session.add(api_root)
self.db.session.commit()
Expand All @@ -567,6 +571,7 @@ def add_api_root(
default=api_root.default,
title=api_root.title,
description=api_root.description,
is_public=is_public,
)

def get_job_and_details(
Expand Down
1 change: 1 addition & 0 deletions opentaxii/persistence/sqldb/taxii2models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ApiRoot(Base):
default = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False)
title = sqlalchemy.Column(sqlalchemy.String(100), nullable=False, index=True)
description = sqlalchemy.Column(sqlalchemy.Text)
is_public = sqlalchemy.Column(sqlalchemy.Boolean, nullable=False)

collections = relationship("Collection", back_populates="api_root")

Expand Down
14 changes: 11 additions & 3 deletions opentaxii/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,16 @@ def discovery_handler(self):
response["api_roots"] = [f"/{api_root.id}/" for api_root in api_roots]
return make_taxii2_response(response)

@register_handler(r"^/(?P<api_root_id>[^/]+)/$")
@register_handler(r"^/(?P<api_root_id>[^/]+)/$", handles_own_auth=True)
def api_root_handler(self, api_root_id):
try:
api_root = self.persistence.get_api_root(api_root_id=api_root_id)
except DoesNotExistError:
if context.account is None:
raise Unauthorized()
raise NotFound()
if context.account is None and not api_root.is_public:
raise Unauthorized()
response = {
"title": api_root.title,
"versions": ["application/taxii+json;version=2.1"],
Expand Down Expand Up @@ -527,12 +531,16 @@ def job_handler(self, api_root_id, job_id):
}
return make_taxii2_response(response)

@register_handler(r"^/(?P<api_root_id>[^/]+)/collections/$")
@register_handler(r"^/(?P<api_root_id>[^/]+)/collections/$", handles_own_auth=True)
def collections_handler(self, api_root_id):
try:
self.persistence.get_api_root(api_root_id=api_root_id)
api_root = self.persistence.get_api_root(api_root_id=api_root_id)
except DoesNotExistError:
if context.account is None:
raise Unauthorized()
raise NotFound()
if context.account is None and not api_root.is_public:
raise Unauthorized()
collections = self.persistence.get_collections(api_root_id=api_root_id)
response = {}
if collections:
Expand Down
6 changes: 5 additions & 1 deletion opentaxii/taxii2/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ class ApiRoot(Entity):
:param bool default: indicator of default api root, should only be True once
:param str title: human readable plain text name used to identify this API Root
:param str description: human readable plain text description for this API Root
:param bool is_public: whether this is a publicly readable API root
"""

def __init__(self, id: str, default: bool, title: str, description: str):
def __init__(
self, id: str, default: bool, title: str, description: str, is_public: bool
):
"""Initialize ApiRoot."""
self.id = id
self.default = default
self.title = title
self.description = description
self.is_public = is_public


class Collection(Entity):
Expand Down
40 changes: 34 additions & 6 deletions tests/taxii2/test_taxii2_api_root.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,58 +198,85 @@ def test_api_root(
assert content == expected_content


@pytest.mark.parametrize("is_public", [True, False])
@pytest.mark.parametrize("method", ["get", "post", "delete"])
def test_api_root_unauthenticated(
client,
method,
is_public,
):
func = getattr(client, method)
response = func(f"/{API_ROOTS[0].id}/")
assert response.status_code == 401
if is_public:
api_root_id = API_ROOTS[1].id
if method == "get":
expected_status_code = 200
else:
expected_status_code = 405
else:
api_root_id = API_ROOTS[0].id
if method == "get":
expected_status_code = 401
else:
expected_status_code = 405
with patch.object(
client.application.taxii_server.servers.taxii2.persistence.api,
"get_api_root",
side_effect=GET_API_ROOT_MOCK,
):
func = getattr(client, method)
response = func(
f"/{api_root_id}/",
headers={"Accept": "application/taxii+json;version=2.1"},
)
assert response.status_code == expected_status_code


@pytest.mark.parametrize(
["title", "description", "default", "db_api_roots"],
["title", "description", "default", "is_public", "db_api_roots"],
[
pytest.param(
"my new api root", # title
None, # description
False, # default
False, # is_public
[], # db_api_roots
id="title only",
),
pytest.param(
"my new api root", # title
"my description", # description
False, # default
True, # is_public
[], # db_api_roots
id="title, description",
),
pytest.param(
"my new api root", # title
None, # description
True, # default
False, # is_public
[], # db_api_roots
id="title, default",
),
pytest.param(
"my new api root", # title
"my description", # description
True, # default
True, # is_public
API_ROOTS_WITH_DEFAULT, # db_api_roots
id="title, description, default, existing",
),
],
indirect=["db_api_roots"],
)
def test_add_api_root(app, title, description, default, db_api_roots):
def test_add_api_root(app, title, description, default, is_public, db_api_roots):
api_root = app.taxii_server.servers.taxii2.persistence.api.add_api_root(
title, description, default
title, description, default, is_public
)
assert api_root.id is not None
assert api_root.title == title
assert api_root.description == description
assert api_root.default == default
assert api_root.is_public == is_public
db_api_root = (
app.taxii_server.servers.taxii2.persistence.api.db.session.query(
taxii2models.ApiRoot
Expand All @@ -260,6 +287,7 @@ def test_add_api_root(app, title, description, default, db_api_roots):
assert db_api_root.title == title
assert db_api_root.description == description
assert db_api_root.default == default
assert db_api_root.is_public == is_public
if default:
assert (
app.taxii_server.servers.taxii2.persistence.api.db.session.query(
Expand Down
32 changes: 29 additions & 3 deletions tests/taxii2/test_taxii2_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,37 @@ def test_collections(
assert content == expected_content


@pytest.mark.parametrize("is_public", [True, False])
@pytest.mark.parametrize("method", ["get", "post", "delete"])
def test_collections_unauthenticated(
client,
method,
is_public,
):
func = getattr(client, method)
response = func(f"/{API_ROOTS[0].id}/collections/")
assert response.status_code == 401
if is_public:
api_root_id = API_ROOTS[1].id
if method == "get":
expected_status_code = 200
else:
expected_status_code = 405
else:
api_root_id = API_ROOTS[0].id
if method == "get":
expected_status_code = 401
else:
expected_status_code = 405
with patch.object(
client.application.taxii_server.servers.taxii2.persistence.api,
"get_api_root",
side_effect=GET_API_ROOT_MOCK,
), patch.object(
client.application.taxii_server.servers.taxii2.persistence.api,
"get_collections",
side_effect=GET_COLLECTIONS_MOCK,
):
func = getattr(client, method)
response = func(
f"/{api_root_id}/collections/",
headers={"Accept": "application/taxii+json;version=2.1"},
)
assert response.status_code == expected_status_code
10 changes: 5 additions & 5 deletions tests/taxii2/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
from opentaxii.taxii2.utils import DATETIMEFORMAT, taxii2_datetimeformat

API_ROOTS_WITH_DEFAULT = (
ApiRoot(str(uuid4()), True, "first title", "first description"),
ApiRoot(str(uuid4()), False, "second title", "second description"),
ApiRoot(str(uuid4()), True, "first title", "first description", False),
ApiRoot(str(uuid4()), False, "second title", "second description", True),
)
API_ROOTS_WITHOUT_DEFAULT = (
ApiRoot(str(uuid4()), False, "first title", "first description"),
ApiRoot(str(uuid4()), False, "second title", "second description"),
ApiRoot(str(uuid4()), False, "third title", None),
ApiRoot(str(uuid4()), False, "first title", "first description", False),
ApiRoot(str(uuid4()), False, "second title", "second description", True),
ApiRoot(str(uuid4()), False, "third title", None, False),
)
API_ROOTS = API_ROOTS_WITHOUT_DEFAULT
NOW = datetime.datetime.now(datetime.timezone.utc)
Expand Down

0 comments on commit cb1bd99

Please sign in to comment.