Skip to content

Commit

Permalink
Merge pull request #332 from red-kite-solutions/wrap-job-to-end-them-all
Browse files Browse the repository at this point in the history
Wrap jobs to prevent forever-hanging jobs
  • Loading branch information
Aboisier authored Jan 25, 2025
2 parents 40cd99c + 9a341b4 commit 6dde05f
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 63 deletions.
1 change: 1 addition & 0 deletions jobs/job-base-images/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ RUN python -m pip install "poetry==1.3.2"
RUN python -m pip install pillow

COPY stalker_job_sdk /usr/src/stalker_job_sdk
COPY main.py /usr/src

RUN python -m pip install -e /usr/src/stalker_job_sdk
RUN apt-get update && apt-get install -y nmap libpcap0.8 wget gnupg libc6
Expand Down
15 changes: 15 additions & 0 deletions jobs/job-base-images/python/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import sys
from stalker_job_sdk import JobStatus, log_status, log_error, log_debug, _log_done

log_debug('Job execution started.')

try:
command = sys.argv[1]
exec(command)

except Exception as exception:
log_error(exception)
log_status(JobStatus.FAILED)

_log_done()
log_debug('Job execution ended.')
6 changes: 4 additions & 2 deletions jobs/job-base-images/python/nuclei/nuclei_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
PortFinding, TextField, WebsiteFinding, build_url,
is_valid_ip, is_valid_port, log_debug, log_error,
log_finding, log_info, log_status, log_warning,
to_boolean)
to_boolean, _log_done)


def handle_port_finding(finding: NucleiFinding, all_fields: 'list[Field]', output_finding_name: str):
Expand Down Expand Up @@ -295,4 +295,6 @@ def main():
log_status(JobStatus.SUCCESS)
except Exception as err:
log_error(err)
log_status(JobStatus.FAILED)
log_status(JobStatus.FAILED)
finally:
_log_done()
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import httpx
import json
import sys
from abc import ABC
from ipaddress import ip_address
from os import getenv
from functools import lru_cache

import httpx


@lru_cache
def get_http_client():
return httpx.Client(verify=False, http2=True)

class Field(ABC):
def __init__(self, key: str, type: str):
self.key = key
Expand Down Expand Up @@ -160,9 +165,9 @@ def _log(prefix: str, message: str):
print(output)
sys.stdout.flush()
return
with httpx.Client(verify=False, http2=True) as client:
client.post(f"{orchestratorUrl}/Jobs/{jobId}/Finding", json={ "Finding": output})

client = get_http_client()
client.post(f"{orchestratorUrl}/Jobs/{jobId}/Finding", json={ "Finding": output})

def log_status(status: str):
"""Reports the status to the orchestrator. Status can be Success of Failed."""
Expand All @@ -177,9 +182,22 @@ def log_status(status: str):
sys.stdout.flush()
return

with httpx.Client(verify=False, http2=True) as client:
client.post(f"{orchestratorUrl}/Jobs/{jobId}/Status", json={ "Status": status})
client = get_http_client()
client.post(f"{orchestratorUrl}/Jobs/{jobId}/Status", json={ "Status": status})


def _log_done():
"""Reports the job has ended."""
jobId = getenv('RedKiteJobId')
orchestratorUrl = getenv('RedKiteOrchestratorUrl') or 'http://orchestrator.stalker.svc.cluster.local.'

if(not jobId):
print(f"Status: Ended")
sys.stdout.flush()
return

client = get_http_client()
client.post(f"{orchestratorUrl}/Jobs/{jobId}/Status", json={ "Status": "Ended"})

def is_valid_ip(ip: str):
"""Validates an IP address. Returns false if the IP is invalid, true otherwise."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ export class JobExecutionsService {
const select = { _id: { $eq: new Types.ObjectId(jobId) } };
switch (status.toLowerCase()) {
case 'started':
await this.addJobOutputLine(jobId, timestamp, 'Job started.', 'debug');
return await this.jobModel.updateOne(select, { startTime: timestamp });

case 'success':
await this.addJobOutputLine(
jobId,
Expand All @@ -187,9 +187,14 @@ export class JobExecutionsService {
'debug',
);
return await this.jobModel.updateOne(select, { endTime: timestamp });

case 'failed':
await this.addJobOutputLine(jobId, timestamp, 'Job failed.', 'debug');
return await this.jobModel.updateOne(select, { endTime: timestamp });

case 'ended':
await this.addJobOutputLine(jobId, timestamp, 'Job ended.', 'debug');
return await this.jobModel.updateOne(select, { endTime: timestamp });
default:
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ public class JobsController : Controller
private IMessagesProducer<JobLogMessage> LogsProducer { get; set; }
private IFindingsParser Parser { get; set; }

private static long CurrentTimeMs
{
get
{
private static long CurrentTimeMs
{
get
{
DateTimeOffset dto = new DateTimeOffset(DateTime.Now.ToUniversalTime());
return dto.ToUnixTimeMilliseconds();
}
}
}

/// <summary>
Expand All @@ -28,12 +28,12 @@ private static long CurrentTimeMs
/// <returns></returns>
private static bool IsValidJobId(string jobId)
{
if(string.IsNullOrEmpty(jobId)) return false;
if (string.IsNullOrEmpty(jobId)) return false;
if (!Regex.IsMatch(jobId, @"^[a-f0-9]{24}$")) return false;
return true;
}

public JobsController(IMessagesProducer<JobEventMessage> eventsProducer, IMessagesProducer<JobLogMessage> jobLogsProducer, IFindingsParser parser)
public JobsController(IMessagesProducer<JobEventMessage> eventsProducer, IMessagesProducer<JobLogMessage> jobLogsProducer, IFindingsParser parser)
{
EventsProducer = eventsProducer;
LogsProducer = jobLogsProducer;
Expand Down Expand Up @@ -66,7 +66,7 @@ await EventsProducer.Produce(new JobEventMessage

// POST /Jobs/Finding
[HttpPost]
public async Task<ActionResult> Finding([FromBody]JobFindingDto dto, string id = "")
public async Task<ActionResult> Finding([FromBody] JobFindingDto dto, string id = "")
{
if (!IsValidJobId(id)) return BadRequest("Job id is invalid");
try
Expand All @@ -86,16 +86,17 @@ public async Task<ActionResult> Finding([FromBody]JobFindingDto dto, string id =

// POST /Jobs/Status
[HttpPost]
public async Task<ActionResult> Status([FromBody]StatusUpdateDto dto, string id = "")
public async Task<ActionResult> Status([FromBody] StatusUpdateDto dto, string id = "")
{
if (dto.Status != "Success" && dto.Status != "Failed")
var acceptableStatuses = new HashSet<string>() { "Success", "Failed", "Ended" };
if (!acceptableStatuses.Contains(dto.Status))
{
Console.WriteLine("bad status");
return BadRequest("Status should be Success or Failed");
}

if (!IsValidJobId(id)) return BadRequest("Job id is invalid");

await EventsProducer.Produce(new JobEventMessage
{
JobId = id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Orchestrator.Jobs.JobTemplates;

public abstract class PythonJobTemplate : KubernetesJobTemplate
{
public override string[] Command => new[] { "python", "-c", PythonCommand };
public override string[] Command => new[] { "python", "/usr/src/main.py", PythonCommand };

protected virtual string PythonCommand { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ protected KubernetesCommand(T request, IKubernetesFacade kubernetes, IMessagesPr
Logger = logger;
}


public override async Task Execute()
{
Logger.LogInformation(Request.JobId, "Creating job.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,7 @@ public interface IKubernetesFacade
Task<KubernetesJob> CreateJob(KubernetesJobTemplate jobTemplate);

/// <summary>
/// Gets the stream of logs for the given jobTemplate's pod.
/// </summary>
Task<Stream> GetJobLogs(string jobName, string jobNamespace = "default");

/// <summary>
/// Deletes a jobTemplate.
/// </summary>
Task DeleteJob(string jobName, string jobNamespace = "default");

/// <summary>
/// True if the pod is in the status "Failed" or "Succeeded", false otherwise
/// True if the pod is in the status "Failed" or "Succeeded", false otherwise
/// </summary>
Task<bool> IsJobPodFinished(string jobName, string jobNamespace = "default");
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,42 @@ public async Task<KubernetesJob> CreateJob(KubernetesJobTemplate jobTemplate)
}
}


var kubernetesJob = new V1Job("batch/v1", "Job",
new V1ObjectMeta
{
Name = jobName,
Labels = new Dictionary<string, string>()
{
["red-kite.io/component"] = "job"
}
},
new V1JobSpec
{
Template = new V1PodTemplateSpec
{
Metadata = new V1ObjectMeta
{
Labels = new Dictionary<string, string>()
{
["red-kite.io/component"] = "job"
}
},
Spec = new V1PodSpec
{
Containers = new List<V1Container>
{
new()
new ()
{
Name = "jobtemplate",
Image = jobTemplate.Image,
Command = jobTemplate.Command,
Env = jobTemplate.EnvironmentVariable.Select(x => new V1EnvVar(x.Key, x.Value)).ToList(),
Resources = resources,
}
},
}
},
NodeSelector = nodeSelector,
RestartPolicy = "Never",
TerminationGracePeriodSeconds = 100
},
},
BackoffLimit = jobTemplate.MaxRetries,
Expand All @@ -97,32 +108,6 @@ public async Task<KubernetesJob> CreateJob(KubernetesJobTemplate jobTemplate)
});
}

/// <summary>
/// Gets the stream of logs for the given jobTemplate's pod.
/// </summary>
public async Task<Stream> GetJobLogs(string jobName, string jobNamespace = "default")
{
using var client = new Kubernetes(KubernetesConfiguration);
V1PodList pods;
do
{
Thread.Sleep(100);
pods = await RetryableCall(() => client.ListNamespacedPodAsync(labelSelector: $"job-name={jobName}", limit: 1, namespaceParameter: jobNamespace));

} while (pods?.Items == null || pods.Items.Count < 1 || pods.Items.FirstOrDefault()?.Status?.Phase == "Pending");

return await RetryableCall(() => client.ReadNamespacedPodLogAsync(pods.Items.FirstOrDefault().Metadata.Name, jobNamespace, follow: true));
}

/// <summary>
/// Deletes a jobTemplate.
/// </summary>
public async Task DeleteJob(string jobName, string jobNamespace = "default")
{
using var client = new Kubernetes(KubernetesConfiguration);
await RetryableCall(() => client.DeleteCollectionNamespacedJobAsync(jobNamespace, fieldSelector: $"metadata.name={jobName}", propagationPolicy: "Foreground"));
}

/// <summary>
/// True if the pod is in the status "Failed" or "Succeeded", false otherwise
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ public class KubernetesJobTemplate
/// </summary>
public virtual string[] Command { get; init; }

/// <summary>
/// Command executed before the container stops.
/// </summary>
public virtual IList<string> PreStopCommand { get; init; } = Array.Empty<string>();
/// <summary>
/// Gets the environment variables for this job.
/// </summary>
Expand Down

0 comments on commit 6dde05f

Please sign in to comment.