From 9bcecadc7570c0571a3fde027175128311bfc80d Mon Sep 17 00:00:00 2001 From: Romain Beaumont Date: Fri, 6 Jan 2023 03:09:05 +0100 Subject: [PATCH 1/5] add video platform does not actually work well --- cc2dataset/main.py | 18 ++++++++++++++++++ examples/single_warc_example.py | 2 +- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/cc2dataset/main.py b/cc2dataset/main.py index efbd6bb..0c2ad53 100644 --- a/cc2dataset/main.py +++ b/cc2dataset/main.py @@ -17,7 +17,21 @@ import time from .spark_session_builder import build_spark_session from io import BytesIO +from yt_dlp.extractor import gen_extractor_classes, GenericIE + +def valid_video_platform_link(link): + if "amazon" in link.get("url", "") or "drive" in link.get("url", "") or "twitter" in link.get("url", "") or "pinterest" in link.get("url", "") or "youtube" in link.get("url", "") or "instagram" in link.get("url", ""): + return False + for ie in gen_extractor_classes(): + if ie != GenericIE and ie.suitable(link.get("url", "")): + return True + return False + +def extract_video_platform_from_links(links): + #links = links[:100] + filtered_links = [{"url": link["url"], "alt": link.get("text", "")} for link in links if valid_video_platform_link(link)] + return filtered_links def valid_video_link(link): valid_http = link.get("url", "").startswith("http") @@ -105,6 +119,8 @@ def extract_documents_from_links(links, document_type): return extract_text_from_links(links) elif document_type == "video": return extract_video_from_links(links) + elif document_type == "video_platform": + return extract_video_platform_from_links(links) else: raise ValueError(f"Unknown document type {document_type}") @@ -136,6 +152,8 @@ def extract_documents_from_wat(stream, document_type): for link in filtered_links: link["uid"] = str(hashlib.md5((link["alt"] + link["url"]).encode()).hexdigest()) all_links.extend(filtered_links) + if len(all_links) > 100: + return all_links except Exception as e: # pylint: disable=broad-except logger.info(e) logger.info("A shard failed to parse") diff --git a/examples/single_warc_example.py b/examples/single_warc_example.py index 3d2cffb..4771d2a 100644 --- a/examples/single_warc_example.py +++ b/examples/single_warc_example.py @@ -10,7 +10,7 @@ else: url = "https://data.commoncrawl.org/" + wat - results = process_wat(url, "image") + results = process_wat(url, "video_platform") df = pd.DataFrame(results, columns=["uid", "url", "alt"]) df.to_parquet(os.getcwd() + "/output.parquet") print(df) From 7af4f7dc49efe0d44bc0a0afe32c9f5a2fc6a786 Mon Sep 17 00:00:00 2001 From: Marianna Date: Tue, 26 Sep 2023 13:16:02 +0200 Subject: [PATCH 2/5] add WARC iframe support --- cc2dataset/main.py | 223 ++++++++++++++++++++-------- cc2dataset/spark_session_builder.py | 3 +- 2 files changed, 161 insertions(+), 65 deletions(-) diff --git a/cc2dataset/main.py b/cc2dataset/main.py index 0c2ad53..f166007 100644 --- a/cc2dataset/main.py +++ b/cc2dataset/main.py @@ -2,6 +2,8 @@ from fastwarc.warc import ArchiveIterator, WarcRecordType +from resiliparse.parse.html import HTMLTree +from resiliparse.extract.html2text import extract_plain_text import simdjson import fsspec from timeit import default_timer as timer @@ -17,32 +19,20 @@ import time from .spark_session_builder import build_spark_session from io import BytesIO -from yt_dlp.extractor import gen_extractor_classes, GenericIE - +from urllib.parse import urljoin +import fire -def valid_video_platform_link(link): - if "amazon" in link.get("url", "") or "drive" in link.get("url", "") or "twitter" in link.get("url", "") or "pinterest" in link.get("url", "") or "youtube" in link.get("url", "") or "instagram" in link.get("url", ""): - return False - for ie in gen_extractor_classes(): - if ie != GenericIE and ie.suitable(link.get("url", "")): - return True - return False - -def extract_video_platform_from_links(links): - #links = links[:100] - filtered_links = [{"url": link["url"], "alt": link.get("text", "")} for link in links if valid_video_platform_link(link)] - return filtered_links def valid_video_link(link): - valid_http = link.get("url", "").startswith("http") valid_video = any( link.get("url", "").endswith(ext) for ext in [".avi", ".mp4", ".mkv", ".webm", ".mov", ".mpg", ".mpeg", ".m4v"] ) - return valid_http and valid_video + return valid_video def extract_video_from_links(links): - filtered_links = [{"url": link["url"], "alt": link.get("text", "")} for link in links if valid_video_link(link)] + filtered_links = [{"url": link["url"], "alt": link.get( + "text", "")} for link in links if valid_video_link(link)] return filtered_links @@ -68,8 +58,6 @@ def extract_video_from_links(links): def valid_text_link(link): - if not link.get("url", "").startswith("http"): - return False splits = link.get("url", "").split(".") if len(splits) < 2: return False @@ -79,40 +67,69 @@ def valid_text_link(link): def extract_text_from_links(links): - filtered_links = [{"url": link["url"], "alt": link.get("text", "")} for link in links if valid_text_link(link)] + filtered_links = [{"url": link["url"], "alt": link.get( + "text", "")} for link in links if valid_text_link(link)] return filtered_links def valid_audio_link(link): - valid_http = link.get("url", "").startswith("http") - valid_audio = any(link.get("url", "").endswith(ext) for ext in [".ogg", ".wav", ".mp3", ".flac", ".m4a"]) - return valid_http and valid_audio + valid_audio = any(link.get("url", "").endswith(ext) + for ext in [".ogg", ".wav", ".mp3", ".flac", ".m4a"]) + return valid_audio def extract_audio_from_links(links): """Extract image from links""" - filtered_links = [{"url": link["url"], "alt": link.get("text", "")} for link in links if valid_audio_link(link)] + filtered_links = [{"url": link["url"], "alt": link.get( + "text", "")} for link in links if valid_audio_link(link)] return filtered_links def valid_image_link(link): valid_path = link.get("path", "") == "IMG@/src" valid_alt = len(link.get("alt", "")) > 0 - valid_http = link.get("url", "").startswith("http") - return valid_path and valid_http and valid_alt + return valid_path and valid_alt def extract_image_from_links(links): """Extract image from links""" - filtered_links = [{"url": link["url"], "alt": link["alt"]} for link in links if valid_image_link(link)] + filtered_links = [{"url": link["url"], "alt": link["alt"]} + for link in links if valid_image_link(link)] + return filtered_links + + +def valid_image_only_link(link): + valid_path = link.get("path", "") == "IMG@/src" + return valid_path + + +def extract_image_only_from_links(links): + """Extract image from links even when no caption is present""" + filtered_links = [{"url": link["url"], "alt": link.get( + "alt", "")} for link in links if valid_image_only_link(link)] return filtered_links +def make_link_absolute(url, base_url): + if url.startswith("http://") or url.startswith("https://"): + return url + try: + return urljoin(base_url, url) + except ValueError: + return url + + +def make_links_absolute(links, base_url): + return [{"url": make_link_absolute(link["url"], base_url), "alt": link["alt"]} for link in links] + + def extract_documents_from_links(links, document_type): """Extract documents from links ; this function returns a list of dict {"alt": ..., "url": ...}""" if document_type == "image": return extract_image_from_links(links) + elif document_type == "image_only": + return extract_image_only_from_links(links) elif document_type == "audio": return extract_audio_from_links(links) elif document_type == "text": @@ -125,6 +142,30 @@ def extract_documents_from_links(links, document_type): raise ValueError(f"Unknown document type {document_type}") +def extract_documents_from_warc(path): + """Extract documents from WARC""" + with fsspec.open(path, mode="rb", compression="gzip") as f: + try: + for record in ArchiveIterator(f): + try: + page_url = str(record.headers["WARC-Target-URI"]) + tree = HTMLTree.parse_from_bytes(record.reader.read()) + + for ele in tree.body.get_elements_by_tag_name("iframe"): + alt = extract_plain_text(str(ele.parent)) + url = urljoin(page_url, ele.getattr("src")) + if url not in [None, "anout:blank"]: + + yield (str(hashlib.md5((alt + url).encode()).hexdigest()), url, alt, path, page_url) + + except: # pylint: disable=bare-except + continue + + except Exception as e: # pylint: disable=broad-except + logger.info(e) + logger.info("A shard failed to parse") + + def extract_documents_from_wat(stream, document_type): """Extract document from stream""" all_links = [] @@ -147,13 +188,31 @@ def extract_documents_from_wat(stream, document_type): continue links = metadata["Links"] + cc_filename = record_data["Container"]["Filename"] + page_url = envelope["WARC-Header-Metadata"]["WARC-Target-URI"] + # extract base URL to resolve relative URLs + base_url = envelope["WARC-Header-Metadata"]["WARC-Target-URI"] + if "Head" in metadata and "Base" in metadata["Head"]: + try: + base_url = urljoin(base_url, metadata["Head"]["Base"]) + except ValueError: + pass filtered_links = extract_documents_from_links(links, document_type) + filtered_links = make_links_absolute(filtered_links, base_url) + filtered_links = [ + link + for link in filtered_links + if link["url"].startswith("http://") or link["url"].startswith("https://") + ] for link in filtered_links: - link["uid"] = str(hashlib.md5((link["alt"] + link["url"]).encode()).hexdigest()) + link["uid"] = str(hashlib.md5( + (link["alt"] + link["url"]).encode()).hexdigest()) + link["cc_filename"] = cc_filename + link["page_url"] = page_url all_links.extend(filtered_links) - if len(all_links) > 100: - return all_links + # if len(all_links) > 100: + # return all_links except Exception as e: # pylint: disable=broad-except logger.info(e) logger.info("A shard failed to parse") @@ -164,44 +223,66 @@ def extract_documents_from_wat(stream, document_type): def process_wat(path, document_type): """Process a single wat file""" + + ext = path.replace(".gz", "").split(".")[-1].replace("/", "").lower() + if ext not in ["wat", "warc"]: + raise ValueError( + f"Extension can only be either 'wat' or 'warc', you provied {ext}") + begin_read = timer() - with fsspec.open(path, "rb") as f: - for i in range(10): - try: - tf = BytesIO(f.read()) - break - except Exception as ex: # pylint: disable=broad-except - if i == 9: - logger.info("failed 10 times, skipping ", path) - return - logger.info(ex) - logger.info(f"retrying reading {i}/10") - time.sleep(1) - - for e in extract_documents_from_wat(tf, document_type): - yield (e["uid"], e["url"], e["alt"]) + if ext == "warc" and document_type == "iframe": + for e in extract_documents_from_warc(path): + yield e + else: + with fsspec.open(path, mode="rb", compression="gzip") as f: + for i in range(10): + try: + tf = BytesIO(f.read()) + break + except Exception as ex: # pylint: disable=broad-except + if i == 9: + + logger.info("failed 10 times, skipping ", path) + return + logger.info(ex) + logger.info(f"retrying reading {i}/10") + time.sleep(1) + + if ext == "wat" and document_type != "iframe": + for e in extract_documents_from_wat(tf, document_type): + yield (e["uid"], e["url"], e["alt"], e["cc_filename"], e["page_url"]) + elif ext == "wat" and document_type == "iframe": + raise ValueError( + f"Document type {document_type} is not suppeorted by file type {ext}") + else: + raise ValueError( + f"Unknown document type {document_type} and file type {ext}") end_read = timer() tot_read_time = end_read - begin_read logger.info(f"Took {tot_read_time} to parse") -def get_cc_wat_links(source_cc_protocol): +def get_cc_wat_links(source_cc_protocol, ext): """Get cc wat links""" if source_cc_protocol == "s3": fs, p = fsspec.core.url_to_fs("s3://commoncrawl/crawl-data/") - links = ["s3://" + e for e in fs.glob(p + "/*/wat.paths.gz")] + links = ["s3://" + e for e in fs.glob(p + f"/*/{ext}.paths.gz")] return links elif source_cc_protocol == "http": - fs, p = fsspec.core.url_to_fs("https://commoncrawl.org/the-data/get-started/") + fs, p = fsspec.core.url_to_fs( + "https://commoncrawl.org/the-data/get-started/") a = fs.open(p).read() l = a.splitlines() l = [e.decode("utf8").replace("[WARC] ", "") for e in l] l = [e for e in l if "
  • s3://commoncrawl/crawl-data/" in e] l = [ - e.split(" ")[0].replace("
  • s3://commoncrawl/", "https://data.commoncrawl.org/").replace("", "") + e.split(" ")[0].replace("
  • s3://commoncrawl/", + "https://data.commoncrawl.org/").replace("", "") for e in l ] - l = [(e + "/wat.paths.gz").replace("//wat", "/wat") for e in l] + l = [(e + f"/{ext}.paths.gz").replace(f"//{ext}", f"/{ext}") + for e in l] + return l else: raise ValueError(f"Unknown protocol {source_cc_protocol}") @@ -213,11 +294,12 @@ def read_wat_index_file(wat_index): return wats -def read_wat_index_files(shard_count, wat_count, source_cc_protocol): +def read_wat_index_files(shard_count, wat_count, source_cc_protocol, ext): """Read all wat index files""" - cc_wat_links = get_cc_wat_links(source_cc_protocol) + cc_wat_links = get_cc_wat_links(source_cc_protocol, ext) if shard_count is not None: - cc_wat_links = cc_wat_links[-shard_count:] # pylint: disable=invalid-unary-operand-type + cc_wat_links = cc_wat_links[-shard_count: + ] # pylint: disable=invalid-unary-operand-type all_wats = [] with ThreadPool(16) as pool: for wats in pool.imap_unordered(read_wat_index_file, cc_wat_links): @@ -261,7 +343,8 @@ def extract(x): yield from process_wat(prefix + x[0], document_type) output = wat_rdd.mapPartitions(extract) - df = output.toDF(["uid", "url", "alt"]) + df = output.toDF(["uid", "url", "alt", "cc_filename", "page_url"]) + df = df.na.drop(subset=["url"]).filter(df.url != "about:blank") deduplicate_repartition_count(df, output_path, wat_count, spark, shuffle) @@ -271,7 +354,8 @@ def get_last_successful_part(output_path): output_path = output_path.replace("s3a", "s3") fs, _ = fsspec.core.url_to_fs(output_path) successful_parts = fs.glob(output_path + "/*/_SUCCESS") - last_part = sorted([int(e.split("/")[-2].split("_")[-1]) for e in successful_parts if "merged" not in e])[-1] + last_part = sorted([int(e.split("/")[-2].split("_")[-1]) + for e in successful_parts if "merged" not in e])[-1] return last_part @@ -292,8 +376,10 @@ def process_multi_part( end = (i + 1) * wat_per_part part_path = f"{output_path}/part_{i}" part_paths.append(part_path) - logger.info(f"Processing part {i} from {start} to {end} into {part_path}") - process_one_part(part_path, wat_index_files[start:end], build_spark, False, document_type, source_cc_protocol) + logger.info( + f"Processing part {i} from {start} to {end} into {part_path}") + process_one_part( + part_path, wat_index_files[start:end], build_spark, False, document_type, source_cc_protocol) spark = build_spark() logger.info("Merging parts") @@ -305,7 +391,8 @@ def process_multi_part( else: df = df.union(spark.read.parquet(part_path)) - deduplicate_repartition_count(df, output_path + "/merged", wat_count, spark, shuffle) + deduplicate_repartition_count( + df, output_path + "/merged", wat_count, spark, shuffle) def get_date_str(): @@ -325,9 +412,15 @@ def cc2dataset( spark_builder=None, document_type="image", source_cc_protocol="s3", + file_type="wat", ): """Convert common crawl to image caption set""" + file_type = file_type.lower() + + if file_type not in ["wat", "warc"]: + raise ValueError("File type can only be either 'wat' or 'warc'") + if resume is not None and multipart is None: raise ValueError("Cannot resume without multipart") @@ -341,7 +434,9 @@ def cc2dataset( logger.info(f"Writing in: {output_path}") if spark_builder is None: - spark_builder = lambda: build_spark_session(master, num_cores, mem_gb) + + def spark_builder(): + return build_spark_session(master, num_cores, mem_gb) def build_spark(): spark = SparkSession.getActiveSession() @@ -350,16 +445,18 @@ def build_spark(): return spark_builder() if resume is None: - wat_index_files = read_wat_index_files(wat_index_count, wat_count, source_cc_protocol) + wat_index_files = read_wat_index_files( + wat_index_count, wat_count, source_cc_protocol, file_type) # write wat index files to disk in output_path with fsspec - with fsspec.open(f"{output_path}/wat_index_files.txt", "w", encoding="utf8") as f: + with fsspec.open(f"{output_path}/{file_type}_index_files.txt", "w", encoding="utf8") as f: f.write("\n".join(wat_index_files)) else: - with fsspec.open(f"{output_path}/wat_index_files.txt", "r", encoding="utf8") as f: + with fsspec.open(f"{output_path}/{file_type}_index_files.txt", "r", encoding="utf8") as f: wat_index_files = f.read().splitlines() if multipart is None: - process_one_part(output_path, wat_index_files, build_spark, shuffle, document_type, source_cc_protocol) + process_one_part(output_path, wat_index_files, build_spark, + shuffle, document_type, source_cc_protocol) else: process_multi_part( output_path, wat_index_files, build_spark, multipart, shuffle, resume, document_type, source_cc_protocol diff --git a/cc2dataset/spark_session_builder.py b/cc2dataset/spark_session_builder.py index 4d1ddf8..427d05b 100644 --- a/cc2dataset/spark_session_builder.py +++ b/cc2dataset/spark_session_builder.py @@ -46,7 +46,7 @@ def aws_ec2_s3_spark_session(master, num_cores=128, mem_gb=256): "spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,org.apache.spark:spark-hadoop-cloud_2.13:3.3.1" ) # change to the appropriate auth method, see https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html - .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider") + # .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider") # ton of options to try and make s3a run faster .config("spark.hadoop.fs.s3a.threads.max", "512") .config("spark.hadoop.fs.s3a.connection.maximum", "2048") @@ -65,7 +65,6 @@ def aws_ec2_s3_spark_session(master, num_cores=128, mem_gb=256): .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.hadoop.fs.s3a.experimental.input.fadvise", "random") .config("spark.hadoop.fs.s3a.block.size", "2M") - .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") .config("spark.hadoop.fs.s3a.fast.buffer.size", "100M") .config("spark.hadoop.fs.s3a.fast.upload.buffer", "array") .config("spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled", "true") From d35fd42715a6fa245f00bcdb95b10a377aa2358e Mon Sep 17 00:00:00 2001 From: Marianna Date: Tue, 26 Sep 2023 13:16:25 +0200 Subject: [PATCH 3/5] add WARC iframe test --- tests/test_single_warc.py | 39 ++++++++++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/tests/test_single_warc.py b/tests/test_single_warc.py index 4c4ca81..d1c6cf9 100644 --- a/tests/test_single_warc.py +++ b/tests/test_single_warc.py @@ -3,6 +3,7 @@ import pandas as pd test_url = "https://data.commoncrawl.org/crawl-data/CC-MAIN-2022-40/segments/1664030331677.90/wat/CC-MAIN-20220924151538-20220924181538-00000.warc.wat.gz" +test_warc_url = "https://data.commoncrawl.org/crawl-data/CC-MAIN-2022-40/segments/1664030331677.90/warc/CC-MAIN-20220924151538-20220924181538-00000.warc.gz" def retry(f, n=3): @@ -16,29 +17,45 @@ def retry(f, n=3): return retry(f, n - 1) +cols = ["uid", "url", "alt", "cc_filename", "page_url"] + + def test_single_warc_image(): - results = retry(lambda: process_wat(test_url, "image")) - df = pd.DataFrame(results, columns=["uid", "url", "alt"]) - assert len(df) == 240033 - assert df["uid"][3] == "ee8ab8628552d88a099129cf1a452745" + results = retry(lambda: process_wat(test_url, document_type="image")) + df = pd.DataFrame(results, columns=cols) + assert len(df) == 405232 + assert df["uid"][3] == "2a64f921d7ced2fed91e82eeb56338cd" + + +def test_single_warc_image_only(): + results = retry(lambda: process_wat(test_url, "image_only")) + df = pd.DataFrame(results, columns=cols) + assert len(df) == 733994 + assert df["uid"][3] == "331e6480d20ea2f51a5557ca7e20909a" def test_single_warc_audio(): results = retry(lambda: process_wat(test_url, "audio")) - df = pd.DataFrame(results, columns=["uid", "url", "alt"]) - assert len(df) == 721 - assert df["uid"][3] == "fd38d5c43140dfda889566eddd8755c0" + df = pd.DataFrame(results, columns=cols) + assert len(df) == 927 + assert df["uid"][3] == "5c835ccd44d718e0a95d74b4a2902dfe" def test_single_warc_text(): results = retry(lambda: process_wat(test_url, "text")) - df = pd.DataFrame(results, columns=["uid", "url", "alt"]) - assert len(df) == 6375 + df = pd.DataFrame(results, columns=cols) + assert len(df) == 10552 assert df["uid"][3] == "b485d42a0fad04a4e7e2fdb114e341c8" def test_single_warc_video(): results = retry(lambda: process_wat(test_url, "video")) - df = pd.DataFrame(results, columns=["uid", "url", "alt"]) - assert len(df) == 508 + df = pd.DataFrame(results, columns=cols) + assert len(df) == 676 assert df["uid"][3] == "a8f5837e354808f319d2a4899089090c" + + +def test_single_warc_iframe(): + results = retry(lambda: process_wat(test_warc_url, "iframe")) + df = pd.DataFrame(results, columns=cols) + assert len(df) == 16407 From 8b044468936222b22e308f71174905b6966a5b29 Mon Sep 17 00:00:00 2001 From: Marianna Date: Tue, 26 Sep 2023 13:17:51 +0200 Subject: [PATCH 4/5] add WARC iframe example --- examples/single_warc_iframe_example.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 examples/single_warc_iframe_example.py diff --git a/examples/single_warc_iframe_example.py b/examples/single_warc_iframe_example.py new file mode 100644 index 0000000..122ada6 --- /dev/null +++ b/examples/single_warc_iframe_example.py @@ -0,0 +1,18 @@ +from cc2dataset import process_wat +import os +import pandas as pd + +if __name__ == "__main__": + from_s3 = False + wat = ( + "crawl-data/CC-MAIN-2023-06/segments/1674764494974.98/warc/CC-MAIN-20230127065356-20230127095356-00752.warc.gz" + ) + if from_s3: + url = "s3://commoncrawl/" + wat + else: + url = "https://data.commoncrawl.org/" + wat + + results = process_wat(url, "iframe") + df = pd.DataFrame(results, columns=["uid", "url", "alt", "cc_filename", "page_url"]) + df.to_parquet(os.getcwd() + "/output.parquet") + print(df) From 8e422b19eb92d440d814a67f52dd639e0dd657dc Mon Sep 17 00:00:00 2001 From: Marianna Date: Tue, 26 Sep 2023 13:23:45 +0200 Subject: [PATCH 5/5] update main --- cc2dataset/main.py | 60 ++++++++++++++++------------------------------ 1 file changed, 20 insertions(+), 40 deletions(-) diff --git a/cc2dataset/main.py b/cc2dataset/main.py index f166007..7d08d9d 100644 --- a/cc2dataset/main.py +++ b/cc2dataset/main.py @@ -31,8 +31,7 @@ def valid_video_link(link): def extract_video_from_links(links): - filtered_links = [{"url": link["url"], "alt": link.get( - "text", "")} for link in links if valid_video_link(link)] + filtered_links = [{"url": link["url"], "alt": link.get("text", "")} for link in links if valid_video_link(link)] return filtered_links @@ -67,21 +66,18 @@ def valid_text_link(link): def extract_text_from_links(links): - filtered_links = [{"url": link["url"], "alt": link.get( - "text", "")} for link in links if valid_text_link(link)] + filtered_links = [{"url": link["url"], "alt": link.get("text", "")} for link in links if valid_text_link(link)] return filtered_links def valid_audio_link(link): - valid_audio = any(link.get("url", "").endswith(ext) - for ext in [".ogg", ".wav", ".mp3", ".flac", ".m4a"]) + valid_audio = any(link.get("url", "").endswith(ext) for ext in [".ogg", ".wav", ".mp3", ".flac", ".m4a"]) return valid_audio def extract_audio_from_links(links): """Extract image from links""" - filtered_links = [{"url": link["url"], "alt": link.get( - "text", "")} for link in links if valid_audio_link(link)] + filtered_links = [{"url": link["url"], "alt": link.get("text", "")} for link in links if valid_audio_link(link)] return filtered_links @@ -93,8 +89,7 @@ def valid_image_link(link): def extract_image_from_links(links): """Extract image from links""" - filtered_links = [{"url": link["url"], "alt": link["alt"]} - for link in links if valid_image_link(link)] + filtered_links = [{"url": link["url"], "alt": link["alt"]} for link in links if valid_image_link(link)] return filtered_links @@ -105,8 +100,7 @@ def valid_image_only_link(link): def extract_image_only_from_links(links): """Extract image from links even when no caption is present""" - filtered_links = [{"url": link["url"], "alt": link.get( - "alt", "")} for link in links if valid_image_only_link(link)] + filtered_links = [{"url": link["url"], "alt": link.get("alt", "")} for link in links if valid_image_only_link(link)] return filtered_links @@ -206,8 +200,7 @@ def extract_documents_from_wat(stream, document_type): if link["url"].startswith("http://") or link["url"].startswith("https://") ] for link in filtered_links: - link["uid"] = str(hashlib.md5( - (link["alt"] + link["url"]).encode()).hexdigest()) + link["uid"] = str(hashlib.md5((link["alt"] + link["url"]).encode()).hexdigest()) link["cc_filename"] = cc_filename link["page_url"] = page_url all_links.extend(filtered_links) @@ -226,8 +219,7 @@ def process_wat(path, document_type): ext = path.replace(".gz", "").split(".")[-1].replace("/", "").lower() if ext not in ["wat", "warc"]: - raise ValueError( - f"Extension can only be either 'wat' or 'warc', you provied {ext}") + raise ValueError(f"Extension can only be either 'wat' or 'warc', you provied {ext}") begin_read = timer() if ext == "warc" and document_type == "iframe": @@ -252,11 +244,9 @@ def process_wat(path, document_type): for e in extract_documents_from_wat(tf, document_type): yield (e["uid"], e["url"], e["alt"], e["cc_filename"], e["page_url"]) elif ext == "wat" and document_type == "iframe": - raise ValueError( - f"Document type {document_type} is not suppeorted by file type {ext}") + raise ValueError(f"Document type {document_type} is not suppeorted by file type {ext}") else: - raise ValueError( - f"Unknown document type {document_type} and file type {ext}") + raise ValueError(f"Unknown document type {document_type} and file type {ext}") end_read = timer() tot_read_time = end_read - begin_read logger.info(f"Took {tot_read_time} to parse") @@ -269,19 +259,16 @@ def get_cc_wat_links(source_cc_protocol, ext): links = ["s3://" + e for e in fs.glob(p + f"/*/{ext}.paths.gz")] return links elif source_cc_protocol == "http": - fs, p = fsspec.core.url_to_fs( - "https://commoncrawl.org/the-data/get-started/") + fs, p = fsspec.core.url_to_fs("https://commoncrawl.org/the-data/get-started/") a = fs.open(p).read() l = a.splitlines() l = [e.decode("utf8").replace("[WARC] ", "") for e in l] l = [e for e in l if "
  • s3://commoncrawl/crawl-data/" in e] l = [ - e.split(" ")[0].replace("
  • s3://commoncrawl/", - "https://data.commoncrawl.org/").replace("", "") + e.split(" ")[0].replace("
  • s3://commoncrawl/", "https://data.commoncrawl.org/").replace("", "") for e in l ] - l = [(e + f"/{ext}.paths.gz").replace(f"//{ext}", f"/{ext}") - for e in l] + l = [(e + f"/{ext}.paths.gz").replace(f"//{ext}", f"/{ext}") for e in l] return l else: @@ -298,8 +285,7 @@ def read_wat_index_files(shard_count, wat_count, source_cc_protocol, ext): """Read all wat index files""" cc_wat_links = get_cc_wat_links(source_cc_protocol, ext) if shard_count is not None: - cc_wat_links = cc_wat_links[-shard_count: - ] # pylint: disable=invalid-unary-operand-type + cc_wat_links = cc_wat_links[-shard_count:] # pylint: disable=invalid-unary-operand-type all_wats = [] with ThreadPool(16) as pool: for wats in pool.imap_unordered(read_wat_index_file, cc_wat_links): @@ -354,8 +340,7 @@ def get_last_successful_part(output_path): output_path = output_path.replace("s3a", "s3") fs, _ = fsspec.core.url_to_fs(output_path) successful_parts = fs.glob(output_path + "/*/_SUCCESS") - last_part = sorted([int(e.split("/")[-2].split("_")[-1]) - for e in successful_parts if "merged" not in e])[-1] + last_part = sorted([int(e.split("/")[-2].split("_")[-1]) for e in successful_parts if "merged" not in e])[-1] return last_part @@ -376,10 +361,8 @@ def process_multi_part( end = (i + 1) * wat_per_part part_path = f"{output_path}/part_{i}" part_paths.append(part_path) - logger.info( - f"Processing part {i} from {start} to {end} into {part_path}") - process_one_part( - part_path, wat_index_files[start:end], build_spark, False, document_type, source_cc_protocol) + logger.info(f"Processing part {i} from {start} to {end} into {part_path}") + process_one_part(part_path, wat_index_files[start:end], build_spark, False, document_type, source_cc_protocol) spark = build_spark() logger.info("Merging parts") @@ -391,8 +374,7 @@ def process_multi_part( else: df = df.union(spark.read.parquet(part_path)) - deduplicate_repartition_count( - df, output_path + "/merged", wat_count, spark, shuffle) + deduplicate_repartition_count(df, output_path + "/merged", wat_count, spark, shuffle) def get_date_str(): @@ -445,8 +427,7 @@ def build_spark(): return spark_builder() if resume is None: - wat_index_files = read_wat_index_files( - wat_index_count, wat_count, source_cc_protocol, file_type) + wat_index_files = read_wat_index_files(wat_index_count, wat_count, source_cc_protocol, file_type) # write wat index files to disk in output_path with fsspec with fsspec.open(f"{output_path}/{file_type}_index_files.txt", "w", encoding="utf8") as f: f.write("\n".join(wat_index_files)) @@ -455,8 +436,7 @@ def build_spark(): wat_index_files = f.read().splitlines() if multipart is None: - process_one_part(output_path, wat_index_files, build_spark, - shuffle, document_type, source_cc_protocol) + process_one_part(output_path, wat_index_files, build_spark, shuffle, document_type, source_cc_protocol) else: process_multi_part( output_path, wat_index_files, build_spark, multipart, shuffle, resume, document_type, source_cc_protocol