Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Comlaude] improvements #3055

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 109 additions & 26 deletions external-import/comlaude/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,22 @@
"""

import datetime
import json
import os
import sys
import threading
import time

import stix2
import yaml
from pycti import (
OpenCTIConnectorHelper,
get_config_variable,
Identity,
Indicator,
OpenCTIConnectorHelper,
StixCoreRelationship,
get_config_variable,
)
from stix2 import Bundle, DomainName, TLP_AMBER
from stix2 import TLP_AMBER, Bundle, DomainName

import comlaude

Expand Down Expand Up @@ -71,6 +73,40 @@ def _is_empty(value):
return False


def _deserialize_json_string(value):
"""
Attempt to deserialize a JSON string into a Python object.

:param value: Potentially serialized JSON string.
:return: Deserialized Python object or the original value if deserialization fails.
"""
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return value
return value


def _validate_required_fields(domain_object, required_fields):
"""
Validate that all required fields are present and not empty.

:param domain_object: Dictionary representing the domain object.
:param required_fields: List of required fields to validate.
:return: Boolean indicating whether all required fields are present and non-empty.
"""
missing_fields = [
field
for field in required_fields
if field not in domain_object or _is_empty(domain_object[field])
]
if missing_fields:
print(f"Skipping domain due to missing fields: {missing_fields}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use the logger rather than a print please ? (and preferably when using the method rather than directly in it : this is not this method that skips but the code that uses its result)

return False
return True


def _generate_dynamic_custom_properties(helper, domain_object, score, author_identity):
"""
Generate custom properties for domain objects dynamically with a specific prefix.
Expand All @@ -84,13 +120,24 @@ def _generate_dynamic_custom_properties(helper, domain_object, score, author_ide
custom_properties = {
"x_opencti_score": score,
"x_opencti_description": "This domain is known infrastructure managed by Comlaude.",
"created_by_ref": author_identity.id, # Add the created_by_ref to custom properties
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for cleaning the code base 👍

"created_by_ref": author_identity.id,
}
required_fields = ["id", "name", "created_at", "updated_at"]
for key, value in domain_object.items():
if not _is_empty(value):
custom_key = X_OPENCTI_PREFIX + key
custom_properties[custom_key] = value
helper.log_debug(f"Custom Key: {custom_key}")
if not _is_empty(value) and key in required_fields:
# Deserialize JSON values if necessary
value = _deserialize_json_string(value)

# Serialize complex values as JSON
if isinstance(value, (dict, list)):
try:
custom_properties[X_OPENCTI_PREFIX + key] = json.dumps(value)
helper.log_debug(f"Serialized Custom Key: {X_OPENCTI_PREFIX + key}")
except Exception as e:
helper.log_error(f"Error serializing value for key {key}: {str(e)}")
else:
custom_properties[X_OPENCTI_PREFIX + key] = value
helper.log_debug(f"Custom Key: {X_OPENCTI_PREFIX + key}")
domain_name = custom_properties.pop(f"{X_OPENCTI_PREFIX}name", None)
helper.log_debug(f"Pop domain_name: {domain_name}")
return domain_name, custom_properties
Expand Down Expand Up @@ -156,7 +203,7 @@ def _create_stix_create_bundle(helper, domain_object, labels, score, author_iden
source_ref=sdo_indicator.id,
target_ref=sco_domain_name.id,
start_time=start_time,
created_by_ref=author_identity.id, # Remplace author_identity.id par self.identity.id
created_by_ref=author_identity.id,
)

helper.log_debug(f"Create relationships: {domain_name}")
Expand Down Expand Up @@ -255,7 +302,7 @@ def _load_config(self) -> dict:
)
return config
except Exception as e:
print(f"Error loading configuration: {str(e)}")
self.helper.log_error(f"Error loading configuration: {str(e)}")
raise

def _get_interval(self):
Expand Down Expand Up @@ -284,23 +331,58 @@ def _iterate_events(self):
"""
Iterate through events from Comlaude, generate STIX bundles, and send them to OpenCTI.
"""
required_fields = ["name", "created_at", "updated_at"]
self.helper.log_info(
"Process ({}) events.".format(len(self.comlaude_search.results["data"]))
)
if len(self.comlaude_search.results["data"]) > 0:
self._refresh_work_id()
stix_objects = [self.identity]
last_event_time = None
for event in self.comlaude_search.results["data"]:
# Deserialize JSON fields before validating
for key in event.keys():
event[key] = _deserialize_json_string(event[key])
if not _validate_required_fields(event, required_fields):
continue
domain_name, objects = _create_stix_create_bundle(
self.helper, event, self.labels, self.score, self.identity
)
stix_objects.extend(objects)
bundle = Bundle(objects=stix_objects, allow_custom=True)
self.helper.send_stix2_bundle(
bundle.serialize(),
update=self.update_existing_data,
work_id=self.work_id,
)
last_event_time = event.get("updated_at", None)
bundle = Bundle(objects=stix_objects, allow_custom=True)
try:
self.helper.send_stix2_bundle(
bundle.serialize(),
update=self.update_existing_data,
work_id=self.work_id,
)
except Exception as e:
self.helper.log_error(f"Error sending STIX bundle: {str(e)}")
stix_objects = [self.identity]

if last_event_time:
try:
# Update the state with the current timestamp
current_timestamp = _format_time(datetime.datetime.utcnow())
self.helper.set_state({"last_run": current_timestamp})
self.helper.log_info(
f"State updated with last_run: {current_timestamp}"
)
except Exception as e:
self.helper.log_error(f"Error updating last_run state: {str(e)}")

def _ping_connector(self):
"""
Continuously ping OpenCTI to keep the connector alive.
"""
while True:
try:
self.helper.force_ping()
self.helper.log_info("Connector ping successful.")
except Exception as e:
self.helper.log_error(f"Error during connector ping: {str(e)}")
time.sleep(300)

def run(self):
"""
Expand All @@ -312,16 +394,17 @@ def run(self):
)
)

# Start the ping thread to keep the connector alive
ping_thread = threading.Thread(target=self._ping_connector)
ping_thread.daemon = True
ping_thread.start()

while True:
if self._process_events():
self.helper.log_info(
"Connector stop: ({})".format(
_format_time(datetime.datetime.utcnow())
)
)
self.helper.force_ping()
# Sleep for interval specified in Hours.
time.sleep(self._get_interval())
try:
self._process_events()
except Exception as e:
self.helper.log_error(f"Error during event processing: {str(e)}")
time.sleep(300)

def _process_events(self):
"""
Expand All @@ -348,6 +431,6 @@ def _process_events(self):
connector = ComlaudeConnector()
connector.run()
except Exception as e:
print(e)
OpenCTIConnectorHelper.log_error(f"Fatal error: {str(e)}")
time.sleep(10)
sys.exit(0)