Skip to content
This repository has been archived by the owner on Jun 12, 2024. It is now read-only.

Commit

Permalink
fix: parsing method updated (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsdanielpark committed Mar 17, 2024
1 parent 3999fc3 commit ec5f639
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 40 deletions.
4 changes: 2 additions & 2 deletions gemini/src/model/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class GeminiCandidate(BaseModel):

rcid: str
text: str
code: List[str] = []
code: Dict = {}
web_images: List[GeminiImage] = []
generated_images: List[GeminiImage] = []
response_dict: Dict = {}
Expand Down Expand Up @@ -39,7 +39,7 @@ def text(self) -> str:
return self.candidates[self.chosen].text

@property
def code(self) -> str:
def code(self) -> Optional[Dict]:
"""The text of the chosen candidate."""
return self.candidates[self.chosen].code

Expand Down
90 changes: 52 additions & 38 deletions gemini/src/model/parser/response_parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from typing import Dict
from typing import Dict, List
from gemini.src.model.parser.base import BaesParser
from gemini.src.misc.utils import extract_code

Expand Down Expand Up @@ -88,9 +88,10 @@ def _extract_body(self, response_text: str) -> Dict:
Dict: The extracted body.
"""
parsing_strategies = [
self.__parse_strategy_1,
self.__parse_strategy_2,
self.__parse_strategy_3,
self.__extract_strategy_1,
self.__extract_strategy_2,
self.__extract_strategy_3,
self.__extract_strategy_4,
]

for strategy in parsing_strategies:
Expand All @@ -101,13 +102,29 @@ def _extract_body(self, response_text: str) -> Dict:
): # Assuming the strategy returns None or a non-empty dict on success.
return body
except Exception as e:
# Log or print the exception if needed
print(f"Parsing failed with strategy {strategy.__name__}: {e}")
# print(f"Parsing failed with strategy {strategy.__name__}: {e}")
continue

raise ValueError("All parsing strategies failed.")

def __parse_strategy_1(self, response_text: str) -> Dict:
def __extract_strategy_1(self, response_text: str) -> Dict:
body = json.loads(json.loads(response_text.split("\n")[3])[0][2])
if not body[4]:
body = json.loads(json.loads(response_text.split("\n")[3])[4][2])
else:
raise ValueError("Invalid response data received.")
return body

def __extract_strategy_2(self, response_text: str) -> Dict:
body = json.loads(json.loads(response_text.split("\n")[2])[0][2])
if not body[4]:
body = json.loads(json.loads(response_text.split("\n")[2])[4][2])
else:
raise ValueError("Invalid response data received.")
return body


def __extract_strategy_3(self, response_text: str) -> Dict:
body = json.loads(
json.loads(response_text.lstrip("')]}'\n\n").split("\n")[1])[0][2]
)
Expand All @@ -116,16 +133,9 @@ def __parse_strategy_1(self, response_text: str) -> Dict:
json.loads(response_text.lstrip("')]}'\n\n").split("\n")[1])[4][2]
)
return body


def __parse_strategy_2(self, response_text: str) -> Dict:
body = json.loads(json.loads(response_text.text.split("\n")[2])[0][2])
if not body[4]:
body = json.loads(json.loads(response_text.text.split("\n")[2])[4][2])
else:
raise ValueError("Invalid response data received.")
return body

def __parse_strategy_3(self, response_text: str) -> Dict:
def __extract_strategy_4(self, response_text: str) -> Dict:
max_response = max(response_text.split("\n"), key=len)
body = json.loads(json.loads(max_response)[0][2])
if not body[4]:
Expand All @@ -146,10 +156,11 @@ def _parse_candidates(self, candidates_data: Dict) -> Dict:
for candidate_data in candidates_data:
web_images = self._parse_web_images(candidate_data[4])
generated_images = self._parse_generated_images(candidate_data[12])
codes = self._parse_code(candidate_data[1][0])
candidate_dict = {
"rcid": candidate_data[0],
"text": candidate_data[1][0],
"code": self._parse_code(candidate_data[1][0]),
"code": codes,
"web_images": web_images,
"generated_images": generated_images,
}
Expand All @@ -176,7 +187,7 @@ def _parse_web_images(self, images_data: Dict) -> Dict:
}
for image in images_data
]

def _parse_generated_images(self, images_data: Dict) -> Dict:
"""
Parses generated images data.
Expand All @@ -199,26 +210,29 @@ def _parse_generated_images(self, images_data: Dict) -> Dict:
for i, image in enumerate(images_data[7][0])
]

def _parse_code(self, text: str) -> Dict:
"""
Parses the provided text to extract code snippets and structures them similarly to how generated images are parsed.
from typing import List


def _parse_code(self, text: str) -> List[str]:
"""
Parses the provided text to extract code snippets.
This method checks if the input text is not empty and then attempts to extract
code snippets from it using the `extract_code` function. If the input text is
empty, it returns an empty list.
Parameters:
- text (str): The text from which code snippets are to be extracted.
Args:
text (str): The text from which code snippets are to be extracted.
Returns:
- List[str]: A list of extracted code snippets. Returns an empty list if no
code is found or if the input text is empty.
Returns:
Dict: A structured dictionary of extracted code snippets, with each key being a unique
identifier for the snippet. Each value is another dictionary holding the snippet
and potentially additional metadata. Returns an empty dictionary if no snippets are found.
"""
if not text:
return {}
extracted_code = extract_code(text)
code_dict = {}

if isinstance(extracted_code, str) and extracted_code != text:
code_dict["snippett_01"] = extracted_code
elif isinstance(extracted_code, list):
for i, snippet in enumerate(extracted_code):
code_dict[f"snippett_0{i+1}"] = snippet
else:
return {}

"""
if not text:
return []
return extract_code(text)
return code_dict

0 comments on commit ec5f639

Please sign in to comment.