Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write sharded repodata #161

Draft
wants to merge 28 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a2cc811
begin sharded repodata creation
dholth May 10, 2024
9001bc8
remove non-overridden method
dholth May 10, 2024
ba6759a
write to output_root
dholth May 10, 2024
11392e5
move save_fs_state out of extract_subdir_to_cache
dholth May 10, 2024
fd6c9bc
shards cli capable of generating repodata from cache database and no …
dholth May 10, 2024
504a61c
create output subdir if necessary
dholth May 10, 2024
d049dc2
allow expanduser in --patch-generator; use output_path when reading s…
dholth May 14, 2024
e979689
continue applying patches to shards
dholth May 14, 2024
4f17a15
add --no-current-repodata option
dholth May 20, 2024
90c92a2
maintain current_repodata=True as the default
dholth May 20, 2024
69a1e6c
Merge branch 'main' into sharded-repodata
dholth May 24, 2024
de75383
Merge branch 'main' into sharded-repodata
dholth May 29, 2024
012f234
skip test_cli; why does it cause test failure?
dholth May 29, 2024
9584bb5
add news
dholth Jun 3, 2024
3cac14a
begin combine-small-shards experiment
dholth Jun 3, 2024
45d3a7c
include virtual name, sha256 columns in create() as well as migration
dholth Jun 12, 2024
06bbd84
Merge branch 'main' into sharded-repodata
dholth Jul 1, 2024
4e4b3fd
Merge branch 'main' into sharded-repodata
dholth Jul 22, 2024
193b6c8
Merge branch 'main' into sharded-repodata
dholth Aug 22, 2024
04afa01
combining the sharded CLI into the main CLI
travishathaway Aug 27, 2024
c01824b
Merge pull request #2 from travishathaway/merge-sharded-repodata-cli
dholth Aug 30, 2024
ff2ae94
Merge branch 'main' into sharded-repodata
dholth Aug 30, 2024
b4582cb
add msgpack dependency
dholth Aug 30, 2024
3c295d2
import annotations
dholth Aug 30, 2024
707f482
Update conda_index/index/shards.py
jezdez Sep 19, 2024
9f599e6
Merge branch 'main' into sharded-repodata
jezdez Sep 19, 2024
ab817a9
bump version 0.6.0
dholth Nov 4, 2024
4bc897c
Merge branch 'main' into sharded-repodata
dholth Nov 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions conda_index/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
import os.path
from pathlib import Path

import click

Expand Down Expand Up @@ -91,6 +92,23 @@
repodata_version=2 which is supported in conda 24.5.0 or later.
""",
)
@click.option(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could replace this with an "add-only" or "no-remove" option, that would keep packages in the index even if they are not found in the filesystem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two options are more about testing from a backup of the conda-forge database, they may not survive into the main branch or we could make them easier to use.

"--save-fs-state/--no-save-fs-state",
help="""
Skip using listdir() to refresh the set of available packages. Used to
generate complete repodata.json from cache only when packages are not on
disk.
""",
default=False,
show_default=True,
)
@click.option(
"--upstream-stage",
help="""
Set to 'clone' to generate example repodata from conda-forge test database.
""",
default="fs",
)
@click.option(
"--current-repodata/--no-current-repodata",
help="""
Expand Down Expand Up @@ -126,6 +144,8 @@ def cli(
run_exports=False,
compact=True,
base_url=None,
save_fs_state=False,
upstream_stage="fs",
current_repodata=True,
):
logutil.configure()
Expand All @@ -146,14 +166,33 @@ def cli(
write_run_exports=run_exports,
compact_json=compact,
base_url=base_url,
save_fs_state=save_fs_state,
write_current_repodata=current_repodata,
)

if save_fs_state is False:
# We call listdir() in save_fs_state, or its remote fs equivalent; then
# we call changed_packages(); but the changed_packages query against a
# remote filesystem is different than the one we need for a local
# filesystem. How about skipping the extract packages stage entirely by
# returning no changed packages? Might fail if we use
# threads/multiprocessing.
def no_changed_packages(self, *args):
return []

channel_index.cache_class.changed_packages = no_changed_packages

# XXX this patch doesn't stick when using multiprocessing
channel_index.cache_class.upstream_stage = upstream_stage

current_index_versions = None
if current_index_versions_file:
with open(current_index_versions_file) as f:
current_index_versions = yaml.safe_load(f)

if patch_generator:
patch_generator = str(Path(patch_generator).expanduser())

channel_index.index(
patch_generator=patch_generator, # or will use outdated .py patch functions
current_index_versions=current_index_versions,
Expand Down
41 changes: 28 additions & 13 deletions conda_index/index/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,27 +209,32 @@ def _make_seconds(timestamp):
)


def _apply_instructions(subdir, repodata, instructions):
def _apply_instructions(subdir, repodata, instructions, new_pkg_fixes=None):
repodata.setdefault("removed", [])
# apply to .tar.bz2 packages
utils.merge_or_update_dict(
repodata.get("packages", {}),
instructions.get("packages", {}),
merge=False,
add_missing_keys=False,
)
# we could have totally separate instructions for .conda than .tar.bz2, but it's easier if we assume
# that a similarly-named .tar.bz2 file is the same content as .conda, and shares fixes
new_pkg_fixes = {
k.replace(CONDA_PACKAGE_EXTENSION_V1, CONDA_PACKAGE_EXTENSION_V2): v
for k, v in instructions.get("packages", {}).items()
}

if new_pkg_fixes is None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new_pkg_fixes argument is part of an effort to make repodata patching faster in the case of shards; the way it works now is not as efficient for shards compared to patching all repodata in one go.

# we could have totally separate instructions for .conda than .tar.bz2, but it's easier if we assume
# that a similarly-named .tar.bz2 file is the same content as .conda, and shares fixes
new_pkg_fixes = {
k.replace(CONDA_PACKAGE_EXTENSION_V1, CONDA_PACKAGE_EXTENSION_V2): v
for k, v in instructions.get("packages", {}).items()
}

# apply .tar.bz2 fixes to packages.conda
utils.merge_or_update_dict(
repodata.get("packages.conda", {}),
new_pkg_fixes,
merge=False,
add_missing_keys=False,
)
# apply .conda-only fixes to packages.conda
utils.merge_or_update_dict(
repodata.get("packages.conda", {}),
instructions.get("packages.conda", {}),
Expand Down Expand Up @@ -481,6 +486,7 @@ class ChannelIndex:
:param channel_url: fsspec URL where package files live. If provided, channel_root will only be used for cache and index output.
:param fs: ``MinimalFS`` instance to be used with channel_url. Wrap fsspec AbstractFileSystem with ``conda_index.index.fs.FsspecFS(fs)``.
:param base_url: Add ``base_url/<subdir>`` to repodata.json to be able to host packages separate from repodata.json
:param save_fs_state: Pass False to use cached filesystem state instead of ``os.listdir(subdir)``
"""

fs: MinimalFS | None = None
Expand All @@ -504,6 +510,7 @@ def __init__(
channel_url: str | None = None,
fs: MinimalFS | None = None,
base_url: str | None = None,
save_fs_state=True,
write_current_repodata=True,
):
if threads is None:
Expand Down Expand Up @@ -531,6 +538,7 @@ def __init__(
self.write_run_exports = write_run_exports
self.compact_json = compact_json
self.base_url = base_url
self.save_fs_state = save_fs_state
self.write_current_repodata = write_current_repodata

def index(
Expand Down Expand Up @@ -571,6 +579,10 @@ def extract_wrapper(args: tuple):
# runs in thread
subdir, verbose, progress, subdir_path = args
cache = self.cache_for_subdir(subdir)
# exactly these packages (unless they are un-indexable) will
# be in the output repodata
if self.save_fs_state:
cache.save_fs_state(subdir_path)
return self.extract_subdir_to_cache(
subdir, verbose, progress, subdir_path, cache
)
Expand Down Expand Up @@ -775,17 +787,18 @@ def cache_for_subdir(self, subdir):
return cache

def extract_subdir_to_cache(
self, subdir, verbose, progress, subdir_path, cache: sqlitecache.CondaIndexCache
):
self,
subdir: str,
verbose,
progress,
subdir_path,
cache: sqlitecache.CondaIndexCache,
) -> str:
"""
Extract all changed packages into the subdir cache.

Return name of subdir.
"""
# exactly these packages (unless they are un-indexable) will be in the
# output repodata
cache.save_fs_state(subdir_path)

log.debug("%s find packages to extract", subdir)

# list so tqdm can show progress
Expand Down Expand Up @@ -1242,6 +1255,8 @@ def _maybe_write_output_paths(
newline = b"\n"
newline_option = None

# XXX could we avoid writing output_temp_path in some cases?

# always use \n line separator
with open(
output_temp_path,
Expand Down
38 changes: 30 additions & 8 deletions conda_index/index/convert_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
log = logging.getLogger(__name__)

# maximum 'PRAGMA user_version' we support
USER_VERSION = 1
USER_VERSION = 2

PATH_INFO = re.compile(
r"""
Expand Down Expand Up @@ -73,7 +73,13 @@ def create(conn):
# has md5, shasum. older? packages do not include timestamp?
# SELECT path, datetime(json_extract(index_json, '$.timestamp'), 'unixepoch'), index_json from index_json
conn.execute(
"CREATE TABLE IF NOT EXISTS index_json (path TEXT PRIMARY KEY, index_json BLOB)"
"""
CREATE TABLE IF NOT EXISTS index_json (
path TEXT PRIMARY KEY, index_json BLOB,
name AS (json_extract(index_json, '$.name')),
sha256 AS (json_extract(index_json, '$.sha256'))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worthwhile to index name; but it might not since we will typically fetch everything.

)
"""
)
conn.execute(
"CREATE TABLE IF NOT EXISTS recipe (path TEXT PRIMARY KEY, recipe BLOB)"
Expand Down Expand Up @@ -127,13 +133,14 @@ def migrate(conn):
"conda-index cache is too new: version {user_version} > {USER_VERSION}"
)

if user_version > 0:
return

remove_prefix(conn)
if user_version < 1:
remove_prefix(conn)
# PRAGMA can't accept ?-substitution
conn.execute("PRAGMA user_version=1")

# PRAGMA can't accept ?-substitution
conn.execute("PRAGMA user_version=1")
if user_version < 2:
add_computed_name(conn)
conn.execute("PRAGMA user_version=2")


def remove_prefix(conn: sqlite3.Connection):
Expand Down Expand Up @@ -161,6 +168,21 @@ def basename(path):
)


def add_computed_name(db: sqlite3.Connection):
"""
Add helpful computed columns to index_json.
"""
columns = set(row[1] for row in db.execute("PRAGMA table_xinfo(index_json)"))
if "name" not in columns:
db.execute(
"ALTER TABLE index_json ADD COLUMN name AS (json_extract(index_json, '$.name'))"
)
if "sha256" not in columns:
db.execute(
"ALTER TABLE index_json ADD COLUMN sha256 AS (json_extract(index_json, '$.sha256'))"
)


def extract_cache_filesystem(path):
"""
Yield interesting (match, <bytes>) members of filesystem at path.
Expand Down
Loading
Loading