Skip to content

Commit

Permalink
Improve test coverage (#123)
Browse files Browse the repository at this point in the history
* remove 'if verbose' check

* remove unused function

* improve test coverage; add some type annotations

* move indexed packages query into sqlitecache

* test incompatible patch

* additional tests

* improve coverage; remove code related to converting tar archives of legacy cache
  • Loading branch information
dholth authored Oct 11, 2023
1 parent c17dfa4 commit 3fcd25a
Show file tree
Hide file tree
Showing 22 changed files with 622 additions and 165 deletions.
161 changes: 65 additions & 96 deletions conda_index/index/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,69 +534,74 @@ def index(
``index.html`` for each subdir.
"""
if verbose:
level = logging.DEBUG
else:
level = logging.ERROR

logging_context = utils.LoggingContext(level, loggers=[__name__])
log.debug(
"ChannelIndex.index(verbose=...) is a no-op. Alter log levels for %s to control verbosity.",
__name__,
)

with logging_context:
subdirs = self.detect_subdirs()
subdirs = self.detect_subdirs()

# Lock local channel.
with utils.try_acquire_locks(
[utils.get_lock(self.channel_root)], timeout=900
):
# begin non-stop "extract packages into cache";
# extract_subdir_to_cache manages subprocesses. Keeps cores busy
# during write/patch/update channeldata steps.
def extract_subdirs_to_cache():
executor = ThreadPoolExecutor(max_workers=1)

def extract_args():
for subdir in subdirs:
# .cache is currently in channel_root not output_root
_ensure_valid_channel(self.channel_root, subdir)
subdir_path = join(self.channel_root, subdir)
yield (subdir, verbose, progress, subdir_path)

def extract_wrapper(args: tuple):
# runs in thread
subdir, verbose, progress, subdir_path = args
cache = self.cache_for_subdir(subdir)
return self.extract_subdir_to_cache(
subdir, verbose, progress, subdir_path, cache
)
# Lock local channel.
with utils.try_acquire_locks([utils.get_lock(self.channel_root)], timeout=900):
# begin non-stop "extract packages into cache";
# extract_subdir_to_cache manages subprocesses. Keeps cores busy
# during write/patch/update channeldata steps.
def extract_subdirs_to_cache():
executor = ThreadPoolExecutor(max_workers=1)

def extract_args():
for subdir in subdirs:
# .cache is currently in channel_root not output_root
_ensure_valid_channel(self.channel_root, subdir)
subdir_path = join(self.channel_root, subdir)
yield (subdir, verbose, progress, subdir_path)

def extract_wrapper(args: tuple):
# runs in thread
subdir, verbose, progress, subdir_path = args
cache = self.cache_for_subdir(subdir)
return self.extract_subdir_to_cache(
subdir, verbose, progress, subdir_path, cache
)

# map() gives results in order passed, not in order of
# completion. If using multiple threads, switch to
# submit() / as_completed().
return executor.map(extract_wrapper, extract_args())

# Collect repodata from packages, save to
# REPODATA_FROM_PKGS_JSON_FN file
with self.thread_executor_factory() as index_process:
futures = [
index_process.submit(
functools.partial(
self.index_prepared_subdir,
subdir=subdir,
verbose=verbose,
progress=progress,
patch_generator=patch_generator,
current_index_versions=current_index_versions,
)
# map() gives results in order passed, not in order of
# completion. If using multiple threads, switch to
# submit() / as_completed().
return executor.map(extract_wrapper, extract_args())

# Collect repodata from packages, save to
# REPODATA_FROM_PKGS_JSON_FN file
with self.thread_executor_factory() as index_process:
futures = [
index_process.submit(
functools.partial(
self.index_prepared_subdir,
subdir=subdir,
verbose=verbose,
progress=progress,
patch_generator=patch_generator,
current_index_versions=current_index_versions,
)
for subdir in extract_subdirs_to_cache()
]
# limited API to support DummyExecutor
for future in futures:
result = future.result()
log.info(f"Completed {result}")
)
for subdir in extract_subdirs_to_cache()
]
# limited API to support DummyExecutor
for future in futures:
result = future.result()
log.info(f"Completed {result}")

def index_prepared_subdir(
self, subdir, verbose, progress, patch_generator, current_index_versions
self,
subdir: str,
verbose: bool,
progress: bool,
patch_generator,
current_index_versions,
):
"""
Create repodata_from_packages.json by calling index_subdir, then apply
any patches to create repodata.json.
"""
log.info("Subdir: %s Gathering repodata", subdir)

repodata_from_packages = self.index_subdir(
Expand Down Expand Up @@ -727,29 +732,9 @@ def index_subdir(self, subdir, verbose=False, progress=False):

cache = self.cache_for_subdir(subdir)

if verbose:
log.info("Building repodata for %s" % subdir_path)
log.debug("Building repodata for %s" % subdir_path)

new_repodata_packages = {}
new_repodata_conda_packages = {}

# load cached packages
for row in cache.db.execute(
"""
SELECT path, index_json FROM stat JOIN index_json USING (path)
WHERE stat.stage = ?
ORDER BY path
""",
(cache.upstream_stage,),
):
path, index_json = row
index_json = json.loads(index_json)
if path.endswith(CONDA_PACKAGE_EXTENSION_V1):
new_repodata_packages[path] = index_json
elif path.endswith(CONDA_PACKAGE_EXTENSION_V2):
new_repodata_conda_packages[path] = index_json
else:
log.warn("%s doesn't look like a conda package", path)
new_repodata_packages, new_repodata_conda_packages = cache.indexed_packages()

new_repodata = {
"packages": new_repodata_packages,
Expand Down Expand Up @@ -824,7 +809,7 @@ def extract_subdir_to_cache(
end_time = time.time()
try:
bytes_sec = size_processed / (end_time - start_time)
except ZeroDivisionError:
except ZeroDivisionError: # pragma: no cover
bytes_sec = 0
log.info(
"%s cached %s from %s packages at %s/second",
Expand Down Expand Up @@ -931,21 +916,6 @@ def _update_channeldata(self, channel_data, repodata, subdir):
)
package_data = channel_data.get("packages", {})

def _append_group(groups, candidate):
pkg_dict = candidate[1]
pkg_name = pkg_dict["name"]

run_exports = package_data.get(pkg_name, {}).get("run_exports", {})
if (
pkg_name not in package_data
or subdir not in package_data.get(pkg_name, {}).get("subdirs", [])
or package_data.get(pkg_name, {}).get("timestamp", 0)
< _make_seconds(pkg_dict.get("timestamp", 0))
or run_exports
and pkg_dict["version"] not in run_exports
):
groups.append(candidate)

# Pay special attention to groups that have run_exports - we
# need to process each version group by version; take newest per
# version group. We handle groups that are not in the index at
Expand Down Expand Up @@ -1093,8 +1063,7 @@ def build_run_exports_data(self, subdir, verbose=False, progress=False):

cache = self.cache_for_subdir(subdir)

if verbose:
log.info("Building run_exports for %s" % subdir_path)
log.debug("Building run_exports for %s" % subdir_path)

run_exports_packages = {}
run_exports_conda_packages = {}
Expand Down
69 changes: 2 additions & 67 deletions conda_index/index/convert_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import re
import sqlite3
import tarfile
from contextlib import closing

from more_itertools import ichunked

Expand Down Expand Up @@ -167,29 +166,6 @@ def basename(path):
)


def extract_cache(path):
"""
Yield interesting (match, tar entry) members of tarball at path.
"""
dirnames = set()
tf = tarfile.open(path)
last_len = len(dirnames)
try:
for entry in tf:
match = PATH_INFO.search(entry.name)
if match:
yield match, tf.extractfile(entry)
dirnames.add(os.path.dirname(entry.name))
next_len = len(dirnames)
if last_len < next_len:
log.info(f"CONVERT {os.path.dirname(entry.name)}")
last_len = next_len
except KeyboardInterrupt:
log.warn("Interrupted!")

log.info("%s", dirnames)


def extract_cache_filesystem(path):
"""
Yield interesting (match, <bytes>) members of filesystem at path.
Expand All @@ -207,7 +183,7 @@ def extract_cache_filesystem(path):
try:
with open(fullpath, "rb") as entry:
yield path_info, entry
except PermissionError as e:
except PermissionError as e: # pragma: no cover
log.warn("Permission error: %s %s", fullpath, e)


Expand Down Expand Up @@ -282,7 +258,7 @@ def convert_cache(conn, cache_generator):
},
)

else:
else: # pragma: no cover
log.warn("Unhandled", match.groupdict())


Expand Down Expand Up @@ -327,44 +303,3 @@ def merge_index_cache(channel_root, output_db="merged.db"):
raise

combined_db.execute("DETACH DATABASE subdir")


def test_from_archive(archive_path):
conn = common.connect("linux-64-cache.db")
create(conn)
with closing(conn):
convert_cache(conn, extract_cache(archive_path))
remove_prefix(conn)


def test():
for i, _ in enumerate(
extract_cache_filesystem(os.path.expanduser("~/miniconda3/osx-64/.cache"))
):
if i > 100:
break


if __name__ == "__main__":
from . import logutil

logutil.configure()
test()
# email us if you're thinking about downloading conda-forge to
# regenerate this 264MB file
CACHE_ARCHIVE = os.path.expanduser("~/Downloads/linux-64-cache.tar.bz2")
test_from_archive(CACHE_ARCHIVE)


# typically 600-10,000 MB
MB_PER_DAY = """
select
date(mtime, 'unixepoch') as d,
printf('%0.2f', sum(size) / 1e6) as MB
from
stat
group by
date(mtime, 'unixepoch')
order by
mtime desc
"""
33 changes: 32 additions & 1 deletion conda_index/index/sqlitecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
from os.path import join
from typing import Any
from zipfile import BadZipFile
from ..utils import (
CONDA_PACKAGE_EXTENSION_V1,
CONDA_PACKAGE_EXTENSION_V2,
)

from conda_package_streaming import package_streaming

Expand Down Expand Up @@ -108,7 +112,7 @@ def __setstate__(self, d):
self.__dict__ = d

@cacher
def db(self):
def db(self) -> sqlite3.Connection:
"""
Connection to our sqlite3 database.
"""
Expand Down Expand Up @@ -488,6 +492,33 @@ def changed_packages(self):

return query

def indexed_packages(self):
"""
Return "packages" and "packages.conda" values from the cache.
"""
new_repodata_packages = {}
new_repodata_conda_packages = {}

# load cached packages
for row in self.db.execute(
"""
SELECT path, index_json FROM stat JOIN index_json USING (path)
WHERE stat.stage = ?
ORDER BY path
""",
(self.upstream_stage,),
):
path, index_json = row
index_json = json.loads(index_json)
if path.endswith(CONDA_PACKAGE_EXTENSION_V1):
new_repodata_packages[path] = index_json
elif path.endswith(CONDA_PACKAGE_EXTENSION_V2):
new_repodata_conda_packages[path] = index_json
else:
log.warn("%s doesn't look like a conda package", path)

return new_repodata_packages, new_repodata_conda_packages

def store_index_json_stat(self, database_path, mtime, size, index_json):
self.db.execute(
"""INSERT OR REPLACE INTO stat (stage, path, mtime, size, sha256, md5)
Expand Down
17 changes: 17 additions & 0 deletions docs/database.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,20 @@ create `current_repodata.json` with only the latest versions of each package,
are similar to pre-sqlite3 conda-index.

The other cached metadata tables are used to create `channeldata.json`.


## Sample queries

Megabytes added per day:

```sql
select
date(mtime, 'unixepoch') as d,
printf('%0.2f', sum(size) / 1e6) as MB
from
stat
group by
date(mtime, 'unixepoch')
order by
mtime desc
```
10 changes: 10 additions & 0 deletions tests/gen_patch_2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# for tests only
def _patch_repodata(repodata, subdir):
index = repodata["packages"]
instructions = {
"patch_instructions_version": 2,
"packages": {},
"revoke": [],
"remove": [],
}
return instructions
Loading

0 comments on commit 3fcd25a

Please sign in to comment.