Skip to content

Commit

Permalink
Support relative path (#1938)
Browse files Browse the repository at this point in the history
  • Loading branch information
16oeahr and yalu4 authored Feb 4, 2024
1 parent feb8063 commit c52f285
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 35 deletions.
10 changes: 5 additions & 5 deletions examples/gen_test_data/config.ini.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@
; The COMMON section provides common values for all other sections.
; Configure both 'document_folder' and 'document_chunk_size' if you require document splitting.
; However, if you wish to bypass the document split process, simply provide the 'document_node_file', which is a JSONL file.
; When all these parameters are configured, the system will primarily use the 'document_node_file'
documents_folder = "<your-document-folder-abspath>"
; When all these parameters are configured, the system will primarily use the 'document_node_file'.
documents_folder = "<your-document-folder-path>"
document_chunk_size = 1024
document_nodes_file = "<your-node-file-abspath>"
document_nodes_file = "<your-node-file-path>"

; Test data gen flow configs
flow_folder = "<your-test-data-gen-flow-folder-abspath>" ; There is must one flow.dag.yaml file under this folder as entry
flow_folder = "<your-test-data-gen-flow-folder-path>" ; There is must one flow.dag.yaml file under this folder as entry
connection_name = "<your-connection-name>"


[LOCAL]
; This section is for local test data generation related configuration.
output_folder = "<your-output-folder-abspath>"
output_folder = "<your-output-folder-path>"
flow_batch_run_size = 4


Expand Down
40 changes: 30 additions & 10 deletions examples/gen_test_data/gen_test_data/common.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import json
import sys
import re
import sys
import time
import typing as t
from pathlib import Path

from constants import DOCUMENT_NODE, TEXT_CHUNK

from promptflow._utils.logger_utils import get_logger


Expand All @@ -24,8 +25,11 @@ def split_document(chunk_size, documents_folder, document_node_output):
logger.info("Step 1: Start to split documents to document nodes...")
# count the number of files in documents_folder, including subfolders.
num_files = sum(1 for _ in Path(documents_folder).rglob("*") if _.is_file())
logger.info(f"Found {num_files} files in the documents folder '{documents_folder}'. Using chunk size: {chunk_size} to split.")
# `SimpleDirectoryReader` by default chunk the documents based on heading tags and paragraphs, which may lead to small chunks.
logger.info(
f"Found {num_files} files in the documents folder '{documents_folder}'. "
f"Using chunk size: {chunk_size} to split."
)
# `SimpleDirectoryReader` by default chunk the documents based on heading tags and paragraphs, which may lead to small chunks. # noqa: E501
# TODO: improve on top of `SimpleDirectoryReader` with a better chunking algorithm.
chunks = SimpleDirectoryReader(documents_folder, recursive=True, encoding="utf-8").load_data()
# Convert documents into nodes
Expand All @@ -50,7 +54,7 @@ def clean_data(test_data_set: list, test_data_output_path: str):

for test_data in test_data_set:
if test_data and all(
val and val != "(Failed)" for key, val in test_data.items() if key.lower() != "line_number"
val and val != "(Failed)" for key, val in test_data.items() if key.lower() != "line_number"
):
data_line = {"question": test_data["question"], "suggested_answer": test_data["suggested_answer"]}
cleaned_data.append(data_line)
Expand All @@ -61,12 +65,14 @@ def clean_data(test_data_set: list, test_data_output_path: str):

# TODO: aggregate invalid data root cause and count, and log it.
# log debug info path.
logger.info(f"Removed {len(test_data_set) - len(cleaned_data)} invalid test data. "
f"Saved {len(cleaned_data)} valid test data to '{test_data_output_path}'.")
logger.info(
f"Removed {len(test_data_set) - len(cleaned_data)} invalid test data. "
f"Saved {len(cleaned_data)} valid test data to '{test_data_output_path}'."
)


def count_non_blank_lines(file_path):
with open(file_path, 'r') as file:
with open(file_path, "r") as file:
lines = file.readlines()

non_blank_lines = len([line for line in lines if line.strip()])
Expand All @@ -87,26 +93,40 @@ def print_progress(log_file_path: str):

try:
last_data_time = time.time()
with open(log_file_path, 'r') as f:
with open(log_file_path, "r") as f:
while True:
line = f.readline().strip()
if line:
last_data_time = time.time() # Update the time when the last data was received
match = log_pattern.match(line)
if not match:
continue

sys.stdout.write("\r" + line) # \r will move the cursor back to the beginning of the line
sys.stdout.flush() # flush the buffer to ensure the log is displayed immediately
finished, total = map(int, match.groups())
if finished == total:
logger.info("Batch run is completed.")
break
elif time.time() - last_data_time > 300:
logger.info("No new log line received for 5 minutes. Stop reading. See the log file for more details.")
logger.info(
"No new log line received for 5 minutes. Stop reading. See the log file for more details."
)
break # Stop reading
else:
time.sleep(1) # wait for 1 second if no new line is available
except KeyboardInterrupt:
sys.stdout.write("\n") # ensure to start on a new line when the user interrupts
sys.stdout.flush()


def convert_to_abs_path(file_path: str) -> str:
if not file_path:
return file_path

path = Path(file_path)
if path.is_absolute():
return str(path)
else:
abs = str(path.resolve())
return abs
52 changes: 32 additions & 20 deletions examples/gen_test_data/gen_test_data/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# in order to import from absolute path, which is required by mldesigner
os.sys.path.insert(0, os.path.abspath(Path(__file__).parent))

from common import clean_data, count_non_blank_lines, split_document, print_progress # noqa: E402
from constants import TEXT_CHUNK, DETAILS_FILE_NAME # noqa: E402
from common import clean_data, convert_to_abs_path, count_non_blank_lines, print_progress, split_document # noqa: E402
from constants import DETAILS_FILE_NAME, TEXT_CHUNK # noqa: E402

logger = get_logger("data.gen")

Expand All @@ -29,11 +29,15 @@ def batch_run_flow(

run_name = f"test_data_gen_{datetime.now().strftime('%b-%d-%Y-%H-%M-%S')}"
# TODO: replace the separate process to submit batch run with batch run async method when it's available.
cmd = f"pf run create --flow {flow_folder} --data {flow_input_data} --name {run_name} " \
f"--environment-variables PF_WORKER_COUNT='{flow_batch_run_size}' PF_BATCH_METHOD='spawn' " \
f"--column-mapping {TEXT_CHUNK}='${{data.text_chunk}}'"
cmd = (
f"pf run create --flow {flow_folder} --data {flow_input_data} --name {run_name} "
f"--environment-variables PF_WORKER_COUNT='{flow_batch_run_size}' PF_BATCH_METHOD='spawn' "
f"--column-mapping {TEXT_CHUNK}='${{data.text_chunk}}'"
)
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
logger.info(f"Submit batch run successfully. process id {process.pid}. Please wait for the batch run to complete...")
logger.info(
f"Submit batch run successfully. process id {process.pid}. Please wait for the batch run to complete..."
)
return run_name


Expand All @@ -47,9 +51,9 @@ def get_batch_run_output(output_path: Path):
if time.time() - start_time > 300:
raise Exception(f"Output jsonl file '{output_path}' is not created within 5 minutes.")

with open(output_path, 'r') as f:
with open(output_path, "r") as f:
output_lines = list(map(json.loads, f))

return [
{"question": line["question"], "suggested_answer": line["suggested_answer"], "debug_info": line["debug_info"]}
for line in output_lines
Expand Down Expand Up @@ -250,10 +254,18 @@ def get_ml_client(subscription_id: str, resource_group: str, workspace_name: str
args = parser.parse_args()

should_skip_split_documents = False
if args.document_nodes_file and Path(args.document_nodes_file).is_file():
document_nodes_file = convert_to_abs_path(args.document_nodes_file)
documents_folder = convert_to_abs_path(args.documents_folder)
flow_folder = convert_to_abs_path(args.flow_folder)
output_folder = convert_to_abs_path(args.output_folder)

if document_nodes_file and Path(document_nodes_file).is_file():
should_skip_split_documents = True
elif not args.documents_folder or not Path(args.documents_folder).is_dir():
parser.error("Either 'documents_folder' or 'document_nodes_file' should be specified correctly.")
elif not documents_folder or not Path(documents_folder).is_dir():
parser.error(
"Either 'documents_folder' or 'document_nodes_file' should be specified correctly.\n"
f"documents_folder: '{documents_folder}'\ndocument_nodes_file: '{document_nodes_file}'"
)

if args.cloud:
logger.info("Start to generate test data at cloud...")
Expand All @@ -263,16 +275,16 @@ def get_ml_client(subscription_id: str, resource_group: str, workspace_name: str
if should_skip_split_documents:
logger.info(
"Skip step 1 'Split documents to document nodes' as received document nodes from "
f"input file '{args.document_nodes_file}'."
f"input file path '{document_nodes_file}'."
)
logger.info(f"Collected {count_non_blank_lines(args.document_nodes_file)} document nodes.")
logger.info(f"Collected {count_non_blank_lines(document_nodes_file)} document nodes.")

if args.cloud:
run_cloud(
args.documents_folder,
documents_folder,
args.document_chunk_size,
args.document_nodes_file,
args.flow_folder,
document_nodes_file,
flow_folder,
args.subscription_id,
args.resource_group,
args.workspace_name,
Expand All @@ -287,11 +299,11 @@ def get_ml_client(subscription_id: str, resource_group: str, workspace_name: str
)
else:
run_local(
args.documents_folder,
documents_folder,
args.document_chunk_size,
args.document_nodes_file,
args.flow_folder,
document_nodes_file,
flow_folder,
args.flow_batch_run_size,
args.output_folder,
output_folder,
should_skip_split_documents,
)

0 comments on commit c52f285

Please sign in to comment.