diff --git a/kubernetes/jobmonitor/src/main/kotlin/FailedJobNotifier.kt b/kubernetes/jobmonitor/src/main/kotlin/FailedJobNotifier.kt index 3394b0591..a290bfdbf 100644 --- a/kubernetes/jobmonitor/src/main/kotlin/FailedJobNotifier.kt +++ b/kubernetes/jobmonitor/src/main/kotlin/FailedJobNotifier.kt @@ -23,6 +23,8 @@ import io.kubernetes.client.openapi.models.V1Job import org.eclipse.apoapsis.ortserver.kubernetes.jobmonitor.JobHandler.Companion.ortRunId import org.eclipse.apoapsis.ortserver.kubernetes.jobmonitor.JobHandler.Companion.traceId +import org.eclipse.apoapsis.ortserver.model.ActiveOrtRun +import org.eclipse.apoapsis.ortserver.model.orchestrator.LostSchedule import org.eclipse.apoapsis.ortserver.model.orchestrator.OrchestratorMessage import org.eclipse.apoapsis.ortserver.model.orchestrator.WorkerError import org.eclipse.apoapsis.ortserver.transport.Endpoint @@ -69,6 +71,17 @@ internal class FailedJobNotifier( sendToOrchestrator(message) } + /** + * Send a notification about an ORT run without active schedules for the given [ortRun]. This is used to notify + * the Orchestrator that it has to reschedule jobs for the affected run. + */ + fun sendLostScheduleNotification(ortRun: ActiveOrtRun) { + val header = MessageHeader(ortRun.traceId.orEmpty(), ortRun.runId) + val message = Message(header, LostSchedule(ortRun.runId)) + + sendToOrchestrator(message) + } + /** * Send the given [message] to the Orchestrator via the configured [MessageSender]. */ diff --git a/kubernetes/jobmonitor/src/test/kotlin/FailedJobNotifierTest.kt b/kubernetes/jobmonitor/src/test/kotlin/FailedJobNotifierTest.kt index b027938fa..b5dd3b12c 100644 --- a/kubernetes/jobmonitor/src/test/kotlin/FailedJobNotifierTest.kt +++ b/kubernetes/jobmonitor/src/test/kotlin/FailedJobNotifierTest.kt @@ -32,6 +32,10 @@ import io.mockk.runs import io.mockk.slot import io.mockk.verify +import kotlinx.datetime.Clock + +import org.eclipse.apoapsis.ortserver.model.ActiveOrtRun +import org.eclipse.apoapsis.ortserver.model.orchestrator.LostSchedule import org.eclipse.apoapsis.ortserver.model.orchestrator.OrchestratorMessage import org.eclipse.apoapsis.ortserver.model.orchestrator.WorkerError import org.eclipse.apoapsis.ortserver.transport.Message @@ -153,4 +157,26 @@ class FailedJobNotifierTest : WordSpec({ } } } + + "sendLostScheduleNotification" should { + "send a notification about a lost schedule" { + val ortRun = ActiveOrtRun(20241211084817L, Clock.System.now(), "someTraceId") + val sender = mockk>() + every { sender.send(any()) } just runs + + val notifier = FailedJobNotifier(sender) + notifier.sendLostScheduleNotification(ortRun) + + val slot = slot>() + verify { + sender.send(capture(slot)) + } + + with(slot.captured) { + header.ortRunId shouldBe ortRun.runId + header.traceId shouldBe ortRun.traceId + payload shouldBe LostSchedule(ortRun.runId) + } + } + } })