Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add events bus when inserting or updating new object in arangodb #1144

Merged
merged 58 commits into from
Oct 19, 2024

Conversation

udgover
Copy link
Collaborator

@udgover udgover commented Oct 9, 2024

Overview

This PR adds support for an events bus when inserting or updating new objects in arangodb.

The system relies on Kombu messaging library which is the underlying system to handle message passing in celery.

It aims to keep it simple, stupid by providing an event producer that will publish new message in events or logs queues and consumers that will call relevant tasks of type EventTask or LogTask accordingly.

Implementation details

Messages

Messages format are validated by relying on pydantic models defined in core.events.message.py:

class MessageType(str, Enum):
    log = "log"
    event = "event"

class EventType(str, Enum):
    new = "new"
    update = "update"
    delete = "delete"

class AbstractEvent(BaseModel, abc.ABC):
    def match(self, acts_on: Pattern) -> bool:
        raise NotImplementedError

class ObjectEvent(AbstractEvent):
    type: EventType
    yeti_object: YetiObjectTypes
    [...]

class LinkEvent(AbstractEvent):
    type: EventType
    source_object: YetiObjectTypes
    target_object: YetiObjectTypes
    relationship: "graph.Relationship"
    [...]
    
class TagEvent(AbstractEvent):
    type: EventType
    tagged_object: YetiObjectTypes
    tag_object: "tag.Tag"
    [...]

class AbstractMessage(BaseModel, abc.ABC):
    type: MessageType
    timestamp: datetime.datetime = Field(default_factory=now)

class LogMessage(AbstractMessage):
    type: MessageType = MessageType.log
    log: str | dict

EventTypes = Union[ObjectEvent, LinkEvent, TagEvent]

class EventMessage(AbstractMessage):
    type: MessageType = MessageType.event
    event: EventTypes

YetiObjectTypes supports all defined schemas in Yeti (even private ones). For example, with ObjectEvent message data, yeti_object could be a campaign (entity), an ipv4 (observable), a regex (indicator), ...

Producer

To publish an event message, producer must be called with publish_event method with the event message argument that can be of type ObjectEvent, LinkEvent or TagLinkEvent:

from core.events.producer import producer

producer.publish_event(event: EventTypes)

To publish a log message, producer must be called with publish_log method which supports either str or dict as arguments:

from core.events.producer import producer

producer.publish_log(log: str | dict)

The call to publish method is non-blocking and just sends the message to the configured queue and exchange, events and logs respectively.

Consumers

In order to receive messages and process them, consumers must be created by defining from which queue they must receive the messages.

To handle events messages:

python -m core.events.consumers events

To handle logs messages:

python -m core.events.consumers logs

By default, the consumer will spawn several multiprocessing.Process based on the number of available CPU by using multiprocessing.cpu_count().

If the number of spawned processes must be changed, --concurrency argument can be used, followed by the number of processes to spawn.

Events Message routing

Plugins of type EventTask can define acts_on attribute which represents a regex to match an event. For every event message, EventWorker calls match method implemented by EventTypes classes with acts_on.

As example, the following acts_on can be set to precisely define when a task must be called on events messages:

  • Global events -> ObjectEvent, LinkEvent, TagEvent

    • called on all events: ""
    • called on all new events: "new"
    • called on all new or update events: "(new|update)"
  • Yeti objects specialisation -> ObjectEvent

    • called on all events related to observables or entity objects: "(new|update|delete):(observable|entity)"
    • called on all events related to observables: "(new|update|delete):observable"
    • called on all events related to url: "(new|update|delete):observable:url"
    • called on all events related to ipv4 and ipv6: "(new|update|delete):observable:(ipv4|ipv6)"
    • called on all new campaign or vulnerability: "new:entity:(campaign|vulnerability)"
    • called on tag creation, update or deletion: "(new|update|delete):tag"
  • Link events specialisation -> LinkEvent

    • called on all events related to links: "(new|update|delete):link"
    • called on all events related to links having an observable as source: "(new|update|delete):link:source:observable"
    • called on all events related to links having an observable as target: "(new|update|delete):link:target:observable"
  • Tagged event specialisation -> TagEvent

    • called on all events related to tag links: "(new|update|delete):tagged"
    • called on all events related when an object is tagged with malware or c2: "(new|update|delete):tagged:(malware|c2)"

EventTask implementation

In order to create task to be called based on events, the following example is provided:

from urllib.parse import urlparse

from core import taskmanager
from core.events.message import Message
from core.schemas import observable, task

class HostnameExtract(task.EventTask):
    _defaults = {
        "name": "HostnameExtact",
        "description": "Extract hostname (domain or ip) from new URL observable.",
        "acts_on": "(new|update):observable:url",
    }

    def run(self, message: EventMessage) -> None:
        url = message.event.yeti_object
        self.logger.info(f"Extracting hostname from: {url.value}")
        o = urlparse(url.value)
        if observable.IPv4.is_valid(o.hostname):
            extracted_obs = observable.IPv4(value=o.hostname).save()
        else:
            extracted_obs = observable.Hostname(value=o.hostname).save()
        url.link_to(extracted_obs, "hostname", "Extracted hostname from URL")
        return

taskmanager.TaskManager.register_task(HostnameExtract)

The task must inherit task.EventTask and define _defaults dictionary to define its name, description and the events to acts on

When a consumer matches a task based on its acts_on, task run method will be called with the event as argument.

In the example, this task will always receive an EventMessage with event of type ObjectEvent because the consumer will precisely match on acts_on which is based on ObjectEvent.

When implementing a task with a more generic acts_on, the task is responsible for handling the different event types it can receive.

Testing

Producer / Consumer

For now, you have to spawn a new python shell to execute the consumer:

poetry run python -m core.events.consumer events --debug

Then in another shell to trigger producer when saving an observable

>>> from core.schemas import observable
>>> observable.Url(value="http://example.com").save()

New plugins

This PR also adds two new plugins:

  • DomainExtract which is an inline task and will be executed for all event matching new.observables.url. It will extract hostname or ipv4 from a newly inserted Url observable
  • LoggerExample which calls logging.info for all received events.

udgover and others added 27 commits October 9, 2024 18:54
@udgover udgover marked this pull request as ready for review October 17, 2024 13:27
@udgover udgover added enhancement dependencies Pull requests that update a dependency file code health Changes about syntax, code health, etc. noteworthy PRs that are noteworthy / introduce new features core labels Oct 17, 2024
@udgover udgover linked an issue Oct 17, 2024 that may be closed by this pull request
core/database_arango.py Outdated Show resolved Hide resolved
core/database_arango.py Show resolved Hide resolved
core/database_arango.py Outdated Show resolved Hide resolved
core/database_arango.py Show resolved Hide resolved
core/events/consumers.py Outdated Show resolved Hide resolved
core/events/message.py Outdated Show resolved Hide resolved
@tomchop tomchop merged commit 46ab0f9 into main Oct 19, 2024
3 checks passed
@tomchop tomchop deleted the events_system branch October 19, 2024 11:11
@hilt86
Copy link

hilt86 commented Nov 1, 2024

will this eventually enable us to check if an observable overlaps with different feeds or analytics?

@udgover
Copy link
Collaborator Author

udgover commented Nov 4, 2024

Hello, if you create an EventTask that acts on update.observables then you will be able to check if there's a new context added by a feed or analytic.

There's no acts_on defined for new.observables.context as of now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
code health Changes about syntax, code health, etc. core dependencies Pull requests that update a dependency file enhancement noteworthy PRs that are noteworthy / introduce new features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Events publisher
3 participants