-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathutils.py
112 lines (86 loc) · 3.92 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from datetime import timedelta
from typing import List
import time
import pandas as pd
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.vectorstores import VectorStore
# gpt-3.5-turbo
_DEFAULT_TPM_LIMIT = 60000
_DEFAULT_RATE_LIMIT_INTERVAL = timedelta(seconds=10)
_INITIAL_TOKEN_USAGE = 0
def documents_to_df(content_column_name: str,
documents: List[Document],
embeddings_model: Embeddings = None,
with_embeddings: bool = False) -> pd.DataFrame:
"""
Given a list of documents, convert it to a dataframe.
:param content_column_name: str
:param documents: List[Document]
:param embeddings_model: Embeddings
:param with_embeddings: bool
:return: pd.DataFrame
"""
df = pd.DataFrame([doc.metadata for doc in documents])
df[content_column_name] = [doc.page_content for doc in documents]
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'], errors='coerce')
# Reordering the columns to have the content column first.
df = df[[content_column_name] + [col for col in df.columns if col != content_column_name]]
if with_embeddings:
df["embeddings"] = embeddings_model.embed_documents(df[content_column_name].tolist())
return df
class VectorStoreOperator:
"""
Encapsulates the logic for adding documents to a vector store with rate limiting.
"""
def __init__(self,
vector_store: VectorStore,
embeddings_model: Embeddings,
documents: List[Document] = None,
token_per_minute_limit: int = _DEFAULT_TPM_LIMIT,
rate_limit_interval: timedelta = _DEFAULT_RATE_LIMIT_INTERVAL,
):
self.documents = documents
self.embeddings_model = embeddings_model
self.token_per_minute_limit = token_per_minute_limit
self.rate_limit_interval = rate_limit_interval
self.current_token_usage = _INITIAL_TOKEN_USAGE
self._vector_store = None
self.verify_vector_store(vector_store, documents)
def verify_vector_store(self, vector_store, documents):
if documents:
self._add_documents_to_store(documents, vector_store)
elif isinstance(vector_store, VectorStore):
# checking is it instance or subclass instance
self._vector_store = vector_store
elif issubclass(vector_store, VectorStore):
# checking is it an uninstantiated subclass of VectorStore i.e. Chroma or PGVector
raise ValueError("If not documents are provided, an instantiated vector_store must be provided")
@property
def vector_store(self):
return self._vector_store
@staticmethod
def _calculate_token_usage(document):
return len(document.page_content)
def _rate_limit(self):
if self.current_token_usage >= self.token_per_minute_limit:
time.sleep(self.rate_limit_interval.total_seconds())
self.current_token_usage = _INITIAL_TOKEN_USAGE
def _update_token_usage(self, document: Document):
self._rate_limit()
self.current_token_usage += self._calculate_token_usage(document)
def _add_document(self, document: Document):
self._update_token_usage(document)
self.vector_store.add_documents([document])
def _add_documents_to_store(self, documents: List[Document], vector_store: VectorStore):
self._init_vector_store(documents, vector_store)
self.add_documents(documents)
def _init_vector_store(self, documents: List[Document], vector_store: VectorStore):
if len(documents) > 0:
self._vector_store = vector_store.from_documents(
documents=[documents[0]], embedding=self.embeddings_model
)
def add_documents(self, documents: List[Document]):
for document in documents:
self._add_document(document)