Skip to content

Commit

Permalink
fire event at job start
Browse files Browse the repository at this point in the history
  • Loading branch information
Legrems committed Oct 16, 2023
1 parent eedc281 commit d0310d3
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 0 deletions.
3 changes: 3 additions & 0 deletions salt/metaproxy/deltaproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
SaltInvocationError,
SaltSystemExit,
)
from salt.utils.decorators.start_event import fire_started_event_job_wrapper
from salt.minion import ProxyMinion
from salt.utils.event import tagify
from salt.utils.process import SignalHandlingProcess, default_signals
Expand Down Expand Up @@ -609,6 +610,7 @@ def target(cls, minion_instance, opts, data, connected, creds_map):
ProxyMinion._thread_return(minion_instance, opts, data)


@fire_started_event_job_wrapper
def thread_return(cls, minion_instance, opts, data):
"""
This method should be used as a threading target, start the actual
Expand Down Expand Up @@ -858,6 +860,7 @@ def thread_return(cls, minion_instance, opts, data):
log.exception("The return failed for job %s: %s", data["jid"], exc)


@fire_started_event_job_wrapper
def thread_multi_return(cls, minion_instance, opts, data):
"""
This method should be used as a threading target, start the actual
Expand Down
3 changes: 3 additions & 0 deletions salt/metaproxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
SaltSystemExit,
)
from salt.minion import ProxyMinion
from salt.utils.decorators.start_event import fire_started_event_job_wrapper
from salt.utils.event import tagify
from salt.utils.process import SignalHandlingProcess, default_signals

Expand Down Expand Up @@ -393,6 +394,7 @@ def target(cls, minion_instance, opts, data, connected, creds_map):
ProxyMinion._thread_return(minion_instance, opts, data)


@fire_started_event_job_wrapper
def thread_return(cls, minion_instance, opts, data):
"""
This method should be used as a threading target, start the actual
Expand Down Expand Up @@ -632,6 +634,7 @@ def thread_return(cls, minion_instance, opts, data):
log.exception("The return failed for job %s: %s", data["jid"], exc)


@fire_started_event_job_wrapper
def thread_multi_return(cls, minion_instance, opts, data):
"""
This method should be used as a threading target, start the actual
Expand Down
3 changes: 3 additions & 0 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
from salt.template import SLS_ENCODING
from salt.utils.ctx import RequestContext
from salt.utils.debug import enable_sigusr1_handler
from salt.utils.decorators.start_event import fire_started_event_job_wrapper
from salt.utils.event import tagify
from salt.utils.network import parse_host_port
from salt.utils.odict import OrderedDict
Expand Down Expand Up @@ -1906,6 +1907,7 @@ def _execute_job_function(
return None

@classmethod
@fire_started_event_job_wrapper
def _thread_return(cls, minion_instance, opts, data):
"""
This method should be used as a threading target, start the actual
Expand Down Expand Up @@ -2107,6 +2109,7 @@ def _thread_return(cls, minion_instance, opts, data):
log.exception("The return failed for job %s: %s", data["jid"], exc)

@classmethod
@fire_started_event_job_wrapper
def _thread_multi_return(cls, minion_instance, opts, data):
"""
This method should be used as a threading target, start the actual
Expand Down
31 changes: 31 additions & 0 deletions salt/utils/decorators/start_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
import logging
from functools import wraps

log = logging.getLogger(__name__)


def fire_started_event_job_wrapper(function):
"""
Fire a custom/job/<jid>/started/<minion_id> event when starting a job
"""

@wraps(function)
def wrapped(*args, **kwargs):
"""We wraps for *args and **kwargs, to be safe, but we expect: (cls, minion_instance, opts, data) as args"""

try:
sdata = {"pid": os.getpid()}
sdata.update(args[3])

jid = sdata["jid"]
minion_id = args[1].opts.get("id", "undefined")

args[1]._fire_master(sdata, f"custom/job/{jid}/started/{minion_id}")

except Exception as exc:
log.warning(f"Exception raised during started job event wrapper: {exc}")

return function(*args, **kwargs)

return wrapped

0 comments on commit d0310d3

Please sign in to comment.