Skip to content

Commit

Permalink
[WiP] A step towards getting cancel working on Kubernetes
Browse files Browse the repository at this point in the history
  • Loading branch information
wvengen committed Feb 15, 2024
1 parent 13c1198 commit 2e84b2c
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
40 changes: 22 additions & 18 deletions scrapyd_k8s/launcher/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class K8s:
'Pending': 'pending',
'Waiting': 'pending',
'Running': 'running',
'Terminating': 'running',
'Succeeded': 'finished',
'Completed': 'finished',
'Terminated': 'finished',
Expand Down Expand Up @@ -104,25 +105,27 @@ def cancel(self, project, job_id, signal):

prevstate = self._k8s_job_to_scrapyd_status(job)
if prevstate == 'finished':
pass # nothing to do
return prevstate # nothing to do
elif prevstate == 'running':
# kill pod (retry is disabled, so there should be only one pod)
pod = self._get_pod(project, job_id)
if not pod:
# job apparently just ended, fine
return None
self._k8s_kill(pod.metadata.name, signal)
else:
# not started yet, delete job
self._k8s_batch.delete_namespaced_job(
namespace=self._namespace,
name=job.metadata.name,
body=kubernetes.client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=0
)
return None # job apparently just ended, fine
# make sure container is running - https://stackoverflow.com/a/74833788
#if all([c.state.running for c in pod.status.container_statuses]):
if True:
self._k8s_kill(pod.metadata.name, signal)
return prevstate

# not started yet, delete job
self._k8s_batch.delete_namespaced_job(
namespace=self._namespace,
name=job.metadata.name,
body=kubernetes.client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=0
)
return prevstate
)

def _parse_job(self, job):
return {
Expand Down Expand Up @@ -173,13 +176,14 @@ def _k8s_job_name(self, project, job_id):
return '-'.join(('scrapyd', project, job_id))

def _k8s_kill(self, pod_name, signal):
# exec needs stream, which modified client, so use separate instance
# exec needs stream, which modifies client, so use separate instance
k8s = kubernetes.client.CoreV1Api()
resp = kubernetes.stream(
resp = kubernetes.stream.stream(
k8s.connect_get_namespaced_pod_exec,
pod_name,
'default',
namespace=self._namespace,
# this is a bit blunt, bit it works and is usually available
command=['/usr/sbin/killall5', '-' + signal]
command=['/usr/sbin/killall5', '-' + signal],
stderr=True
)
# TODO figure out how to get return value
2 changes: 1 addition & 1 deletion test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def test_scenario_cancel_running_ok():
# schedule a new job and wait until it is running
response = requests.post(BASE_URL + '/schedule.json', data={
'project': RUN_PROJECT, '_version': RUN_VERSION, 'spider': RUN_SPIDER,
'setting': 'STATIC_SLEEP=%d' % STATIC_SLEEP
'setting': 'STATIC_SLEEP=%d' % (STATIC_SLEEP * 5)
})
assert_response_ok(response)
jobid = response.json()['jobid']
Expand Down

0 comments on commit 2e84b2c

Please sign in to comment.