-
Notifications
You must be signed in to change notification settings - Fork 424
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Comlaude] improvements #3055
base: master
Are you sure you want to change the base?
[Comlaude] improvements #3055
Conversation
Hello @yassine-ouaamou, and @MohamedMerimi, Suggestions for Code ImprovementDuring this PR, I took the time to examine the code as a whole, and here are a few points I think would be useful with best practices : 1. README
2. Docker Compose
3. config.yml.sample
4. Import management
import datetime
import json
import os
import sys
import threading
import time
import comlaude
import stix2
import yaml
from pycti import (
Identity,
Indicator,
OpenCTIConnectorHelper,
StixCoreRelationship,
get_config_variable,
)
from stix2 import TLP_AMBER, Bundle, DomainName 5. requirements.txt
6. Code cleaning
def _get_interval(self):
"""
Get the interval of execution in seconds.
:return: Interval in seconds.
"""
return int(self.config_interval) * 60 * 60
def run(self):
self.helper.schedule_iso(
message_callback=self.your_old_process_run,
duration_period=self.duration_period
)
self.update_existing_data = get_config_variable(
"CONFIG_UPDATE_EXISTING_DATA",
["comlaude", "update_existing_data"],
self.config,
isNumber=True,
)
self.helper.send_stix2_bundle(
bundle.serialize(),
update=self.update_existing_data, # <<<<<< to be deleted
work_id=self.work_id,
)
self.helper.send_stix2_bundle(
bundle.serialize(),
cleanup_inconsistent_bundle=True,
work_id=self.work_id,
)
finally:
if self.work_id is not None:
self.helper.api.work.to_processed(self.work_id, "Finished")
if __name__ == "__main__":
"""
Entry point of the script.
"""
import traceback
try:
connector = ComlaudeConnector()
connector.run()
except Exception:
traceback.print_exc()
sys.exit(1) |
if field not in domain_object or _is_empty(domain_object[field]) | ||
] | ||
if missing_fields: | ||
print(f"Skipping domain due to missing fields: {missing_fields}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please use the logger rather than a print please ? (and preferably when using the method rather than directly in it : this is not this method that skips but the code that uses its result)
@@ -84,13 +120,24 @@ def _generate_dynamic_custom_properties(helper, domain_object, score, author_ide | |||
custom_properties = { | |||
"x_opencti_score": score, | |||
"x_opencti_description": "This domain is known infrastructure managed by Comlaude.", | |||
"created_by_ref": author_identity.id, # Add the created_by_ref to custom properties |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for cleaning the code base 👍
@@ -255,7 +302,7 @@ def _load_config(self) -> dict: | |||
) | |||
return config | |||
except Exception as e: | |||
print(f"Error loading configuration: {str(e)}") | |||
self.helper.log_error(f"Error loading configuration: {str(e)}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
This PR contains the code commited by @MohamedMerimi:
Proposed changes
Related issues