Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use arbitrary str instead of enum for querytype #1016

Merged
merged 3 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions core/schemas/indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,13 @@ def match(self, value: str) -> IndicatorMatch | None:
return None


class QueryType(str, Enum):
opensearch = "opensearch"
osquery = "osquery"
sql = "sql"
splunk = "splunk"
censys = "censys"
shodan = "shodan"


class Query(Indicator):
"""Represents a query that can be sent to another system."""

_type_filter: ClassVar[str] = IndicatorType.query
type: Literal["query"] = IndicatorType.query

query_type: QueryType
query_type: str
target_systems: list[str] = []

def match(self, value: str) -> IndicatorMatch | None:
Expand Down
4 changes: 1 addition & 3 deletions plugins/analytics/public/censys.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ def run(self):
api_secret=api_secret,
)

censys_queries, _ = indicator.Query.filter(
{"query_type": indicator.QueryType.censys}
)
censys_queries, _ = indicator.Query.filter({"query_type": "censys"})

for query in censys_queries:
ip_addresses = query_censys(hosts_api, query.pattern)
Expand Down
4 changes: 1 addition & 3 deletions plugins/analytics/public/shodan.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ def run(self):

shodan_api = Shodan(api_key)

shodan_queries, _ = indicator.Query.filter(
{"query_type": indicator.QueryType.shodan}
)
shodan_queries, _ = indicator.Query.filter({"query_type": "shodan"})

for query in shodan_queries:
ip_addresses = query_shodan(shodan_api, query.pattern, result_limit)
Expand Down
2 changes: 1 addition & 1 deletion plugins/feeds/public/dfiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _process_approach(yaml_string: str) -> None:
name=step.description,
pattern=step.value,
relevant_tags=approach.dfiq_tags or [],
query_type=indicator.QueryType.opensearch,
query_type="opensearch",
location=processor.name,
diamond=indicator.DiamondModel.victim,
).save()
Expand Down
6 changes: 3 additions & 3 deletions tests/analytics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_censys_query(self, mock_censys_hosts):
location="censys",
diamond=DiamondModel.infrastructure,
relevant_tags=["censys_query_tag"],
query_type=indicator.QueryType.censys,
query_type="censys",
).save()

mock_search_result = [
Expand Down Expand Up @@ -144,7 +144,7 @@ def test_shodan_query_with_various_limits(self, limit, expected_count, mock_shod
location="shodan",
diamond=DiamondModel.infrastructure,
relevant_tags=["shodan_query_tag"],
query_type=indicator.QueryType.shodan,
query_type="shodan",
).save()

def mock_search_cursor(query):
Expand Down Expand Up @@ -190,7 +190,7 @@ def test_shodan_observables_and_neighbors(self, mock_shodan):
location="shodan",
diamond=DiamondModel.infrastructure,
relevant_tags=["shodan_query_tag"],
query_type=indicator.QueryType.shodan,
query_type="shodan",
).save()

def mock_search_cursor(query):
Expand Down
4 changes: 2 additions & 2 deletions tests/schemas/fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from core import database_arango
from core.schemas.entity import Investigation, Malware, ThreatActor
from core.schemas.indicator import DiamondModel, Query, QueryType, Regex
from core.schemas.indicator import DiamondModel, Query, Regex
from core.schemas.observables import (
bic,
generic_observable,
Expand Down Expand Up @@ -79,7 +79,7 @@ def test_something(self):
location="syslogs",
diamond=DiamondModel.capability,
pattern='(reporter:"sshd" AND Accepted)',
query_type=QueryType.opensearch,
query_type="opensearch",
target_systems=["timesketch", "plaso"],
relevant_tags=["ssh", "login"],
).save()
Expand Down
Loading