diff --git a/record_processor/__init__.py b/record_processor/__init__.py new file mode 100644 index 00000000..7f3fd831 --- /dev/null +++ b/record_processor/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. \ No newline at end of file diff --git a/record_processor/__main__.py b/record_processor/__main__.py new file mode 100644 index 00000000..dd26b9e4 --- /dev/null +++ b/record_processor/__main__.py @@ -0,0 +1,8 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +from . import record_processor + +if __name__ == "__main__": + # Execute the main script + record_processor.main() + \ No newline at end of file diff --git a/record_processor/parser/behavior_record.py b/record_processor/parser/behavior_record.py new file mode 100644 index 00000000..6fa9ad52 --- /dev/null +++ b/record_processor/parser/behavior_record.py @@ -0,0 +1,42 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +class BehaviorStep: + """ + Class for the single step information in the user behavior record. + Multiple steps will be recorded to achieve a specific request. + """ + + def __init__(self, application: str, description: str, action: str, screenshot: str, comment: str): + """ + Create a new step. + """ + self.application = application + self.description = description + self.action = action + self.comment = comment + self.screenshot = screenshot + + +class BehaviorRecord: + """ + Class for the user behavior record. + A serise of steps user performed to achieve a specific request will be recorded in this class. + """ + + def __init__(self, applications: list, step_num: int, **steps: BehaviorStep): + """ + Create a new Record. + """ + self.request = "" + self.round = 0 + self.applications = applications + self.step_num = step_num + for index, step in steps.items(): + setattr(self, index, step.__dict__) + + def set_request(self, request): + """ + Set the request. + """ + self.request = request diff --git a/record_processor/parser/psr_record_parser.py b/record_processor/parser/psr_record_parser.py new file mode 100644 index 00000000..7eec6cde --- /dev/null +++ b/record_processor/parser/psr_record_parser.py @@ -0,0 +1,171 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import re +import xml.etree.ElementTree as ET +from bs4 import BeautifulSoup +from .behavior_record import BehaviorRecord, BehaviorStep + + +class PSRRecordParser: + """ + Class for parsing the steps recorder .mht file content to user behavior record. + """ + + def __init__(self, content: str): + """ + Constructor for the RecordParser class. + """ + self.content = content + self.parts_dict = {} + self.applications = [] + self.comments = [] + self.steps = [] + + def parse_to_record(self) -> BehaviorRecord: + """ + Parse the steps recorder .mht file content to record in following steps: + 1. Find the boundary in the .mht file. + 2. Split the file by the boundary into parts. + 3. Get the comments for each step. + 4. Get the steps from the content. + 5. Construct the record object and return it. + return: A record object. + """ + boundary = self.find_boundary() + self.parts_dict = self.split_file_by_boundary(boundary) + self.comments = self.get_comments( + self.parts_dict['main.htm']['Content']) + self.steps = self.get_steps(self.parts_dict['main.htm']['Content']) + record = BehaviorRecord( + list(set(self.applications)), len(self.steps), **self.steps) + + return record + + def find_boundary(self) -> str: + """ + Find the boundary in the .mht file. + """ + + boundary_start = self.content.find("boundary=") + + if boundary_start != -1: + boundary_start += len("boundary=") + boundary_end = self.content.find("\n", boundary_start) + boundary = self.content[boundary_start:boundary_end].strip('\"') + return boundary + else: + raise ValueError("Boundary not found in the .mht file.") + + def split_file_by_boundary(self, boundary: str) -> dict: + """ + Split the file by the boundary into parts, + Store the parts in a dictionary, including the content type, + content location and content transfer encoding. + boundary: The boundary of the file. + return: A dictionary of parts in the file. + """ + parts = self.content.split("--" + boundary) + part_dict = {} + for part in parts: + content_type_start = part.find("Content-Type:") + content_location_start = part.find("Content-Location:") + content_transfer_encoding_start = part.find( + "Content-Transfer-Encoding:") + part_info = {} + if content_location_start != -1: + content_location_end = part.find("\n", content_location_start) + content_location = part[content_location_start:content_location_end].split(":")[ + 1].strip() + + # add the content location + if content_type_start != -1: + content_type_end = part.find("\n", content_type_start) + content_type = part[content_type_start:content_type_end].split(":")[ + 1].strip() + part_info["Content-Type"] = content_type + + # add the content transfer encoding + if content_transfer_encoding_start != -1: + content_transfer_encoding_end = part.find( + "\n", content_transfer_encoding_start) + content_transfer_encoding = part[content_transfer_encoding_start:content_transfer_encoding_end].split(":")[ + 1].strip() + part_info["Content-Transfer-Encoding"] = content_transfer_encoding + + content = part[content_location_end:].strip() + part_info["Content"] = content + part_dict[content_location] = part_info + return part_dict + + def get_steps(self, content: str) -> dict: + """ + Get the steps from the content in fllowing steps: + 1. Find the UserActionData tag in the content. + 2. Parse the UserActionData tag to get the steps. + 3. Get the screenshot for each step. + 4. Get the comments for each step. + content: The content of the main.htm file. + return: A dictionary of steps. + """ + + user_action_data = re.search( + r'(.*?)', content, re.DOTALL) + if user_action_data: + + root = ET.fromstring(user_action_data.group(1)) + steps = {} + + for each_action in root.findall('EachAction'): + + action_number = each_action.get('ActionNumber') + application = each_action.get('FileName') + description = each_action.find('Description').text + action = each_action.find('Action').text + screenshot_file_name = each_action.find( + 'ScreenshotFileName').text + screenshot = self.get_screenshot(screenshot_file_name) + step_key = f"step_{int(action_number) - 1}" + + step = BehaviorStep( + application, description, action, screenshot, self.comments.get(step_key)) + steps[step_key] = step + self.applications.append(application) + return steps + else: + raise ValueError("UserActionData not found in the file.") + + def get_comments(self, content: str) -> dict: + """ + Get the user input comments for each step + content: The content of the main.htm file. + return: A dictionary of comments for each step. + """ + soup = BeautifulSoup(content, 'html.parser') + body = soup.body + steps_html = body.find('div', id='Steps') + steps = steps_html.find_all(lambda tag: tag.name == 'div' and tag.has_attr( + 'id') and re.match(r'^Step\d+$', tag['id'])) + + comments = {} + for index, step in enumerate(steps): + comment_tag = step.find('b', text='Comment: ') + comments[f'step_{index}'] = comment_tag.next_sibling if comment_tag else None + return comments + + def get_screenshot(self, screenshot_file_name: str) -> str: + """ + Get the screenshot by screenshot file name. + The screenshot related information is stored in the parts_dict. + screenshot_file_name: The file name of the screenshot. + return: The screenshot in base64 string. + """ + screenshot_part = self.parts_dict[screenshot_file_name] + content = screenshot_part['Content'] + content_type = screenshot_part['Content-Type'] + content_transfer_encoding = screenshot_part['Content-Transfer-Encoding'] + + screenshot = 'data:{type};{encoding}, {content}'.format( + type=content_type, encoding=content_transfer_encoding, content=content) + + return screenshot diff --git a/record_processor/record_processor.py b/record_processor/record_processor.py new file mode 100644 index 00000000..c293120f --- /dev/null +++ b/record_processor/record_processor.py @@ -0,0 +1,36 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import argparse +from .parser.psr_record_parser import PSRRecordParser +from .utils import save_to_json, unzip_and_read_file +from ufo.utils import print_with_color +import os + +args = argparse.ArgumentParser() +args.add_argument("--request", "-r", help="The request that user want to achieve.", + type=lambda s: s.strip() or None, nargs='+') +args.add_argument("--behavior-record-path", "-p", help="The path for user behavior record in zip file.", + type=lambda f: f if f.endswith(".zip") else None) +parsed_args = args.parse_args() + + +def main(): + """ + Main function. + """ + + # Temporarily hardcode the output file path, will move to a config file later + output_file = '{prefix}\\vectordb\\records\\log\\{file_name}.json'.format( + prefix=os.getcwd(), + file_name=parsed_args.request[0].replace(' ', '_') + ) + try: + content = unzip_and_read_file(parsed_args.behavior_record_path) + record = PSRRecordParser(content).parse_to_record() + record.set_request(parsed_args.request[0]) + save_to_json(record.__dict__, output_file) + print_with_color( + f"Record process successfully. The record is saved to {output_file}.", "green") + except ValueError as e: + print_with_color(str(e), "red") diff --git a/record_processor/utils/__init__.py b/record_processor/utils/__init__.py new file mode 100644 index 00000000..864fc6bd --- /dev/null +++ b/record_processor/utils/__init__.py @@ -0,0 +1,57 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +import zipfile +import json +import os + + +def unzip_and_read_file(file_path: str) -> str: + """ + Unzip the file and read the content of the extracted file. + file_path: the path of the pending zip file. + return: the content of the extracted file. + """ + extracted_file = unzip_file(file_path) + with open(extracted_file, 'r', encoding='utf-8') as file: + content = file.read() + return content + + +def unzip_file(zip_file_path: str) -> str: + """ + Unzip the file and return the path of the extracted file. + zip_file_path: the path of the pending zip file. + return: the path of the extracted file. + """ + folder_name = os.path.splitext(zip_file_path)[0] + + # Create the directory if it doesn't exist + if not os.path.exists(folder_name): + os.makedirs(folder_name) + + # Extract the contents of the ZIP file into the directory + with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: + zip_ref.extractall(folder_name) + + extracted_file = os.path.join(folder_name, os.listdir(folder_name)[0]) + return extracted_file + + +def save_to_json(data: dict, output_file_path: str): + """ + Save the data to a JSON file. + data: the data to save. + output_file_path: the path of the output file. + """ + + # Extract the directory path from the file path + directory = os.path.dirname(output_file_path) + + # Check if the directory exists, if not, create it + if not os.path.exists(directory): + os.makedirs(directory) + + with open(output_file_path, 'w') as file: + json.dump(data, file, indent=4) diff --git a/requirements.txt b/requirements.txt index 36ba9b0c..d65a8b0d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,5 @@ PyYAML==6.0.1 Requests==2.31.0 faiss-cpu==1.8.0 lxml==5.1.0 -psutil==5.9.8 \ No newline at end of file +psutil==5.9.8 +beautifulsoup4==4.12.3 \ No newline at end of file