Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[action] [PR:14367] Refactor the logic of tagging kube container as local latest #14764

Merged
merged 1 commit into from
Apr 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 34 additions & 59 deletions src/sonic-ctrmgrd/ctrmgr/ctrmgrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@
KUBE_LABEL_TABLE = "KUBE_LABELS"
KUBE_LABEL_SET_KEY = "SET"

MODE_KUBE = "kube"
MODE_LOCAL = "local"
OWNER_KUBE = "kube"
OWNER_LOCAL = "local"
OWNER_NONE = "none"
REMOTE_READY = "ready"
REMOTE_PENDING = "pending"
REMOTE_STOPPED = "stopped"
REMOTE_NONE = "none"

remote_connected = False

dflt_cfg_ser = {
Expand Down Expand Up @@ -89,13 +99,15 @@
JOIN_RETRY = "retry_join_interval_seconds"
LABEL_RETRY = "retry_labels_update_seconds"
TAG_IMAGE_LATEST = "tag_latest_image_on_wait_seconds"
TAG_RETRY = "retry_tag_latest_seconds"
USE_K8S_PROXY = "use_k8s_as_http_proxy"

remote_ctr_config = {
JOIN_LATENCY: 10,
JOIN_RETRY: 10,
LABEL_RETRY: 2,
TAG_IMAGE_LATEST: 30,
TAG_RETRY: 5,
USE_K8S_PROXY: ""
}

Expand Down Expand Up @@ -151,9 +163,6 @@ def init():
with open(SONIC_CTR_CONFIG, "r") as s:
d = json.load(s)
remote_ctr_config.update(d)
if UNIT_TESTING:
remote_ctr_config[TAG_IMAGE_LATEST] = 0


class MainServer:
""" Implements main io-loop of the application
Expand Down Expand Up @@ -437,55 +446,6 @@ def do_join(self, ip, port, insecure):
log_debug("kube_join_master failed retry after {} seconds @{}".
format(remote_ctr_config[JOIN_RETRY], self.start_time))


def tag_latest_image(server, feat, docker_id, image_ver):
res = 1
if not UNIT_TESTING:
status = os.system("docker ps |grep {} >/dev/null".format(docker_id))
if status:
syslog.syslog(syslog.LOG_ERR,
"Feature {}:{} is not stable".format(feat, image_ver))
else:
image_item = os.popen("docker inspect {} |jq -r .[].Image".format(docker_id)).read().strip()
if image_item:
image_id = image_item.split(":")[1][:12]
image_info = os.popen("docker images |grep {}".format(image_id)).read().split()
if image_info:
image_rep = image_info[0]
res = os.system("docker tag {} {}:latest".format(image_id, image_rep))
if res != 0:
syslog.syslog(syslog.LOG_ERR,
"Failed to tag {}:{} to latest".format(image_rep, image_ver))
else:
syslog.syslog(syslog.LOG_INFO,
"Successfully tag {}:{} to latest".format(image_rep, image_ver))
feat_status = os.popen("docker inspect {} |jq -r .[].State.Running".format(feat)).read().strip()
if feat_status:
if feat_status == 'true':
os.system("docker stop {}".format(feat))
syslog.syslog(syslog.LOG_ERR,
"{} should not run, stop it".format(feat))
os.system("docker rm {}".format(feat))
syslog.syslog(syslog.LOG_INFO,
"Delete previous {} container".format(feat))
else:
syslog.syslog(syslog.LOG_ERR,
"Failed to docker images |grep {} to get image repo".format(image_id))
else:
syslog.syslog(syslog.LOG_ERR,
"Failed to inspect container:{} to get image id".format(docker_id))
else:
server.mod_db_entry(STATE_DB_NAME,
FEATURE_TABLE, feat, {"tag_latest": "true"})
res = 0
if res:
log_debug("failed to tag {}:{} to latest".format(feat, image_ver))
else:
log_debug("successfully tag {}:{} to latest".format(feat, image_ver))

return res


#
# Feature changes
#
Expand All @@ -512,7 +472,9 @@ def handle_update(self, feat, set_owner, ct_owner, remote_state):
# There after only called upon changes in either that requires action
#
if not is_systemd_active(feat):
# Nothing todo, if system state is down
# Restart the service manually when kube upgrade happens to decrease the down time
if set_owner == MODE_KUBE and ct_owner == OWNER_NONE and remote_state == REMOTE_STOPPED:
restart_systemd_service(self.server, feat, OWNER_KUBE)
return

label_add = set_owner == "kube"
Expand Down Expand Up @@ -587,27 +549,40 @@ def on_state_update(self, key, op, data):
# Tag latest
start_time = datetime.datetime.now() + datetime.timedelta(
seconds=remote_ctr_config[TAG_IMAGE_LATEST])
self.server.register_timer(start_time, tag_latest_image, (
self.server,
self.server.register_timer(start_time, self.do_tag_latest, (
key,
self.st_data[key][ST_FEAT_CTR_ID],
self.st_data[key][ST_FEAT_CTR_VER]))

log_debug("try to tag latest label after {} seconds @{}".format(
remote_ctr_config[TAG_IMAGE_LATEST], start_time))

if (not init) and (
(old_remote_state == remote_state) or (remote_state != "pending")):
# no change or nothing to do.
return
if (not init):
if (old_remote_state == remote_state):
# if no remote state change, do nothing.
return
if (remote_state not in (REMOTE_PENDING, REMOTE_STOPPED)):
# if remote state not in pending or stopped, do nothing.
return

if key in self.cfg_data:
log_debug("{} init={} old_remote_state={} remote_state={}".format(key, init, old_remote_state, remote_state))
self.handle_update(key, self.cfg_data[key][CFG_FEAT_OWNER],
self.st_data[key][ST_FEAT_OWNER],
remote_state)
return

def do_tag_latest(self, feat, docker_id, image_ver):
ret = kube_commands.tag_latest(feat, docker_id, image_ver)
if ret != 0:
# Tag latest failed. Retry after an interval
self.start_time = datetime.datetime.now()
self.start_time += datetime.timedelta(
seconds=remote_ctr_config[TAG_RETRY])
self.server.register_timer(self.start_time, self.do_tag_latest, (feat, docker_id, image_ver))

log_debug("Tag latest as local failed retry after {} seconds @{}".
format(remote_ctr_config[TAG_RETRY], self.start_time))

#
# Label re-sync
Expand Down
69 changes: 69 additions & 0 deletions src/sonic-ctrmgrd/ctrmgr/kube_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,76 @@ def kube_reset_master(force):

return (ret, err)

def _do_tag(docker_id, image_ver):
err = ""
out = ""
ret = 1
status, _, err = _run_command("docker ps |grep {}".format(docker_id))
if status == 0:
_, image_item, err = _run_command("docker inspect {} |jq -r .[].Image".format(docker_id))
if image_item:
image_id = image_item.split(":")[1][:12]
_, image_info, err = _run_command("docker images |grep {}".format(image_id))
if image_info:
# Only need the docker repo name without acr domain
image_rep = image_info.split()[0].split("/")[-1]
tag_res, _, err = _run_command("docker tag {} {}:latest".format(image_id, image_rep))
if tag_res == 0:
out = "docker tag {} {}:latest successfully".format(image_id, image_rep)
ret = 0
else:
err = "Failed to tag {}:{} to latest. Err: {}".format(image_rep, image_ver, err)
else:
err = "Failed to docker images |grep {} to get image repo. Err: {}".format(image_id, err)
else:
err = "Failed to inspect container:{} to get image id. Err: {}".format(docker_id, err)
elif err:
err = "Error happens when execute docker ps |grep {}. Err: {}".format(docker_id, err)
else:
out = "New version {} is not running.".format(image_ver)
ret = -1

return (ret, out, err)

def _remove_container(feat):
err = ""
out = ""
ret = 0
_, feat_status, err = _run_command("docker inspect {} |jq -r .[].State.Running".format(feat))
if feat_status:
if feat_status == 'true':
err = "Feature {} container is running, it's unexpected".format(feat)
ret = 1
else:
rm_res, _, err = _run_command("docker rm {}".format(feat))
if rm_res == 0:
out = "Remove origin local {} container successfully".format(feat)
else:
err = "Failed to docker rm {}. Err: {}".format(feat, err)
ret = 1
elif err.startswith("Error: No such object"):
out = "Origin local {} container has been removed before".format(feat)
err = ""
else:
err = "Failed to docker inspect {} |jq -r .[].State.Running. Err: {}".format(feat, err)
ret = 1

return (ret, out, err)

def tag_latest(feat, docker_id, image_ver):
ret, out, err = _do_tag(docker_id, image_ver)
if ret == 0:
log_debug(out)
ret, out, err = _remove_container(feat)
if ret == 0:
log_debug(out)
else:
log_error(err)
elif ret == -1:
ret = 0
else:
log_error(err)
return ret

def main():
syslog.openlog("kube_commands")
Expand Down
1 change: 1 addition & 0 deletions src/sonic-ctrmgrd/ctrmgr/remote_ctr.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"retry_labels_update_seconds": 5,
"revert_to_local_on_wait_seconds": 60,
"tag_latest_image_on_wait_seconds": 600,
"retry_tag_latest_seconds": 30,
"use_k8s_as_http_proxy": "n"
}

11 changes: 9 additions & 2 deletions src/sonic-ctrmgrd/tests/common_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
PROC_THROW = "proc_throw"
PROC_OUT = "subproc_output"
PROC_ERR = "subproc_error"
PROC_CODE = "subproc_code"
PROC_KILLED = "procs_killed"

# container_start test cases
Expand Down Expand Up @@ -605,6 +606,7 @@ def communicate(self, timeout):

out_lst = current_test_data.get(PROC_OUT, None)
err_lst = current_test_data.get(PROC_ERR, None)
code_lst = current_test_data.get(PROC_CODE, None)
if out_lst:
assert (len(out_lst) > self.index)
out = out_lst[self.index]
Expand All @@ -615,7 +617,11 @@ def communicate(self, timeout):
err = err_lst[self.index]
else:
err = ""
self.returncode = 0 if not err else -1
if code_lst:
assert (len(code_lst) > self.index)
self.returncode = code_lst[self.index]
else:
self.returncode = 0 if not err else -1
return (out, err)

def kill(self):
Expand Down Expand Up @@ -673,7 +679,8 @@ def create_remote_ctr_config_json():
"join_latency_on_boot_seconds": 2,\n\
"retry_join_interval_seconds": 0,\n\
"retry_labels_update_seconds": 0,\n\
"revert_to_local_on_wait_seconds": 5\n\
"revert_to_local_on_wait_seconds": 5,\n\
"tag_latest_image_on_wait_seconds": 0\n\
}\n'

fname = "/tmp/remote_ctr.config.json"
Expand Down
9 changes: 0 additions & 9 deletions src/sonic-ctrmgrd/tests/ctrmgrd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,6 @@
}
}
}
},
common_test.POST: {
common_test.STATE_DB_NO: {
common_test.FEATURE_TABLE: {
"snmp": {
"tag_latest": "true"
}
}
}
}
}
}
Expand Down
Loading