Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seprate cloud part #1926

Merged
merged 7 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions docs/cloud/azureai/generate-test-data-cloud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# How to generate test data based on documents
16oeahr marked this conversation as resolved.
Show resolved Hide resolved
This guide will help you learn how to generate test data in a pipeline job on AzureML, so that you can integrate the created flow with existing pipelines and process a large amount of data.
16oeahr marked this conversation as resolved.
Show resolved Hide resolved


## Prerequisites

1. Go through local test data generation [guide](../../how-to-guides/generate-test-data.md) and prepare your test data generation flow.
2. Go to the [gen_test_data](../../../examples/gen_test_data) folder and run command `pip install -r requirements_cloud.txt` to prepare local environment.
3. Prepare the environment required to run the component.
16oeahr marked this conversation as resolved.
Show resolved Hide resolved
- Navigate to file [conda.yml](../../../examples/gen_test_data/conda.yml).
- For specific document file types, you will need to add extra packages in `conda.yml`:
16oeahr marked this conversation as resolved.
Show resolved Hide resolved
> !Note: We use llama index `SimpleDirectoryReador` in this process. For the latest information on required packages, please check [here](https://docs.llamaindex.ai/en/stable/examples/data_connectors/simple_directory_reader.html).
- .docx - `docx2txt`
- .pdf - `pypdf`
- .ipynb - `nbconvert`

4. Create cloud connection: [Create a connection](https://microsoft.github.io/promptflow/how-to-guides/manage-connections.html#create-a-connection)
16oeahr marked this conversation as resolved.
Show resolved Hide resolved
5. Prepare AzureML resources to run your pipeline.
16oeahr marked this conversation as resolved.
Show resolved Hide resolved
- An Azure AI ML workspace - [Create workspace resources you need to get started with Azure AI](https://learn.microsoft.com/en-us/azure/machine-learning/quickstart-create-resources?view=azureml-api-2).
- A compute target - [Learn more about compute cluster](https://learn.microsoft.com/en-us/azure/machine-learning/concept-compute-target?view=azureml-api-2).
6. Set configs
16oeahr marked this conversation as resolved.
Show resolved Hide resolved
- Navigate to [gen_test_data](../../../examples/gen_test_data) folder.
- Run command to copy `cloud.config.ini.example` and update the configurations in the `cloud.configs.ini` file
```
cp cloud.config.ini.example cloud.config.ini
```
- Fill in the values.


## Generate test data at cloud
For handling larger test data, you can leverage the PRS component to run flow in pipeline.
- Navigate to [gen_test_data](../../../examples/gen_test_data) folder.
- After configuration, run the following command to generate the test data set:
```bash
python -m gen_test_data.run --cloud
```

- The generated test data will be a data asset which can be found in the output of the last node. You can register this data asset for future use.
27 changes: 27 additions & 0 deletions examples/gen_test_data/cloud.config.ini.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
; cloud.config.ini
; This is a sample configuration file

; Configure both 'document_folder' and 'document_chunk_size' if you require document splitting.
; However, if you wish to bypass the document split process, simply provide the 'document_node_file', which is a JSONL file.
; When all these parameters are configured, the system will primarily use the 'document_node_file'
documents_folder = "<your-document-folder-abspath>"
document_chunk_size = 1024
document_nodes_file = "<your-node-file-abspath>"

; Test data gen flow configs
flow_folder = "<your-test-data-gen-flow-folder-abspath>" ; There is must one flow.dag.yaml file under this folder as entry
; connection_name = "<your-connection-name>"

; This section is for cloud test data generation related configuration.
subscription_id = "<your-sub-id>"
resource_group = "<your-resource-group>"
workspace_name = "<your-workspace-name>"
aml_cluster = "<your-compute-name>"

; Parallel run step configs
prs_instance_count = 2
prs_mini_batch_size = 2
prs_max_concurrency_per_instance = 10
prs_max_retry_count = 3
prs_run_invocation_time = 800
prs_allowed_failed_count = -1
12 changes: 12 additions & 0 deletions examples/gen_test_data/conda.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: test_data_gen_conda_env
channels:
- defaults
dependencies:
- python=3.10.12
- pip=23.2.1
- pip:
- mldesigner==0.1.0b17
- configargparse
- llama_index
- docx2txt
- promptflow
17 changes: 0 additions & 17 deletions examples/gen_test_data/config.ini.example
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
; config.ini
; This is a sample configuration file

[COMMON]
; The COMMON section provides common values for all other sections.
; Configure both 'document_folder' and 'document_chunk_size' if you require document splitting.
; However, if you wish to bypass the document split process, simply provide the 'document_node_file', which is a JSONL file.
; When all these parameters are configured, the system will primarily use the 'document_node_file'
Expand All @@ -14,21 +12,6 @@ document_nodes_file = "<your-node-file-abspath>"
flow_folder = "<your-test-data-gen-flow-folder-abspath>" ; There is must one flow.dag.yaml file under this folder as entry
connection_name = "<your-connection-name>"


[LOCAL]
; This section is for local test data generation related configuration.
output_folder = "<your-output-folder-abspath>"
flow_batch_run_size = 10


[CLOUD]
; This section is for cloud test data generation related configuration.
subscription_id = "<your-sub-id>"
resource_group = "<your-resource-group>"
workspace_name = "<your-workspace-name>"
aml_cluster = "<your-compute-name>"

; Parallel run step configs
prs_instance_count = 2
prs_mini_batch_size = 2
prs_max_concurrency_per_instance = 10
20 changes: 14 additions & 6 deletions examples/gen_test_data/gen_test_data/components.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
import json
from pathlib import Path

from common import clean_data_and_save, split_document
from mldesigner import Input, Output, command_component
from common import split_document, clean_data_and_save
from constants import ENVIRONMENT_DICT_FIXED_VERSION

conda_file = Path(__file__).parent.parent / "conda.yml"
env_image = "mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04"


@command_component(
name="split_document_component",
display_name="split documents",
description="Split documents into document nodes.",
environment=ENVIRONMENT_DICT_FIXED_VERSION,
environment=dict(
conda_file=conda_file,
image=env_image,
),
)
def split_document_component(
documents_folder: Input(type="uri_folder"), chunk_size: int, document_node_output: Output(type="uri_folder")
documents_folder: Input(type="uri_folder"), chunk_size: int, document_node_output: Output(type="uri_folder")
) -> str:
"""Split documents into document nodes.

Expand All @@ -32,10 +37,13 @@ def split_document_component(
name="clean_data_and_save_component",
display_name="clean dataset",
description="Clean test data set to remove empty lines.",
environment=ENVIRONMENT_DICT_FIXED_VERSION,
environment=dict(
conda_file=conda_file,
image=env_image,
),
)
def clean_data_and_save_component(
test_data_set_folder: Input(type="uri_folder"), test_data_output: Output(type="uri_folder")
test_data_set_folder: Input(type="uri_folder"), test_data_output: Output(type="uri_folder")
) -> str:
test_data_set_path = Path(test_data_set_folder) / "parallel_run_step.jsonl"

Expand Down
13 changes: 0 additions & 13 deletions examples/gen_test_data/gen_test_data/constants.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,2 @@
DOCUMENT_NODE = "document_node"
TEXT_CHUNK = "text_chunk"

ENVIRONMENT_DICT_FIXED_VERSION = dict(
image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
conda_file={
"name": "test_data_gen_conda_env",
"channels": ["defaults"],
"dependencies": [
"python=3.10.12",
"pip=23.2.1",
{"pip": ["mldesigner==0.1.0b17", "llama_index", "docx2txt", "promptflow", "configargparse"]},
],
},
)
92 changes: 66 additions & 26 deletions examples/gen_test_data/gen_test_data/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
from promptflow.entities import Run

CONFIG_FILE = (Path(__file__).parents[1] / "config.ini").resolve()
CLOUD_CONFIG_FILE = (Path(__file__).parents[1] / "cloud.config.ini").resolve()

# in order to import from absoluate path, which is required by mldesigner
# in order to import from absolute path, which is required by mldesigner
os.sys.path.insert(0, os.path.abspath(Path(__file__).parent))

from common import clean_data_and_save, split_document, count_non_blank_lines # noqa: E402
from common import clean_data_and_save, count_non_blank_lines, split_document # noqa: E402
from constants import TEXT_CHUNK # noqa: E402

logger = get_logger("data.gen")
Expand Down Expand Up @@ -83,7 +84,7 @@ def run_local(

# Store intermedian batch run output results
jsonl_str = "\n".join(map(json.dumps, test_data_set))
intermedian_batch_run_res = os.path.join(inner_folder, "batch-run-result.jsonl")
intermedian_batch_run_res = os.path.join(inner_folder, "test-data-gen-details.jsonl")
with open(intermedian_batch_run_res, "wt") as text_file:
print(f"{jsonl_str}", file=text_file)

Expand All @@ -103,13 +104,17 @@ def run_cloud(
prs_instance_count,
prs_mini_batch_size,
prs_max_concurrency_per_instance,
prs_max_retry_count,
prs_run_invocation_time,
prs_allowed_failed_count,
should_skip_split,
):
# lazy import azure dependencies
try:
from azure.ai.ml import Input as V2Input, MLClient, dsl, load_component
from azure.ai.ml import Input as V2Input
from azure.ai.ml import MLClient, dsl, load_component
from azure.ai.ml.entities import RetrySettings
from azure.identity import DefaultAzureCredential
from constants import ENVIRONMENT_DICT_FIXED_VERSION
except ImportError:
raise ImportError(
"Please install azure dependencies using the following command: "
Expand All @@ -123,6 +128,9 @@ def run_cloud(
"instance_count",
"mini_batch_size",
"max_concurrency_per_instance",
"max_retry_count",
"run_invocation_time",
"allowed_failed_count",
]
)
def gen_test_data_pipeline(
Expand All @@ -133,12 +141,18 @@ def gen_test_data_pipeline(
instance_count=1,
mini_batch_size=1,
max_concurrency_per_instance=2,
max_retry_count=3,
run_invocation_time=600,
allowed_failed_count=-1,
):
from components import split_document_component, clean_data_and_save_component
from components import clean_data_and_save_component, split_document_component

data = (
data_input
if should_skip_doc_split
else split_document_component(documents_folder=data_input, chunk_size=chunk_size).outputs.document_node_output
else split_document_component(
documents_folder=data_input, chunk_size=chunk_size
).outputs.document_node_output
)
flow_node = load_component(flow_yml_path)(
data=data,
Expand All @@ -151,7 +165,8 @@ def gen_test_data_pipeline(
flow_node.mini_batch_size = mini_batch_size
flow_node.max_concurrency_per_instance = max_concurrency_per_instance
flow_node.set_resources(instance_count=instance_count)
# flow_node.allowed_failed_count = -1
flow_node.retry_settings = RetrySettings(max_retry_count=max_retry_count, timeout=run_invocation_time)
flow_node.mini_batch_error_threshold = allowed_failed_count
# Should use `mount` mode to ensure PRS complete merge output lines.
flow_node.outputs.flow_outputs.mode = "mount"
clean_data_and_save_component(test_data_set_folder=flow_node.outputs.flow_outputs).outputs.test_data_output
Expand All @@ -176,6 +191,9 @@ def get_ml_client(subscription_id: str, resource_group: str, workspace_name: str
"instance_count": prs_instance_count,
"mini_batch_size": prs_mini_batch_size,
"max_concurrency_per_instance": prs_max_concurrency_per_instance,
"max_retry_count": prs_max_retry_count,
"run_invocation_time": prs_run_invocation_time,
"allowed_failed_count": prs_allowed_failed_count,
}

pipeline_with_flow = gen_test_data_pipeline(
Expand All @@ -191,15 +209,26 @@ def get_ml_client(subscription_id: str, resource_group: str, workspace_name: str


if __name__ == "__main__":
if Path(CONFIG_FILE).is_file():
parser = configargparse.ArgParser(default_config_files=[CONFIG_FILE])
parser = configargparse.ArgParser()
parser.add_argument("--cloud", action="store_true", help="cloud flag")
args = parser.parse_args()

# Choose the config file based on the argument
if args.cloud:
config_file = CLOUD_CONFIG_FILE
else:
config_file = CONFIG_FILE

if not Path(config_file).is_file():
raise Exception(
f"'{CONFIG_FILE}' does not exist. "
f"'{config_file}' does not exist. "
+ "Please check if you are under the wrong directory or the file is missing."
)

parser = configargparse.ArgParser(default_config_files=[config_file])
# TODO: remove this
parser.add_argument("--cloud", action="store_true", help="cloud flag")

parser.add_argument("--documents_folder", type=str, help="Documents folder path")
parser.add_argument("--document_chunk_size", type=int, help="Document chunk size, default is 1024")
parser.add_argument(
Expand All @@ -212,33 +241,41 @@ def get_ml_client(subscription_id: str, resource_group: str, workspace_name: str
type=int,
help="Test data generation flow batch run size, default is 16",
)
# Configs for local
parser.add_argument("--output_folder", type=str, help="Output folder path.")
# Configs for cloud
parser.add_argument("--subscription_id", help="AzureML workspace subscription id")
parser.add_argument("--resource_group", help="AzureML workspace resource group name")
parser.add_argument("--workspace_name", help="AzureML workspace name")
parser.add_argument("--aml_cluster", help="AzureML cluster name")
parser.add_argument("--prs_instance_count", type=int, help="Parallel run step instance count")
parser.add_argument("--prs_mini_batch_size", help="Parallel run step mini batch size")
parser.add_argument(
"--prs_max_concurrency_per_instance", type=int, help="Parallel run step max concurrency per instance"
)
if not args.cloud:
parser.add_argument("--output_folder", type=str, help="Output folder path.")
else:
parser.add_argument("--subscription_id", help="AzureML workspace subscription id")
parser.add_argument("--resource_group", help="AzureML workspace resource group name")
parser.add_argument("--workspace_name", help="AzureML workspace name")
parser.add_argument("--aml_cluster", help="AzureML cluster name")
parser.add_argument("--prs_instance_count", type=int, help="Parallel run step instance count")
parser.add_argument("--prs_mini_batch_size", help="Parallel run step mini batch size")
parser.add_argument(
"--prs_max_concurrency_per_instance", type=int, help="Parallel run step max concurrency per instance"
)
parser.add_argument("--prs_max_retry_count", type=int, help="Parallel run step max retry count")
parser.add_argument("--prs_run_invocation_time", type=int, help="Parallel run step run invocation time")
parser.add_argument(
"--prs_allowed_failed_count", type=int, help="Number of failed mini batches that could be ignored"
)
args = parser.parse_args()

should_skip_split_documents = False
if args.document_nodes_file and Path(args.document_nodes_file).is_file():
should_skip_split_documents = True
elif not args.documents_folder or not Path(args.documents_folder).is_dir():
parser.error("Either 'documents_folder' or 'document_nodes_file' should be specified correctly.")

if args.cloud:
logger.info("Start to generate test data at cloud...")
else:
logger.info("Start to generate test data at local...")

if should_skip_split_documents:
logger.info(f"Skip step 1 'Split documents to document nodes' as received document nodes from input file '{args.document_nodes_file}'.")
logger.info(
"Skip step 1 'Split documents to document nodes' as received document nodes from "
f"input file '{args.document_nodes_file}'."
)
logger.info(f"Collected {count_non_blank_lines(args.document_nodes_file)} document nodes.")

if args.cloud:
Expand All @@ -254,6 +291,9 @@ def get_ml_client(subscription_id: str, resource_group: str, workspace_name: str
args.prs_instance_count,
args.prs_mini_batch_size,
args.prs_max_concurrency_per_instance,
args.prs_max_retry_count,
args.prs_run_invocation_time,
args.prs_allowed_failed_count,
should_skip_split_documents,
)
else:
Expand Down
Loading