Skip to content

Commit

Permalink
Merge pull request #140 from Shopify/statefulset
Browse files Browse the repository at this point in the history
Add Stateful Set Resoruce
  • Loading branch information
karanthukral authored Nov 9, 2017
2 parents 0be8870 + e4c6185 commit 4705b25
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 114 deletions.
8 changes: 8 additions & 0 deletions lib/kubernetes-deploy/kubeclient_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ def build_policy_v1beta1_kubeclient(context)
)
end

def build_apps_v1beta1_kubeclient(context)
_build_kubeclient(
api_version: "v1beta1",
context: context,
endpoint_path: "/apis/apps"
)
end

def _build_kubeclient(api_version:, context:, endpoint_path: nil)
config = GoogleFriendlyConfig.read(ENV.fetch("KUBECONFIG"))
unless config.contexts.include?(context)
Expand Down
62 changes: 11 additions & 51 deletions lib/kubernetes-deploy/kubernetes_resource/daemon_set.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# frozen_string_literal: true
require 'kubernetes-deploy/kubernetes_resource/pod_set_base'
module KubernetesDeploy
class DaemonSet < KubernetesResource
class DaemonSet < PodSetBase
TIMEOUT = 5.minutes
attr_reader :pods

def sync
raw_json, _err, st = kubectl.run("get", type, @name, "--output=json")
Expand Down Expand Up @@ -31,66 +33,24 @@ def deploy_succeeded?
end

def deploy_failed?
@pods.present? && @pods.any?(&:deploy_failed?)
pods.present? && pods.any?(&:deploy_failed?)
end

def failure_message
@pods.map(&:failure_message).compact.uniq.join("\n")
end

def timeout_message
@pods.map(&:timeout_message).compact.uniq.join("\n")
end

def deploy_timed_out?
super || @pods.present? && @pods.any?(&:deploy_timed_out?)
def fetch_logs
most_useful_pod = @pods.find(&:deploy_failed?) || @pods.find(&:deploy_timed_out?) || @pods.first
most_useful_pod.fetch_logs
end

def exists?
@found
end

def fetch_events
own_events = super
return own_events unless @pods.present?
most_useful_pod = @pods.find(&:deploy_failed?) || @pods.find(&:deploy_timed_out?) || @pods.first
own_events.merge(most_useful_pod.fetch_events)
end

def fetch_logs
most_useful_pod = @pods.find(&:deploy_failed?) || @pods.find(&:deploy_timed_out?) || @pods.first
most_useful_pod.fetch_logs
end

private

def find_pods(ds_data)
label_string = ds_data["spec"]["selector"]["matchLabels"].map { |k, v| "#{k}=#{v}" }.join(",")
raw_json, _err, st = kubectl.run("get", "pods", "-a", "--output=json", "--selector=#{label_string}")
return [] unless st.success?

all_pods = JSON.parse(raw_json)["items"]
template_generation = ds_data["spec"]["templateGeneration"]

latest_pods = all_pods.find_all do |pod|
next unless owners = pod.dig("metadata", "ownerReferences")
owners.any? { |ref| ref["uid"] == ds_data["metadata"]["uid"] } &&
pod["metadata"]["labels"]["pod-template-generation"].to_i == template_generation.to_i
end
return [] unless latest_pods.present?

latest_pods.each_with_object([]) do |pod_data, relevant_pods|
pod = Pod.new(
namespace: namespace,
context: context,
definition: pod_data,
logger: @logger,
parent: "#{@name.capitalize} daemon set",
deploy_started: @deploy_started
)
pod.sync(pod_data)
relevant_pods << pod
end
def parent_of_pod?(set_data, pod_data)
return false unless pod_data.dig("metadata", "ownerReferences")
pod_data["metadata"]["ownerReferences"].any? { |ref| ref["uid"] == set_data["metadata"]["uid"] } &&
pod_data["metadata"]["labels"]["pod-template-generation"].to_i == set_data["spec"]["templateGeneration"].to_i
end
end
end
70 changes: 70 additions & 0 deletions lib/kubernetes-deploy/kubernetes_resource/pod_set_base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# frozen_string_literal: true
module KubernetesDeploy
class PodSetBase < KubernetesResource
def failure_message
pods.map(&:failure_message).compact.uniq.join("\n")
end

def timeout_message
pods.map(&:timeout_message).compact.uniq.join("\n")
end

def fetch_events
own_events = super
return own_events unless pods.present?
most_useful_pod = pods.find(&:deploy_failed?) || pods.find(&:deploy_timed_out?) || pods.first
own_events.merge(most_useful_pod.fetch_events)
end

def fetch_logs
return {} unless pods.present? # the kubectl command times out if no pods exist
container_names.each_with_object({}) do |container_name, container_logs|
out, _err, _st = kubectl.run(
"logs",
id,
"--container=#{container_name}",
"--since-time=#{@deploy_started.to_datetime.rfc3339}",
"--tail=#{LOG_LINE_COUNT}"
)
container_logs[container_name] = out.split("\n")
end
end

private

def pods
raise NotImplementedError, "Subclasses must define a `pods` accessor"
end

def parent_of_pod?(_, _)
raise NotImplementedError, "Subclasses must define a `parent_of_pod?` method"
end

def container_names
regular_containers = @definition["spec"]["template"]["spec"]["containers"].map { |c| c["name"] }
init_containers = @definition["spec"]["template"]["spec"].fetch("initContainers", {}).map { |c| c["name"] }
regular_containers + init_containers
end

def find_pods(pod_controller_data)
label_string = pod_controller_data["spec"]["selector"]["matchLabels"].map { |k, v| "#{k}=#{v}" }.join(",")
raw_json, _err, st = kubectl.run("get", "pods", "-a", "--output=json", "--selector=#{label_string}")
return [] unless st.success?

all_pods = JSON.parse(raw_json)["items"]
all_pods.each_with_object([]) do |pod_data, relevant_pods|
next unless parent_of_pod?(pod_controller_data, pod_data)
pod = Pod.new(
namespace: namespace,
context: context,
definition: pod_data,
logger: @logger,
parent: "#{name.capitalize} #{self.class.name}",
deploy_started: @deploy_started
)
pod.sync(pod_data)
relevant_pods << pod
end
end
end
end
67 changes: 8 additions & 59 deletions lib/kubernetes-deploy/kubernetes_resource/replica_set.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# frozen_string_literal: true
require 'kubernetes-deploy/kubernetes_resource/pod_set_base'
module KubernetesDeploy
class ReplicaSet < KubernetesResource
class ReplicaSet < PodSetBase
TIMEOUT = 5.minutes
attr_reader :desired_replicas
attr_reader :desired_replicas, :pods

def initialize(namespace:, context:, definition:, logger:, parent: nil, deploy_started: nil)
@parent = parent
Expand Down Expand Up @@ -39,74 +40,22 @@ def deploy_succeeded?
end

def deploy_failed?
@pods.present? && @pods.all?(&:deploy_failed?)
end

def failure_message
@pods.map(&:failure_message).compact.uniq.join("\n")
end

def timeout_message
@pods.map(&:timeout_message).compact.uniq.join("\n")
pods.present? && pods.all?(&:deploy_failed?)
end

def exists?
@found
end

def fetch_events
own_events = super
return own_events unless @pods.present?
most_useful_pod = @pods.find(&:deploy_failed?) || @pods.find(&:deploy_timed_out?) || @pods.first
own_events.merge(most_useful_pod.fetch_events)
end
private

def fetch_logs
return {} unless @pods.present? # the kubectl command times out if no pods exist
container_names.each_with_object({}) do |container_name, container_logs|
out, _err, _st = kubectl.run(
"logs",
id,
"--container=#{container_name}",
"--since-time=#{@deploy_started.to_datetime.rfc3339}",
"--tail=#{LOG_LINE_COUNT}"
)
container_logs[container_name] = out.split("\n")
end
def parent_of_pod?(set_data, pod_data)
return false unless pod_data.dig("metadata", "ownerReferences")
pod_data["metadata"]["ownerReferences"].any? { |ref| ref["uid"] == set_data["metadata"]["uid"] }
end

private

def unmanaged?
@parent.blank?
end

def container_names
regular_containers = @definition["spec"]["template"]["spec"]["containers"].map { |c| c["name"] }
init_containers = @definition["spec"]["template"]["spec"].fetch("initContainers", []).map { |c| c["name"] }
regular_containers + init_containers
end

def find_pods(rs_data)
label_string = rs_data["spec"]["selector"]["matchLabels"].map { |k, v| "#{k}=#{v}" }.join(",")
raw_json, _err, st = kubectl.run("get", "pods", "-a", "--output=json", "--selector=#{label_string}")
return [] unless st.success?

all_pods = JSON.parse(raw_json)["items"]
all_pods.each_with_object([]) do |pod_data, relevant_pods|
next unless owners = pod_data.dig("metadata", "ownerReferences")
next unless owners.any? { |ref| ref["uid"] == rs_data["metadata"]["uid"] }
pod = Pod.new(
namespace: namespace,
context: context,
definition: pod_data,
logger: @logger,
parent: "#{@name.capitalize} replica set",
deploy_started: @deploy_started
)
pod.sync(pod_data)
relevant_pods << pod
end
end
end
end
66 changes: 66 additions & 0 deletions lib/kubernetes-deploy/kubernetes_resource/stateful_set.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# frozen_string_literal: true
require 'kubernetes-deploy/kubernetes_resource/pod_set_base'
module KubernetesDeploy
class StatefulSet < PodSetBase
TIMEOUT = 10.minutes
ONDELETE = 'OnDelete'
attr_reader :pods

def sync
raw_json, _err, st = kubectl.run("get", type, @name, "--output=json")
@found = st.success?

if @found
stateful_data = JSON.parse(raw_json)
@desired_replicas = stateful_data["spec"]["replicas"].to_i
@status_data = stateful_data["status"]
rollout_data = stateful_data["status"].slice("replicas", "readyReplicas", "currentReplicas")
@update_strategy = if kubectl.server_version < Gem::Version.new("1.7.0")
ONDELETE
else
stateful_data['spec']['updateStrategy']['type']
end
@status = rollout_data.map { |state_replicas, num| "#{num} #{state_replicas.chop.pluralize(num)}" }.join(", ")
@pods = find_pods(stateful_data)
else # reset
@status_data = { 'readyReplicas' => '-1', 'currentReplicas' => '-2' }
@status = nil
@pods = []
end
end

def deploy_succeeded?
if @update_strategy == ONDELETE
# Gem cannot monitor update since it doesn't occur until delete
unless @success_assumption_warning_shown
@logger.warn("WARNING: Your StatefulSet's updateStrategy is set to OnDelete, "\
"which means updates will not be applied until its pods are deleted. "\
"If you are using k8s 1.7+, consider switching to rollingUpdate.")
@success_assumption_warning_shown = true
end
true
else
@status_data['currentRevision'] == @status_data['updateRevision'] &&
@desired_replicas == @status_data['readyReplicas'].to_i &&
@desired_replicas == @status_data['currentReplicas'].to_i
end
end

def deploy_failed?
return false if @update_strategy == ONDELETE
pods.present? && pods.any?(&:deploy_failed?)
end

def exists?
@found
end

private

def parent_of_pod?(set_data, pod_data)
return false unless pod_data.dig("metadata", "ownerReferences")
pod_data["metadata"]["ownerReferences"].any? { |ref| ref["uid"] == set_data["metadata"]["uid"] } &&
set_data["status"]["currentRevision"] == pod_data["metadata"]["labels"]["controller-revision-hash"]
end
end
end
1 change: 1 addition & 0 deletions lib/kubernetes-deploy/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
statefulservice
topic
bucket
stateful_set
).each do |subresource|
require "kubernetes-deploy/kubernetes_resource/#{subresource}"
end
Expand Down
19 changes: 19 additions & 0 deletions test/fixtures/hello-cloud/stateful_set.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: nginx-ss
spec:
serviceName: "nginx-ss"
replicas: 2
template:
metadata:
labels:
app: hello-cloud
name: nginx-ss
spec:
containers:
- name: nginx
image: gcr.io/google_containers/nginx-slim:0.8
command: ["sleep", "40"]
ports:
- containerPort: 80
7 changes: 7 additions & 0 deletions test/helpers/fixture_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ def assert_daemon_set_present(name)
desired = daemon_sets.find { |ds| ds.metadata.name == name }
assert desired.present?, "Daemon set #{name} does not exist"
end

def assert_stateful_set_present(name)
labels = "name=#{name},app=#{app_name}"
stateful_sets = apps_v1beta1_kubeclient.get_stateful_sets(namespace: namespace, label_selector: labels)
desired = stateful_sets.find { |ss| ss.metadata.name == name }
assert desired.present?, "Stateful set #{name} does not exist"
end
end
end

Expand Down
5 changes: 5 additions & 0 deletions test/helpers/fixture_sets/hello_cloud.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def assert_all_up
assert_bare_replicaset_up
assert_all_service_accounts_up
assert_daemon_set_up
assert_stateful_set_up
end

def assert_unmanaged_pod_statuses(status, count = 1)
Expand Down Expand Up @@ -89,5 +90,9 @@ def assert_all_service_accounts_up
def assert_daemon_set_up
assert_daemon_set_present("nginx")
end

def assert_stateful_set_up
assert_stateful_set_present("nginx-ss")
end
end
end
4 changes: 4 additions & 0 deletions test/helpers/kubeclient_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,8 @@ def v1beta1_kubeclient
def policy_v1beta1_kubeclient
@policy_v1beta1_kubeclient ||= build_policy_v1beta1_kubeclient(MINIKUBE_CONTEXT)
end

def apps_v1beta1_kubeclient
@apps_v1beta1_kubeclient ||= build_apps_v1beta1_kubeclient(MINIKUBE_CONTEXT)
end
end
Loading

0 comments on commit 4705b25

Please sign in to comment.