Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More efficient CLI multiprocessing #47

Merged
merged 3 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 62 additions & 44 deletions courlan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from concurrent.futures import ProcessPoolExecutor, as_completed
from itertools import islice
from typing import Any, Iterator, List, Optional, Tuple
from typing import Any, List, Optional, Tuple

from .core import check_url
from .sampling import sample_urls
Expand Down Expand Up @@ -102,56 +102,74 @@ def _cli_check_urls(
return results


def _get_line_batches(filename: str, size: int = 1000) -> Iterator[List[str]]:
"Iterate over a file and returns series of line batches."
with open(filename, "r", encoding="utf-8", errors="ignore") as inputfh:
def _cli_sample(args: Any) -> None:
"Sample URLs on the CLI."
urllist: List[str] = []

with open(args.inputfile, "r", encoding="utf-8", errors="ignore") as inputfh:
urllist.extend(line.strip() for line in inputfh)

with open(args.outputfile, "w", encoding="utf-8") as outputfh:
for url in sample_urls(
urllist,
args.samplesize,
exclude_min=args.exclude_min,
exclude_max=args.exclude_max,
strict=args.strict,
verbose=args.verbose,
):
outputfh.write(url + "\n")


def _cli_process(args: Any) -> None:
"Read input file bit by bit and process URLs in batches."
with ProcessPoolExecutor(max_workers=args.parallel) as executor, open(
args.outputfile, "w", encoding="utf-8"
) as outputfh, open(
args.inputfile, "r", encoding="utf-8", errors="ignore"
) as inputfh:
while True:
line_batch = list(islice(inputfh, size))
if not line_batch:
batches = [] # type: List[List[str]]
while len(batches) < 1000:
line_batch = list(islice(inputfh, 1000))
if not line_batch:
break
batches.append(line_batch)

if batches:
futures = (
executor.submit(
_cli_check_urls,
batch,
strict=args.strict,
with_redirects=args.redirects,
language=args.language,
)
for batch in batches
)

for future in as_completed(futures):
for valid, url in future.result():
if valid:
outputfh.write(url + "\n")
# proceed with discarded URLs. to be rewritten
elif args.discardedfile is not None:
with open(
args.discardedfile, "a", encoding="utf-8"
) as discardfh:
discardfh.write(url)

batches = []
else:
break
yield line_batch


def process_args(args: Any) -> None:
"""Start processing according to the arguments"""
if not args.sample:
with ProcessPoolExecutor(max_workers=args.parallel) as executor, open(
args.outputfile, "w", encoding="utf-8"
) as outputfh:
futures = (
executor.submit(
_cli_check_urls,
batch,
strict=args.strict,
with_redirects=args.redirects,
language=args.language,
)
for batch in _get_line_batches(args.inputfile)
)
for future in as_completed(futures):
for valid, url in future.result():
if valid:
outputfh.write(url + "\n")
# proceed with discarded URLs. to be rewritten
elif args.discardedfile is not None:
with open(
args.discardedfile, "a", encoding="utf-8"
) as discardfh:
discardfh.write(url)
if args.sample:
_cli_sample(args)
else:
urllist: List[str] = []
with open(args.inputfile, "r", encoding="utf-8", errors="ignore") as inputfh:
urllist.extend(line.strip() for line in inputfh)
with open(args.outputfile, "w", encoding="utf-8") as outputfh:
for url in sample_urls(
urllist,
args.samplesize,
exclude_min=args.exclude_min,
exclude_max=args.exclude_max,
strict=args.strict,
verbose=args.verbose,
):
outputfh.write(url + "\n")
_cli_process(args)


def main() -> None:
Expand Down
5 changes: 3 additions & 2 deletions courlan/urlstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ def dump_urls(self) -> List[str]:
def print_unvisited_urls(self) -> None:
"Print all unvisited URLs in store."
for domain in self.urldict:
print("\n".join(self.find_unvisited_urls(domain)))
print("\n".join(self.find_unvisited_urls(domain)), flush=True)

def print_urls(self) -> None:
"Print all URLs in store (URL + TAB + visited or not)."
Expand All @@ -485,5 +485,6 @@ def print_urls(self) -> None:
domain + u.urlpath + "\t" + str(u.visited)
for u in self._load_urls(domain)
]
)
),
flush=True,
)
14 changes: 5 additions & 9 deletions tests/unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,12 @@ def test_fix_relative():
== "https://www.example.org/dir/subdir/this:that"
)
assert (
fix_relative_urls("https://www.example.org/test.html?q=test#frag", "foo.html?q=bar#baz")
fix_relative_urls(
"https://www.example.org/test.html?q=test#frag", "foo.html?q=bar#baz"
)
== "https://www.example.org/foo.html?q=bar#baz"
)
assert (
fix_relative_urls("https://www.example.org", "{privacy}")
== "{privacy}"
)
assert fix_relative_urls("https://www.example.org", "{privacy}") == "{privacy}"


def test_scrub():
Expand Down Expand Up @@ -921,10 +920,7 @@ def test_cli():
assert os.system("courlan --help") == 0 # exit status

# _cli_check_urls
assert cli._cli_check_urls(["123", "https://example.org"]) == [
(False, "123"),
(True, "https://example.org"),
]
assert cli._cli_check_urls(["123", "https://example.org"]) == [(False, "123"), (True, "https://example.org")]

# testfile
inputfile = os.path.join(RESOURCES_DIR, "input.txt")
Expand Down