Skip to content

Commit

Permalink
Updated notebooks for python and ray
Browse files Browse the repository at this point in the history
Signed-off-by: Maroun Touma <touma@us.ibm.com>
  • Loading branch information
touma-I committed Dec 19, 2024
1 parent ad548bf commit bc88085
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 102 deletions.
16 changes: 16 additions & 0 deletions transforms/universal/fdedup/dpk_fdedup/ray/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ def execute_service(self, service_short_name: str, params: list) -> int:
return status


# Class used by the notebooks to ingest binary files and create parquet files
class Fdedup:
def __init__(self, **kwargs):
self.params = {}
for key in kwargs:
self.params[key] = kwargs[key]

def transform(self):
sys.argv = ParamsUtils.dict_to_req(d=(self.params))
args = parse_args()
# Initialize the orchestrator
orchestrator = RayServiceOrchestrator(global_params=args)
# Launch python fuzzy dedup execution
return orchestrator.orchestrate()


if __name__ == "__main__":
# Parse command line arguments
args = parse_args()
Expand Down
18 changes: 18 additions & 0 deletions transforms/universal/fdedup/dpk_fdedup/transform_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,24 @@ def parse_args() -> argparse.Namespace:
return parser.parse_args()




# Class used by the notebooks to ingest binary files and create parquet files
class Fdedup:
def __init__(self, **kwargs):
self.params = {}
for key in kwargs:
self.params[key] = kwargs[key]

def transform(self):
sys.argv = ParamsUtils.dict_to_req(d=(self.params))
args = parse_args()
# Initialize the orchestrator
orchestrator = ServiceOrchestrator(global_params=args)
# Launch python fuzzy dedup execution
return orchestrator.orchestrate()


if __name__ == "__main__":

# Parse command line arguments
Expand Down
61 changes: 13 additions & 48 deletions transforms/universal/fdedup/fdedup_python.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
"## This is here as a reference only\n",
"# Users and application developers must use the right tag for the latest from pypi\n",
"#!pip install data-prep-toolkit\n",
"#!pip install data-prep-toolkit-transforms\n",
"#!pip install data-prep-connector"
"#!pip install data-prep-toolkit-transforms"
]
},
{
Expand All @@ -38,16 +37,11 @@
{
"cell_type": "code",
"execution_count": null,
"id": "c2a12abc-9460-4e45-8961-873b48a9ab19",
"id": "bae63d15-4ce5-4f2a-a917-0f3161e9dd73",
"metadata": {},
"outputs": [],
"source": [
"import ast\n",
"import os\n",
"import sys\n",
"\n",
"from data_processing.utils import ParamsUtils\n",
"from dpk_fdedup.transform_python import parse_args, ServiceOrchestrator"
"from dpk_fdedup.transform_python import Fdedup"
]
},
{
Expand All @@ -72,48 +66,18 @@
{
"cell_type": "code",
"execution_count": null,
"id": "e90a853e-412f-45d7-af3d-959e755aeebb",
"metadata": {},
"outputs": [],
"source": [
"# create parameters\n",
"input_folder = os.path.join(os.path.abspath(\"\"), \"test-data\", \"input\")\n",
"output_folder = os.path.join(os.path.abspath(\"\"), \"output\")\n",
"params = {\n",
" # transform configuration parameters\n",
" \"input_folder\": input_folder,\n",
" \"output_folder\": output_folder,\n",
" \"contents_column\": \"contents\",\n",
" \"document_id_column\": \"int_id_column\",\n",
" \"num_permutations\": 112,\n",
" \"num_bands\": 14,\n",
" \"num_minhashes_per_band\": 8,\n",
" \"operation_mode\": \"filter_duplicates\",\n",
"}"
]
},
{
"cell_type": "markdown",
"id": "7949f66a-d207-45ef-9ad7-ad9406f8d42a",
"metadata": {},
"source": [
"##### ***** Use ray runtime to invoke each transform in the fuzzy dedup pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0775e400-7469-49a6-8998-bd4772931459",
"id": "a54a78e9-d78b-4aeb-ac2b-806070a2dec0",
"metadata": {},
"outputs": [],
"source": [
"\n",
"sys.argv = ParamsUtils.dict_to_req(d=params)\n",
"args = parse_args()\n",
"# Initialize the orchestrator\n",
"orchestrator = ServiceOrchestrator(global_params=args)\n",
"# Launch python fuzzy dedup execution\n",
"orchestrator.orchestrate()"
"Fdedup(input_folder='test-data/input',\n",
" output_folder='output',\n",
" contents_column= \"contents\",\n",
" document_id_column= \"int_id_column\",\n",
" num_permutations= 112,\n",
" num_bands= 14,\n",
" num_minhashes_per_band= 8,\n",
" operation_mode=\"filter_duplicates\").transform()\n"
]
},
{
Expand Down Expand Up @@ -151,6 +115,7 @@
"outputs": [],
"source": [
"import polars as pl\n",
"import os\n",
"input_df_1 = pl.read_parquet(os.path.join(os.path.abspath(\"\"), \"test-data\", \"input\", \"data_1\", \"df1.parquet\"))\n",
"input_df_2 = pl.read_parquet(os.path.join(os.path.abspath(\"\"), \"test-data\", \"input\", \"data_2\", \"df2.parquet\"))\n",
"input_df = input_df_1.vstack(input_df_2)\n",
Expand Down
70 changes: 16 additions & 54 deletions transforms/universal/fdedup/fdedup_ray.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
"## This is here as a reference only\n",
"# Users and application developers must use the right tag for the latest from pypi\n",
"#!pip install data-prep-toolkit\n",
"#!pip install data-prep-toolkit-transforms\n",
"#!pip install data-prep-connector"
"#!pip install data-prep-toolkit-transforms"
]
},
{
Expand All @@ -38,17 +37,11 @@
{
"cell_type": "code",
"execution_count": null,
"id": "c2a12abc-9460-4e45-8961-873b48a9ab19",
"id": "bae63d15-4ce5-4f2a-a917-0f3161e9dd73",
"metadata": {},
"outputs": [],
"source": [
"import ast\n",
"import os\n",
"import sys\n",
"\n",
"from data_processing.utils import ParamsUtils\n",
"from dpk_fdedup.transform_python import parse_args\n",
"from dpk_fdedup.ray.transform import RayServiceOrchestrator"
"from dpk_fdedup.ray.transform import Fdedup"
]
},
{
Expand All @@ -67,57 +60,25 @@
"| num_permutations:int | 112 | number of permutations to use for minhash calculation |\n",
"| num_bands:int | 14 | number of bands to use for band hash calculation |\n",
"| num_minhashes_per_band | 8 | number of minhashes to use in each band |\n",
"| operation_mode:{filter_duplicates,filter_non_duplicates,annotate} | filter_duplicates | operation mode for data cleanup: filter out duplicates/non-duplicates, or annotate duplicate documents |\n",
"| run_locally:bool | true | if true, launch a ray cluster locally, otherwise connect to an already existing cluster | \n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e90a853e-412f-45d7-af3d-959e755aeebb",
"metadata": {},
"outputs": [],
"source": [
"# create parameters\n",
"input_folder = os.path.join(os.path.abspath(\"\"), \"ray\", \"test-data\", \"input\")\n",
"output_folder = os.path.join(os.path.abspath(\"\"), \"output\")\n",
"params = {\n",
" # transform configuration parameters\n",
" \"input_folder\": input_folder,\n",
" \"output_folder\": output_folder,\n",
" \"contents_column\": \"contents\",\n",
" \"document_id_column\": \"int_id_column\",\n",
" \"num_permutations\": 112,\n",
" \"num_bands\": 14,\n",
" \"num_minhashes_per_band\": 8,\n",
" \"operation_mode\": \"filter_duplicates\",\n",
" # ray configuration parameters\n",
" \"run_locally\": True,\n",
"}\n"
]
},
{
"cell_type": "markdown",
"id": "7949f66a-d207-45ef-9ad7-ad9406f8d42a",
"metadata": {},
"source": [
"##### ***** Use ray runtime to invoke each transform in the fuzzy dedup pipeline"
"| operation_mode:{filter_duplicates,filter_non_duplicates,annotate} | filter_duplicates | operation mode for data cleanup: filter out duplicates/non-duplicates, or annotate duplicate documents |"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0775e400-7469-49a6-8998-bd4772931459",
"id": "a54a78e9-d78b-4aeb-ac2b-806070a2dec0",
"metadata": {},
"outputs": [],
"source": [
"\n",
"sys.argv = ParamsUtils.dict_to_req(d=params)\n",
"args = parse_args()\n",
"# Initialize the orchestrator\n",
"orchestrator = RayServiceOrchestrator(global_params=args)\n",
"# Launch ray fuzzy dedup execution\n",
"orchestrator.orchestrate()"
"Fdedup(input_folder='ray/test-data/input',\n",
" output_folder='output',\n",
" contents_column= \"contents\",\n",
" document_id_column= \"int_id_column\",\n",
" num_permutations= 112,\n",
" num_bands= 14,\n",
" num_minhashes_per_band= 8,\n",
" operation_mode= \"filter_duplicates\",\n",
" run_locally= True).transform()\n"
]
},
{
Expand Down Expand Up @@ -155,6 +116,7 @@
"outputs": [],
"source": [
"import polars as pl\n",
"import os\n",
"input_df = pl.read_parquet(os.path.join(os.path.abspath(\"\"), \"ray\", \"test-data\", \"input\", \"df1.parquet\"))\n",
"with pl.Config(fmt_str_lengths=10000000, tbl_rows=-1):\n",
" print(input_df)"
Expand Down Expand Up @@ -192,7 +154,7 @@
{
"cell_type": "code",
"execution_count": null,
"id": "c11d3a4b-8ef9-417d-a8a2-f688db067a52",
"id": "787c644e-2640-4c05-bdc2-8a261305a89f",
"metadata": {},
"outputs": [],
"source": []
Expand Down

0 comments on commit bc88085

Please sign in to comment.