Skip to content

Prepare Tuency bot for new version #2561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -45,6 +45,11 @@
- `intelmq.bots.experts.securitytxt`:
- Added new bot (PR#2538 by Frank Westers and Sebastian Wagner)
- `intelmq.bots.experts.misp`: Use `PyMISP` class instead of deprecated `ExpandedPyMISP` (PR#2532 by Radek Vyhnal)
- `intelmq.bots.experts.tuency`: (PR#2561 by Kamil Mańkowski)
- Support for querying using `feed.code` and `classification.identifier` (requires Tuency 2.6+),
- Support for customizing fields and the TTL value for suspended sending.
- Support selecting if IP and/or FQDN should be used for querying Tuency.
- Various fixes.
- `intelmq.bots.experts.fake.expert`: New expert to fake data (PR#2567 by Sebastian Wagner).

#### Outputs
53 changes: 47 additions & 6 deletions docs/user/bots.md
Original file line number Diff line number Diff line change
@@ -4054,8 +4054,9 @@ addresses and delivery settings for IP objects (addresses, netblocks), Autonomou

- `classification.taxonomy`
- `classification.type`
- `classification.identifier`
- `feed.provider`
- `feed.name`
- `feed.name` or `feed.code`

These fields therefore need to exist, otherwise the message is skipped.

@@ -4064,17 +4065,20 @@ The API parameter "feed_status" is currently set to "production" constantly, unt
The API answer is processed as following. For the notification interval:

- If *suppress* is true, then `extra.notify` is set to false.
If explicitly configured, a special TTL value can be set.
- Otherwise:
- If the interval is *immediate*, then `extra.ttl` is set to 0.
- Otherwise the interval is converted into seconds and saved in
`extra.ttl`.
- If the interval is *immediate*, then `extra.ttl` is set to 0.
- Otherwise the interval is converted into seconds and saved in
`extra.ttl`.

For the contact lookup: For both fields *ip* and *domain*, the
*destinations* objects are iterated and its *email* fields concatenated to a comma-separated list
in `source.abuse_contact`.

The IntelMQ fields used by this bot may change in the next IntelMQ release, as soon as better suited fields are
available.
For constituency: if provided from Tuency, the list of relvant consitituencies will
be saved comma-separated in the `extra.constituency` field.

The IntelMQ fields used by this bot may be customized by the parameters.

**Module:** `intelmq.bots.experts.tuency.expert`

@@ -4092,6 +4096,43 @@ available.

(optional, boolean) Whether the existing data in `source.abuse_contact` should be overwritten. Defaults to true.

**`notify_field`**

(optional, string) Name of the field to save information if the message should not be send
(suspension in Tuency). By default `extra.notify`

**`ttl_field`**

(optional, string) Name of the field to save the TTL value (in seconds). By default `extra.ttl`.

**`constituency_field`**

(optional, string) Name of the gield to save information about the consitutuency. By default
`extra.constituency`. If set to empty value, this information won't be saved.

**`query_ip`**

(optional, boolean) Whether the bot should query Tuency based on `source.ip`. By default `true`.

**`query_domain`**

(optional, boolean) Whether the bot should query Tuency based on `source.fqdn`. By default `true`.

**`ttl_on_suspended`**

(optional, integer) Custom value to set as TTL when the sending is suspended. By default
not set - no value will be set at all.

**`query_classification_identifier`**

(optional, boolean) Whether to add `classification.identifier` to the query. Requires
at least Tuency 2.6. By default `False`.

**`query_feed_code`**

(optional, boolean) Whether to query using `feed.code` instead of `feed.name`. Requires
at least Tuency 2.6. By default `False`.

---

### Truncate By Delimiter <div id="intelmq.bots.experts.truncate_by_delimiter.expert" />
135 changes: 108 additions & 27 deletions intelmq/bots/experts/tuency/expert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
© 2021 Sebastian Wagner <wagner@cert.at>

SPDX-FileCopyrightText: 2021 Sebastian Wagner <wagner@cert.at>
SPDX-FileCopyrightText: 2025 CERT.at GmbH <https://cert.at/>
SPDX-License-Identifier: AGPL-3.0-or-later

https://gitlab.com/intevation/tuency/tuency/-/blob/master/backend/docs/IntelMQ-API.md
@@ -26,60 +26,141 @@ class TuencyExpertBot(ExpertBot):
authentication_token: str
overwrite: bool = True

notify_field = "extra.notify"
ttl_field = "extra.ttl"
constituency_field = "extra.constituency"

query_ip = True
query_domain = True

# Allows setting custom TTL for suspended sending
ttl_on_suspended = None

# Non-default values require Tuency v2.6+
query_classification_identifier = False
query_feed_code = False

def init(self):
self.set_request_parameters()
self.session = create_request_session(self)
self.session.headers["Authorization"] = f"Bearer {self.authentication_token}"
self.url = f"{self.url}intelmq/lookup"

if not self.query_ip and not self.query_domain:
self.logger.warning(
"Neither query_ip nor query_domain is set. "
"Bot won't do anything, please ensure it's intended."
)

@staticmethod
def check(parameters):
results = []
if not parameters.get("query_ip", True) and not parameters.get(
"query_domain", True
):
results.append(
[
"warning",
"Neither query_ip nor query_domain is set. "
"Bot won't do anything, please ensure it's intended.",
]
)

return results or None

def process(self):
event = self.receive_message()
if not ("source.ip" in event or "source.fqdn" in event):
self.send_message(event)
self.acknowledge_message()
return

try:
params = {
"classification_taxonomy": event["classification.taxonomy"],
"classification_type": event["classification.type"],
"feed_provider": event["feed.provider"],
"feed_name": event["feed.name"],
"feed_status": "production",
}
if self.query_feed_code:
params["feed_code"] = event["feed.code"]
else:
params["feed_name"] = event["feed.name"]

if self.query_classification_identifier:
params["classification_identifier"] = event["classification.identifier"]
except KeyError as exc:
self.logger.debug('Skipping event because of missing field: %s.', exc)
self.logger.debug("Skipping event because of missing field: %s.", exc)
self.send_message(event)
self.acknowledge_message()
return
try:
params["ip"] = event["source.ip"]
except KeyError:
pass
try:
params["domain"] = event["source.fqdn"]
except KeyError:
pass

response = self.session.get(self.url, params=params).json()
self.logger.debug('Received response %r.', response)
if self.query_ip:
try:
params["ip"] = event["source.ip"]
except KeyError:
pass

if self.query_domain:
try:
params["domain"] = event["source.fqdn"]
except KeyError:
pass

if "ip" not in params and "domain" not in params:
# Nothing to query - skip
self.send_message(event)
self.acknowledge_message()
return

response = self.session.get(self.url, params=params)
self.logger.debug("Received response %r.", response.text)
response = response.json()

destinations = (
response.get("ip", {"destinations": []})["destinations"]
+ response.get("domain", {"destinations": []})["destinations"]
)

if response.get("suppress", False):
event["extra.notify"] = False
event.add(self.notify_field, False, overwrite=self.overwrite)
if self.ttl_on_suspended:
event.add(
self.ttl_field,
self.ttl_on_suspended,
overwrite=self.overwrite,
)
else:
if 'interval' not in response:
if not destinations:
# empty response
self.send_message(event)
self.acknowledge_message()
return
elif response['interval']['unit'] == 'immediate':
event["extra.ttl"] = 0
else:
event["extra.ttl"] = parse_relative(f"{response['interval']['length']} {response['interval']['unit']}") * 60

if "interval" in response:
if response["interval"]["unit"] == "immediate":
event.add(self.ttl_field, 0, overwrite=self.overwrite)
else:
event.add(
self.ttl_field,
(
parse_relative(
f"{response['interval']['length']} {response['interval']['unit']}"
)
* 60
),
overwrite=self.overwrite,
)

contacts = []
for destination in response.get('ip', {'destinations': []})['destinations'] + response.get('domain', {'destinations': []})['destinations']:
contacts.extend(contact['email'] for contact in destination["contacts"])
event.add('source.abuse_contact', ','.join(contacts), overwrite=self.overwrite)
for destination in destinations:
contacts.extend(contact["email"] for contact in destination["contacts"])
event.add("source.abuse_contact", ",".join(contacts), overwrite=self.overwrite)

if self.constituency_field and (
constituencies := response.get("constituencies", [])
):
event.add(
self.constituency_field,
",".join(constituencies),
overwrite=self.overwrite,
)

self.send_message(event)
self.acknowledge_message()
342 changes: 301 additions & 41 deletions intelmq/tests/bots/experts/tuency/test_expert.py
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@
This unittest can test the bot against a read tuency instance as well as using requests mock.
The latter is the default while the first is only in use if a tunency instance URL and authentication token is given a environment variable.
"""

import os
import unittest

@@ -15,74 +16,200 @@
import requests_mock


INPUT = {'__type': 'Event',
'classification.taxonomy': 'availability',
'classification.type': 'system-compromise',
'feed.provider': 'Some Provider',
'feed.name': 'FTP',
'source.ip': '123.123.123.23',
'source.fqdn': 'www.example.at'
}
INPUT = {
"__type": "Event",
"classification.taxonomy": "availability",
"classification.type": "system-compromise",
"classification.identifier": "hacked-server",
"feed.provider": "Some Provider",
"feed.name": "FTP",
"feed.code": "ftp",
"source.ip": "123.123.123.23",
"source.fqdn": "www.example.at",
}
INPUT_IP = INPUT.copy()
del INPUT_IP['source.fqdn']
INPUT_IP['source.abuse_contact'] = 'abuse@example.com'
del INPUT_IP["source.fqdn"]
INPUT_IP["source.abuse_contact"] = "abuse@example.com"
INPUT_DOMAIN = INPUT.copy()
del INPUT_DOMAIN['source.ip']
del INPUT_DOMAIN["source.ip"]
OUTPUT = INPUT.copy()
OUTPUT_IP = INPUT_IP.copy()
OUTPUT_IP['extra.notify'] = False
OUTPUT_IP['source.abuse_contact'] = 'test@ntvtn.de'
OUTPUT_IP["extra.notify"] = False
OUTPUT_IP["source.abuse_contact"] = "test@ntvtn.de"
OUTPUT_IP["extra.constituency"] = "Tenant1,Tenant2"
OUTPUT_IP_NO_OVERWRITE = OUTPUT_IP.copy()
OUTPUT_IP_NO_OVERWRITE['source.abuse_contact'] = 'abuse@example.com'
OUTPUT_IP_NO_OVERWRITE["source.abuse_contact"] = "abuse@example.com"
OUTPUT_DOMAIN = INPUT_DOMAIN.copy()
OUTPUT_DOMAIN['extra.ttl'] = 24*60*60 # 1 day
OUTPUT_DOMAIN['source.abuse_contact'] = 'abuse+www@example.at'
OUTPUT_DOMAIN["extra.ttl"] = 24 * 60 * 60 # 1 day
OUTPUT_DOMAIN["source.abuse_contact"] = "abuse+www@example.at"
OUTPUT_BOTH = OUTPUT.copy()
OUTPUT_BOTH['extra.ttl'] = 24*60*60 # 1 day
OUTPUT_BOTH['source.abuse_contact'] = 'test@ntvtn.de,abuse+www@example.at'
EMPTY = {'__type': 'Event', 'comment': 'foobar'}
OUTPUT_BOTH["extra.ttl"] = 24 * 60 * 60 # 1 day
OUTPUT_BOTH["source.abuse_contact"] = "test@ntvtn.de,abuse+www@example.at"
EMPTY = {"__type": "Event", "comment": "foobar"}
UNKNOWN_IP = INPUT_IP.copy()
UNKNOWN_IP['source.ip'] = '10.0.0.1'
UNKNOWN_IP["source.ip"] = "10.0.0.1"


PREFIX = 'http://localhost/intelmq/lookup?classification_taxonomy=availability&classification_type=system-compromise&feed_provider=Some+Provider&feed_name=FTP&feed_status=production'
PREFIX = (
"http://localhost/intelmq/lookup?classification_taxonomy=availability"
"&classification_type=system-compromise&feed_provider=Some+Provider"
"&feed_status=production"
)


def prepare_mocker(mocker):
# IP address
mocker.get(f'{PREFIX}&ip=123.123.123.23',
request_headers={'Authorization': 'Bearer Lorem ipsum'},
json={"ip":{"destinations":[{"source":"portal","name":"Thurner","contacts":[{"email":"test@ntvtn.de"}]}]},"suppress":True,"interval":{"unit":"days","length":1}})
mocker.get(
f"{PREFIX}&ip=123.123.123.23&feed_name=FTP",
request_headers={"Authorization": "Bearer Lorem ipsum"},
json={
"ip": {
"destinations": [
{
"source": "portal",
"name": "Thurner",
"contacts": [{"email": "test@ntvtn.de"}],
}
]
},
"suppress": True,
"interval": {"unit": "days", "length": 1},
"constituencies": ["Tenant1", "Tenant2"],
},
)

# Domain:
mocker.get(f'{PREFIX}&domain=www.example.at',
request_headers={'Authorization': 'Bearer Lorem ipsum'},
json={"domain":{"destinations":[{"source":"portal","name":"EineOrganisation","contacts":[{"email":"abuse+www@example.at"}]}]},"suppress":False,"interval":{"unit":"days","length":1}})
mocker.get(
f"{PREFIX}&domain=www.example.at&feed_name=FTP",
request_headers={"Authorization": "Bearer Lorem ipsum"},
json={
"domain": {
"destinations": [
{
"source": "portal",
"name": "EineOrganisation",
"contacts": [{"email": "abuse+www@example.at"}],
}
]
},
"suppress": False,
"interval": {"unit": "days", "length": 1},
},
)
# Both
mocker.get(f'{PREFIX}&ip=123.123.123.23&domain=www.example.at',
request_headers={'Authorization': 'Bearer Lorem ipsum'},
json={"ip":{"destinations":[{"source":"portal","name":"Thurner","contacts":[{"email":"test@ntvtn.de"}]}]},"domain":{"destinations":[{"source":"portal","name":"EineOrganisation","contacts":[{"email":"abuse+www@example.at"}]}]},"suppress":False,"interval":{"unit":"day","length":1}})
mocker.get(
f"{PREFIX}&ip=123.123.123.23&domain=www.example.at&feed_name=FTP",
request_headers={"Authorization": "Bearer Lorem ipsum"},
json={
"ip": {
"destinations": [
{
"source": "portal",
"name": "Thurner",
"contacts": [{"email": "test@ntvtn.de"}],
}
]
},
"domain": {
"destinations": [
{
"source": "portal",
"name": "EineOrganisation",
"contacts": [{"email": "abuse+www@example.at"}],
}
]
},
"suppress": False,
"interval": {"unit": "day", "length": 1},
},
)

# Unknown IP address
mocker.get(f'{PREFIX}&ip=10.0.0.1',
request_headers={'Authorization': 'Bearer Lorem ipsum'},
json={'ip': {'destinations': [], 'netobject': None}})
mocker.get(
f"{PREFIX}&ip=10.0.0.1&feed_name=FTP",
request_headers={"Authorization": "Bearer Lorem ipsum"},
json={"ip": {"destinations": [], "netobject": None}},
)

# feed_code
mocker.get(
f"{PREFIX}&ip=123.123.123.23&feed_code=ftp",
request_headers={"Authorization": "Bearer Lorem ipsum"},
json={
"ip": {
"destinations": [
{
"source": "portal",
"name": "Thurner",
"contacts": [{"email": "test+code@ntvtn.de"}],
}
]
},
"suppress": False,
"interval": {"unit": "days", "length": 1},
"constituencies": ["Tenant1", "Tenant2"],
},
)

# classification identifier
mocker.get(
f"{PREFIX}&ip=123.123.123.23&feed_name=FTP&classification_identifier=hacked-server",
request_headers={"Authorization": "Bearer Lorem ipsum"},
json={
"ip": {
"destinations": [
{
"source": "portal",
"name": "Thurner",
"contacts": [{"email": "test+identifier@ntvtn.de"}],
}
]
},
"suppress": True,
"interval": {"unit": "days", "length": 1},
"constituencies": ["Tenant1", "Tenant2"],
},
)

# IP address directly from RIPE
mocker.get(
f"{PREFIX}&ip=123.123.123.123&feed_name=FTP",
request_headers={"Authorization": "Bearer Lorem ipsum"},
json={
"ip": {
"destinations": [
{
"source": "ripe",
"name": "Thurner",
"contacts": [{"email": "test@ntvtn.de"}],
}
]
},
"suppress": True,
"constituencies": ["Tenant1", "Tenant2"],
},
)


@requests_mock.Mocker()
class TestTuencyExpertBot(BotTestCase, unittest.TestCase):
@classmethod
def set_bot(cls):
cls.bot_reference = TuencyExpertBot
if not os.environ.get("INTELMQ_TEST_TUNECY_URL") or not os.environ.get("INTELMQ_TEST_TUNECY_TOKEN"):
if not os.environ.get("INTELMQ_TEST_TUNECY_URL") or not os.environ.get(
"INTELMQ_TEST_TUNECY_TOKEN"
):
cls.mock = True
cls.sysconfig = {"url": 'http://localhost/',
"authentication_token": 'Lorem ipsum',
}
cls.sysconfig = {
"url": "http://localhost/",
"authentication_token": "Lorem ipsum",
}
else:
cls.mock = False
cls.sysconfig = {"url": os.environ["INTELMQ_TEST_TUNECY_URL"],
"authentication_token": os.environ["INTELMQ_TEST_TUNECY_TOKEN"],
}
cls.sysconfig = {
"url": os.environ["INTELMQ_TEST_TUNECY_URL"],
"authentication_token": os.environ["INTELMQ_TEST_TUNECY_TOKEN"],
}
cls.default_input_message = INPUT

def test_both(self, mocker):
@@ -114,7 +241,7 @@ def test_ip_no_overwrite(self, mocker):
else:
mocker.real_http = True
self.input_message = INPUT_IP
self.run_bot(parameters={'overwrite': False})
self.run_bot(parameters={"overwrite": False})
self.assertMessageEqual(0, OUTPUT_IP_NO_OVERWRITE)

def test_domain(self, mocker):
@@ -126,6 +253,112 @@ def test_domain(self, mocker):
self.run_bot()
self.assertMessageEqual(0, OUTPUT_DOMAIN)

def test_feed_code(self, mocker):
"""Using feed.code to identify feeds"""
if self.mock:
prepare_mocker(mocker)
else:
mocker.real_http = True

self.input_message = INPUT_IP
self.run_bot(parameters={"query_feed_code": True})
expected = {
**OUTPUT_IP,
"source.abuse_contact": "test+code@ntvtn.de",
"extra.ttl": 86400,
"extra.notify": None,
}
del expected["extra.notify"]
self.assertMessageEqual(
0,
expected,
)

def test_classification_identifier(self, mocker):
"""Using classification.identifier to filter events"""
if self.mock:
prepare_mocker(mocker)
else:
mocker.real_http = True

self.input_message = INPUT_IP
self.run_bot(parameters={"query_classification_identifier": True})
self.assertMessageEqual(
0,
{
**OUTPUT_IP,
"source.abuse_contact": "test+identifier@ntvtn.de",
},
)

def test_custom_fields(self, mocker):
"""Allow customize fields that bot sets"""
if self.mock:
prepare_mocker(mocker)
else:
mocker.real_http = True

self.input_message = INPUT_IP
self.run_bot(
parameters={
"notify_field": "extra.my_notify",
"constituency_field": "extra.my_constituency",
# Response for feed_code is not suspended - allows testing TTL
# "query_feed_code": True,
}
)

output = OUTPUT_IP.copy()
output["extra.my_notify"] = output["extra.notify"]
del output["extra.notify"]
output["extra.my_constituency"] = output["extra.constituency"]
del output["extra.constituency"]
self.assertMessageEqual(0, output)

def test_custom_fields_ttl(self, mocker):
"""Allow customize fields that bot sets"""
if self.mock:
prepare_mocker(mocker)
else:
mocker.real_http = True

self.input_message = INPUT_IP
self.run_bot(
parameters={
"ttl_field": "extra.my_ttl",
# Response for feed_code is not suspended - allows testing TTL
"query_feed_code": True,
}
)

output = OUTPUT_IP.copy()
del output["extra.notify"]
output["extra.my_ttl"] = 86400
output["source.abuse_contact"] = "test+code@ntvtn.de"
self.assertMessageEqual(0, output)

def test_ttl_on_suspended(self, mocker):
"""Allow setting custom TTL when Tuency decides on suspending sending"""
if self.mock:
prepare_mocker(mocker)
else:
mocker.real_http = True

self.input_message = INPUT_IP
self.run_bot(
parameters={
"ttl_on_suspended": -1,
}
)

self.assertMessageEqual(
0,
{
**OUTPUT_IP,
"extra.ttl": -1,
},
)

def test_empty(self, mocker):
"""
A message with neither an IP address nor a domain, should be ignored and just passed on.
@@ -134,10 +367,18 @@ def test_empty(self, mocker):
prepare_mocker(mocker)
else:
mocker.real_http = True

self.input_message = EMPTY
self.run_bot()
self.assertMessageEqual(0, EMPTY)

self.input_message = INPUT
self.run_bot(
parameters={"query_ip": False, "query_domain": False},
allowed_warning_count=1,
)
self.assertMessageEqual(0, INPUT)

def test_no_result(self, mocker):
"""
This IP address is not in the database
@@ -149,3 +390,22 @@ def test_no_result(self, mocker):
self.input_message = UNKNOWN_IP
self.run_bot()
self.assertMessageEqual(0, UNKNOWN_IP)

def test_data_from_ripe(self, mocker):
"""
Data sourced from ripe don't get interval
"""
if self.mock:
prepare_mocker(mocker)
else:
mocker.real_http = True

input_msq = INPUT_IP.copy()
input_msq["source.ip"] = "123.123.123.123"

self.input_message = input_msq
self.run_bot()

output_msg = OUTPUT_IP.copy()
output_msg["source.ip"] = "123.123.123.123"
self.assertMessageEqual(0, output_msg)