Skip to content
This repository has been archived by the owner on Jun 2, 2023. It is now read-only.

feat: Add a script to see if the CLA check is required. #39

Merged
merged 5 commits into from
Nov 1, 2022
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
374 changes: 374 additions & 0 deletions migrate/repo_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,374 @@
"""
Run checks Against Repos and correct them if they're missing something.

"""
from itertools import chain
from pprint import pformat

import click
import requests
from fastcore.net import HTTP4xxClientError, HTTP5xxServerError, HTTP404NotFoundError
from ghapi.all import GhApi, paged


class Check:
Copy link
Contributor

@ormsbee ormsbee Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For all of these comments I'm making, I'm assuming that this is something we're going to be building on in the future. If it's likely to only be used once or twice, it's probably not worth the effort to refactor around.

def __init__(self, api, org, repo):
self.api = api
self.org_name = org
self.repo_name = repo

def check(self):
"""
Verify whether or not the check is failing.

This should not change anything and should not have a side-effect.
"""

raise NotImplementedError

def fix(self):
"""
Make an idempotent change to resolve the issue.
"""

raise NotImplementedError

def dry_run(self):
"""
See what will happen without making any changes.
"""
raise NotImplementedError


class RequiredCLACheck(Check):
"""
This class validates the following:

* Branch Protection is enabled on the default branch.
* The CLA Check is a required check.

If the check fails, the fix function can update the repo
so that it has branch protection enabled with the "openedx/cla"
check as a required check.
"""

def __init__(self, api, org, repo):
super().__init__(api, org, repo)

self.cla_check = {"context": "openedx/cla", "app_id": -1}
self.cla_team = "cla-checker"

self.has_a_branch_protection_rule = False
self.branch_protection_has_required_checks = False
self.required_checks_has_cla_required = False
self.team_setup_correctly = False

def check(self):
is_required_check = self._check_cla_is_required_check()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the longer term, it might be worthwhile to think about decoupling the logic of "how does this check run" from "should this check run on this repo", since the latter is a policy decision that may be very separate from the logic of how to actually execute those checks.

Copy link
Contributor

@ormsbee ormsbee Oct 27, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the things that putting it in the same place makes me wonder off the top of my head is, "What is the expected result of check() if the repo doesn't pass the check, but it's also not a required check for this repo."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, not addressing this in this round but I'm thinking more about this and will probably add a separate function to see if it should run separate from whether or not the check would succeed. I'll probably try to make that change as I add more checks.

repo_on_required_team = self._check_cla_team_has_write_access()

value = is_required_check[0] and repo_on_required_team[0]
reason = f"{is_required_check[1]} {repo_on_required_team[1]}"
return (value, reason)

def _check_cla_is_required_check(self):
repo = self.api.repos.get(self.org_name, self.repo_name)
default_branch = repo.default_branch
# Branch protection rule might not exist.
try:
branch_protection = self.api.repos.get_branch_protection(
self.org_name, self.repo_name, default_branch
)
self.has_a_branch_protection_rule = True
except HTTP404NotFoundError as e:
return (False, "No branch protection rule.")

if "required_status_checks" not in branch_protection:
return (False, "No required status checks in place.")
self.branch_protection_has_required_checks = True

# We don't need to check the `contexts` list because, github mirrors
# all existing checks in `contexts` into the `checks` data. The `contexts`
# data is deprecated and will not be available in the future.
contexts = [
check["context"]
for check in branch_protection.required_status_checks.checks
]
if "openedx/cla" not in contexts:
return (False, "CLA Check is not a required check.")
self.required_checks_has_cla_required = True

return (True, "Branch Protection with CLA Check is in Place.")

def _check_cla_team_has_write_access(self):
teams = chain.from_iterable(
paged(
self.api.repos.list_teams,
self.org_name,
self.repo_name,
per_page=100,
)
)

team_permissions = {team.slug: team.permission for team in teams}
if self.cla_team not in team_permissions:
return (False, f"'{self.cla_team}' team not listed on the repo.")
# CLA Checker needs write access to push status but it doesn't need anything
# higher than that.
elif team_permissions[self.cla_team] != "push":
return (
False,
f"'{self.cla_team}' team does not have the correct access. "
f"Has {team_permissions[self.cla_team]} instead of push.",
)
else:
self.team_setup_correctly = True
return (True, f"'{self.cla_team}' team has 'push' access.")

def dry_run(self):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think it would make sense to make dry_run a member of the class instead of passing it around since it doesn't look like you're calling the fix method more than one way in the same run. I can see an argument for leaving it as is for manual testing, it really depends on how you expect it to be used.

"""
Provide info on what would be done to make this check pass.
"""
return self.fix(dry_run=True)

def fix(self, dry_run=False):
steps = []
if not self.required_checks_has_cla_required:
steps += self._fix_branch_protection(dry_run)

if not self.team_setup_correctly:
steps += self._fix_team_setup(dry_run)

return steps

def _fix_branch_protection(self, dry_run=False):
try:
steps = []

# Short Circuit if there is nothing to do.
if self.required_checks_has_cla_required:
return steps

repo = self.api.repos.get(self.org_name, self.repo_name)
default_branch = repo.default_branch

# While the API docs claim that "contexts" is a required part
# of the put body, it is only required if "checks" is not supplied.
required_status_checks = {
"strict": False,
"checks": [
self.cla_check,
],
}

if not self.has_a_branch_protection_rule:
# The easy case where we don't already have branch protection setup.
# Might not work actually because of the bug we found below. We'll need
# to test against github to verify.
params = {
"owner": self.org_name,
"repo": self.repo_name,
"branch": default_branch,
"required_status_checks": required_status_checks,
"endorce_admins": None,
"required_pull_request_reviews": None,
"restrictions": None,
}
if not dry_run:
self._update_branch_protection(params)

steps.append(
f"Added new branch protection with `openedx/cla` as a required check."
)
return steps

# There's already a branch protection rule, so we need to make sure
# not to clobber the existing checks or settings.
params = self._get_update_params_from_get_branch_protection()
steps.append(f"State Before Update: {pformat(dict(params))}")

if not self.branch_protection_has_required_checks:
# We need to add a check object to the params we get
# since this branch protection rule has no required checks.
steps.append(f"Adding a new required check.\n{required_status_checks}")
params["required_status_checks"] = required_status_checks
else:
# There is already a set of required checks, we just need to
# add our new check to the existing list.
steps.append(
f"Adding `openedx/cla` as a new required check to existing branch protection."
)
params["required_status_checks"]["checks"].append(self.cla_check)

if not self.required_checks_has_cla_required:
# Have to do this because of a bug in GhAPI see
# _update_branch_protection docstring for more details.
steps.append(f"Update we're requesting: {pformat(dict(params))}")
if not dry_run:
self._update_branch_protection(params)
# self.api.repos.update_branch_protection(**params)
except HTTP4xxClientError as e:
# Print the steps before raising the existing exception so we have
# some more context on what might have happened.
click.echo("\n".join(steps))
click.echo(e.fp.read().decode("utf-8"))
raise
except requests.HTTPError as e:
# Print the steps before raising the existing exception so we have
# some more context on what might have happened.
click.echo("\n".join(steps))
click.echo(pformat(e.response.json()))
raise

return steps

def _fix_team_setup(self, dry_run=False):
try:
if not dry_run:
self.api.teams.add_or_update_repo_permissions_in_org(
self.org_name,
self.cla_team,
self.org_name,
self.repo_name,
"push",
)
return ["Added push access for {self.cla_team} to {self.repo_name}."]
except HTTP4xxClientError as e:
click.echo(e.fp.read().decode("utf-8"))
raise

def _update_branch_protection(self, params):
"""
Need to do this ourselves because of a bug in GhAPI that ignores
`None` parameters and doesn't pass them through to the API.

- https://github.com/fastai/ghapi/issues/81
- https://github.com/fastai/ghapi/pull/91
"""
params = dict(params)
headers = self.api.headers
url = (
"https://api.github.com"
+ self.api.repos.update_branch_protection.path.format(**params)
)
resp = requests.put(url, headers=headers, json=params)

resp.raise_for_status()

def _get_update_params_from_get_branch_protection(self):
"""
Get the params needed to do an update operation that would produce
the same branch protection as doing a get on this repo.

We'll need this in cases where there are already some branch protection
rules on the default branch and we want to update only some it without
resetting the rest of it.
"""

# TODO: Could use Glom here in the future, but didn't need it.
repo = self.api.repos.get(self.org_name, self.repo_name)
default_branch = repo.default_branch
bp = self.api.repos.get_branch_protection(
self.org_name, self.repo_name, default_branch
)

required_checks = None
if "required_status_checks" in bp:
# While the API docs claim that "contexts" is a required part
# of the put body, it is only required if "checks" is not supplied.
# The GET endpoint provides the curent set of required checks in both
# format. So we only use the new "checks" format in our PUT params.
required_checks = {
"strict": bp.required_status_checks.strict,
"checks": list(bp.required_status_checks.checks),
}

required_pr_reviews = None
if "required_pull_request_reviews" in bp:
required_pr_reviews = {
"dismiss_stale_reviews": bp.required_pull_request_reviews.dismiss_stale_reviews,
"require_code_owner_reviews": bp.required_pull_request_reviews.require_code_owner_reviews,
"required_approving_review_count": bp.required_pull_request_reviews.required_approving_review_count,
}

restrictions = None
if "restrictions" in bp:
restrictions = {
"users": bp.restrictions.users,
"teams": bp.restrictions.teams,
"apps": bp.restrictions.apps,
}

params = {
"owner": self.org_name,
"repo": self.repo_name,
"branch": default_branch,
"required_status_checks": required_checks,
"enforce_admins": True if bp.enforce_admins.enabled else None,
"required_pull_request_reviews": required_pr_reviews,
"restrictions": restrictions,
}

return params


CHECKS = [RequiredCLACheck]


@click.command()
@click.option(
"--github-token",
envvar="GITHUB_TOKEN",
required=True,
help="A github personal access token.",
)
@click.option(
"--org",
default="openedx",
help="The github org that you wish check.",
)
@click.option(
"--dry-run/--no-dry-run",
"-n",
default=True,
is_flag=True,
help="Show what changes would be made without making them.",
)
def main(org, dry_run, github_token):
api = GhApi()
repos = [
repo.name
for repo in chain.from_iterable(
paged(api.repos.list_for_org, org, per_page=100)
)
]

for repo in repos:
click.secho(f"{repo}: ")
for CheckType in CHECKS:
check = CheckType(api, org, repo)

result = check.check()
if result[0]:
color = "green"
else:
color = "red"

click.secho(f"\t{result[1]}", fg=color)

if dry_run:
steps = check.dry_run()
steps_color = "yellow"
else:
steps = check.fix()
steps_color = "green"

if steps:
click.secho("\tSteps:\n\t\t", fg=steps_color, nl=False)
click.secho(
"\n\t\t".join([step.replace("\n", "\n\t\t") for step in steps])
)


if __name__ == "__main__":
main()