Skip to content

Commit

Permalink
refactored response parsing to make it a lot easier to understand and…
Browse files Browse the repository at this point in the history
… add filters
  • Loading branch information
SubhadityaMukherjee committed Jul 12, 2024
1 parent 61b3618 commit eb443a4
Show file tree
Hide file tree
Showing 6 changed files with 1,045 additions and 960 deletions.
1,711 changes: 890 additions & 821 deletions docs/developer tutorials/train and evaluate models.ipynb

Large diffs are not rendered by default.

25 changes: 11 additions & 14 deletions frontend/ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,20 @@
st.session_state["query"] = query
st.session_state["query_type"] = query_type

response = {"initial_response": None}

# Submit button logic
if st.button("Submit"):
with st.spinner("Waiting for results..."):

response = fetch_response(query_type, query)

if response["initial_response"] is not None:
if query_type == "Dataset":
with st.spinner("Using an LLM to find the most relevant information..."):
llm_response = fetch_llm_response(query)
initial_response = parse_and_update_response(query_type, response, llm_response, data_metadata, flow_metadata)
else:
initial_response = parse_and_update_response(query_type, response, None, data_metadata, flow_metadata)

display_results(initial_response)
response_parser = ResponseParser(query_type)
if query_type == "Dataset":
with st.spinner("Waiting for results..."):
# get rag response
response_parser.fetch_rag_response(query_type, query)
# get llm response
response_parser.fetch_llm_response(query)
# get updated columns based on llm response
results = response_parser.parse_and_update_response(data_metadata)
# display results in a table
display_results(results)

with st.form("fb_form"):
streamlit_feedback(
Expand Down
238 changes: 117 additions & 121 deletions frontend/ui_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,9 @@
import requests
import streamlit as st

# load paths from paths.json
with open("paths.json", "r") as file:
paths = json.load(file)

def feedback_cb():
"""
Description: Callback function to save feedback to a file
Input: None
Returns: None
"""
file_path = "feedback.json"

Expand All @@ -35,132 +27,136 @@ def feedback_cb():
with open(file_path, "w") as file:
json.dump(data, file, indent=4)


def parse_llm_response(response):
def display_results(initial_response):
"""
Description: Parse the answers from the LLM response
Input: response (dict)
Returns: size (str), missing (str), classification (str), sort (str)
Description: Display the results in a DataFrame
"""
size, missing, classification, uploader = response["answers"]
# Split size and sort if there is a comma
size, sort = size.split(",") if "," in size else (size, None)

# split uploader by = to get the name
if uploader != "none":
uploader = uploader.split("=")[1].strip()
return size, missing, classification, sort, uploader

st.write("Results:")
st.dataframe(initial_response)

def update_subset_cols(size, missing, classification, uploader):
"""
Description: Update the subset columns based on LLM's response
Input: size (str), missing (str), classification (str)
Returns: cols (list)
class LLMResponseParser:
"""
cols = ["did", "name"]
if size == "yes":
cols.append("NumberOfInstances")
if missing == "yes":
cols.append("NumberOfMissingValues")
if classification != "none":
cols.append("NumberOfClasses")
if uploader != "none":
cols.append("uploader")
return cols


def filter_initial_response(response, classification, uploader):
Description: Parse the response from the LLM service and update the columns based on the response
"""
Description: Filter the initial response based on the classification
def __init__(self, llm_response):
self.llm_response = llm_response
self.subset_cols = ["did", "name"]
self.size_sort = None
self.classification_type = None
self.uploader_name = None

Input: response (DataFrame), classification (str)
def process_size_attribute(self, attr_size):
size, sort = attr_size.split(",") if "," in attr_size else (attr_size, None)
if size == "yes":
self.subset_cols.append("NumberOfInstances")
if sort:
self.size_sort = sort

Returns: response (DataFrame)
"""
if classification != "none":
if "multi" in classification:
response = response[response["NumberOfClasses"] > 2]
elif "binary" in classification:
response = response[response["NumberOfClasses"] == 2]
if uploader != "none":
try:
uploader = int(uploader)
response = response[response["uploader"] == uploader]
except:
pass
return response


def fetch_response(query_type, query):
"""
Description: Fetch the response from the FastAPI service
def missing_values_attribute(self, attr_missing):
if attr_missing == "yes":
self.subset_cols.append("NumberOfMissingValues")

Input: query_type (str), query (str)
def classification_type_attribute(self, attr_classification):
if attr_classification != "none":
self.subset_cols.append("NumberOfClasses")
self.classification_type = attr_classification

Returns: response (dict)
"""
rag_response_path = paths["rag_response"]
try:
response = requests.get(
f"{rag_response_path['docker']}{query_type.lower()}/{query}",
json={"query": query, "type": query_type.lower()},
).json()
except:
response = requests.get(
f"{rag_response_path['local']}{query_type.lower()}/{query}",
json={"query": query, "type": query_type.lower()},
).json()
return response

def fetch_llm_response(query):
"""
Description: Fetch the response from the LLM service
def uploader_attribute(self, attr_uploader):
if attr_uploader != "none":
self.subset_cols.append("uploader")
self.uploader_name = attr_uploader.split("=")[1].strip()

Input: query (str)
def get_attributes_from_response(self):
attribute_processors = {
"size_of_dataset": self.process_size_attribute,
"missing_values": self.missing_values_attribute,
"classification_type": self.classification_type_attribute,
"uploader": self.uploader_attribute
}

for attribute, value in self.llm_response.items():
if attribute in attribute_processors:
attribute_processors[attribute](value)

Returns: llm_response (dict)
"""
llm_response_path = paths["llm_response"]
try:
llm_response = requests.get(f"{llm_response_path['docker']}{query}").json()
except:
llm_response = requests.get(f"{llm_response_path['local']}{query}").json()
return llm_response

def parse_and_update_response(query_type, response, llm_response, data_metadata, flow_metadata):
"""
Description: Parse and update the response based on the query type
def update_subset_cols(self, metadata):
"""
Description: Filter the metadata based on the updated subset columns and extra conditions
"""
if self.classification_type is not None:
if "multi" in self.classification_type:
metadata = metadata[metadata["NumberOfClasses"] > 2]
elif "binary" in self.classification_type:
metadata = metadata[metadata["NumberOfClasses"] == 2]
if self.uploader_name is not None:
try:
uploader = int(self.uploader_name)
metadata = metadata[metadata["uploader"] == uploader]
except:
pass

return metadata[self.subset_cols]

class ResponseParser:
def __init__(self, query_type):
self.query_type = query_type
self.paths = self.load_paths()
self.rag_response = None
self.llm_response = None
self.apply_llm_before_rag = False

Input: query_type (str), response (dict), llm_response (dict), data_metadata (DataFrame), flow_metadata (DataFrame)
def load_paths(self):
"""
Description: Load paths from paths.json
"""
with open("paths.json", "r") as file:
return json.load(file)

Returns: initial_response (DataFrame)
"""
if query_type == "Dataset":
initial_response = data_metadata[data_metadata["did"].isin(response["initial_response"])]
subset_cols = ["did", "name"]
def fetch_llm_response(self, query):
"""
Description: Fetch the response from the LLM service as a json
"""
llm_response_path = self.paths["llm_response"]
try:
dataset_size, dataset_missing, dataset_classification, dataset_sort, uploader = parse_llm_response(llm_response)
subset_cols = update_subset_cols(dataset_size, dataset_missing, dataset_classification, uploader)
initial_response = filter_initial_response(initial_response, dataset_classification, uploader)
except Exception as e:
st.error(f"Error processing LLM response: {e}")
initial_response = initial_response[subset_cols]
else:
initial_response = flow_metadata[flow_metadata["id"].isin(response["initial_response"])]
return initial_response

def display_results(initial_response):
"""
Description: Display the results in a DataFrame
self.llm_response = requests.get(f"{llm_response_path['docker']}{query}").json()
except:
self.llm_response = requests.get(f"{llm_response_path['local']}{query}").json()
return self.llm_response

Input: initial_response (DataFrame)
def fetch_rag_response(self, query_type, query):
"""
Description: Fetch the response from the FastAPI service
Input: query_type (str), query (str)
Returns: response (dict)
"""
rag_response_path = self.paths["rag_response"]
try:
self.rag_response = requests.get(
f"{rag_response_path['docker']}{query_type.lower()}/{query}",
json={"query": query, "type": query_type.lower()},
).json()
except:
self.rag_response = requests.get(
f"{rag_response_path['local']}{query_type.lower()}/{query}",
json={"query": query, "type": query_type.lower()},
).json()
return self.rag_response

Returns: None
"""
st.write("Results:")
st.dataframe(initial_response)
def parse_and_update_response(self, metadata):
"""
Description: Parse the response from the RAG and LLM services and update the metadata based on the response
"""
if self.apply_llm_before_rag == False:
if self.rag_response is not None and self.llm_response is not None:
filtered_metadata = metadata[metadata["did"].isin(self.rag_response["initial_response"])]
llm_parser = LLMResponseParser(self.llm_response)

if self.query_type == "Dataset":
llm_parser.get_attributes_from_response()
return llm_parser.update_subset_cols(filtered_metadata)
else:
return metadata
else:
return NotImplementedError

4 changes: 2 additions & 2 deletions llm_service/llm_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ async def get_llm_query(query: str):
"""
query = query.replace("%20", " ")
response = chain.invoke({"query": query})
answers = parse_answers_initial(response, patterns)
return JSONResponse(content={"answers": answers})
answers = parse_answers_initial(response, patterns, prompt_dict)
return JSONResponse(content=answers)
9 changes: 7 additions & 2 deletions llm_service/llm_service_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def create_chain(prompt, model="llama3", temperature=0):
return prompt | llm | StrOutputParser()


def parse_answers_initial(response, patterns):
def parse_answers_initial(response, patterns, prompt_dict):
"""
Description: Parse the answers from the initial response
Expand Down Expand Up @@ -48,4 +48,9 @@ def parse_answers_initial(response, patterns):
answers.append(potential_answer)
break # Stop checking other patterns if a match is found

return answers
# return answers as a dict using the prompt_dict keys
answers_dict = {}
for i, key in enumerate(prompt_dict.keys()):
answers_dict[key] = answers[i]

return answers_dict
18 changes: 18 additions & 0 deletions start_llm_service.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
killall ollama
killall streamlit
# Define a file to store the PIDs
PID_FILE="processes.pid"

# Start processes and save their PIDs
cd ollama
./get_ollama.sh &
echo $! > $PID_FILE

cd ../llm_service
uvicorn llm_service:app --host 0.0.0.0 --port 8081 &
echo $! > $PID_FILE

cd ..
# Keep the script running to maintain the background processes
wait

0 comments on commit eb443a4

Please sign in to comment.