Skip to content

Commit

Permalink
Refactor web retriever (#1102)
Browse files Browse the repository at this point in the history
  • Loading branch information
Spycsh authored Jan 8, 2025
1 parent 4480d80 commit 962e097
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 152 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/docker/compose/web_retrievers-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# this file should be run in the root of the repo
services:
web-retriever-chroma:
web-retriever:
build:
dockerfile: comps/web_retrievers/chroma/langchain/Dockerfile
image: ${REGISTRY:-opea}/web-retriever-chroma:${TAG:-latest}
dockerfile: comps/web_retrievers/src/Dockerfile
image: ${REGISTRY:-opea}/web-retriever:${TAG:-latest}
134 changes: 0 additions & 134 deletions comps/web_retrievers/chroma/langchain/retriever_chroma.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ COPY comps /home/user/comps

RUN pip install --no-cache-dir --upgrade pip setuptools && \
if [ ${ARCH} = "cpu" ]; then \
pip install --no-cache-dir --extra-index-url https://download.pytorch.org/whl/cpu -r /home/user/comps/web_retrievers/chroma/langchain/requirements.txt; \
pip install --no-cache-dir --extra-index-url https://download.pytorch.org/whl/cpu -r /home/user/comps/web_retrievers/src/requirements.txt; \
else \
pip install --no-cache-dir -r /home/user/comps/web_retrievers/chroma/langchain/requirements.txt; \
pip install --no-cache-dir -r /home/user/comps/web_retrievers/src/requirements.txt; \
fi

ENV PYTHONPATH=$PYTHONPATH:/home/user

WORKDIR /home/user/comps/web_retrievers/chroma/langchain
WORKDIR /home/user/comps/web_retrievers/src

ENTRYPOINT ["python", "retriever_chroma.py"]
ENTRYPOINT ["python", "opea_web_retrievers_microservice.py"]
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ The Web Retriever Microservice is designed to efficiently search web pages relev

```bash
cd ../../../../
docker build -t opea/web-retriever-chroma:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/web_retrievers/chroma/langchain/Dockerfile .
docker build -t opea/web-retriever:latest --build-arg https_proxy=$https_proxy --build-arg http_proxy=$http_proxy -f comps/web_retrievers/src/Dockerfile .
```

### Start TEI Service

```bash
model=BAAI/bge-base-en-v1.5
volume=$PWD/data
docker run -d -p 6060:80 -v $volume:/data -e http_proxy=$http_proxy -e https_proxy=$https_proxy --pull always ghcr.io/huggingface/text-embeddings-inference:cpu-1.5 --model-id $model
docker run -d -p 6060:80 -v $volume:/data -e http_proxy=$http_proxy -e https_proxy=$https_proxy --pull always ghcr.io/huggingface/text-embeddings-inference:cpu-1.5 --model-id $model --auto-truncate
```

### Start Web Retriever Service
Expand All @@ -31,7 +31,7 @@ export GOOGLE_CSE_ID=xxx
```

```bash
docker run -d --name="web-retriever-chroma-server" -p 7077:7077 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e no_proxy=$no_proxy -e TEI_EMBEDDING_ENDPOINT=$TEI_EMBEDDING_ENDPOINT -e GOOGLE_API_KEY=$GOOGLE_API_KEY -e GOOGLE_CSE_ID=$GOOGLE_CSE_ID opea/web-retriever-chroma:latest
docker run -d --name="web-retriever-server" -p 7077:7077 --ipc=host -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e no_proxy=$no_proxy -e TEI_EMBEDDING_ENDPOINT=$TEI_EMBEDDING_ENDPOINT -e GOOGLE_API_KEY=$GOOGLE_API_KEY -e GOOGLE_CSE_ID=$GOOGLE_CSE_ID opea/web-retriever:latest
```

### Consume Web Retriever Service
Expand All @@ -44,6 +44,6 @@ your_embedding=$(python -c "import random; embedding = [random.uniform(-1, 1) fo

http_proxy= curl http://${your_ip}:7077/v1/web_retrieval \
-X POST \
-d "{\"text\":\"What is black myth wukong?\",\"embedding\":${your_embedding}}" \
-d "{\"text\":\"What is The Game of the Year 2024?\",\"embedding\":${your_embedding},\"k\":4}" \
-H 'Content-Type: application/json'
```
File renamed without changes.
140 changes: 140 additions & 0 deletions comps/web_retrievers/src/integrations/google_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import os
import time

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import AsyncHtmlLoader
from langchain_community.document_transformers import Html2TextTransformer
from langchain_community.utilities import GoogleSearchAPIWrapper
from langchain_community.vectorstores import Chroma
from langchain_huggingface import HuggingFaceEndpointEmbeddings

from comps import (
CustomLogger,
EmbedDoc,
OpeaComponent,
OpeaComponentRegistry,
SearchedDoc,
ServiceType,
TextDoc,
statistics_dict,
)

logger = CustomLogger("opea_google_search")
logflag = os.getenv("LOGFLAG", False)


@OpeaComponentRegistry.register("OPEA_GOOGLE_SEARCH")
class OpeaGoogleSearch(OpeaComponent):
"""A specialized Web Retrieval component derived from OpeaComponent for Google web retriever services."""

def __init__(self, name: str, description: str, config: dict = None):
self.google_api_key = os.environ.get("GOOGLE_API_KEY")
self.google_cse_id = os.environ.get("GOOGLE_CSE_ID")
self.text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=50)
# Create vectorstore
self.tei_embedding_endpoint = os.getenv("TEI_EMBEDDING_ENDPOINT")
health_status = self.check_health()
if not health_status:
logger.error("OpeaGoogleSearch health check failed.")

super().__init__(name, ServiceType.WEB_RETRIEVER.name.lower(), description, config)

def get_urls(self, query, num_search_result=1):
result = self.search.results(query, num_search_result)
return result

def dump_docs(self, docs):
batch_size = 32
for i in range(0, len(docs), batch_size):
self.vector_db.add_documents(docs[i : i + batch_size])

def retrieve_htmls(self, all_urls):
loader = AsyncHtmlLoader(all_urls, ignore_load_errors=True, trust_env=True)
docs = loader.load()
return docs

def parse_htmls(self, docs):
if logflag:
logger.info("Indexing new urls...")

html2text = Html2TextTransformer()
docs = list(html2text.transform_documents(docs))
docs = self.text_splitter.split_documents(docs)

return docs

async def invoke(self, input: EmbedDoc) -> SearchedDoc:
"""Involve the Google search service to retrieve the documents related to the prompt."""
# Read the uploaded file
if logflag:
logger.info(input)
start = time.time()
query = input.text
embedding = input.embedding

# Google Search the results, parse the htmls
search_results = self.get_urls(query=query, num_search_result=input.k)
urls_to_look = []
for res in search_results:
if res.get("link", None):
urls_to_look.append(res["link"])
urls = list(set(urls_to_look))
if logflag:
logger.info(f"urls: {urls}")
docs = self.retrieve_htmls(urls)
docs = self.parse_htmls(docs)
if logflag:
logger.info(docs)
# Remove duplicated docs
unique_documents_dict = {(doc.page_content, tuple(sorted(doc.metadata.items()))): doc for doc in docs}
unique_documents = list(unique_documents_dict.values())
statistics_dict["opea_service@search"].append_latency(time.time() - start, None)

# dump to vector_db
self.dump_docs(unique_documents)

# Do the retrieval
search_res = await self.vector_db.asimilarity_search_by_vector(embedding=embedding, k=input.k)

searched_docs = []

for r in search_res:
# include the metadata into the retrieved docs content
description_str = f"\n description: {r.metadata['description']} \n" if "description" in r.metadata else ""
title_str = f"\n title: {r.metadata['title']} \n" if "title" in r.metadata else ""
source_str = f"\n source: {r.metadata['source']} \n" if "source" in r.metadata else ""
text_with_meta = f"{r.page_content} {description_str} {title_str} {source_str}"
searched_docs.append(TextDoc(text=text_with_meta))

result = SearchedDoc(retrieved_docs=searched_docs, initial_query=query)
statistics_dict["opea_service@web_retriever"].append_latency(time.time() - start, None)

# For Now history is banned
if self.vector_db.get()["ids"]:
self.vector_db.delete(self.vector_db.get()["ids"])
if logflag:
logger.info(result)
return result

def check_health(self) -> bool:
"""Checks the health of the embedding service.
Returns:
bool: True if the service is reachable and healthy, False otherwise.
"""
try:
self.search = GoogleSearchAPIWrapper(
google_api_key=self.google_api_key, google_cse_id=self.google_cse_id, k=10
)
# vectordb_persistent_directory = os.getenv("VECTORDB_PERSISTENT_DIR", "/home/user/chroma_db_oai")
self.vector_db = Chroma(
embedding_function=HuggingFaceEndpointEmbeddings(model=self.tei_embedding_endpoint),
# persist_directory=vectordb_persistent_directory
)
except Exception as e:
logger.error(e)
return False
return True
59 changes: 59 additions & 0 deletions comps/web_retrievers/src/opea_web_retrievers_microservice.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import os
import time

from integrations.google_search import OpeaGoogleSearch

from comps import (
CustomLogger,
EmbedDoc,
OpeaComponentLoader,
SearchedDoc,
ServiceType,
opea_microservices,
register_microservice,
register_statistics,
statistics_dict,
)

logger = CustomLogger("opea_web_retriever_microservice")
logflag = os.getenv("LOGFLAG", False)

web_retriever_component_name = os.getenv("WEB_RETRIEVER_NAME", "OPEA_GOOGLE_SEARCH")
# Initialize OpeaComponentLoader
loader = OpeaComponentLoader(
web_retriever_component_name, description=f"OPEA WEB RETRIEVER Component: {web_retriever_component_name}"
)


@register_microservice(
name="opea_service@web_retriever",
service_type=ServiceType.WEB_RETRIEVER,
endpoint="/v1/web_retrieval",
host="0.0.0.0",
port=7077,
input_datatype=EmbedDoc,
output_datatype=SearchedDoc,
)
@register_statistics(names=["opea_service@web_retriever", "opea_service@search"])
async def web_retriever(input: EmbedDoc) -> SearchedDoc:
start = time.time()

try:
# Use the loader to invoke the active component
res = await loader.invoke(input)
if logflag:
logger.info(res)
statistics_dict["opea_service@web_retriever"].append_latency(time.time() - start, None)
return res

except Exception as e:
logger.error(f"Error during web retriever invocation: {e}")
raise


if __name__ == "__main__":
logger.info("OPEA Web Retriever Microservice is starting....")
opea_microservices["opea_service@web_retriever"].start()
File renamed without changes.
Loading

0 comments on commit 962e097

Please sign in to comment.