-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Write sharded repodata #161
base: main
Are you sure you want to change the base?
Changes from all commits
a2cc811
9001bc8
ba6759a
11392e5
fd6c9bc
504a61c
d049dc2
e979689
4f17a15
90c92a2
69a1e6c
de75383
012f234
9584bb5
3cac14a
45d3a7c
06bbd84
4e4b3fd
193b6c8
04afa01
c01824b
ff2ae94
b4582cb
3c295d2
707f482
9f599e6
ab817a9
4bc897c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -4,12 +4,15 @@ | |||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
import logging | ||||||||||||||||||||||||||||||||||
import os.path | ||||||||||||||||||||||||||||||||||
from pathlib import Path | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
import click | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
from conda_index.index import MAX_THREADS_DEFAULT, ChannelIndex, logutil | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
from .. import yaml | ||||||||||||||||||||||||||||||||||
from ..index.shards import ChannelIndexShards, ShardedIndexCache | ||||||||||||||||||||||||||||||||||
from ..index.sqlitecache import CondaIndexCache | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
@click.command(context_settings={"help_option_names": ["-h", "--help"]}) | ||||||||||||||||||||||||||||||||||
|
@@ -91,6 +94,23 @@ | |||||||||||||||||||||||||||||||||
repodata_version=2 which is supported in conda 24.5.0 or later. | ||||||||||||||||||||||||||||||||||
""", | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
@click.option( | ||||||||||||||||||||||||||||||||||
"--save-fs-state/--no-save-fs-state", | ||||||||||||||||||||||||||||||||||
help=""" | ||||||||||||||||||||||||||||||||||
Skip using listdir() to refresh the set of available packages. Used to | ||||||||||||||||||||||||||||||||||
generate complete repodata.json from cache only when packages are not on | ||||||||||||||||||||||||||||||||||
disk. | ||||||||||||||||||||||||||||||||||
""", | ||||||||||||||||||||||||||||||||||
default=False, | ||||||||||||||||||||||||||||||||||
show_default=True, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
@click.option( | ||||||||||||||||||||||||||||||||||
"--upstream-stage", | ||||||||||||||||||||||||||||||||||
help=""" | ||||||||||||||||||||||||||||||||||
Set to 'clone' to generate example repodata from conda-forge test database. | ||||||||||||||||||||||||||||||||||
""", | ||||||||||||||||||||||||||||||||||
default="fs", | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
@click.option( | ||||||||||||||||||||||||||||||||||
"--current-repodata/--no-current-repodata", | ||||||||||||||||||||||||||||||||||
help=""" | ||||||||||||||||||||||||||||||||||
|
@@ -110,6 +130,14 @@ | |||||||||||||||||||||||||||||||||
default=False, | ||||||||||||||||||||||||||||||||||
is_flag=True, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
@click.option( | ||||||||||||||||||||||||||||||||||
"--sharded", | ||||||||||||||||||||||||||||||||||
help=""" | ||||||||||||||||||||||||||||||||||
Write index using shards | ||||||||||||||||||||||||||||||||||
""", | ||||||||||||||||||||||||||||||||||
default=False, | ||||||||||||||||||||||||||||||||||
is_flag=True, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
Comment on lines
+133
to
+140
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||
def cli( | ||||||||||||||||||||||||||||||||||
dir, | ||||||||||||||||||||||||||||||||||
patch_generator=None, | ||||||||||||||||||||||||||||||||||
|
@@ -126,7 +154,10 @@ def cli( | |||||||||||||||||||||||||||||||||
run_exports=False, | ||||||||||||||||||||||||||||||||||
compact=True, | ||||||||||||||||||||||||||||||||||
base_url=None, | ||||||||||||||||||||||||||||||||||
save_fs_state=False, | ||||||||||||||||||||||||||||||||||
upstream_stage="fs", | ||||||||||||||||||||||||||||||||||
current_repodata=True, | ||||||||||||||||||||||||||||||||||
sharded=False, | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||||||||||
): | ||||||||||||||||||||||||||||||||||
logutil.configure() | ||||||||||||||||||||||||||||||||||
if verbose: | ||||||||||||||||||||||||||||||||||
|
@@ -135,7 +166,10 @@ def cli( | |||||||||||||||||||||||||||||||||
if output: | ||||||||||||||||||||||||||||||||||
output = os.path.expanduser(output) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
channel_index = ChannelIndex( | ||||||||||||||||||||||||||||||||||
channel_index_class = ChannelIndexShards if sharded else ChannelIndex | ||||||||||||||||||||||||||||||||||
cache_class = ShardedIndexCache if sharded else CondaIndexCache | ||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be able to write both with a single CLI invocation. Possibly the grouped query (additional method in ShardedIndexCache) becomes the only query we use from now on. We could maintain the subclass to distinguish when conda-index is being extended by non-shard-aware embedders or we could merge them into a single class. |
||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
channel_index = channel_index_class( | ||||||||||||||||||||||||||||||||||
os.path.expanduser(dir), | ||||||||||||||||||||||||||||||||||
channel_name=channel_name, | ||||||||||||||||||||||||||||||||||
output_root=output, | ||||||||||||||||||||||||||||||||||
|
@@ -146,14 +180,32 @@ def cli( | |||||||||||||||||||||||||||||||||
write_run_exports=run_exports, | ||||||||||||||||||||||||||||||||||
compact_json=compact, | ||||||||||||||||||||||||||||||||||
base_url=base_url, | ||||||||||||||||||||||||||||||||||
save_fs_state=save_fs_state, | ||||||||||||||||||||||||||||||||||
write_current_repodata=current_repodata, | ||||||||||||||||||||||||||||||||||
cache_class=cache_class, | ||||||||||||||||||||||||||||||||||
upstream_stage=upstream_stage, | ||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if save_fs_state is False: | ||||||||||||||||||||||||||||||||||
# We call listdir() in save_fs_state, or its remote fs equivalent; then | ||||||||||||||||||||||||||||||||||
# we call changed_packages(); but the changed_packages query against a | ||||||||||||||||||||||||||||||||||
# remote filesystem is different than the one we need for a local | ||||||||||||||||||||||||||||||||||
# filesystem. How about skipping the extract packages stage entirely by | ||||||||||||||||||||||||||||||||||
# returning no changed packages? Might fail if we use | ||||||||||||||||||||||||||||||||||
# threads/multiprocessing. | ||||||||||||||||||||||||||||||||||
def no_changed_packages(self, *args): | ||||||||||||||||||||||||||||||||||
return [] | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
channel_index.cache_class.changed_packages = no_changed_packages | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
current_index_versions = None | ||||||||||||||||||||||||||||||||||
if current_index_versions_file: | ||||||||||||||||||||||||||||||||||
with open(current_index_versions_file) as f: | ||||||||||||||||||||||||||||||||||
current_index_versions = yaml.safe_load(f) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
if patch_generator: | ||||||||||||||||||||||||||||||||||
patch_generator = str(Path(patch_generator).expanduser()) | ||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||
channel_index.index( | ||||||||||||||||||||||||||||||||||
patch_generator=patch_generator, # or will use outdated .py patch functions | ||||||||||||||||||||||||||||||||||
current_index_versions=current_index_versions, | ||||||||||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -209,27 +209,32 @@ def _make_seconds(timestamp): | |
) | ||
|
||
|
||
def _apply_instructions(subdir, repodata, instructions): | ||
def _apply_instructions(subdir, repodata, instructions, new_pkg_fixes=None): | ||
repodata.setdefault("removed", []) | ||
# apply to .tar.bz2 packages | ||
utils.merge_or_update_dict( | ||
repodata.get("packages", {}), | ||
instructions.get("packages", {}), | ||
merge=False, | ||
add_missing_keys=False, | ||
) | ||
# we could have totally separate instructions for .conda than .tar.bz2, but it's easier if we assume | ||
# that a similarly-named .tar.bz2 file is the same content as .conda, and shares fixes | ||
new_pkg_fixes = { | ||
k.replace(CONDA_PACKAGE_EXTENSION_V1, CONDA_PACKAGE_EXTENSION_V2): v | ||
for k, v in instructions.get("packages", {}).items() | ||
} | ||
|
||
if new_pkg_fixes is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new_pkg_fixes argument is part of an effort to make repodata patching faster in the case of shards; the way it works now is not as efficient for shards compared to patching all repodata in one go. |
||
# we could have totally separate instructions for .conda than .tar.bz2, but it's easier if we assume | ||
# that a similarly-named .tar.bz2 file is the same content as .conda, and shares fixes | ||
new_pkg_fixes = { | ||
k.replace(CONDA_PACKAGE_EXTENSION_V1, CONDA_PACKAGE_EXTENSION_V2): v | ||
for k, v in instructions.get("packages", {}).items() | ||
} | ||
|
||
# apply .tar.bz2 fixes to packages.conda | ||
utils.merge_or_update_dict( | ||
repodata.get("packages.conda", {}), | ||
new_pkg_fixes, | ||
merge=False, | ||
add_missing_keys=False, | ||
) | ||
# apply .conda-only fixes to packages.conda | ||
utils.merge_or_update_dict( | ||
repodata.get("packages.conda", {}), | ||
instructions.get("packages.conda", {}), | ||
|
@@ -481,6 +486,7 @@ class ChannelIndex: | |
:param channel_url: fsspec URL where package files live. If provided, channel_root will only be used for cache and index output. | ||
:param fs: ``MinimalFS`` instance to be used with channel_url. Wrap fsspec AbstractFileSystem with ``conda_index.index.fs.FsspecFS(fs)``. | ||
:param base_url: Add ``base_url/<subdir>`` to repodata.json to be able to host packages separate from repodata.json | ||
:param save_fs_state: Pass False to use cached filesystem state instead of ``os.listdir(subdir)`` | ||
""" | ||
|
||
fs: MinimalFS | None = None | ||
|
@@ -504,7 +510,9 @@ def __init__( | |
channel_url: str | None = None, | ||
fs: MinimalFS | None = None, | ||
base_url: str | None = None, | ||
save_fs_state=True, | ||
write_current_repodata=True, | ||
upstream_stage: str = "fs", | ||
): | ||
if threads is None: | ||
threads = MAX_THREADS_DEFAULT | ||
|
@@ -531,7 +539,9 @@ def __init__( | |
self.write_run_exports = write_run_exports | ||
self.compact_json = compact_json | ||
self.base_url = base_url | ||
self.save_fs_state = save_fs_state | ||
self.write_current_repodata = write_current_repodata | ||
self.upstream_stage = upstream_stage | ||
|
||
def index( | ||
self, | ||
|
@@ -571,6 +581,10 @@ def extract_wrapper(args: tuple): | |
# runs in thread | ||
subdir, verbose, progress, subdir_path = args | ||
cache = self.cache_for_subdir(subdir) | ||
# exactly these packages (unless they are un-indexable) will | ||
# be in the output repodata | ||
if self.save_fs_state: | ||
cache.save_fs_state(subdir_path) | ||
return self.extract_subdir_to_cache( | ||
subdir, verbose, progress, subdir_path, cache | ||
) | ||
|
@@ -768,24 +782,26 @@ def cache_for_subdir(self, subdir): | |
subdir=subdir, | ||
fs=self.fs, | ||
channel_url=self.channel_url, | ||
upstream_stage=self.upstream_stage, | ||
) | ||
if cache.cache_is_brand_new: | ||
# guaranteed to be only thread doing this? | ||
cache.convert() | ||
return cache | ||
|
||
def extract_subdir_to_cache( | ||
self, subdir, verbose, progress, subdir_path, cache: sqlitecache.CondaIndexCache | ||
): | ||
self, | ||
subdir: str, | ||
verbose, | ||
progress, | ||
subdir_path, | ||
cache: sqlitecache.CondaIndexCache, | ||
) -> str: | ||
""" | ||
Extract all changed packages into the subdir cache. | ||
|
||
Return name of subdir. | ||
""" | ||
# exactly these packages (unless they are un-indexable) will be in the | ||
# output repodata | ||
cache.save_fs_state(subdir_path) | ||
|
||
log.debug("%s find packages to extract", subdir) | ||
|
||
# list so tqdm can show progress | ||
|
@@ -1242,6 +1258,8 @@ def _maybe_write_output_paths( | |
newline = b"\n" | ||
newline_option = None | ||
|
||
# XXX could we avoid writing output_temp_path in some cases? | ||
|
||
# always use \n line separator | ||
with open( | ||
output_temp_path, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ | |
log = logging.getLogger(__name__) | ||
|
||
# maximum 'PRAGMA user_version' we support | ||
USER_VERSION = 1 | ||
USER_VERSION = 2 | ||
|
||
PATH_INFO = re.compile( | ||
r""" | ||
|
@@ -72,7 +72,13 @@ def create(conn): | |
# has md5, shasum. older? packages do not include timestamp? | ||
# SELECT path, datetime(json_extract(index_json, '$.timestamp'), 'unixepoch'), index_json from index_json | ||
conn.execute( | ||
"CREATE TABLE IF NOT EXISTS index_json (path TEXT PRIMARY KEY, index_json BLOB)" | ||
""" | ||
CREATE TABLE IF NOT EXISTS index_json ( | ||
path TEXT PRIMARY KEY, index_json BLOB, | ||
name AS (json_extract(index_json, '$.name')), | ||
sha256 AS (json_extract(index_json, '$.sha256')) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be worthwhile to index name; but it might not since we will typically fetch everything. |
||
) | ||
""" | ||
) | ||
conn.execute( | ||
"CREATE TABLE IF NOT EXISTS recipe (path TEXT PRIMARY KEY, recipe BLOB)" | ||
|
@@ -126,13 +132,14 @@ def migrate(conn): | |
"conda-index cache is too new: version {user_version} > {USER_VERSION}" | ||
) | ||
|
||
if user_version > 0: | ||
return | ||
|
||
remove_prefix(conn) | ||
if user_version < 1: | ||
remove_prefix(conn) | ||
# PRAGMA can't accept ?-substitution | ||
conn.execute("PRAGMA user_version=1") | ||
|
||
# PRAGMA can't accept ?-substitution | ||
conn.execute("PRAGMA user_version=1") | ||
if user_version < 2: | ||
add_computed_name(conn) | ||
conn.execute("PRAGMA user_version=2") | ||
|
||
|
||
def remove_prefix(conn: sqlite3.Connection): | ||
|
@@ -160,6 +167,21 @@ def basename(path): | |
) | ||
|
||
|
||
def add_computed_name(db: sqlite3.Connection): | ||
""" | ||
Add helpful computed columns to index_json. | ||
""" | ||
columns = set(row[1] for row in db.execute("PRAGMA table_xinfo(index_json)")) | ||
if "name" not in columns: | ||
db.execute( | ||
"ALTER TABLE index_json ADD COLUMN name AS (json_extract(index_json, '$.name'))" | ||
) | ||
if "sha256" not in columns: | ||
db.execute( | ||
"ALTER TABLE index_json ADD COLUMN sha256 AS (json_extract(index_json, '$.sha256'))" | ||
) | ||
|
||
|
||
def extract_cache_filesystem(path): | ||
""" | ||
Yield interesting (match, <bytes>) members of filesystem at path. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could replace this with an "add-only" or "no-remove" option, that would keep packages in the index even if they are not found in the filesystem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two options are more about testing from a backup of the conda-forge database, they may not survive into the main branch or we could make them easier to use.