Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combining the sharded CLI into the main CLI #2

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions conda_index/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from conda_index.index import MAX_THREADS_DEFAULT, ChannelIndex, logutil

from .. import yaml
from ..index.shards import ChannelIndexShards, ShardedIndexCache
from ..index.sqlitecache import CondaIndexCache


@click.command(context_settings={"help_option_names": ["-h", "--help"]})
Expand Down Expand Up @@ -128,6 +130,14 @@
default=False,
is_flag=True,
)
@click.option(
"--sharded",
help="""
Write index using shards
""",
default=False,
is_flag=True,
)
def cli(
dir,
patch_generator=None,
Expand All @@ -147,6 +157,7 @@ def cli(
save_fs_state=False,
upstream_stage="fs",
current_repodata=True,
sharded=False,
):
logutil.configure()
if verbose:
Expand All @@ -155,7 +166,10 @@ def cli(
if output:
output = os.path.expanduser(output)

channel_index = ChannelIndex(
channel_index_class = ChannelIndexShards if sharded else ChannelIndex
cache_class = ShardedIndexCache if sharded else CondaIndexCache

channel_index = channel_index_class(
os.path.expanduser(dir),
channel_name=channel_name,
output_root=output,
Expand All @@ -168,6 +182,8 @@ def cli(
base_url=base_url,
save_fs_state=save_fs_state,
write_current_repodata=current_repodata,
cache_class=cache_class,
upstream_stage=upstream_stage
)

if save_fs_state is False:
Expand All @@ -182,9 +198,6 @@ def no_changed_packages(self, *args):

channel_index.cache_class.changed_packages = no_changed_packages

# XXX this patch doesn't stick when using multiprocessing
channel_index.cache_class.upstream_stage = upstream_stage

current_index_versions = None
if current_index_versions_file:
with open(current_index_versions_file) as f:
Expand Down
3 changes: 3 additions & 0 deletions conda_index/index/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ def __init__(
base_url: str | None = None,
save_fs_state=True,
write_current_repodata=True,
upstream_stage: str = "fs",
):
if threads is None:
threads = MAX_THREADS_DEFAULT
Expand Down Expand Up @@ -540,6 +541,7 @@ def __init__(
self.base_url = base_url
self.save_fs_state = save_fs_state
self.write_current_repodata = write_current_repodata
self.upstream_stage = upstream_stage

def index(
self,
Expand Down Expand Up @@ -780,6 +782,7 @@ def cache_for_subdir(self, subdir):
subdir=subdir,
fs=self.fs,
channel_url=self.channel_url,
upstream_stage=self.upstream_stage,
)
if cache.cache_is_brand_new:
# guaranteed to be only thread doing this?
Expand Down
187 changes: 2 additions & 185 deletions conda_index/index/shards.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,18 @@
Sharded repodata.
"""

import click
import hashlib
import itertools
import json
import logging
from pathlib import Path
from typing import Any
from . import MAX_THREADS_DEFAULT, logutil

import msgpack
import zstandard

from conda_index.index.sqlitecache import CondaIndexCache

from .. import yaml
from . import (
CONDA_PACKAGE_EXTENSIONS,
REPODATA_VERSION,
Expand Down Expand Up @@ -113,6 +110,8 @@ def index_prepared_subdir(
subdir, verbose=verbose, progress=progress
)

print(len(shards_from_packages["shards"]))

log.info("%s Writing pre-patch shards", subdir)
unpatched_path = self.channel_root / subdir / "repodata_shards.msgpack.zst"
self._maybe_write(
Expand Down Expand Up @@ -286,185 +285,3 @@ def per_shard_apply_instructions():
)

return dict(per_shard_apply_instructions()), instructions


@click.command(context_settings={"help_option_names": ["-h", "--help"]})
@click.argument("dir")
@click.option("--output", help="Output repodata to given directory.")
@click.option(
"--subdir",
multiple=True,
default=None,
help="Subdir to index. Accepts multiple.",
)
@click.option(
"-n",
"--channel-name",
help="Customize the channel name listed in each channel's index.html.",
)
@click.option(
"--patch-generator",
required=False,
help="Path to Python file that outputs metadata patch instructions from its "
"_patch_repodata function or a .tar.bz2/.conda file which contains a "
"patch_instructions.json file for each subdir",
)
@click.option(
"--channeldata/--no-channeldata",
help="Generate channeldata.json.",
default=False,
show_default=True,
)
@click.option(
"--rss/--no-rss",
help="Write rss.xml (Only if --channeldata is enabled).",
default=True,
show_default=True,
)
@click.option(
"--bz2/--no-bz2",
help="Write repodata.json.bz2.",
default=False,
show_default=True,
)
@click.option(
"--zst/--no-zst",
help="Write repodata.json.zst.",
default=False,
show_default=True,
)
@click.option(
"--run-exports/--no-run-exports",
help="Write run_exports.json.",
default=False,
show_default=True,
)
@click.option(
"--compact/--no-compact",
help="Output JSON as one line, or pretty-printed.",
default=True,
show_default=True,
)
@click.option(
"--current-index-versions-file",
"-m",
help="""
YAML file containing name of package as key, and list of versions as values. The current_index.json
will contain the newest from this series of versions. For example:

python:
- 3.8
- 3.9

will keep python 3.8.X and 3.9.Y in the current_index.json, instead of only the very latest python version.
""",
)
@click.option(
"--base-url",
help="""
If packages should be served separately from repodata.json, URL of the
directory tree holding packages. Generates repodata.json with
repodata_version=2 which is supported in conda 24.5.0 or later.
""",
)
@click.option(
"--save-fs-state/--no-save-fs-state",
help="""
Skip using listdir() to refresh the set of available packages. Used to
generate complete repodata.json from cache only when packages are not on
disk.
""",
default=False,
show_default=True,
)
@click.option(
"--upstream-stage",
help="""
Set to 'clone' to generate example repodata from conda-forge test database.
""",
default="fs",
)
@click.option("--threads", default=MAX_THREADS_DEFAULT, show_default=True)
@click.option(
"--verbose",
help="""
Enable debug logging.
""",
default=False,
is_flag=True,
)
def cli(
dir,
patch_generator=None,
subdir=None,
output=None,
channeldata=False,
verbose=False,
threads=None,
current_index_versions_file=None,
channel_name=None,
bz2=False,
zst=False,
rss=False,
run_exports=False,
compact=True,
base_url=None,
save_fs_state=False,
upstream_stage="fs",
):
logutil.configure()
if verbose:
logging.getLogger("conda_index.index").setLevel(logging.DEBUG)

if output:
output = Path(output).expanduser()

channel_index = ChannelIndexShards(
Path(dir).expanduser(),
channel_name=channel_name,
output_root=output,
subdirs=subdir,
write_bz2=bz2,
write_zst=zst,
threads=1,
write_run_exports=run_exports,
compact_json=compact,
base_url=base_url,
save_fs_state=save_fs_state,
)

if save_fs_state is False:
# We call listdir() in save_fs_state, or its remote fs equivalent; then
# we call changed_packages(); but the changed_packages query against a
# remote filesystem is different than the one we need for a local
# filesystem. How about skipping the extract packages stage entirely by
# returning no changed packages? Might fail if we use
# threads/multiprocessing.
def no_changed_packages(self, *args):
return []

ShardedIndexCache.changed_packages = no_changed_packages

ShardedIndexCache.upstream_stage = upstream_stage

current_index_versions = None
if current_index_versions_file:
with open(current_index_versions_file) as f:
current_index_versions = yaml.safe_load(f)

if patch_generator:
patch_generator = str(Path(patch_generator).expanduser())

channel_index.index(
patch_generator=patch_generator, # or will use outdated .py patch functions
current_index_versions=current_index_versions,
progress=False, # clone is a batch job
)

if channeldata: # about 2 1/2 minutes for conda-forge
# XXX wants to read repodata.json, not shards
channel_index.update_channeldata(rss=rss)


if __name__ == "__main__":
cli()
5 changes: 3 additions & 2 deletions conda_index/index/sqlitecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,21 @@ def __get__(self, inst, objtype=None) -> Any:


class CondaIndexCache:
upstream_stage = "fs"

def __init__(
self,
channel_root: Path | str,
subdir: str,
*,
fs: MinimalFS | None = None,
channel_url: str | None = None,
upstream_stage: str = "fs",
):
"""
channel_root: directory containing platform subdir's, e.g. /clones/conda-forge
subdir: platform subdir, e.g. 'linux-64'
fs: MinimalFS (designed to wrap fsspec.spec.AbstractFileSystem); optional.
channel_url: base url if fs is used; optional.
upstream_stage: type of index record it is; defaults to "fs"
"""

self.subdir = subdir
Expand All @@ -108,6 +108,7 @@ def __init__(
self.cache_dir = Path(channel_root, subdir, ".cache")
self.db_filename = Path(self.cache_dir, "cache.db")
self.cache_is_brand_new = not self.db_filename.exists()
self.upstream_stage = upstream_stage

self.fs = fs or MinimalFS()
self.channel_url = channel_url or str(channel_root)
Expand Down
Loading