-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #31 from yunhao0204/pre-release
Add record processor to parse the user behavior record to JSON file
- Loading branch information
Showing
7 changed files
with
318 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
from . import record_processor | ||
|
||
if __name__ == "__main__": | ||
# Execute the main script | ||
record_processor.main() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
class BehaviorStep: | ||
""" | ||
Class for the single step information in the user behavior record. | ||
Multiple steps will be recorded to achieve a specific request. | ||
""" | ||
|
||
def __init__(self, application: str, description: str, action: str, screenshot: str, comment: str): | ||
""" | ||
Create a new step. | ||
""" | ||
self.application = application | ||
self.description = description | ||
self.action = action | ||
self.comment = comment | ||
self.screenshot = screenshot | ||
|
||
|
||
class BehaviorRecord: | ||
""" | ||
Class for the user behavior record. | ||
A serise of steps user performed to achieve a specific request will be recorded in this class. | ||
""" | ||
|
||
def __init__(self, applications: list, step_num: int, **steps: BehaviorStep): | ||
""" | ||
Create a new Record. | ||
""" | ||
self.request = "" | ||
self.round = 0 | ||
self.applications = applications | ||
self.step_num = step_num | ||
for index, step in steps.items(): | ||
setattr(self, index, step.__dict__) | ||
|
||
def set_request(self, request): | ||
""" | ||
Set the request. | ||
""" | ||
self.request = request |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
import re | ||
import xml.etree.ElementTree as ET | ||
from bs4 import BeautifulSoup | ||
from .behavior_record import BehaviorRecord, BehaviorStep | ||
|
||
|
||
class PSRRecordParser: | ||
""" | ||
Class for parsing the steps recorder .mht file content to user behavior record. | ||
""" | ||
|
||
def __init__(self, content: str): | ||
""" | ||
Constructor for the RecordParser class. | ||
""" | ||
self.content = content | ||
self.parts_dict = {} | ||
self.applications = [] | ||
self.comments = [] | ||
self.steps = [] | ||
|
||
def parse_to_record(self) -> BehaviorRecord: | ||
""" | ||
Parse the steps recorder .mht file content to record in following steps: | ||
1. Find the boundary in the .mht file. | ||
2. Split the file by the boundary into parts. | ||
3. Get the comments for each step. | ||
4. Get the steps from the content. | ||
5. Construct the record object and return it. | ||
return: A record object. | ||
""" | ||
boundary = self.find_boundary() | ||
self.parts_dict = self.split_file_by_boundary(boundary) | ||
self.comments = self.get_comments( | ||
self.parts_dict['main.htm']['Content']) | ||
self.steps = self.get_steps(self.parts_dict['main.htm']['Content']) | ||
record = BehaviorRecord( | ||
list(set(self.applications)), len(self.steps), **self.steps) | ||
|
||
return record | ||
|
||
def find_boundary(self) -> str: | ||
""" | ||
Find the boundary in the .mht file. | ||
""" | ||
|
||
boundary_start = self.content.find("boundary=") | ||
|
||
if boundary_start != -1: | ||
boundary_start += len("boundary=") | ||
boundary_end = self.content.find("\n", boundary_start) | ||
boundary = self.content[boundary_start:boundary_end].strip('\"') | ||
return boundary | ||
else: | ||
raise ValueError("Boundary not found in the .mht file.") | ||
|
||
def split_file_by_boundary(self, boundary: str) -> dict: | ||
""" | ||
Split the file by the boundary into parts, | ||
Store the parts in a dictionary, including the content type, | ||
content location and content transfer encoding. | ||
boundary: The boundary of the file. | ||
return: A dictionary of parts in the file. | ||
""" | ||
parts = self.content.split("--" + boundary) | ||
part_dict = {} | ||
for part in parts: | ||
content_type_start = part.find("Content-Type:") | ||
content_location_start = part.find("Content-Location:") | ||
content_transfer_encoding_start = part.find( | ||
"Content-Transfer-Encoding:") | ||
part_info = {} | ||
if content_location_start != -1: | ||
content_location_end = part.find("\n", content_location_start) | ||
content_location = part[content_location_start:content_location_end].split(":")[ | ||
1].strip() | ||
|
||
# add the content location | ||
if content_type_start != -1: | ||
content_type_end = part.find("\n", content_type_start) | ||
content_type = part[content_type_start:content_type_end].split(":")[ | ||
1].strip() | ||
part_info["Content-Type"] = content_type | ||
|
||
# add the content transfer encoding | ||
if content_transfer_encoding_start != -1: | ||
content_transfer_encoding_end = part.find( | ||
"\n", content_transfer_encoding_start) | ||
content_transfer_encoding = part[content_transfer_encoding_start:content_transfer_encoding_end].split(":")[ | ||
1].strip() | ||
part_info["Content-Transfer-Encoding"] = content_transfer_encoding | ||
|
||
content = part[content_location_end:].strip() | ||
part_info["Content"] = content | ||
part_dict[content_location] = part_info | ||
return part_dict | ||
|
||
def get_steps(self, content: str) -> dict: | ||
""" | ||
Get the steps from the content in fllowing steps: | ||
1. Find the UserActionData tag in the content. | ||
2. Parse the UserActionData tag to get the steps. | ||
3. Get the screenshot for each step. | ||
4. Get the comments for each step. | ||
content: The content of the main.htm file. | ||
return: A dictionary of steps. | ||
""" | ||
|
||
user_action_data = re.search( | ||
r'<UserActionData>(.*?)</UserActionData>', content, re.DOTALL) | ||
if user_action_data: | ||
|
||
root = ET.fromstring(user_action_data.group(1)) | ||
steps = {} | ||
|
||
for each_action in root.findall('EachAction'): | ||
|
||
action_number = each_action.get('ActionNumber') | ||
application = each_action.get('FileName') | ||
description = each_action.find('Description').text | ||
action = each_action.find('Action').text | ||
screenshot_file_name = each_action.find( | ||
'ScreenshotFileName').text | ||
screenshot = self.get_screenshot(screenshot_file_name) | ||
step_key = f"step_{int(action_number) - 1}" | ||
|
||
step = BehaviorStep( | ||
application, description, action, screenshot, self.comments.get(step_key)) | ||
steps[step_key] = step | ||
self.applications.append(application) | ||
return steps | ||
else: | ||
raise ValueError("UserActionData not found in the file.") | ||
|
||
def get_comments(self, content: str) -> dict: | ||
""" | ||
Get the user input comments for each step | ||
content: The content of the main.htm file. | ||
return: A dictionary of comments for each step. | ||
""" | ||
soup = BeautifulSoup(content, 'html.parser') | ||
body = soup.body | ||
steps_html = body.find('div', id='Steps') | ||
steps = steps_html.find_all(lambda tag: tag.name == 'div' and tag.has_attr( | ||
'id') and re.match(r'^Step\d+$', tag['id'])) | ||
|
||
comments = {} | ||
for index, step in enumerate(steps): | ||
comment_tag = step.find('b', text='Comment: ') | ||
comments[f'step_{index}'] = comment_tag.next_sibling if comment_tag else None | ||
return comments | ||
|
||
def get_screenshot(self, screenshot_file_name: str) -> str: | ||
""" | ||
Get the screenshot by screenshot file name. | ||
The screenshot related information is stored in the parts_dict. | ||
screenshot_file_name: The file name of the screenshot. | ||
return: The screenshot in base64 string. | ||
""" | ||
screenshot_part = self.parts_dict[screenshot_file_name] | ||
content = screenshot_part['Content'] | ||
content_type = screenshot_part['Content-Type'] | ||
content_transfer_encoding = screenshot_part['Content-Transfer-Encoding'] | ||
|
||
screenshot = 'data:{type};{encoding}, {content}'.format( | ||
type=content_type, encoding=content_transfer_encoding, content=content) | ||
|
||
return screenshot |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
import argparse | ||
from .parser.psr_record_parser import PSRRecordParser | ||
from .utils import save_to_json, unzip_and_read_file | ||
from ufo.utils import print_with_color | ||
import os | ||
|
||
args = argparse.ArgumentParser() | ||
args.add_argument("--request", "-r", help="The request that user want to achieve.", | ||
type=lambda s: s.strip() or None, nargs='+') | ||
args.add_argument("--behavior-record-path", "-p", help="The path for user behavior record in zip file.", | ||
type=lambda f: f if f.endswith(".zip") else None) | ||
parsed_args = args.parse_args() | ||
|
||
|
||
def main(): | ||
""" | ||
Main function. | ||
""" | ||
|
||
# Temporarily hardcode the output file path, will move to a config file later | ||
output_file = '{prefix}\\vectordb\\records\\log\\{file_name}.json'.format( | ||
prefix=os.getcwd(), | ||
file_name=parsed_args.request[0].replace(' ', '_') | ||
) | ||
try: | ||
content = unzip_and_read_file(parsed_args.behavior_record_path) | ||
record = PSRRecordParser(content).parse_to_record() | ||
record.set_request(parsed_args.request[0]) | ||
save_to_json(record.__dict__, output_file) | ||
print_with_color( | ||
f"Record process successfully. The record is saved to {output_file}.", "green") | ||
except ValueError as e: | ||
print_with_color(str(e), "red") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
|
||
import zipfile | ||
import json | ||
import os | ||
|
||
|
||
def unzip_and_read_file(file_path: str) -> str: | ||
""" | ||
Unzip the file and read the content of the extracted file. | ||
file_path: the path of the pending zip file. | ||
return: the content of the extracted file. | ||
""" | ||
extracted_file = unzip_file(file_path) | ||
with open(extracted_file, 'r', encoding='utf-8') as file: | ||
content = file.read() | ||
return content | ||
|
||
|
||
def unzip_file(zip_file_path: str) -> str: | ||
""" | ||
Unzip the file and return the path of the extracted file. | ||
zip_file_path: the path of the pending zip file. | ||
return: the path of the extracted file. | ||
""" | ||
folder_name = os.path.splitext(zip_file_path)[0] | ||
|
||
# Create the directory if it doesn't exist | ||
if not os.path.exists(folder_name): | ||
os.makedirs(folder_name) | ||
|
||
# Extract the contents of the ZIP file into the directory | ||
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: | ||
zip_ref.extractall(folder_name) | ||
|
||
extracted_file = os.path.join(folder_name, os.listdir(folder_name)[0]) | ||
return extracted_file | ||
|
||
|
||
def save_to_json(data: dict, output_file_path: str): | ||
""" | ||
Save the data to a JSON file. | ||
data: the data to save. | ||
output_file_path: the path of the output file. | ||
""" | ||
|
||
# Extract the directory path from the file path | ||
directory = os.path.dirname(output_file_path) | ||
|
||
# Check if the directory exists, if not, create it | ||
if not os.path.exists(directory): | ||
os.makedirs(directory) | ||
|
||
with open(output_file_path, 'w') as file: | ||
json.dump(data, file, indent=4) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,4 +11,5 @@ PyYAML==6.0.1 | |
Requests==2.31.0 | ||
faiss-cpu==1.8.0 | ||
lxml==5.1.0 | ||
psutil==5.9.8 | ||
psutil==5.9.8 | ||
beautifulsoup4==4.12.3 |