Skip to content

Commit

Permalink
Merge pull request #55 from snok/v2-upgrade
Browse files Browse the repository at this point in the history
refactor: Generate encoded package name instead of storing it
  • Loading branch information
sondrelg authored Feb 12, 2023
2 parents 270d72c + f82e63c commit 3863241
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 226 deletions.
228 changes: 107 additions & 121 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from __future__ import annotations

import asyncio
import re
import time
import os
import re
from asyncio import Semaphore
from datetime import datetime, timedelta
from enum import Enum
from fnmatch import fnmatch
from sys import argv
from typing import TYPE_CHECKING, Literal, NamedTuple
from typing import TYPE_CHECKING, Literal
from urllib.parse import quote_from_bytes

from dateparser import parse
Expand All @@ -22,16 +21,8 @@
BASE_URL = 'https://api.github.com'


class ImageName(NamedTuple):
"""
We need to store both the raw image names and url-encoded image names.
The raw images names are used for logging, while the url-encoded
images names are sent in our payloads to the Github API.
"""

value: str
encoded: str
def encode_image_name(name: str) -> str:
return quote_from_bytes(name.strip().encode('utf-8'), safe='')


class TimestampType(str, Enum):
Expand All @@ -55,6 +46,7 @@ class AccountType(str, Enum):
deleted: list[str] = []
failed: list[str] = []
needs_github_assistance: list[str] = []

GITHUB_ASSISTANCE_MSG = (
'Publicly visible package versions with more than '
'5000 downloads cannot be deleted. '
Expand All @@ -69,124 +61,120 @@ class PackageResponse(BaseModel):
updated_at: datetime | None


async def wait_for_ratelimit(*, response: Response, eligible_for_secondary_limit: bool = False) -> None:
ratelimit_remaining = int(response.headers['x-ratelimit-remaining'])
if ratelimit_remaining == 0:
print('ratelimit exceeded')
# This could be made into a setting if needed
MAX_SLEEP = 60 * 10 # 10 minutes


async def wait_for_rate_limit(*, response: Response, eligible_for_secondary_limit: bool = False) -> None:
"""
Sleeps or terminates the workflow if we've hit rate limits.
See docs on rate limits: https://docs.github.com/en/rest/rate-limit?apiVersion=2022-11-28.
"""
if int(response.headers['x-ratelimit-remaining']) == 0:
ratelimit_reset = datetime.fromtimestamp(int(response.headers['x-ratelimit-reset']))
delta = ratelimit_reset - datetime.now()
if delta > timedelta(0):
print(f'sleeping for {delta}s')
time.sleep(delta.total_seconds())
print('done sleeping')

if delta > timedelta(seconds=MAX_SLEEP):
print(
f'Rate limited for {delta} seconds. '
f'Terminating workflow, since that\'s above the maximum allowed sleep time. '
f'Retry the job manually, when the rate limit is refreshed.'
)
exit(1)
elif delta > timedelta(seconds=0):
print(f'Rate limit exceeded. Sleeping for {delta} seconds')
await asyncio.sleep(delta.total_seconds())

elif eligible_for_secondary_limit:
# https://docs.github.com/en/rest/guides/best-practices-for-integrators#dealing-with-secondary-rate-limits
time.sleep(1)
await asyncio.sleep(1)


async def list_org_packages(*, org_name: str, http_client: AsyncClient) -> list[PackageResponse]:
async def get_all_pages(*, url: str, http_client: AsyncClient) -> list[dict]:
"""
List all packages, for an organization.
Accumulate all pages of a paginated API endpoint.
:param org_name: The name of the organization.
:param url: The full API URL
:param http_client: HTTP client.
:return: List of packages.
:return: List of objects.
"""
response = await http_client.get(f'{BASE_URL}/orgs/{org_name}/packages?package_type=container&per_page=100')
response.raise_for_status()
return [PackageResponse(**i) for i in response.json()]

result = []
rel_regex = re.compile(r'<([^<>]*)>; rel="(\w+)"')
rels = {'next': url}

async def list_packages(*, http_client: AsyncClient) -> list[PackageResponse]:
"""
List all packages, for a user.
while 'next' in rels:
response = await http_client.get(rels['next'])
response.raise_for_status()
result.extend(response.json())

:param http_client: HTTP client.
:return: List of packages.
"""
response = await http_client.get(f'{BASE_URL}/user/packages?package_type=container&per_page=100')
response.raise_for_status()
return [PackageResponse(**i) for i in response.json()]
rels = {rel: url for url, rel in rel_regex.findall(response.headers['link'])}

await wait_for_rate_limit(response=response)

class ContainerModel(BaseModel):
tags: list[str]
return result


class MetadataModel(BaseModel):
package_type: Literal['container']
container: ContainerModel
async def list_org_packages(*, org_name: str, http_client: AsyncClient) -> list[PackageResponse]:
"""List all packages for an organization."""
packages = await get_all_pages(
url=f'{BASE_URL}/orgs/{org_name}/packages?package_type=container&per_page=100',
http_client=http_client,
)
return [PackageResponse(**i) for i in packages]


class PackageVersionResponse(BaseModel):
id: int
name: str
metadata: MetadataModel
created_at: datetime | None
updated_at: datetime | None
async def list_packages(*, http_client: AsyncClient) -> list[PackageResponse]:
"""List all packages for a user."""
packages = await get_all_pages(
url=f'{BASE_URL}/user/packages?package_type=container&per_page=100',
http_client=http_client,
)
return [PackageResponse(**i) for i in packages]


async def list_org_package_versions(
*, org_name: str, image_name: ImageName, http_client: AsyncClient
*, org_name: str, image_name: str, http_client: AsyncClient
) -> list[PackageVersionResponse]:
"""
List image versions, for an organization.
:param org_name: The name of the organization.
:param image_name: The name of the container image.
:param http_client: HTTP client.
:return: List of image objects.
"""
"""List image versions, for an organization."""
packages = await get_all_pages(
url=f'{BASE_URL}/orgs/{org_name}/packages/container/{image_name.encoded}/versions?per_page=100',
url=f'{BASE_URL}/orgs/{org_name}/packages/container/{encode_image_name(image_name)}/versions?per_page=100',
http_client=http_client,
)
return [PackageVersionResponse(**i) for i in packages]


async def list_package_versions(*, image_name: ImageName, http_client: AsyncClient) -> list[PackageVersionResponse]:
"""
List image versions, for a personal account.
:param image_name: The name of the container image.
:param http_client: HTTP client.
:return: List of image objects.
"""
async def list_package_versions(*, image_name: str, http_client: AsyncClient) -> list[PackageVersionResponse]:
"""List image versions for a user."""
packages = await get_all_pages(
url=f'{BASE_URL}/user/packages/container/{image_name.encoded}/versions?per_page=100', http_client=http_client
url=f'{BASE_URL}/user/packages/container/{encode_image_name(image_name)}/versions?per_page=100',
http_client=http_client,
)
return [PackageVersionResponse(**i) for i in packages]


async def get_all_pages(*, url: str, http_client: AsyncClient) -> list[dict]:
"""
Accumulate all pages of a paginated API endpoint.
:param url: The full API URL
:param http_client: HTTP client.
:return: List of objects.
"""
result = []
rel_regex = re.compile(r'<([^<>]*)>; rel="(\w+)"')
rels = {'next': url}
class ContainerModel(BaseModel):
tags: list[str]

while 'next' in rels:
response = await http_client.get(rels['next'])
response.raise_for_status()
result.extend(response.json())

rels = {rel: url for url, rel in rel_regex.findall(response.headers['link'])}
class MetadataModel(BaseModel):
package_type: Literal['container']
container: ContainerModel

await wait_for_ratelimit(response=response)

return result
class PackageVersionResponse(BaseModel):
id: int
name: str
metadata: MetadataModel
created_at: datetime | None
updated_at: datetime | None


def post_deletion_output(*, response: Response, image_name: ImageName, version_id: int) -> None:
def post_deletion_output(*, response: Response, image_name: str, version_id: int) -> None:
"""
Output a little info to the user.
"""
image_name_with_tag = f'{image_name.value}:{version_id}'
image_name_with_tag = f'{image_name}:{version_id}'
if response.is_error:
if response.status_code == 400 and response.json()['message'] == GITHUB_ASSISTANCE_MSG:
# Output the names of these images in one block at the end
Expand All @@ -202,8 +190,20 @@ def post_deletion_output(*, response: Response, image_name: ImageName, version_i
print(f'Deleted old image: {image_name_with_tag}')


async def delete_package_version(
url: str, semaphore: Semaphore, http_client: AsyncClient, image_name: str, version_id: int
) -> None:
async with semaphore:
try:
response = await http_client.delete(url)
await wait_for_rate_limit(response=response, eligible_for_secondary_limit=True)
post_deletion_output(response=response, image_name=image_name, version_id=version_id)
except TimeoutException as e:
print(f'Request to delete {image_name} timed out with error `{e}`')


async def delete_org_package_versions(
*, org_name: str, image_name: ImageName, version_id: int, http_client: AsyncClient, semaphore: Semaphore
*, org_name: str, image_name: str, version_id: int, http_client: AsyncClient, semaphore: Semaphore
) -> None:
"""
Delete an image version, for an organization.
Expand All @@ -214,20 +214,14 @@ async def delete_org_package_versions(
:param http_client: HTTP client.
:return: Nothing - the API returns a 204.
"""
url = f'{BASE_URL}/orgs/{org_name}/packages/container/{image_name.encoded}/versions/{version_id}'
await semaphore.acquire()
try:
response = await http_client.delete(url)
await wait_for_ratelimit(response=response, eligible_for_secondary_limit=True)
post_deletion_output(response=response, image_name=image_name, version_id=version_id)
except TimeoutException as e:
print(f'Request to delete {image_name.value} timed out with error `{e}`')
finally:
semaphore.release()
url = f'{BASE_URL}/orgs/{org_name}/packages/container/{encode_image_name(image_name)}/versions/{version_id}'
await delete_package_version(
url=url, semaphore=semaphore, http_client=http_client, image_name=image_name, version_id=version_id
)


async def delete_package_versions(
*, image_name: ImageName, version_id: int, http_client: AsyncClient, semaphore: Semaphore
*, image_name: str, version_id: int, http_client: AsyncClient, semaphore: Semaphore
) -> None:
"""
Delete an image version, for a personal account.
Expand All @@ -237,16 +231,10 @@ async def delete_package_versions(
:param http_client: HTTP client.
:return: Nothing - the API returns a 204.
"""
url = f'{BASE_URL}/user/packages/container/{image_name.encoded}/versions/{version_id}'
await semaphore.acquire()
try:
response = await http_client.delete(url)
await wait_for_ratelimit(response=response, eligible_for_secondary_limit=True)
post_deletion_output(response=response, image_name=image_name, version_id=version_id)
except TimeoutException as e:
print(f'Request to delete {image_name.value} timed out with error `{e}`')
finally:
semaphore.release()
url = f'{BASE_URL}/user/packages/container/{encode_image_name(image_name)}/versions/{version_id}'
await delete_package_version(
url=url, semaphore=semaphore, http_client=http_client, image_name=image_name, version_id=version_id
)


class GithubAPI:
Expand All @@ -265,7 +253,7 @@ async def list_packages(

@staticmethod
async def list_package_versions(
*, account_type: AccountType, org_name: str | None, image_name: ImageName, http_client: AsyncClient
*, account_type: AccountType, org_name: str | None, image_name: str, http_client: AsyncClient
) -> list[PackageVersionResponse]:
if account_type != AccountType.ORG:
return await list_package_versions(image_name=image_name, http_client=http_client)
Expand All @@ -277,7 +265,7 @@ async def delete_package(
*,
account_type: AccountType,
org_name: str | None,
image_name: ImageName,
image_name: str,
version_id: int,
http_client: AsyncClient,
semaphore: Semaphore,
Expand Down Expand Up @@ -330,7 +318,7 @@ def validate_org_name(cls, v: str, values: dict) -> str | None:
return None


async def get_and_delete_old_versions(image_name: ImageName, inputs: Inputs, http_client: AsyncClient) -> None:
async def get_and_delete_old_versions(image_name: str, inputs: Inputs, http_client: AsyncClient) -> None:
"""
Delete old package versions for an image name.
Expand Down Expand Up @@ -413,7 +401,7 @@ async def get_and_delete_old_versions(image_name: ImageName, inputs: Inputs, htt
)

if not tasks:
print(f'No more versions to delete for {image_name.value}')
print(f'No more versions to delete for {image_name}')

results = await asyncio.gather(*tasks, return_exceptions=True)

Expand All @@ -429,7 +417,7 @@ async def get_and_delete_old_versions(image_name: ImageName, inputs: Inputs, htt
)


def filter_image_names(all_packages: list[PackageResponse], image_names: list[str]) -> set[ImageName]:
def filter_image_names(all_packages: list[PackageResponse], image_names: list[str]) -> set[str]:
"""
Filter package names by action input package names.
Expand All @@ -450,9 +438,7 @@ def filter_image_names(all_packages: list[PackageResponse], image_names: list[st
for image_name in image_names:
for package in all_packages:
if fnmatch(package.name, image_name):
packages_to_delete_from.add(
ImageName(package.name.strip(), quote_from_bytes(package.name.strip().encode('utf-8'), safe=''))
)
packages_to_delete_from.add(package.name.strip())

return packages_to_delete_from

Expand Down Expand Up @@ -485,7 +471,7 @@ async def main(
:param image_names: The image names to delete versions for. Can be a single
image name, or multiple comma-separated image names.
:param timestamp_to_use: Which timestamp to base our cut-off on. Can be 'updated_at' or 'created_at'.
:param cut_off: Can be a human readable relative time like '2 days ago UTC', or a timestamp.
:param cut_off: Can be a human-readable relative time like '2 days ago UTC', or a timestamp.
Must contain a reference to the timezone.
:param token: The personal access token to authenticate with.
:param untagged_only: Whether to only delete untagged images.
Expand Down Expand Up @@ -529,7 +515,7 @@ async def main(
await asyncio.gather(*tasks)

if needs_github_assistance:
# Print a human readable list of public images we couldn't handle
# Print a human-readable list of public images we couldn't handle
print('\n')
print('─' * 110)
image_list = '\n\t- ' + '\n\t- '.join(needs_github_assistance)
Expand Down
Loading

0 comments on commit 3863241

Please sign in to comment.