-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unmanaged pods should fail fast when evicted/preempted/deleted #353
Changes from 1 commit
28888f3
32256fb
55cf833
ce8df22
b0c14a5
02b7a93
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,9 @@ | |
module KubernetesDeploy | ||
class Kubectl | ||
DEFAULT_TIMEOUT = 30 | ||
NOT_FOUND_ERROR_TEXT = 'NotFound' | ||
|
||
class ResourceNotFoundError < StandardError; end | ||
|
||
def initialize(namespace:, context:, logger:, log_failure_by_default:, default_timeout: DEFAULT_TIMEOUT, | ||
output_is_sensitive: false) | ||
|
@@ -17,7 +20,7 @@ def initialize(namespace:, context:, logger:, log_failure_by_default:, default_t | |
raise ArgumentError, "context is required" if context.blank? | ||
end | ||
|
||
def run(*args, log_failure: nil, use_context: true, use_namespace: true) | ||
def run(*args, log_failure: nil, use_context: true, use_namespace: true, raise_on_404: false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick: |
||
log_failure = @log_failure_by_default if log_failure.nil? | ||
|
||
args = args.unshift("kubectl") | ||
|
@@ -29,10 +32,17 @@ def run(*args, log_failure: nil, use_context: true, use_namespace: true) | |
out, err, st = Open3.capture3(*args) | ||
@logger.debug(out.shellescape) unless output_is_sensitive? | ||
|
||
if !st.success? && log_failure | ||
@logger.warn("The following command failed: #{Shellwords.join(args)}") | ||
@logger.warn(err) unless output_is_sensitive? | ||
unless st.success? | ||
if log_failure | ||
@logger.warn("The following command failed: #{Shellwords.join(args)}") | ||
@logger.warn(err) unless output_is_sensitive? | ||
end | ||
|
||
if raise_on_404 && err.match(NOT_FOUND_ERROR_TEXT) | ||
raise ResourceNotFoundError, err | ||
end | ||
end | ||
|
||
[out.chomp, err.chomp, st] | ||
end | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,6 +85,7 @@ def initialize(namespace:, context:, definition:, logger:, statsd_tags: []) | |
@logger = logger | ||
@definition = definition | ||
@statsd_report_done = false | ||
@disappeared = false | ||
@validation_errors = [] | ||
@instance_data = {} | ||
end | ||
|
@@ -121,12 +122,23 @@ def file_path | |
end | ||
|
||
def sync(mediator) | ||
@instance_data = mediator.get_instance(kubectl_resource_type, name) | ||
@instance_data = mediator.get_instance(kubectl_resource_type, name, raise_on_404: true) | ||
rescue KubernetesDeploy::Kubectl::ResourceNotFoundError | ||
@disappeared = true if deploy_started? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to prioritize the state tracking refactor. We're effectively adding a new state here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Exactly.
Are we? This is of course new data about the state of the resource in a general sense, but it isn't a new end state for the resource, which is what that refactor was about (the new state that triggered it was "ignored"). In other words, our end states are still succeeded, failed and timed out, and this is just a new way that resources can fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still disagree. The refactor is about mutually exclusive states not just terminal ones. |
||
@instance_data = {} | ||
end | ||
|
||
def after_sync | ||
end | ||
|
||
def deleted? | ||
@instance_data.dig('metadata', 'deletionTimestamp').present? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this basically means that k8s has asked the pod to "please go away", but in theory the pod might still exist and the process might still be running, right? If so then I find "deleted?" slightly misleading (since it might still exist and might even terminate successfully, i.e. with non-zero error, right?). What's the reasoning for not checking whether the pod has actually already been deleted? Because this will catch deletion requests earlier? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes. I should probably call this
Yes. We've also seen a k8s bug in the past where resources would get stuck in the terminating state indefinitely, even after the underlying container was gone (though it hasn't been reported in recent version afaik). (Note that we are also checking that they have actually been deleted--if so |
||
end | ||
|
||
def disappeared? | ||
@disappeared | ||
end | ||
|
||
def deploy_failed? | ||
false | ||
end | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -69,9 +69,18 @@ def timeout_message | |
header + probe_failure_msgs.join("\n") + "\n" | ||
end | ||
|
||
def permanent_failed_phase? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "permanently"? dunno ESL There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpick, maybe something like this would be easier to follow: def permanently_failed?
failed_phase? && (unmanaged? || non_transient_error?)
end |
||
return false unless phase == FAILED_PHASE_NAME | ||
unmanaged? || !TRANSIENT_FAILURE_REASONS.include?(reason) | ||
end | ||
|
||
def failure_message | ||
if phase == FAILED_PHASE_NAME && !TRANSIENT_FAILURE_REASONS.include?(reason) | ||
phase_problem = "Pod status: #{status}. " | ||
phase_problem = if permanent_failed_phase? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this entire if block be wrapped in |
||
"Pod status: #{status}. " | ||
elsif unmanaged? && deleted? | ||
"Pod status: Terminating. " | ||
elsif unmanaged? && disappeared? | ||
"Pod status: Disappeared. " | ||
end | ||
|
||
doomed_containers = @containers.select(&:doomed?) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,12 +10,16 @@ def initialize(namespace:, context:, logger:) | |
clear_cache | ||
end | ||
|
||
def get_instance(kind, resource_name) | ||
if @cache.key?(kind) | ||
@cache.dig(kind, resource_name) || {} | ||
else | ||
request_instance(kind, resource_name) | ||
def get_instance(kind, resource_name, raise_on_404: false) | ||
unless @cache.key?(kind) | ||
return request_instance(kind, resource_name, raise_on_404: raise_on_404) | ||
end | ||
|
||
cached_instance = @cache[kind].fetch(resource_name, {}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously this used There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only place that can set it is this: @cache[kind] = JSON.parse(raw_json)["items"].each_with_object({}) do |r, instances|
instances[r.dig("metadata", "name")] = r
end That's way too densely written. If I rewrite it like this it is clearer that it shouldn't be possible to have a instances = {}
JSON.parse(raw_json)["items"].each do |resource|
resource_name = resource.dig("metadata", "name")
instances[resource_name] = resource
end
@cache[kind] = instances |
||
if cached_instance.blank? && raise_on_404 | ||
raise KubernetesDeploy::Kubectl::ResourceNotFoundError, "Resource does not exist (used cache for kind #{kind})" | ||
end | ||
cached_instance | ||
end | ||
|
||
def get_all(kind, selector = nil) | ||
|
@@ -55,8 +59,8 @@ def clear_cache | |
@cache = {} | ||
end | ||
|
||
def request_instance(kind, iname) | ||
raw_json, _, st = kubectl.run("get", kind, iname, "-a", "--output=json") | ||
def request_instance(kind, iname, raise_on_404:) | ||
raw_json, _err, st = kubectl.run("get", kind, iname, "-a", "--output=json", raise_on_404: raise_on_404) | ||
st.success? ? JSON.parse(raw_json) : {} | ||
end | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,7 @@ def test_run_without_verify_result_succeeds_as_soon_as_pod_is_successfully_creat | |
"Result: SUCCESS", | ||
"Result verification is disabled for this task", | ||
"The following status was observed immediately after pod creation:", | ||
%r{Pod/task-runner-\w+\s+Pending}, | ||
%r{Pod/task-runner-\w+\s+(Pending|Running)}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change and the one below are to decrease test flakiness. It's not important to these two test cases whether the pod manages to start running in the run window. |
||
], in_order: true) | ||
|
||
pods = kubeclient.get_pods(namespace: @namespace) | ||
|
@@ -37,7 +37,7 @@ def test_run_global_timeout_with_max_watch_seconds | |
"Result: TIMED OUT", | ||
"Timed out waiting for 1 resource to run", | ||
%r{Pod/task-runner-\w+: GLOBAL WATCH TIMEOUT \(5 seconds\)}, | ||
"Final status: Running" | ||
/Final status\: (Pending|Running)/ | ||
], in_order: true) | ||
end | ||
|
||
|
@@ -88,6 +88,40 @@ def test_run_with_verify_result_success | |
assert_equal task_runner.pod_name, pods.first.metadata.name, "Pod name should be available after run" | ||
end | ||
|
||
def test_run_with_verify_result_fails_quickly_if_the_pod_is_deleted_out_of_band | ||
deploy_task_template | ||
|
||
task_runner = build_task_runner | ||
deleter_thread = Thread.new do | ||
loop do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is really scrappy. Good job! 😆 |
||
if task_runner.pod_name.present? | ||
begin | ||
kubeclient.delete_pod(task_runner.pod_name, @namespace) | ||
break | ||
rescue Kubeclient::ResourceNotFoundError | ||
sleep 0.1 | ||
retry | ||
end | ||
end | ||
sleep 0.1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this sleep for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a tiny throttle that takes effect between when we start the thread and when the pod name is generated. The on one L102 takes effect between when the pod name is generated and when the the pod has been created. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be in an else block? |
||
end | ||
end | ||
|
||
result = task_runner.run(run_params(log_lines: 20, log_interval: 1)) | ||
assert_task_run_failure(result) | ||
|
||
assert_logs_match_all([ | ||
"Pod creation succeeded", | ||
"Result: FAILURE", | ||
"Pod status: Terminating", | ||
]) | ||
ensure | ||
if deleter_thread | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. This is for the case where the test is failing ultimately because of a problem in the thread (which I originally had when I wrote this). If the thread raises and you never join it, you'll never see the error and the test will suck to debug. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you leave a comment like: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added the comment. Thinking about this more, I think the better thing to do is to set |
||
deleter_thread.join | ||
deleter_thread.kill | ||
end | ||
end | ||
|
||
def test_run_with_verify_result_neither_misses_nor_duplicates_logs_across_pollings | ||
deploy_task_template | ||
task_runner = build_task_runner | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,6 +137,22 @@ def test_version_info_raises_if_command_fails | |
end | ||
end | ||
|
||
def test_run_with_raise_err_on_404_raises_the_correct_thing | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. forgot to rename the |
||
err = 'Error from server (NotFound): pods "foobar" not found' | ||
stub_open3(%w(kubectl get pod foobar --namespace=testn --context=testc --request-timeout=30), | ||
resp: "", err: err, success: false) | ||
assert_raises_message(KubernetesDeploy::Kubectl::ResourceNotFoundError, err) do | ||
build_kubectl.run("get", "pod", "foobar", raise_on_404: true) | ||
end | ||
end | ||
|
||
def test_run_with_raise_err_on_404_does_not_raise_on_other_errors | ||
err = 'Error from server (TooManyRequests): Please try again later' | ||
stub_open3(%w(kubectl get pod foobar --namespace=testn --context=testc --request-timeout=30), | ||
resp: "", err: err, success: false) | ||
build_kubectl.run("get", "pod", "foobar", raise_on_404: true) | ||
end | ||
|
||
private | ||
|
||
def stub_version_request(client:, server:) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't refactor this to use the new
raise_on_404
because I have a WIP branch that moves this check to use Kubeclient, which would be better.