Skip to content

Commit

Permalink
CLI: proper batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
adbar committed Jun 26, 2023
1 parent 53fca91 commit d305524
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 63 deletions.
142 changes: 82 additions & 60 deletions courlan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
import argparse
import sys

from concurrent.futures import ProcessPoolExecutor
from functools import partial
from concurrent.futures import ProcessPoolExecutor, as_completed
from itertools import islice
from typing import Any, Iterator, List, Optional, Tuple
from typing import Any, List, Optional, Tuple

from .core import check_url
from .sampling import sample_urls
Expand Down Expand Up @@ -79,75 +78,98 @@ def parse_args(args: Any) -> Any:
return argsparser.parse_args()


def _cli_check_url(
url: str,
def _cli_check_urls(
urls: List[str],
strict: bool = False,
with_redirects: bool = False,
language: Optional[str] = None,
with_nav: bool = False,
) -> Tuple[bool, str]:
) -> List[Tuple[bool, str]]:
"Internal function to be used with CLI multiprocessing."
result = check_url(
url,
strict=strict,
with_redirects=with_redirects,
language=language,
with_nav=with_nav,
)
if result is not None:
return (True, result[0])
return (False, url)


def _get_line_batches(filename: str, size: int = 1000) -> Iterator[List[str]]:
"Iterate over a file and returns series of line batches."
with open(filename, "r", encoding="utf-8", errors="ignore") as inputfh:
results = []
for url in urls:
result = check_url(
url,
strict=strict,
with_redirects=with_redirects,
language=language,
with_nav=with_nav,
)
if result is not None:
results.append((True, result[0]))
else:
results.append((False, url))
return results


def _cli_sample(args: Any) -> None:
"Sample URLs on the CLI."
urllist: List[str] = []

with open(args.inputfile, "r", encoding="utf-8", errors="ignore") as inputfh:
urllist.extend(line.strip() for line in inputfh)

with open(args.outputfile, "w", encoding="utf-8") as outputfh:
for url in sample_urls(
urllist,
args.samplesize,
exclude_min=args.exclude_min,
exclude_max=args.exclude_max,
strict=args.strict,
verbose=args.verbose,
):
outputfh.write(url + "\n")


def _cli_process(args: Any) -> None:
"Read input file bit by bit and process URLs in batches."
with ProcessPoolExecutor(max_workers=args.parallel) as executor, open(
args.outputfile, "w", encoding="utf-8"
) as outputfh, open(
args.inputfile, "r", encoding="utf-8", errors="ignore"
) as inputfh:
while True:
line_batch = list(islice(inputfh, size))
if not line_batch:
batches = [] # type: List[List[str]]
while len(batches) < 1000:
line_batch = list(islice(inputfh, 1000))
if not line_batch:
break
batches.append(line_batch)

if batches:
futures = (
executor.submit(
_cli_check_urls,
batch,
strict=args.strict,
with_redirects=args.redirects,
language=args.language,
)
for batch in batches
)

for future in as_completed(futures):
for valid, url in future.result():
if valid:
outputfh.write(url + "\n")
# proceed with discarded URLs. to be rewritten
elif args.discardedfile is not None:
with open(
args.discardedfile, "a", encoding="utf-8"
) as discardfh:
discardfh.write(url)

batches = []
else:
break
yield line_batch


def process_args(args: Any) -> None:
"""Start processing according to the arguments"""
if not args.sample:
with ProcessPoolExecutor(max_workers=args.parallel) as executor, open(
args.outputfile, "w", encoding="utf-8"
) as outputfh:
for batch in _get_line_batches(args.inputfile):
results = executor.map(
partial(
_cli_check_url,
strict=args.strict,
with_redirects=args.redirects,
language=args.language,
),
batch,
)
for valid, url in results:
if valid:
outputfh.write(url + "\n")
# proceed with discarded URLs. to be rewritten
elif args.discardedfile is not None:
with open(
args.discardedfile, "a", encoding="utf-8"
) as discardfh:
discardfh.write(url)
if args.sample:
_cli_sample(args)
else:
urllist: List[str] = []
with open(args.inputfile, "r", encoding="utf-8", errors="ignore") as inputfh:
urllist.extend(line.strip() for line in inputfh)
with open(args.outputfile, "w", encoding="utf-8") as outputfh:
for url in sample_urls(
urllist,
args.samplesize,
exclude_min=args.exclude_min,
exclude_max=args.exclude_max,
strict=args.strict,
verbose=args.verbose,
):
outputfh.write(url + "\n")
_cli_process(args)


def main() -> None:
Expand Down
4 changes: 1 addition & 3 deletions tests/unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,9 +920,7 @@ def test_cli():
assert os.system("courlan --help") == 0 # exit status

# _cli_check_urls

assert cli._cli_check_url("123") == (False, "123")
assert cli._cli_check_url("https://example.org") == (True, "https://example.org")
assert cli._cli_check_urls(["123", "https://example.org"]) == [(False, "123"), (True, "https://example.org")]

# testfile
inputfile = os.path.join(RESOURCES_DIR, "input.txt")
Expand Down

0 comments on commit d305524

Please sign in to comment.