diff --git a/scripts/k8s-setup/populate_minio.sh b/scripts/k8s-setup/populate_minio.sh index 90865b8e5..8c5bd2268 100755 --- a/scripts/k8s-setup/populate_minio.sh +++ b/scripts/k8s-setup/populate_minio.sh @@ -38,7 +38,7 @@ mc cp --recursive ${REPOROOT}/transforms/language/doc_chunk/test-data/input/ kfp mc cp --recursive ${REPOROOT}/transforms/language/html2parquet/test-data/input/test1.html kfp/test/html2parquet/input # universal mc cp --recursive ${REPOROOT}/transforms/universal/doc_id/test-data/input/ kfp/test/doc_id/input -mc cp --recursive ${REPOROOT}/transforms/universal/ededup/ray/test-data/input/ kfp/test/ededup/input +mc cp --recursive ${REPOROOT}/transforms/universal/ededup/test-data/input/ kfp/test/ededup/input mc cp --recursive ${REPOROOT}/transforms/universal/fdedup/ray/test-data/input/ kfp/test/fdedup/input mc cp --recursive ${REPOROOT}/transforms/universal/filter/ray/test-data/input/ kfp/test/filter/input mc cp --recursive ${REPOROOT}/transforms/universal/noop/ray/test-data/input/ kfp/test/noop/input diff --git a/transforms/code/code_profiler/python/requirements.txt b/transforms/code/code_profiler/python/requirements.txt index ee5c4e7f2..663b4a234 100644 --- a/transforms/code/code_profiler/python/requirements.txt +++ b/transforms/code/code_profiler/python/requirements.txt @@ -52,7 +52,7 @@ packaging==24.0 pandas==2.2.2 parso==0.8.4 pexpect==4.9.0 -pillow==10.3.0 +pillow>=10.3.0 platformdirs==4.2.2 prompt_toolkit==3.0.45 protobuf==5.27.2 diff --git a/transforms/code/code_quality/python/requirements.txt b/transforms/code/code_quality/python/requirements.txt index 4226758bd..498f103de 100644 --- a/transforms/code/code_quality/python/requirements.txt +++ b/transforms/code/code_quality/python/requirements.txt @@ -1,3 +1,3 @@ data-prep-toolkit>=0.2.3 bs4==0.0.2 -transformers==4.38.2 +transformers>=4.38.2 diff --git a/transforms/language/lang_id/requirements.txt b/transforms/language/lang_id/requirements.txt index 6d0647329..dc7cebe90 100644 --- a/transforms/language/lang_id/requirements.txt +++ b/transforms/language/lang_id/requirements.txt @@ -1,4 +1,4 @@ fasttext==0.9.2 -langcodes==3.3.0 +langcodes>=3.3.0 huggingface-hub >= 0.21.4, <1.0.0 numpy==1.26.4 diff --git a/transforms/language/text_encoder/requirements.txt b/transforms/language/text_encoder/requirements.txt index 286f87d76..527a286c2 100644 --- a/transforms/language/text_encoder/requirements.txt +++ b/transforms/language/text_encoder/requirements.txt @@ -1 +1 @@ -sentence-transformers==3.0.1 +sentence-transformers>=3.0.1 diff --git a/transforms/pyproject.toml b/transforms/pyproject.toml index 34e24acef..4813460b7 100644 --- a/transforms/pyproject.toml +++ b/transforms/pyproject.toml @@ -62,7 +62,7 @@ language = { file = [ "universal/hap/python/requirements.txt", "universal/tokenization/python/requirements.txt", -"universal/ededup/python/requirements.txt", +"universal/ededup/requirements.txt", "universal/fdedup/python/requirements.txt", "language/doc_quality/requirements.txt", @@ -93,7 +93,6 @@ code_profiler = { file = ["code/code_profiler/python/requirements.txt"]} pii_redactor = { file = ["language/pii_redactor/python/requirements.txt"]} -ededup = { file = ["universal/ededup/python/requirements.txt"]} fdedup = { file = ["universal/fdedup/python/requirements.txt"]} profiler = { file = ["universal/profiler/python/requirements.txt"]} filter = { file = ["universal/filter/python/requirements.txt"]} @@ -110,6 +109,8 @@ text_encoder = { file = ["language/text_encoder/requirements.txt"]} doc_id = { file = ["universal/doc_id/requirements.txt"]} hap = { file = ["universal/hap/requirements.txt"]} +ededup = { file = ["universal/ededup/requirements.txt"]} + web2parquet = { file = ["universal/web2parquet/requirements.txt"]} # Does not seem to work for our custom layout @@ -128,6 +129,7 @@ dpk_pdf2parquet = "language/pdf2parquet/dpk_pdf2parquet" dpk_text_encoder = "language/text_encoder/dpk_text_encoder" dpk_doc_id = "universal/doc_id/dpk_doc_id" dpk_hap = "universal/hap/dpk_hap" +dpk_ededup = "universal/ededup/dpk_ededup" #[tool.setuptools.package-data] #"*" = ["*.txt"] diff --git a/transforms/transforms-1.0-lang.ipynb b/transforms/transforms-1.0-lang.ipynb index 38e684415..32f8f1c7c 100644 --- a/transforms/transforms-1.0-lang.ipynb +++ b/transforms/transforms-1.0-lang.ipynb @@ -10,7 +10,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": 38, "id": "4c45c3c6-e4d7-4e61-8de6-32d61f2ce695", "metadata": {}, "outputs": [], @@ -21,9 +21,17 @@ "import pandas as pd" ] }, + { + "cell_type": "markdown", + "id": "c276c60e", + "metadata": {}, + "source": [ + "configur and run web2parquet" + ] + }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 39, "id": "b6c89ac7-6824-4d99-8120-7d5b150bd683", "metadata": {}, "outputs": [], @@ -35,7 +43,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 40, "id": "c2a12abc-9460-4e45-8961-873b48a9ab19", "metadata": {}, "outputs": [], @@ -50,7 +58,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 25, "id": "c3df5adf-4717-4a03-864d-9151cd3f134b", "metadata": {}, "outputs": [], @@ -60,43 +68,20 @@ "#glob.glob(\"downloads/*\") " ] }, + { + "cell_type": "markdown", + "id": "bd71fe8a", + "metadata": {}, + "source": [ + "Configure and run Pdf2Parquet" + ] + }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "id": "7276fe84-6512-4605-ab65-747351e13a7c", "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "10:55:10 INFO - pdf2parquet parameters are : {'batch_size': -1, 'artifacts_path': None, 'contents_type': , 'do_table_structure': True, 'do_ocr': True, 'ocr_engine': , 'bitmap_area_threshold': 0.05, 'pdf_backend': , 'double_precision': 8}\n", - "10:55:10 INFO - pipeline id pipeline_id\n", - "10:55:10 INFO - code location None\n", - "10:55:10 INFO - data factory data_ is using local data access: input_folder - downloads output_folder - pdf2parquet-files\n", - "10:55:10 INFO - data factory data_ max_files -1, n_sample -1\n", - "10:55:10 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.pdf'], files to checkpoint ['.parquet']\n", - "10:55:10 INFO - orchestrator pdf2parquet started at 2024-12-14 10:55:10\n", - "10:55:10 INFO - Number of files is 1, source profile {'max_file_size': 5.308699607849121, 'min_file_size': 5.308699607849121, 'total_file_size': 5.308699607849121}\n", - "10:55:10 INFO - Initializing models\n", - "Fetching 9 files: 100%|██████████| 9/9 [00:00<00:00, 20015.24it/s]\n", - "10:56:06 INFO - Completed 1 files (100.0%) in 0.847 min\n", - "10:56:06 INFO - Done processing 1 files, waiting for flush() completion.\n", - "10:56:06 INFO - done flushing in 0.0 sec\n", - "10:56:07 INFO - Completed execution in 0.941 min, execution result 0\n" - ] - }, - { - "data": { - "text/plain": [ - "0" - ] - }, - "execution_count": 5, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "from dpk_pdf2parquet.transform_python import Pdf2Parquet\n", "Pdf2Parquet(input_folder= \"downloads\", \n", @@ -107,7 +92,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 29, "id": "fef6667e-71ed-4054-9382-55c6bb3fda70", "metadata": {}, "outputs": [], @@ -117,30 +102,20 @@ "#table.to_pandas()" ] }, + { + "cell_type": "markdown", + "id": "54cba5c4", + "metadata": {}, + "source": [ + "Configure and Run DocChunk" + ] + }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "id": "fe8bf1bc", "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "10:56:09 INFO - pipeline id pipeline_id\n", - "10:56:09 INFO - code location None\n", - "10:56:09 INFO - data factory data_ is using local data access: input_folder - pdf2parquet-files output_folder - doc-chunk-files\n", - "10:56:09 INFO - data factory data_ max_files -1, n_sample -1\n", - "10:56:09 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']\n", - "10:56:09 INFO - orchestrator doc_chunk started at 2024-12-14 10:56:09\n", - "10:56:09 INFO - Number of files is 1, source profile {'max_file_size': 0.023062705993652344, 'min_file_size': 0.023062705993652344, 'total_file_size': 0.023062705993652344}\n", - "10:56:09 INFO - Completed 1 files (100.0%) in 0.001 min\n", - "10:56:09 INFO - Done processing 1 files, waiting for flush() completion.\n", - "10:56:09 INFO - done flushing in 0.0 sec\n", - "10:56:09 INFO - Completed execution in 0.001 min, execution result 0\n" - ] - } - ], + "outputs": [], "source": [ "%%capture\n", "from dpk_doc_chunk.transform_python import DocChunk\n", @@ -151,7 +126,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 31, "id": "9d4f7bfc", "metadata": {}, "outputs": [], @@ -161,45 +136,57 @@ "#table.to_pandas()" ] }, + { + "cell_type": "markdown", + "id": "349cf6ff", + "metadata": {}, + "source": [ + "Configure and Run Exact dedup" + ] + }, { "cell_type": "code", - "execution_count": 12, + "execution_count": null, + "id": "38480cd5", + "metadata": {}, + "outputs": [], + "source": [ + "from dpk_ededup.transform_python import Ededup\n", + "Ededup(input_folder=\"doc-chunk-files\",\n", + " output_folder=\"dedup-files\",\n", + " ededup_doc_column=\"contents\",\n", + " ededup_doc_id_column=\"document_id\").transform()" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "27e36a8e", + "metadata": {}, + "outputs": [], + "source": [ + "##### **** To explote the output from eDedup, run the code below\n", + "#table = pq.read_table('dedup-files/arxiv_org_2408.09869v5.pdf_application.parquet')\n", + "#table.to_pandas()" + ] + }, + { + "cell_type": "markdown", + "id": "318bc520", + "metadata": {}, + "source": [ + "Configure and run Land Id" + ] + }, + { + "cell_type": "code", + "execution_count": null, "id": "ad27a462", "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "10:57:06 INFO - lang_id parameters are : {'model_credential': 'PUT YOUR OWN HUGGINGFACE CREDENTIAL', 'model_kind': 'fasttext', 'model_url': 'facebook/fasttext-language-identification', 'content_column_name': 'contents', 'output_lang_column_name': 'lang', 'output_score_column_name': 'score'}\n", - "10:57:06 INFO - pipeline id pipeline_id\n", - "10:57:06 INFO - code location None\n", - "10:57:06 INFO - data factory data_ is using local data access: input_folder - dedup-files output_folder - langId-files\n", - "10:57:06 INFO - data factory data_ max_files -1, n_sample -1\n", - "10:57:06 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']\n", - "10:57:06 INFO - orchestrator lang_id started at 2024-12-14 10:57:06\n", - "10:57:06 INFO - Number of files is 1, source profile {'max_file_size': 0.031200408935546875, 'min_file_size': 0.031200408935546875, 'total_file_size': 0.031200408935546875}\n", - "Warning : `load_model` does not return WordVectorModel or SupervisedModel any more, but a `FastText` object which is very similar.\n", - "10:57:08 INFO - Completed 1 files (100.0%) in 0.001 min\n", - "10:57:08 INFO - Done processing 1 files, waiting for flush() completion.\n", - "10:57:08 INFO - done flushing in 0.0 sec\n", - "10:57:08 INFO - Completed execution in 0.036 min, execution result 0\n" - ] - }, - { - "data": { - "text/plain": [ - "0" - ] - }, - "execution_count": 12, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "from dpk_lang_id.transform_python import LangId\n", - "LangId(input_folder= \"doc-chunk-files\",\n", + "LangId(input_folder= \"dedup-files\",\n", " output_folder= \"langId-files\",\n", " lang_id_model_credential= \"PUT YOUR OWN HUGGINGFACE CREDENTIAL\",\n", " lang_id_model_kind= \"fasttext\",\n", @@ -209,7 +196,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 35, "id": "c35cab2e", "metadata": {}, "outputs": [], @@ -219,34 +206,24 @@ "#table.to_pandas()" ] }, + { + "cell_type": "markdown", + "id": "a968dbb4", + "metadata": {}, + "source": [ + "Configure and run Doc Quality" + ] + }, { "cell_type": "code", - "execution_count": 14, + "execution_count": null, "id": "4e84ce78", "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "10:57:13 INFO - pipeline id pipeline_id\n", - "10:57:13 INFO - code location None\n", - "10:57:13 INFO - data factory data_ is using local data access: input_folder - dedup-files output_folder - doc-quality-files\n", - "10:57:13 INFO - data factory data_ max_files -1, n_sample -1\n", - "10:57:13 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']\n", - "10:57:13 INFO - orchestrator docq started at 2024-12-14 10:57:13\n", - "10:57:13 INFO - Number of files is 1, source profile {'max_file_size': 0.031200408935546875, 'min_file_size': 0.031200408935546875, 'total_file_size': 0.031200408935546875}\n", - "10:57:13 INFO - Completed 1 files (100.0%) in 0.003 min\n", - "10:57:13 INFO - Done processing 1 files, waiting for flush() completion.\n", - "10:57:13 INFO - done flushing in 0.0 sec\n", - "10:57:13 INFO - Completed execution in 0.003 min, execution result 0\n" - ] - } - ], + "outputs": [], "source": [ "%%capture\n", "from dpk_doc_quality.transform_python import DocQuality\n", - "DocQuality(input_folder='doc-chunk-files',\n", + "DocQuality(input_folder='dedup-files',\n", " output_folder= 'doc-quality-files',\n", " docq_text_lang = \"en\",\n", " docq_doc_content_column =\"contents\").transform()" @@ -254,7 +231,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 37, "id": "d98b854f", "metadata": {}, "outputs": [], @@ -267,7 +244,7 @@ ], "metadata": { "kernelspec": { - "display_name": "venv", + "display_name": "Python 3", "language": "python", "name": "python3" }, diff --git a/transforms/universal/ededup/python/Dockerfile b/transforms/universal/ededup/Dockerfile.python similarity index 75% rename from transforms/universal/ededup/python/Dockerfile rename to transforms/universal/ededup/Dockerfile.python index c6b0807bb..c37a0accc 100644 --- a/transforms/universal/ededup/python/Dockerfile +++ b/transforms/universal/ededup/Dockerfile.python @@ -16,20 +16,10 @@ ARG DPK_WHEEL_FILE_NAME COPY --chown=dpk:root data-processing-dist data-processing-dist RUN pip install data-processing-dist/${DPK_WHEEL_FILE_NAME} -COPY --chown=dpk:root src/ src/ -COPY --chown=dpk:root pyproject.toml pyproject.toml -COPY --chown=dpk:root README.md README.md +COPY --chown=dpk:root dpk_ededup/ dpk_ededup/ COPY --chown=dpk:root requirements.txt requirements.txt - -RUN pip install --no-cache-dir -e . - -# copy source data -COPY ./src/ededup_transform_python.py . -COPY ./src/ededup_local.py local/ - -# copy test -COPY test/ test/ -COPY test-data/ test-data/ +COPY --chown=dpk:root README.md README.md +RUN pip install --no-cache-dir -r requirements.txt # Set environment ENV PYTHONPATH /home/dpk diff --git a/transforms/universal/ededup/ray/Dockerfile b/transforms/universal/ededup/Dockerfile.ray similarity index 63% rename from transforms/universal/ededup/ray/Dockerfile rename to transforms/universal/ededup/Dockerfile.ray index c38d0072d..378cb7227 100644 --- a/transforms/universal/ededup/ray/Dockerfile +++ b/transforms/universal/ededup/Dockerfile.ray @@ -13,25 +13,12 @@ ARG DPK_WHEEL_FILE_NAME COPY --chown=ray:users data-processing-dist data-processing-dist RUN pip install data-processing-dist/${DPK_WHEEL_FILE_NAME}[ray] -## Copy the python version of the tansform -COPY --chown=ray:users python-transform/ python-transform/ -RUN cd python-transform && pip install --no-cache-dir -e . # Install ray project source -COPY --chown=ray:users src/ src/ -COPY --chown=ray:users pyproject.toml pyproject.toml +COPY --chown=ray:users dpk_ededup/ dpk_ededup/ +COPY --chown=ray:users requirements.txt requirements.txt COPY --chown=ray:users README.md README.md -RUN pip install --no-cache-dir -e . - -# copy the main() entry point to the image -COPY ./src/ededup_transform_ray.py . - -# copy some of the samples in -COPY src/ededup_local_ray.py local/ - -# copy test -COPY test/ test/ -COPY test-data/ test-data/ +RUN pip install --no-cache-dir -r requirements.txt # Grant non-root users the necessary permissions to the ray directory RUN chmod 755 /home/ray diff --git a/transforms/universal/ededup/Makefile b/transforms/universal/ededup/Makefile index bca6f7e85..41b1fa070 100644 --- a/transforms/universal/ededup/Makefile +++ b/transforms/universal/ededup/Makefile @@ -1,79 +1,21 @@ REPOROOT=../../.. # Use make help, to see the available rules -include $(REPOROOT)/.make.defaults - -setup:: - @# Help: Recursively make $@ all subdirs - $(MAKE) RULE=$@ .recurse - -clean:: - @# Help: Recursively make $@ all subdirs - $(MAKE) RULE=$@ .recurse - -build:: - @# Help: Recursively make $@ in subdirs - $(MAKE) RULE=$@ .recurse -venv:: - @# Help: Recursively make $@ in subdirs - $(MAKE) RULE=$@ .recurse - -image:: - @# Help: Recursively make $@ in all subdirs - @$(MAKE) RULE=$@ .recurse - -set-versions: - @# Help: Recursively $@ in all subdirs - @$(MAKE) RULE=$@ .recurse - -publish:: - @# Help: Recursively make $@ in all subdirs - @$(MAKE) RULE=$@ .recurse - -test-image:: - @# Help: Recursively make $@ in all subdirs - @$(MAKE) RULE=$@ .recurse - -test:: - @# Help: Recursively make $@ in all subdirs - @$(MAKE) RULE=$@ .recurse - -test-src:: - @# Help: Recursively make $@ in all subdirs - $(MAKE) RULE=$@ .recurse - -kind-load-image:: - @# Help: Recursively make $@ in all subdirs - $(MAKE) RULE=$@ .recurse - -docker-load-image:: - @# Help: Recursively make $@ in all subdirs - $(MAKE) RULE=$@ .recurse - -docker-save-image:: - @# Help: Recursively make $@ in all subdirs - $(MAKE) RULE=$@ .recurse - -.PHONY: workflow-venv -workflow-venv: - if [ -e kfp_ray ]; then \ - $(MAKE) -C kfp_ray workflow-venv; \ - fi - -.PHONY: workflow-test -workflow-test: - if [ -e kfp_ray ]; then \ - $(MAKE) -C kfp_ray workflow-test; \ - fi - -.PHONY: workflow-upload -workflow-upload: - if [ -e kfp_ray ]; then \ - $(MAKE) -C kfp_ray workflow-upload; \ - fi - -.PHONY: workflow-build -workflow-build: - if [ -e kfp_ray ]; then \ - $(MAKE) -C kfp_ray workflow-build; \ - fi - +include $(REPOROOT)/transforms/.make.cicd.targets + +# +# This is intended to be included across the Makefiles provided within +# a given transform's directory tree, so must use compatible syntax. +# +################################################################################ +# This defines the name of the transform and is used to match against +# expected files and is used to define the transform's image name. +TRANSFORM_NAME=$(shell basename `pwd`) + +################################################################################ + +run-cli-sample: + make venv + source venv/bin/activate && \ + python -m dpk_$(TRANSFORM_NAME).ray.transform \ + --run_locally True --data_local_config "{ 'input_folder' : 'test-data/input', 'output_folder' : 'output'}" \ + --ededup_num_hashes 2 diff --git a/transforms/universal/ededup/README.md b/transforms/universal/ededup/README.md index 0a2f58af6..b28b06fc1 100644 --- a/transforms/universal/ededup/README.md +++ b/transforms/universal/ededup/README.md @@ -1,11 +1,141 @@ # Exact Deduplication Transform Exact deduplication transform identifies and removes identical documents in a dataset by comparing them hash-for-hash -to ensure exact matching. Per the set of [transform project conventions](../../README.md#transform-project-conventions) -the following runtimes are available: +to ensure exact matching. Please see the set of [transform project conventions](../../README.md#transform-project-conventions) for details on +general project conventions, transform configuration, testing and IDE set up. -* [python](python/README.md) - enables running of the base python transformation in a Python runtime -* [ray](ray/README.md) - enables running of the base python transformation in a Ray runtime -* [kfp](kfp_ray/README.md) - enables running the ray docker image in a kubernetes cluster using a generated `yaml` file. +## Contributors +- Boris Lublinsky (blublinsk@ibm.com) -Please see [here](python/README.md) a more detailed description of this transform. +## Description +This Python implementation of the exact deduplication transform uses "streaming" deduplication based on a central hash. +As shown below, it relies on a distributed hash cache and data processors that read documents, generate hashes, +coordinate with the cache to remove duplicates, and store unique documents in the data plane. + +![](images/exactdedup.png) + +Mapping this model to the transform model is complicated by the need for a hash cache, which the transform model does +not recognize. The solution is to have the transform runtime create the hash cache and pass it as a parameter to the +transforms. The transform runtime handles hash cache creation and enhances statistics with details about cache size and +utilization. + +### Incremental Execution and Snapshotting + +The current implementation includes snapshotting, where the hash cache is saved to storage (local disk or S3) at the +end of execution. This enables incremental deduplication: you can run deduplication on existing files, save the hash +cache, and later load the snapshot to deduplicate only new files, avoiding reprocessing the entire dataset. + +## Input Columns Used by This Transform + +| Input Column Name | Data Type | Description | +|---------------------------------------------------------------------|-----------|----------------------------------| +| Column specified by the _contents_column_ configuration argument | str | Column that stores document text | +| Column specified by the _document_id_column_ configuration argument | int64 | Column that stores document ID | + +## Output Columns Annotated by This Transform +This transform does not perform any annotations; it only filters out the documents that are marked as duplicates. + +## Configuration + +The set of dictionary keys holding [EdedupTransform](dpk_ededup/transform_base.py) +configuration for values (common for Python and Ray) are as follows: + +* _doc_column_ - specifies name of the column containing documents +* _doc_id_column_ - specifies the name of the column containing a document id +* _use_snapshot_ - specifies that ededup execution starts with a set of pre-existing hashes, enabling incremental +execution +* _snapshot_directory_ - specifies the directory for reading snapshots. If not provided, the default is +`output_folder/snapshot` + +## Usage + +The following command line arguments (corresponding to the configuration keys described above) are available in addition +to the options provided by the [python launcher](../../../data-processing-lib/doc/python-launcher-options.md). +```text + --ededup_doc_column EDEDUP_DOC_COLUMN + name of the column containing document + --ededup_doc_id_column EDEDUP_DOC_ID_COLUMN + name of the column containing document id + --ededup_use_snapshot EDEDUP_USE_SNAPSHOT + flag to continue from snapshot + --ededup_snapshot_directory EDEDUP_SNAPSHOT_DIRECTORY + location of snapshot files +``` + +### Code example + +[notebook](ededup-python.ipynb) + +### Transforming data using the transform image + +To use the transform image to transform your data, please refer to the +[running images quickstart](../../../doc/quick-start/run-transform-image.md), +substituting the name of this transform image and runtime as appropriate. + +## Testing + +Following [the testing strategy of data-processing-lib](../../../data-processing-lib/doc/transform-testing.md) + +Currently we have: +- [Unit test](test/test_ededup_python.py) +- [Integration test](test/test_ededup.py) + +# Exact Dedup Ray Annotator + +Please see the set of [transform project conventions](../../README.md#transform-project-conventions) for details on general project conventions, +transform configuration, testing and IDE set up. + +## Additional parameters + +In addition to common ededup parameters, Ray implementation provides two additional ones + +* _hash_cpu_ - specifies amount of CPU per hash actor +* _num_hashes_ - specifies number of hash actors + +## Additional support + +We also provide an [estimate](dpk_ededup/ray/cluster_estimator.py) to roughly determine cluster size for running transformer. + +### Running the samples +To run the samples, use the following `make` target + +* `run-cli-sample` - runs dpk_ededup/ray/transform.py using command line args + +This target will activate the virtual environment and set up any configuration needed. +Use the `-n` option of `make` to see the detail of what is done to run the sample. + +For example, +```shell +make run-cli-sample +... +``` +Then +```shell +ls output +``` +To see results of the transform. + +### Code example + +[notebook](ededup-ray.ipynb) + +### Launched Command Line Options +When running the transform with the Ray launcher (i.e., RayTransformLauncher), these additional command line arguments are available +[the options provided by the launcher](../../../data-processing-lib/doc/ray-launcher-options.md). + +``` + --ededup_hash_cpu EDEDUP_HASH_CPU + number of CPUs per hash + --ededup_num_hashes EDEDUP_NUM_HASHES + number of hash actors to use + --ededup_doc_column EDEDUP_DOC_COLUMN + name of the column containing document + --ededup_doc_id_column EDEDUP_DOC_ID_COLUMN + name of the column containing document id + --ededup_use_snapshot EDEDUP_USE_SNAPSHOT + flag to continue from snapshot + --ededup_snapshot_directory EDEDUP_SNAPSHOT_DIRECTORY + location of snapshot files + ``` + +These correspond to the configuration keys described above. diff --git a/transforms/universal/ededup/dpk_ededup/__init__.py b/transforms/universal/ededup/dpk_ededup/__init__.py new file mode 100644 index 000000000..c6c228e20 --- /dev/null +++ b/transforms/universal/ededup/dpk_ededup/__init__.py @@ -0,0 +1 @@ +from .transform_base import * diff --git a/transforms/universal/ededup/python/src/ededup_local.py b/transforms/universal/ededup/dpk_ededup/local.py similarity index 91% rename from transforms/universal/ededup/python/src/ededup_local.py rename to transforms/universal/ededup/dpk_ededup/local.py index 86431e190..f56ba2a1a 100644 --- a/transforms/universal/ededup/python/src/ededup_local.py +++ b/transforms/universal/ededup/dpk_ededup/local.py @@ -13,9 +13,12 @@ import os from data_processing.data_access import DataAccessLocal -from ededup_transform_base import HashFilter -from ededup_transform_python import EdedupTransform -from ededup_transform_base import doc_column_name_key, int_column_name_key +from dpk_ededup.transform_base import ( + HashFilter, + doc_column_name_key, + int_column_name_key, +) +from dpk_ededup.transform_python import EdedupTransform # create parameters diff --git a/transforms/universal/ededup/python/src/ededup_local_python.py b/transforms/universal/ededup/dpk_ededup/local_python.py similarity index 90% rename from transforms/universal/ededup/python/src/ededup_local_python.py rename to transforms/universal/ededup/dpk_ededup/local_python.py index f768e7675..8da2bf8f8 100644 --- a/transforms/universal/ededup/python/src/ededup_local_python.py +++ b/transforms/universal/ededup/dpk_ededup/local_python.py @@ -15,8 +15,11 @@ from data_processing.runtime.pure_python import PythonTransformLauncher from data_processing.utils import ParamsUtils -from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration -from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param +from dpk_ededup.transform_base import ( + doc_column_name_cli_param, + int_column_name_cli_param, +) +from dpk_ededup.transform_python import EdedupPythonTransformRuntimeConfiguration # create launcher diff --git a/transforms/universal/ededup/python/src/ededup_local_python_incremental.py b/transforms/universal/ededup/dpk_ededup/local_python_incremental.py similarity index 92% rename from transforms/universal/ededup/python/src/ededup_local_python_incremental.py rename to transforms/universal/ededup/dpk_ededup/local_python_incremental.py index b43047a86..6195c7f2d 100644 --- a/transforms/universal/ededup/python/src/ededup_local_python_incremental.py +++ b/transforms/universal/ededup/dpk_ededup/local_python_incremental.py @@ -15,13 +15,13 @@ from data_processing.runtime.pure_python import PythonTransformLauncher from data_processing.utils import ParamsUtils -from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration -from ededup_transform_base import ( +from dpk_ededup.transform_base import ( doc_column_name_cli_param, int_column_name_cli_param, + snapshot_directory_cli_param, use_snapshot_cli_param, - snapshot_directory_cli_param ) +from dpk_ededup.transform_python import EdedupPythonTransformRuntimeConfiguration # create launcher diff --git a/transforms/universal/ededup/dpk_ededup/ray/__init__.py b/transforms/universal/ededup/dpk_ededup/ray/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/transforms/universal/ededup/ray/src/cluster_estimator.py b/transforms/universal/ededup/dpk_ededup/ray/cluster_estimator.py similarity index 100% rename from transforms/universal/ededup/ray/src/cluster_estimator.py rename to transforms/universal/ededup/dpk_ededup/ray/cluster_estimator.py diff --git a/transforms/universal/ededup/ray/src/ededup_local_ray.py b/transforms/universal/ededup/dpk_ededup/ray/local.py similarity index 89% rename from transforms/universal/ededup/ray/src/ededup_local_ray.py rename to transforms/universal/ededup/dpk_ededup/ray/local.py index 4d33ce226..b5cd73f30 100644 --- a/transforms/universal/ededup/ray/src/ededup_local_ray.py +++ b/transforms/universal/ededup/dpk_ededup/ray/local.py @@ -15,9 +15,15 @@ from data_processing.utils import ParamsUtils from data_processing_ray.runtime.ray import RayTransformLauncher -from ededup_transform_ray import EdedupRayTransformRuntimeConfiguration -from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param -from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params +from dpk_ededup.ray.transform import ( + EdedupRayTransformRuntimeConfiguration, + hash_cpu_cli_params, + num_hashes_cli_params, +) +from dpk_ededup.transform_base import ( + doc_column_name_cli_param, + int_column_name_cli_param, +) # create launcher diff --git a/transforms/universal/ededup/ray/src/ededup_local_ray_incremental.py b/transforms/universal/ededup/dpk_ededup/ray/local_incremental.py similarity index 92% rename from transforms/universal/ededup/ray/src/ededup_local_ray_incremental.py rename to transforms/universal/ededup/dpk_ededup/ray/local_incremental.py index b2e9be5fd..f2be3dbb4 100644 --- a/transforms/universal/ededup/ray/src/ededup_local_ray_incremental.py +++ b/transforms/universal/ededup/dpk_ededup/ray/local_incremental.py @@ -15,14 +15,17 @@ from data_processing.utils import ParamsUtils from data_processing_ray.runtime.ray import RayTransformLauncher -from ededup_transform_ray import EdedupRayTransformRuntimeConfiguration -from ededup_transform_base import ( +from dpk_ededup.ray.transform import ( + EdedupRayTransformRuntimeConfiguration, + hash_cpu_cli_params, + num_hashes_cli_params, +) +from dpk_ededup.transform_base import ( doc_column_name_cli_param, int_column_name_cli_param, - use_snapshot_cli_param, snapshot_directory_cli_param, + use_snapshot_cli_param, ) -from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params # create launcher diff --git a/transforms/universal/ededup/ray/src/ededup_s3_ray.py b/transforms/universal/ededup/dpk_ededup/ray/s3.py similarity index 89% rename from transforms/universal/ededup/ray/src/ededup_s3_ray.py rename to transforms/universal/ededup/dpk_ededup/ray/s3.py index 29c8f3b07..3b961f896 100644 --- a/transforms/universal/ededup/ray/src/ededup_s3_ray.py +++ b/transforms/universal/ededup/dpk_ededup/ray/s3.py @@ -14,9 +14,15 @@ from data_processing.utils import ParamsUtils from data_processing_ray.runtime.ray import RayTransformLauncher -from ededup_transform_ray import EdedupRayTransformRuntimeConfiguration -from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param -from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params +from dpk_ededup.ray.transform import ( + EdedupRayTransformRuntimeConfiguration, + hash_cpu_cli_params, + num_hashes_cli_params, +) +from dpk_ededup.transform_base import ( + doc_column_name_cli_param, + int_column_name_cli_param, +) # create launcher diff --git a/transforms/universal/ededup/ray/src/ededup_transform_ray.py b/transforms/universal/ededup/dpk_ededup/ray/transform.py similarity index 87% rename from transforms/universal/ededup/ray/src/ededup_transform_ray.py rename to transforms/universal/ededup/dpk_ededup/ray/transform.py index d90dfa780..ab12034d7 100644 --- a/transforms/universal/ededup/ray/src/ededup_transform_ray.py +++ b/transforms/universal/ededup/dpk_ededup/ray/transform.py @@ -11,12 +11,13 @@ ################################################################################ import pickle +import sys from argparse import ArgumentParser, Namespace from typing import Any import ray from data_processing.data_access import DataAccessFactoryBase, SnapshotUtils -from data_processing.utils import TransformUtils, UnrecoverableException +from data_processing.utils import ParamsUtils, TransformUtils, UnrecoverableException from data_processing_ray.runtime.ray import ( DefaultRayTransformRuntime, RayTransformLauncher, @@ -24,14 +25,14 @@ from data_processing_ray.runtime.ray.runtime_configuration import ( RayTransformRuntimeConfiguration, ) -from ededup_transform_base import ( +from dpk_ededup.transform_base import ( EdedupTransformBase, EdedupTransformConfigurationBase, HashFilter, cli_prefix, + use_snapshot_key, ) from ray.actor import ActorHandle -from ededup_transform_base import use_snapshot_key hash_cpu_key = "hash_cpu" @@ -236,6 +237,37 @@ def __init__(self): super().__init__(transform_config=EdedupRayTransformConfiguration(), runtime_class=EdedupRayRuntime) +# Class used by the notebooks to ingest binary files and create parquet files +class Ededup: + def __init__(self, **kwargs): + self.params = {} + for key in kwargs: + self.params[key] = kwargs[key] + # if input_folder and output_folder are specified, then assume it is represent data_local_config + try: + local_conf = {k: self.params[k] for k in ("input_folder", "output_folder")} + self.params["data_local_config"] = ParamsUtils.convert_to_ast(local_conf) + del self.params["input_folder"] + del self.params["output_folder"] + except: + pass + try: + worker_options = {k: self.params[k] for k in ("num_cpus", "memory")} + self.params["runtime_worker_options"] = ParamsUtils.convert_to_ast(worker_options) + del self.params["num_cpus"] + del self.params["memory"] + except: + pass + + def transform(self): + sys.argv = ParamsUtils.dict_to_req(d=(self.params)) + # create launcher + launcher = RayTransformLauncher(EdedupRayTransformRuntimeConfiguration()) + # launch + return_code = launcher.launch() + return return_code + + if __name__ == "__main__": launcher = RayTransformLauncher(EdedupRayTransformRuntimeConfiguration()) launcher.launch() diff --git a/transforms/universal/ededup/python/src/ededup_transform_base.py b/transforms/universal/ededup/dpk_ededup/transform_base.py similarity index 97% rename from transforms/universal/ededup/python/src/ededup_transform_base.py rename to transforms/universal/ededup/dpk_ededup/transform_base.py index 4437148ac..6243adeef 100644 --- a/transforms/universal/ededup/python/src/ededup_transform_base.py +++ b/transforms/universal/ededup/dpk_ededup/transform_base.py @@ -16,10 +16,7 @@ import pyarrow as pa from data_processing.data_access import SnapshotUtils -from data_processing.transform import ( - AbstractTableTransform, - TransformConfiguration, -) +from data_processing.transform import AbstractTableTransform, TransformConfiguration from data_processing.utils import ( GB, CLIArgumentProvider, @@ -42,6 +39,7 @@ use_snapshot_cli_param = f"{cli_prefix}{use_snapshot_key}" snapshot_directory_cli_param = f"{cli_prefix}{snapshot_directory_key}" + class HashFilter: """ Implements hash @@ -218,12 +216,13 @@ def add_input_params(self, parser: ArgumentParser) -> None: f"--{doc_column_name_cli_param}", type=str, default="contents", - help="name of the column containing document") + help="name of the column containing document", + ) parser.add_argument( f"--{int_column_name_cli_param}", type=str, default="document_id", - help="name of the column containing document id" + help="name of the column containing document id", ) parser.add_argument( f"--{use_snapshot_cli_param}", diff --git a/transforms/universal/ededup/python/src/ededup_transform_python.py b/transforms/universal/ededup/dpk_ededup/transform_python.py similarity index 84% rename from transforms/universal/ededup/python/src/ededup_transform_python.py rename to transforms/universal/ededup/dpk_ededup/transform_python.py index 0135d11bc..d412153ac 100644 --- a/transforms/universal/ededup/python/src/ededup_transform_python.py +++ b/transforms/universal/ededup/dpk_ededup/transform_python.py @@ -9,6 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import sys from argparse import Namespace from typing import Any @@ -19,12 +20,14 @@ PythonTransformRuntimeConfiguration, ) from data_processing.transform import TransformStatistics -from ededup_transform_base import ( +from data_processing.utils import ParamsUtils +from dpk_ededup.transform_base import ( EdedupTransformBase, EdedupTransformConfigurationBase, HashFilter, + snapshot_directory_key, + use_snapshot_key, ) -from ededup_transform_base import use_snapshot_key, snapshot_directory_key class EdedupTransform(EdedupTransformBase): @@ -61,12 +64,11 @@ class EdedupRuntime(DefaultPythonTransformRuntime): def __init__(self, params: dict[str, Any]): from data_processing.utils import get_logger + super().__init__(params=params) self.filter = None self.logger = get_logger(__name__) - - def get_transform_config( self, data_access_factory: DataAccessFactoryBase, statistics: TransformStatistics, files: list[str] ) -> dict[str, Any]: @@ -140,6 +142,30 @@ def __init__(self): ) +# Class used by the notebooks to ingest binary files and create parquet files +class Ededup: + def __init__(self, **kwargs): + self.params = {} + for key in kwargs: + self.params[key] = kwargs[key] + # if input_folder and output_folder are specified, then assume it is represent data_local_config + try: + local_conf = {k: self.params[k] for k in ("input_folder", "output_folder")} + self.params["data_local_config"] = ParamsUtils.convert_to_ast(local_conf) + del self.params["input_folder"] + del self.params["output_folder"] + except: + pass + + def transform(self): + sys.argv = ParamsUtils.dict_to_req(d=(self.params)) + # create launcher + launcher = PythonTransformLauncher(EdedupPythonTransformRuntimeConfiguration()) + # launch + return_code = launcher.launch() + return return_code + + if __name__ == "__main__": launcher = PythonTransformLauncher(EdedupPythonTransformRuntimeConfiguration()) launcher.launch() diff --git a/transforms/universal/ededup/ededup.ipynb b/transforms/universal/ededup/ededup-python.ipynb similarity index 56% rename from transforms/universal/ededup/ededup.ipynb rename to transforms/universal/ededup/ededup-python.ipynb index 9a84d4c51..07fd64dec 100644 --- a/transforms/universal/ededup/ededup.ipynb +++ b/transforms/universal/ededup/ededup-python.ipynb @@ -24,7 +24,7 @@ "## This is here as a reference only\n", "# Users and application developers must use the right tag for the latest from pypi\n", "%pip install data-prep-toolkit\n", - "%pip install data-prep-toolkit-transforms==0.2.2.dev3" + "%pip install data-prep-toolkit-transforms[ededup]" ] }, { @@ -58,13 +58,7 @@ "metadata": {}, "outputs": [], "source": [ - "import os\n", - "import sys\n", - "\n", - "from data_processing.runtime.pure_python import PythonTransformLauncher\n", - "from data_processing.utils import ParamsUtils\n", - "from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration\n", - "from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param" + "from dpk_ededup.transform_python import Ededup" ] }, { @@ -72,57 +66,50 @@ "id": "7234563c-2924-4150-8a31-4aec98c1bf33", "metadata": {}, "source": [ - "##### ***** Setup runtime parameters for this transform" + "##### ***** Setup runtime parameters for this transform and invoke the transform" ] }, { "cell_type": "code", - "execution_count": null, - "id": "e90a853e-412f-45d7-af3d-959e755aeebb", + "execution_count": 1, + "id": "1e4f64dc", "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "11:26:05 INFO - exact dedup params are {'doc_column': 'contents', 'doc_id_column': 'document_id', 'use_snapshot': False, 'snapshot_directory': None}\n", + "11:26:05 INFO - pipeline id pipeline_id\n", + "11:26:05 INFO - code location None\n", + "11:26:05 INFO - data factory data_ is using local data access: input_folder - test-data/input output_folder - output\n", + "11:26:05 INFO - data factory data_ max_files -1, n_sample -1\n", + "11:26:05 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']\n", + "11:26:05 INFO - orchestrator ededup started at 2024-12-12 11:26:05\n", + "11:26:05 INFO - Number of files is 1, source profile {'max_file_size': 0.034458160400390625, 'min_file_size': 0.034458160400390625, 'total_file_size': 0.034458160400390625}\n", + "11:26:05 INFO - Starting from the beginning\n", + "11:26:08 INFO - Completed 1 files (100.0%) in 0.058 min\n", + "11:26:08 INFO - Done processing 1 files, waiting for flush() completion.\n", + "11:26:08 INFO - done flushing in 0.0 sec\n", + "11:26:08 INFO - Completed execution in 0.058 min, execution result 0\n" + ] + }, + { + "data": { + "text/plain": [ + "0" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "\n", - "# create parameters\n", - "input_folder = os.path.join(\"python\", \"test-data\", \"input\")\n", - "output_folder = os.path.join( \"python\", \"output\")\n", - "local_conf = {\n", - " \"input_folder\": input_folder,\n", - " \"output_folder\": output_folder,\n", - "}\n", - "code_location = {\"github\": \"github\", \"commit_hash\": \"12345\", \"path\": \"path\"}\n", - "params = {\n", - " # Data access. Only required parameters are specified\n", - " \"data_local_config\": ParamsUtils.convert_to_ast(local_conf),\n", - " # orchestrator\n", - " \"runtime_pipeline_id\": \"pipeline_id\",\n", - " \"runtime_job_id\": \"job_id\",\n", - " \"runtime_code_location\": ParamsUtils.convert_to_ast(code_location),\n", - " # ededup parameters\n", - " doc_column_name_cli_param: \"contents\",\n", - " int_column_name_cli_param: \"document_id\",\n", - "}" - ] - }, - { - "cell_type": "markdown", - "id": "7949f66a-d207-45ef-9ad7-ad9406f8d42a", - "metadata": {}, - "source": [ - "##### ***** Use python runtime to invoke the transform" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0775e400-7469-49a6-8998-bd4772931459", - "metadata": {}, - "outputs": [], - "source": [ - "%%capture\n", - "sys.argv = ParamsUtils.dict_to_req(d=params)\n", - "launcher = PythonTransformLauncher(EdedupPythonTransformRuntimeConfiguration())\n", - "launcher.launch()" + "Ededup(input_folder=\"test-data/input\",\n", + " output_folder=\"output\",\n", + " ededup_doc_column=\"contents\",\n", + " ededup_doc_id_column=\"document_id\").transform()" ] }, { @@ -135,13 +122,24 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "7276fe84-6512-4605-ab65-747351e13a7c", "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "['output/snapshot', 'output/sample1.parquet', 'output/metadata.json']" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "import glob\n", - "glob.glob(\"python/output/*\")" + "glob.glob(\"output/*\")" ] }, { @@ -155,9 +153,9 @@ ], "metadata": { "kernelspec": { - "display_name": "ededup_ray", + "display_name": "Python 3 (ipykernel)", "language": "python", - "name": "ededup_ray" + "name": "python3" }, "language_info": { "codemirror_mode": { @@ -169,7 +167,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.9" + "version": "3.11.10" } }, "nbformat": 4, diff --git a/transforms/universal/ededup/ededup-ray.ipynb b/transforms/universal/ededup/ededup-ray.ipynb new file mode 100644 index 000000000..b083c8bbb --- /dev/null +++ b/transforms/universal/ededup/ededup-ray.ipynb @@ -0,0 +1,189 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "afd55886-5f5b-4794-838e-ef8179fb0394", + "metadata": {}, + "source": [ + "##### **** These pip installs need to be adapted to use the appropriate release level. Alternatively, The venv running the jupyter lab could be pre-configured with a requirement file that includes the right release. Example for transform developers working from git clone:\n", + "```\n", + "make venv \n", + "source venv/bin/activate \n", + "pip install jupyterlab\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4c45c3c6-e4d7-4e61-8de6-32d61f2ce695", + "metadata": {}, + "outputs": [], + "source": [ + "%%capture\n", + "## This is here as a reference only\n", + "# Users and application developers must use the right tag for the latest from pypi\n", + "%pip install data-prep-toolkit[ray]\n", + "%pip install data-prep-toolkit-transforms[ededup]" + ] + }, + { + "cell_type": "markdown", + "id": "407fd4e4-265d-4ec7-bbc9-b43158f5f1f3", + "metadata": { + "jp-MarkdownHeadingCollapsed": true + }, + "source": [ + "##### **** Configure the transform parameters. The set of dictionary keys holding EdedupTransform configuration for values are as follows: \n", + "* doc_column - specifies name of the column containing documents\n", + "* doc_id_column - specifies the name of the column containing a document id\n", + "* use_snapshot - specifies that ededup execution starts with a set of pre-existing hashes, enabling incremental\n", + "execution\n", + "* snapshot_directory - specifies the directory for reading snapshots. If not provided, the default is\n", + "`output_folder/snapshot`" + ] + }, + { + "cell_type": "markdown", + "id": "ebf1f782-0e61-485c-8670-81066beb734c", + "metadata": {}, + "source": [ + "##### ***** Import required classes and modules" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "c2a12abc-9460-4e45-8961-873b48a9ab19", + "metadata": {}, + "outputs": [], + "source": [ + "from dpk_ededup.ray.transform import Ededup" + ] + }, + { + "cell_type": "markdown", + "id": "7234563c-2924-4150-8a31-4aec98c1bf33", + "metadata": {}, + "source": [ + "##### ***** Setup runtime parameters for this transform and invoke the transform" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "1e4f64dc", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "11:35:26 INFO - exact dedup params are {'doc_column': 'contents', 'doc_id_column': 'document_id', 'use_snapshot': False, 'snapshot_directory': None, 'hash_cpu': 0.5, 'num_hashes': 2}\n", + "11:35:26 INFO - pipeline id pipeline_id\n", + "11:35:26 INFO - code location None\n", + "11:35:26 INFO - number of workers 1 worker options {'num_cpus': 0.8, 'max_restarts': -1}\n", + "11:35:26 INFO - actor creation delay 0\n", + "11:35:26 INFO - job details {'job category': 'preprocessing', 'job name': 'ededup', 'job type': 'ray', 'job id': 'job_id'}\n", + "11:35:26 INFO - data factory data_ is using local data access: input_folder - test-data/input output_folder - output\n", + "11:35:26 INFO - data factory data_ max_files -1, n_sample -1\n", + "11:35:26 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']\n", + "11:35:26 INFO - Running locally\n", + "WARNING: All log messages before absl::InitializeLog() is called are written to STDERR\n", + "I0000 00:00:1733999728.143368 3403365 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers\n", + "I0000 00:00:1733999732.775454 3403365 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers\n", + "I0000 00:00:1733999733.942773 3403365 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers\n", + "I0000 00:00:1733999733.978314 3403365 fork_posix.cc:75] Other threads are currently calling into gRPC, skipping fork() handlers\n", + "2024-12-12 11:35:36,072\tINFO worker.py:1777 -- Started a local Ray instance. View the dashboard at \u001b[1m\u001b[32mhttp://127.0.0.1:8265 \u001b[39m\u001b[22m\n", + "\u001b[36m(orchestrate pid=38973)\u001b[0m 11:35:38 INFO - orchestrator started at 2024-12-12 11:35:38\n", + "\u001b[36m(orchestrate pid=38973)\u001b[0m 11:35:38 INFO - Number of files is 1, source profile {'max_file_size': 0.034458160400390625, 'min_file_size': 0.034458160400390625, 'total_file_size': 0.034458160400390625}\n", + "\u001b[36m(orchestrate pid=38973)\u001b[0m 11:35:38 INFO - Cluster resources: {'cpus': 12, 'gpus': 0, 'memory': 11.283737564459443, 'object_store': 2.0}\n", + "\u001b[36m(orchestrate pid=38973)\u001b[0m 11:35:38 INFO - Number of workers - 1 with {'num_cpus': 0.8, 'max_restarts': -1} each\n", + "\u001b[36m(orchestrate pid=38973)\u001b[0m 11:35:40 INFO - Completed 0 files (0.0%) in 0.0 min. Waiting for completion\n", + "\u001b[36m(orchestrate pid=38973)\u001b[0m 11:35:40 INFO - Completed processing 1 files in 0.002 min\n", + "\u001b[36m(orchestrate pid=38973)\u001b[0m 11:35:40 INFO - done flushing in 0.002 sec\n", + "11:35:50 INFO - Completed execution in 0.396 min, execution result 0\n" + ] + }, + { + "data": { + "text/plain": [ + "0" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "Ededup(input_folder=\"test-data/input\",\n", + " output_folder=\"output\",\n", + " run_locally= True,\n", + " ededup_hash_cpu= 0.5,\n", + " ededup_num_hashes= 2,\n", + " ededup_doc_column=\"contents\",\n", + " ededup_doc_id_column=\"document_id\").transform()" + ] + }, + { + "cell_type": "markdown", + "id": "c3df5adf-4717-4a03-864d-9151cd3f134b", + "metadata": {}, + "source": [ + "##### **** The specified folder will include the transformed parquet files." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "7276fe84-6512-4605-ab65-747351e13a7c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['output/snapshot', 'output/sample1.parquet', 'output/metadata.json']" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import glob\n", + "glob.glob(\"output/*\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "845a75cf-f4a9-467d-87fa-ccbac1c9beb8", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.10" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/transforms/universal/ededup/kfp_ray/Makefile b/transforms/universal/ededup/kfp_ray/Makefile index f0c5cc217..858db1b0a 100644 --- a/transforms/universal/ededup/kfp_ray/Makefile +++ b/transforms/universal/ededup/kfp_ray/Makefile @@ -2,10 +2,15 @@ REPOROOT=${CURDIR}/../../../../ WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate include $(REPOROOT)/transforms/.make.workflows -# Include the common configuration for this transform -include ../transform.config +SRC_DIR=${CURDIR}/../ +# Use the docker image that is built for ray runtime +TRANSFORM_RUNTIME=ray +## override settings in .make.default as they assume old structure with ray being the current folder +DOCKER_IMAGE_NAME=$(TRANSFORM_NAME)-$(TRANSFORM_RUNTIME) +DOCKER_LOCAL_IMAGE=$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION) -SRC_DIR=${CURDIR}/../ray/ +# Only build the image with -f Dockerfile.ray +BUILD_SPECIFIC_RUNTIME=ray PYTHON_WF := $(shell find ./ -name '*_wf.py') YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF}) @@ -15,28 +20,8 @@ workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE} .PHONY: clean clean: @# Help: Clean up the virtual environment. - rm -rf ${REPOROOT}/transforms/venv -venv:: + rm -rf ${REPOROOT}/transforms/venv -build:: - -setup:: - -test:: - -test-src:: - -test-image:: - -publish:: - -image:: - -kind-load-image:: - -docker-load-image:: - -docker-save-image:: .PHONY: workflow-build workflow-build: workflow-venv @@ -44,10 +29,19 @@ workflow-build: workflow-venv .PHONY: workflow-test workflow-test: workflow-build - $(MAKE) .workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=ededup_wf.yaml + $(MAKE) TRANSFORM_SRC=${SRC_DIR} \ + TRANSFORM_RUNTIME=$(TRANSFORM_RUNTIME) \ + TRANSFORM_NAME=$(TRANSFORM_NAME) \ + BUILD_SPECIFIC_RUNTIME=$(BUILD_SPECIFIC_RUNTIME) \ + DOCKER_REMOTE_IMAGE=$(DOCKER_REGISTRY_ENDPOINT)/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION) \ + PIPELINE_FILE=$(TRANSFORM_NAME)_wf.yaml .workflows.test-pipeline .PHONY: workflow-upload workflow-upload: workflow-build @for file in $(YAML_WF); do \ $(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \ done + + + + diff --git a/transforms/universal/ededup/kfp_ray/ededup_wf.py b/transforms/universal/ededup/kfp_ray/ededup_wf.py index ab46daadb..29adf5c18 100644 --- a/transforms/universal/ededup/kfp_ray/ededup_wf.py +++ b/transforms/universal/ededup/kfp_ray/ededup_wf.py @@ -21,7 +21,7 @@ task_image = "quay.io/dataprep1/data-prep-kit/ededup-ray:latest" # the name of the job script -EXEC_SCRIPT_NAME: str = "ededup_transform_ray.py" +EXEC_SCRIPT_NAME: str = "-m dpk_ededup.ray.transform" # components base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest" @@ -74,7 +74,14 @@ def ededup( ray_name: str = "ededup-kfp-ray", # name of Ray cluster # Add image_pull_secret and image_pull_policy to ray workers if needed ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image}, - ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image}, + ray_worker_options: dict = { + "replicas": 2, + "max_replicas": 2, + "min_replicas": 2, + "cpu": 2, + "memory": 4, + "image": task_image, + }, server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", # data access. checkpointing is not supported by dedup data_s3_config: str = "{'input_folder': 'test/ededup/input/', 'output_folder': 'test/ededup/output'}", @@ -84,7 +91,7 @@ def ededup( # orchestrator runtime_actor_options: dict = {"num_cpus": 0.8}, runtime_pipeline_id: str = "pipeline_id", - runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, + runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"}, # ededup ededup_hash_cpu: float = 0.5, ededup_doc_column: str = "contents", @@ -136,7 +143,9 @@ def ededup( :return: None """ # create clean_up task - clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params) + clean_up_task = cleanup_ray_op( + ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params + ) ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2) # pipeline definition with dsl.ExitHandler(clean_up_task): diff --git a/transforms/universal/ededup/python/.dockerignore b/transforms/universal/ededup/python/.dockerignore deleted file mode 100644 index f7275bbbd..000000000 --- a/transforms/universal/ededup/python/.dockerignore +++ /dev/null @@ -1 +0,0 @@ -venv/ diff --git a/transforms/universal/ededup/python/Makefile b/transforms/universal/ededup/python/Makefile deleted file mode 100644 index 348edc74d..000000000 --- a/transforms/universal/ededup/python/Makefile +++ /dev/null @@ -1,64 +0,0 @@ -# Define the root of the local git clone for the common rules to be able -# know where they are running from. -REPOROOT=../../../.. - -# Set this, before including .make.defaults, to -# 1 if requirements reference the latest code in the data processing library -# in this repo (that is not yet published to pypi). This is the default setting. -# 0 if the transforms DPK dependencies are on wheels published to -# pypi (e.g. data-prep-toolkit=0.2.1) -#USE_REPO_LIB_SRC=1 - -# Include a library of common .transform.* targets which most -# transforms should be able to reuse. However, feel free -# to override/redefine the rules below. -include $(REPOROOT)/transforms/.make.transforms - -# Include the common configuration for this transform -include ../transform.config - -venv:: .transforms.python-venv - -test:: .transforms.python-test - -clean:: .transforms.clean - -image:: .transforms.python-image - -test-src:: .transforms.test-src - -setup:: .transforms.setup - -build:: build-dist image - -publish: publish-image - -publish-image:: .transforms.publish-image-python - -setup:: .transforms.setup - -# distribution versions is the same as image version. -set-versions: - $(MAKE) TRANSFORM_PYTHON_VERSION=$(EDEDUP_PYTHON_VERSION) TOML_VERSION=$(EDEDUP_PYTHON_VERSION) .transforms.set-versions - -build-dist:: .defaults.build-dist - -publish-dist:: .defaults.publish-dist - -test-image:: .transforms.python-test-image - -run-cli-sample: .transforms.run-cli-python-sample - -run-local-sample: .transforms.run-local-sample - -run-local-python-sample: .transforms.run-local-python-sample - -#run-s3-ray-sample: .transforms.run-s3-ray-sample - -minio-start: .minio-start - -kind-load-image:: .transforms.kind-load-image - -docker-load-image: .defaults.docker-load-image - -docker-save-image: .defaults.docker-save-image diff --git a/transforms/universal/ededup/python/README.md b/transforms/universal/ededup/python/README.md deleted file mode 100644 index ac15e64d3..000000000 --- a/transforms/universal/ededup/python/README.md +++ /dev/null @@ -1,104 +0,0 @@ -# Ededup Python Transform - -Please see the set of [transform project conventions](../../../README.md#transform-project-conventions) for details on -general project conventions, transform configuration, testing and IDE set up. - -## Contributors -- Boris Lublinsky (blublinsk@ibm.com) - -## Description -This Python implementation of the exact deduplication transform uses "streaming" deduplication based on a central hash. -As shown below, it relies on a distributed hash cache and data processors that read documents, generate hashes, -coordinate with the cache to remove duplicates, and store unique documents in the data plane. - -![](../images/exactdedup.png) - -Mapping this model to the transform model is complicated by the need for a hash cache, which the transform model does -not recognize. The solution is to have the transform runtime create the hash cache and pass it as a parameter to the -transforms. The transform runtime handles hash cache creation and enhances statistics with details about cache size and -utilization. - -### Incremental Execution and Snapshotting - -The current implementation includes snapshotting, where the hash cache is saved to storage (local disk or S3) at the -end of execution. This enables incremental deduplication: you can run deduplication on existing files, save the hash -cache, and later load the snapshot to deduplicate only new files, avoiding reprocessing the entire dataset. - -## Input Columns Used by This Transform - -| Input Column Name | Data Type | Description | -|---------------------------------------------------------------------|-----------|----------------------------------| -| Column specified by the _contents_column_ configuration argument | str | Column that stores document text | -| Column specified by the _document_id_column_ configuration argument | int64 | Column that stores document ID | - -## Output Columns Annotated by This Transform -This transform does not perform any annotations; it only filters out the documents that are marked as duplicates. - -## Configuration - -The set of dictionary keys holding [EdedupTransform](src/ededup_transform_python.py) -configuration for values (common for Python and Ray) are as follows: - -* _doc_column_ - specifies name of the column containing documents -* _doc_id_column_ - specifies the name of the column containing a document id -* _use_snapshot_ - specifies that ededup execution starts with a set of pre-existing hashes, enabling incremental -execution -* _snapshot_directory_ - specifies the directory for reading snapshots. If not provided, the default is -`output_folder/snapshot` - -## Usage - -The following command line arguments (corresponding to the configuration keys described above) are available in addition -to the options provided by the [python launcher](../../../../data-processing-lib/doc/python-launcher-options.md). -```text - --ededup_doc_column EDEDUP_DOC_COLUMN - name of the column containing document - --ededup_doc_id_column EDEDUP_DOC_ID_COLUMN - name of the column containing document id - --ededup_use_snapshot EDEDUP_USE_SNAPSHOT - flag to continue from snapshot - --ededup_snapshot_directory EDEDUP_SNAPSHOT_DIRECTORY - location of snapshot files -``` - -### Running the samples -To run the samples, use the following `make` targets - -* `run-cli-sample` - runs src/ededup_transform_python.py using command line args -* `run-local-sample` - runs src/ededup_local.py - -These targets will activate the virtual environment and set up any configuration needed. -Use the `-n` option of `make` to see the detail of what is done to run the sample. - -For example, -```shell -make run-cli-sample -... -``` -Then -```shell -ls output -``` -To see results of the transform. - -### Code example - -[notebook](../ededup.ipynb) - -### Transforming data using the transform image - -To use the transform image to transform your data, please refer to the -[running images quickstart](../../../../doc/quick-start/run-transform-image.md), -substituting the name of this transform image and runtime as appropriate. - -## Testing - -Following [the testing strategy of data-processing-lib](../../../../data-processing-lib/doc/transform-testing.md) - -Currently we have: -- [Unit test](test/test_ededup_python.py) -- [Integration test](test/test_ededup.py) - -To use the transform image to transform your data, please refer to the -[running images quickstart](../../../../doc/quick-start/run-transform-image.md), -substituting the name of this transform image and runtime as appropriate. diff --git a/transforms/universal/ededup/python/pyproject.toml b/transforms/universal/ededup/python/pyproject.toml deleted file mode 100644 index cb3784b49..000000000 --- a/transforms/universal/ededup/python/pyproject.toml +++ /dev/null @@ -1,47 +0,0 @@ -[project] -name = "dpk_ededup_transform_python" -version = "0.2.4.dev0" -requires-python = ">=3.10,<3.13" -description = "ededup Python Transform" -license = {text = "Apache-2.0"} -readme = {file = "README.md", content-type = "text/markdown"} -authors = [ - { name = "David Wood", email = "dawood@us.ibm.com" }, - { name = "Boris Lublinsky", email = "blublinsky@ibm.com" }, -] - -dynamic = ["dependencies"] - -[build-system] -requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"] -build-backend = "setuptools.build_meta" - -[tool.setuptools.dynamic] -dependencies = {file = ["requirements.txt"]} - -[project.optional-dependencies] -dev = [ - "twine", - "pytest>=7.3.2", - "pytest-dotenv>=0.5.2", - "pytest-env>=1.0.0", - "pre-commit>=3.3.2", - "pytest-cov>=4.1.0", - "pytest-mock>=3.10.0", - "moto==5.0.5", - "markupsafe==2.0.1", -] - -[options] -package_dir = ["src","test"] - -[options.packages.find] -where = ["src/"] - -[tool.pytest.ini_options] -# Currently we use low coverage since we have to run tests separately (see makefile) -#addopts = "--cov --cov-report term-missing --cov-fail-under 25" -markers = ["unit: unit tests", "integration: integration tests"] - -[tool.coverage.run] -include = ["src/*"] diff --git a/transforms/universal/ededup/ray/.dockerignore b/transforms/universal/ededup/ray/.dockerignore deleted file mode 100644 index f7275bbbd..000000000 --- a/transforms/universal/ededup/ray/.dockerignore +++ /dev/null @@ -1 +0,0 @@ -venv/ diff --git a/transforms/universal/ededup/ray/.gitignore b/transforms/universal/ededup/ray/.gitignore deleted file mode 100644 index 3ea7fd4ab..000000000 --- a/transforms/universal/ededup/ray/.gitignore +++ /dev/null @@ -1,38 +0,0 @@ -test-data/output -output/* -/output/ -data-processing-lib/ - - -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - - -# Distribution / packaging -bin/ -build/ -develop-eggs/ -dist/ -eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -*.egg-info/ -.installed.cfg -*.egg - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -.tox/ -htmlcov -.coverage -.cache -nosetests.xml -coverage.xml \ No newline at end of file diff --git a/transforms/universal/ededup/ray/Makefile b/transforms/universal/ededup/ray/Makefile deleted file mode 100644 index 1ff055e29..000000000 --- a/transforms/universal/ededup/ray/Makefile +++ /dev/null @@ -1,67 +0,0 @@ -# Define the root of the local git clone for the common rules to be able -# know where they are running from. -REPOROOT=../../../.. - -# Set this, before including .make.defaults, to -# 1 if requirements reference the latest code in the data processing library -# in this repo (that is not yet published to pypi). This is the default setting. -# 0 if the transforms DPK dependencies are on wheels published to -# pypi (e.g. data-prep-toolkit=0.2.1) -#USE_REPO_LIB_SRC=1 - -# Include a library of common .transform.* targets which most -# transforms should be able to reuse. However, feel free -# to override/redefine the rules below. -include $(REPOROOT)/transforms/.make.transforms - -# Include the common configuration for this transform -include ../transform.config - -BASE_IMAGE=${RAY_BASE_IMAGE} -venv:: .transforms.ray-venv - -test:: .transforms.ray-test - -clean:: .transforms.clean - -image:: .transforms.ray-image - -test-src:: .transforms.test-src - -setup:: .transforms.setup - -test-image:: .transforms.ray-test-image - -build:: build-dist image - -publish: publish-image - -publish-image:: .transforms.publish-image-ray - -setup:: .transforms.setup - -# TRANSFORM_PYTHON_VERSION has no effect since requirements do not specify a python transform implementation -set-versions: - $(MAKE) TRANSFORM_PYTHON_VERSION=$(EDEDUP_PYTHON_VERSION) TOML_VERSION=$(EDEDUP_RAY_VERSION) .transforms.set-versions - -build-dist:: .defaults.build-dist - -publish-dist:: .defaults.publish-dist - -run-cli-sample: - $(MAKE) RUN_FILE=$(TRANSFORM_NAME)_transform_ray.py \ - RUN_ARGS="--run_locally True --data_local_config \"{ 'input_folder' : '../test-data/input', 'output_folder' : '../output'}\" \ - --ededup_num_hashes 2" \ - .transforms.run-src-file - -run-local-sample: .transforms.run-local-ray-sample - -run-s3-sample: .transforms.run-s3-ray-sample - -minio-start: .minio-start - -kind-load-image:: .transforms.kind-load-image - -docker-load-image: .defaults.docker-load-image - -docker-save-image: .defaults.docker-save-image diff --git a/transforms/universal/ededup/ray/README.md b/transforms/universal/ededup/ray/README.md deleted file mode 100644 index 88fe034d5..000000000 --- a/transforms/universal/ededup/ray/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# Exact Dedup - -Please see the set of [transform project conventions](../../../README.md) for details on general project conventions, -transform configuration, testing and IDE set up. - -## Additional parameters - -In addition to [common](../python/README.md) ededup parameters Ray implementation provides two additional ones - -* _hash_cpu_ - specifies amount of CPU per hash actor -* _num_hashes_ - specifies number of hash actors - -## ådditional support - -We also provide an [estimate](src/cluster_estimator.py) to roughly determine cluster size for running transformer. - -## Running - -### Launched Command Line Options -When running the transform with the Ray launcher (i.e. TransformLauncher), the following command line arguments are -available in addition to -[the options provided by the launcher](../../../../data-processing-lib/doc/ray-launcher-options.md). - -``` - --ededup_hash_cpu EDEDUP_HASH_CPU - number of CPUs per hash - --ededup_num_hashes EDEDUP_NUM_HASHES - number of hash actors to use - --ededup_doc_column EDEDUP_DOC_COLUMN - name of the column containing document - --ededup_doc_id_column EDEDUP_DOC_ID_COLUMN - name of the column containing document id - --ededup_use_snapshot EDEDUP_USE_SNAPSHOT - flag to continue from snapshot - --ededup_snapshot_directory EDEDUP_SNAPSHOT_DIRECTORY - location of snapshot files - ``` - -These correspond to the configuration keys described above. diff --git a/transforms/universal/ededup/ray/pyproject.toml b/transforms/universal/ededup/ray/pyproject.toml deleted file mode 100644 index e448b6e2e..000000000 --- a/transforms/universal/ededup/ray/pyproject.toml +++ /dev/null @@ -1,47 +0,0 @@ -[project] -name = "dpk_ededup_transform_ray" -version = "0.2.4.dev0" -requires-python = ">=3.10,<3.13" -description = "ededup Ray Transform" -license = {text = "Apache-2.0"} -readme = {file = "README.md", content-type = "text/markdown"} -authors = [ - { name = "David Wood", email = "dawood@us.ibm.com" }, - { name = "Boris Lublinsky", email = "blublinsky@ibm.com" }, -] -dependencies = [ - "data-prep-toolkit[ray]>=0.2.4.dev0", - "dpk_ededup_transform_python==0.2.4.dev0", - "tqdm==4.66.3", -] - -[build-system] -requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"] -build-backend = "setuptools.build_meta" - -[project.optional-dependencies] -dev = [ - "twine", - "pytest>=7.3.2", - "pytest-dotenv>=0.5.2", - "pytest-env>=1.0.0", - "pre-commit>=3.3.2", - "pytest-cov>=4.1.0", - "pytest-mock>=3.10.0", - "moto==5.0.5", - "markupsafe==2.0.1", -] - -[options] -package_dir = ["src","test"] - -[options.packages.find] -where = ["src/"] - -[tool.pytest.ini_options] -# Currently we use low coverage since we have to run tests separately (see makefile) -#addopts = "--cov --cov-report term-missing --cov-fail-under 25" -markers = ["unit: unit tests", "integration: integration tests"] - -[tool.coverage.run] -include = ["src/*"] diff --git a/transforms/universal/ededup/ray/test-data/incremental/metadata.json b/transforms/universal/ededup/ray/test-data/incremental/metadata.json deleted file mode 100644 index d268aa359..000000000 --- a/transforms/universal/ededup/ray/test-data/incremental/metadata.json +++ /dev/null @@ -1,66 +0,0 @@ -{ - "pipeline": "pipeline_id", - "job details": { - "job category": "preprocessing", - "job name": "ededup", - "job type": "ray", - "job id": "job_id", - "start_time": "2024-08-20 21:54:19", - "end_time": "2024-08-20 21:54:20", - "status": "success" - }, - "code": { - "github": "github", - "commit_hash": "12345", - "path": "path" - }, - "job_input_params": { - "doc_column": "contents", - "doc_id_column": "document_id", - "use_snapshot": true, - "snapshot_directory": "/Users/borisl/Projects/data-prep-kit/transforms/universal/ededup/ray/test-data/input/snapshot", - "hash_cpu": 0.5, - "num_hashes": 2, - "checkpointing": false, - "max_files": -1, - "random_samples": -1, - "files_to_use": [ - ".parquet" - ], - "number of workers": 2, - "worker options": { - "num_cpus": 0.5, - "max_restarts": -1 - }, - "actor creation delay": 0 - }, - "execution_stats": { - "cpus": 12, - "gpus": 0, - "memory": 14.090177917852998, - "object_store": 2.0, - "execution time, min": 0.013 - }, - "job_output_stats": { - "number of hashes": 3, - "hash memory, GB": 7.180497050285339e-07, - "de duplication %": 100.0, - "source_files": 1, - "source_size": 36132, - "result_files": 1, - "result_size": 19280, - "processing_time": 0.026, - "source_documents": 5, - "result_documents": 0, - "source_doc_count": 5, - "result_doc_count": 0 - }, - "source": { - "name": "/Users/borisl/Projects/data-prep-kit/transforms/universal/ededup/ray/test-data/input", - "type": "path" - }, - "target": { - "name": "/Users/borisl/Projects/data-prep-kit/transforms/universal/ededup/ray/output", - "type": "path" - } -} \ No newline at end of file diff --git a/transforms/universal/ededup/python/requirements.txt b/transforms/universal/ededup/requirements.txt similarity index 50% rename from transforms/universal/ededup/python/requirements.txt rename to transforms/universal/ededup/requirements.txt index 99fe74aad..a8d02bd7e 100644 --- a/transforms/universal/ededup/python/requirements.txt +++ b/transforms/universal/ededup/requirements.txt @@ -1,3 +1,2 @@ -data-prep-toolkit>=0.2.3 mmh3>=4.1.0 xxhash==3.4.1 diff --git a/transforms/universal/ededup/ray/test-data/expected/metadata.json b/transforms/universal/ededup/test-data-ray/expected/metadata.json similarity index 100% rename from transforms/universal/ededup/ray/test-data/expected/metadata.json rename to transforms/universal/ededup/test-data-ray/expected/metadata.json diff --git a/transforms/universal/ededup/python/test-data/expected/sample1.parquet b/transforms/universal/ededup/test-data-ray/expected/sample1.parquet similarity index 100% rename from transforms/universal/ededup/python/test-data/expected/sample1.parquet rename to transforms/universal/ededup/test-data-ray/expected/sample1.parquet diff --git a/transforms/universal/ededup/ray/test-data/expected/snapshot/hash_collector_0 b/transforms/universal/ededup/test-data-ray/expected/snapshot/hash_collector_0 similarity index 100% rename from transforms/universal/ededup/ray/test-data/expected/snapshot/hash_collector_0 rename to transforms/universal/ededup/test-data-ray/expected/snapshot/hash_collector_0 diff --git a/transforms/universal/ededup/ray/test-data/expected/snapshot/hash_collector_1 b/transforms/universal/ededup/test-data-ray/expected/snapshot/hash_collector_1 similarity index 100% rename from transforms/universal/ededup/ray/test-data/expected/snapshot/hash_collector_1 rename to transforms/universal/ededup/test-data-ray/expected/snapshot/hash_collector_1 diff --git a/transforms/universal/ededup/test-data-ray/incremental/metadata.json b/transforms/universal/ededup/test-data-ray/incremental/metadata.json new file mode 100644 index 000000000..abe4dad6c --- /dev/null +++ b/transforms/universal/ededup/test-data-ray/incremental/metadata.json @@ -0,0 +1,64 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "ededup", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-08-20 21:54:19", + "end_time": "2024-08-20 21:54:20", + "status": "success" + }, + "code": { + "github": "github", + "commit_hash": "12345", + "path": "path" + }, + "job_input_params": { + "doc_column": "contents", + "doc_id_column": "document_id", + "use_snapshot": true, + "snapshot_directory": "/Users/borisl/Projects/data-prep-kit/transforms/universal/ededup/ray/test-data/input/snapshot", + "hash_cpu": 0.5, + "num_hashes": 2, + "checkpointing": false, + "max_files": -1, + "random_samples": -1, + "files_to_use": [".parquet"], + "number of workers": 2, + "worker options": { + "num_cpus": 0.5, + "max_restarts": -1 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 12, + "gpus": 0, + "memory": 14.090177917852998, + "object_store": 2.0, + "execution time, min": 0.013 + }, + "job_output_stats": { + "number of hashes": 3, + "hash memory, GB": 7.180497050285339e-7, + "de duplication %": 100.0, + "source_files": 1, + "source_size": 36132, + "result_files": 1, + "result_size": 19280, + "processing_time": 0.026, + "source_documents": 5, + "result_documents": 0, + "source_doc_count": 5, + "result_doc_count": 0 + }, + "source": { + "name": "/Users/borisl/Projects/data-prep-kit/transforms/universal/ededup/ray/test-data/input", + "type": "path" + }, + "target": { + "name": "/Users/borisl/Projects/data-prep-kit/transforms/universal/ededup/ray/output", + "type": "path" + } +} diff --git a/transforms/universal/ededup/python/test-data/incremental/sample1.parquet b/transforms/universal/ededup/test-data-ray/incremental/sample1.parquet similarity index 100% rename from transforms/universal/ededup/python/test-data/incremental/sample1.parquet rename to transforms/universal/ededup/test-data-ray/incremental/sample1.parquet diff --git a/transforms/universal/ededup/ray/test-data/incremental/snapshot/hash_collector_0 b/transforms/universal/ededup/test-data-ray/incremental/snapshot/hash_collector_0 similarity index 100% rename from transforms/universal/ededup/ray/test-data/incremental/snapshot/hash_collector_0 rename to transforms/universal/ededup/test-data-ray/incremental/snapshot/hash_collector_0 diff --git a/transforms/universal/ededup/ray/test-data/incremental/snapshot/hash_collector_1 b/transforms/universal/ededup/test-data-ray/incremental/snapshot/hash_collector_1 similarity index 100% rename from transforms/universal/ededup/ray/test-data/incremental/snapshot/hash_collector_1 rename to transforms/universal/ededup/test-data-ray/incremental/snapshot/hash_collector_1 diff --git a/transforms/universal/ededup/python/test-data/input/sample1.parquet b/transforms/universal/ededup/test-data-ray/input/sample1.parquet similarity index 100% rename from transforms/universal/ededup/python/test-data/input/sample1.parquet rename to transforms/universal/ededup/test-data-ray/input/sample1.parquet diff --git a/transforms/universal/ededup/ray/test-data/input/snapshot/hash_collector_0 b/transforms/universal/ededup/test-data-ray/input/snapshot/hash_collector_0 similarity index 100% rename from transforms/universal/ededup/ray/test-data/input/snapshot/hash_collector_0 rename to transforms/universal/ededup/test-data-ray/input/snapshot/hash_collector_0 diff --git a/transforms/universal/ededup/ray/test-data/input/snapshot/hash_collector_1 b/transforms/universal/ededup/test-data-ray/input/snapshot/hash_collector_1 similarity index 100% rename from transforms/universal/ededup/ray/test-data/input/snapshot/hash_collector_1 rename to transforms/universal/ededup/test-data-ray/input/snapshot/hash_collector_1 diff --git a/transforms/universal/ededup/python/test-data/expected/metadata.json b/transforms/universal/ededup/test-data/expected/metadata.json similarity index 100% rename from transforms/universal/ededup/python/test-data/expected/metadata.json rename to transforms/universal/ededup/test-data/expected/metadata.json diff --git a/transforms/universal/ededup/ray/test-data/expected/sample1.parquet b/transforms/universal/ededup/test-data/expected/sample1.parquet similarity index 100% rename from transforms/universal/ededup/ray/test-data/expected/sample1.parquet rename to transforms/universal/ededup/test-data/expected/sample1.parquet diff --git a/transforms/universal/ededup/python/test-data/expected/snapshot/hash_collector_1 b/transforms/universal/ededup/test-data/expected/snapshot/hash_collector_1 similarity index 100% rename from transforms/universal/ededup/python/test-data/expected/snapshot/hash_collector_1 rename to transforms/universal/ededup/test-data/expected/snapshot/hash_collector_1 diff --git a/transforms/universal/ededup/python/test-data/incremental/metadata.json b/transforms/universal/ededup/test-data/incremental/metadata.json similarity index 100% rename from transforms/universal/ededup/python/test-data/incremental/metadata.json rename to transforms/universal/ededup/test-data/incremental/metadata.json diff --git a/transforms/universal/ededup/ray/test-data/incremental/sample1.parquet b/transforms/universal/ededup/test-data/incremental/sample1.parquet similarity index 100% rename from transforms/universal/ededup/ray/test-data/incremental/sample1.parquet rename to transforms/universal/ededup/test-data/incremental/sample1.parquet diff --git a/transforms/universal/ededup/python/test-data/incremental/snapshot/hash_collector_1 b/transforms/universal/ededup/test-data/incremental/snapshot/hash_collector_1 similarity index 100% rename from transforms/universal/ededup/python/test-data/incremental/snapshot/hash_collector_1 rename to transforms/universal/ededup/test-data/incremental/snapshot/hash_collector_1 diff --git a/transforms/universal/ededup/ray/test-data/input/sample1.parquet b/transforms/universal/ededup/test-data/input/sample1.parquet similarity index 100% rename from transforms/universal/ededup/ray/test-data/input/sample1.parquet rename to transforms/universal/ededup/test-data/input/sample1.parquet diff --git a/transforms/universal/ededup/python/test-data/input/snapshot/hash_collector_1 b/transforms/universal/ededup/test-data/input/snapshot/hash_collector_1 similarity index 100% rename from transforms/universal/ededup/python/test-data/input/snapshot/hash_collector_1 rename to transforms/universal/ededup/test-data/input/snapshot/hash_collector_1 diff --git a/transforms/universal/ededup/python/test/test_ededup.py b/transforms/universal/ededup/test/test_ededup.py similarity index 91% rename from transforms/universal/ededup/python/test/test_ededup.py rename to transforms/universal/ededup/test/test_ededup.py index a8ab9f71e..6c9d85f23 100644 --- a/transforms/universal/ededup/python/test/test_ededup.py +++ b/transforms/universal/ededup/test/test_ededup.py @@ -15,9 +15,12 @@ from data_processing.test_support import get_tables_in_folder from data_processing.test_support.transform import AbstractTableTransformTest -from ededup_transform_base import HashFilter -from ededup_transform_python import EdedupTransform -from ededup_transform_base import doc_column_name_key, int_column_name_key +from dpk_ededup.transform_base import ( + HashFilter, + doc_column_name_key, + int_column_name_key, +) +from dpk_ededup.transform_python import EdedupTransform class TestEdedupTransform(AbstractTableTransformTest): diff --git a/transforms/universal/ededup/python/test/test_ededup_python.py b/transforms/universal/ededup/test/test_ededup_python.py similarity index 89% rename from transforms/universal/ededup/python/test/test_ededup_python.py rename to transforms/universal/ededup/test/test_ededup_python.py index d43a98507..d98a61ff3 100644 --- a/transforms/universal/ededup/python/test/test_ededup_python.py +++ b/transforms/universal/ededup/test/test_ededup_python.py @@ -16,8 +16,11 @@ from data_processing.test_support.launch.transform_test import ( AbstractTransformLauncherTest, ) -from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration -from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param +from dpk_ededup.transform_base import ( + doc_column_name_cli_param, + int_column_name_cli_param, +) +from dpk_ededup.transform_python import EdedupPythonTransformRuntimeConfiguration class TestPythonEdedupTransform(AbstractTransformLauncherTest): diff --git a/transforms/universal/ededup/python/test/test_ededup_python_incremental.py b/transforms/universal/ededup/test/test_ededup_python_incremental.py similarity index 92% rename from transforms/universal/ededup/python/test/test_ededup_python_incremental.py rename to transforms/universal/ededup/test/test_ededup_python_incremental.py index 7d70ad5b3..2b324ba7b 100644 --- a/transforms/universal/ededup/python/test/test_ededup_python_incremental.py +++ b/transforms/universal/ededup/test/test_ededup_python_incremental.py @@ -16,13 +16,13 @@ from data_processing.test_support.launch.transform_test import ( AbstractTransformLauncherTest, ) -from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration -from ededup_transform_base import ( +from dpk_ededup.transform_base import ( doc_column_name_cli_param, int_column_name_cli_param, + snapshot_directory_cli_param, use_snapshot_cli_param, - snapshot_directory_cli_param ) +from dpk_ededup.transform_python import EdedupPythonTransformRuntimeConfiguration class TestPythonEdedupTransform(AbstractTransformLauncherTest): diff --git a/transforms/universal/ededup/ray/test/test_ededup_ray.py b/transforms/universal/ededup/test/test_ededup_ray.py similarity index 87% rename from transforms/universal/ededup/ray/test/test_ededup_ray.py rename to transforms/universal/ededup/test/test_ededup_ray.py index 5b8c05edc..729220c0a 100644 --- a/transforms/universal/ededup/ray/test/test_ededup_ray.py +++ b/transforms/universal/ededup/test/test_ededup_ray.py @@ -16,9 +16,15 @@ AbstractTransformLauncherTest, ) from data_processing_ray.runtime.ray import RayTransformLauncher -from ededup_transform_ray import EdedupRayTransformRuntimeConfiguration -from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param -from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params +from dpk_ededup.ray.transform import ( + EdedupRayTransformRuntimeConfiguration, + hash_cpu_cli_params, + num_hashes_cli_params, +) +from dpk_ededup.transform_base import ( + doc_column_name_cli_param, + int_column_name_cli_param, +) class TestRayEdedupTransform(AbstractTransformLauncherTest): @@ -28,7 +34,7 @@ class TestRayEdedupTransform(AbstractTransformLauncherTest): """ def get_test_transform_fixtures(self) -> list[tuple]: - basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data")) + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data-ray")) config = { "run_locally": True, # When running in ray, our Runtime's get_transform_config() method will load the domains using diff --git a/transforms/universal/ededup/ray/test/test_ededup_ray_incremental.py b/transforms/universal/ededup/test/test_ededup_ray_incremental.py similarity index 90% rename from transforms/universal/ededup/ray/test/test_ededup_ray_incremental.py rename to transforms/universal/ededup/test/test_ededup_ray_incremental.py index 94e3c4122..751de43da 100644 --- a/transforms/universal/ededup/ray/test/test_ededup_ray_incremental.py +++ b/transforms/universal/ededup/test/test_ededup_ray_incremental.py @@ -16,14 +16,17 @@ AbstractTransformLauncherTest, ) from data_processing_ray.runtime.ray import RayTransformLauncher -from ededup_transform_ray import EdedupRayTransformRuntimeConfiguration -from ededup_transform_base import ( +from dpk_ededup.ray.transform import ( + EdedupRayTransformRuntimeConfiguration, + hash_cpu_cli_params, + num_hashes_cli_params, +) +from dpk_ededup.transform_base import ( doc_column_name_cli_param, int_column_name_cli_param, - use_snapshot_cli_param, snapshot_directory_cli_param, + use_snapshot_cli_param, ) -from ededup_transform_ray import hash_cpu_cli_params, num_hashes_cli_params class TestRayEdedupTransform(AbstractTransformLauncherTest): @@ -33,7 +36,7 @@ class TestRayEdedupTransform(AbstractTransformLauncherTest): """ def get_test_transform_fixtures(self) -> list[tuple]: - basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data")) + basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data-ray")) config = { "run_locally": True, # When running in ray, our Runtime's get_transform_config() method will load the domains using diff --git a/transforms/universal/ededup/transform.config b/transforms/universal/ededup/transform.config deleted file mode 100644 index 12f5357f1..000000000 --- a/transforms/universal/ededup/transform.config +++ /dev/null @@ -1,20 +0,0 @@ -# -# This is intended to be included across the Makefiles provided within -# a given transform's directory tree, so must use compatible syntax. -# -################################################################################ -# This defines the name of the transform and is used to match against -# expected files and is used to define the transform's image name. -TRANSFORM_NAME=ededup - -################################################################################ -# This defines the transforms' version number as would be used -# when publishing the wheel. In general, only the micro version -# number should be advanced relative to the DPK_VERSION. -# -# If you change the versions numbers, be sure to run "make set-versions" to -# update version numbers across the transform (e.g., pyproject.toml). -EDEDUP_PYTHON_VERSION=$(DPK_VERSION) -EDEDUP_RAY_VERSION=$(EDEDUP_PYTHON_VERSION) -EDEDUP_SPARK_VERSION=$(EDEDUP_PYTHON_VERSION) - diff --git a/transforms/universal/fdedup/python/requirements.txt b/transforms/universal/fdedup/python/requirements.txt index 3d91e5ba4..2c6bb5f36 100644 --- a/transforms/universal/fdedup/python/requirements.txt +++ b/transforms/universal/fdedup/python/requirements.txt @@ -4,7 +4,7 @@ boto3>=1.34.69 kubernetes>=30.1.0 polars==1.9.0 disjoint-set>=0.8.0 -scipy>=1.14.1, <2.0.0 +scipy>=1.12.1, <2.0.0 numpy<1.29.0 sentencepiece>=0.2.0 mmh3>=4.1.0 diff --git a/transforms/universal/hap/requirements.txt b/transforms/universal/hap/requirements.txt index 079767b7a..105cc6ab1 100644 --- a/transforms/universal/hap/requirements.txt +++ b/transforms/universal/hap/requirements.txt @@ -1,4 +1,4 @@ nltk==3.9.1 -transformers==4.38.2 -torch>=2.2.2,<=2.4.1 +transformers>=4.38.2 +torch>=2.2.2,<=2.5.1 pandas==2.2.2 diff --git a/transforms/universal/tokenization/python/requirements.txt b/transforms/universal/tokenization/python/requirements.txt index 56e81f87c..2072e8844 100644 --- a/transforms/universal/tokenization/python/requirements.txt +++ b/transforms/universal/tokenization/python/requirements.txt @@ -1,2 +1,2 @@ data-prep-toolkit>=0.2.3 -transformers==4.38.2 +transformers>=4.38.2