Skip to content

Commit

Permalink
Merge pull request arc53#1152 from siiddhantt/feature/sync-remote
Browse files Browse the repository at this point in the history
feat: sync remote sources
  • Loading branch information
dartpain authored Sep 25, 2024
2 parents 21f46a8 + 3d292aa commit bc2241f
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 88 deletions.
68 changes: 47 additions & 21 deletions application/api/internal/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,20 @@
from bson.objectid import ObjectId

from application.core.settings import settings

mongo = MongoClient(settings.MONGO_URI)
db = mongo["docsgpt"]
conversations_collection = db["conversations"]
sources_collection = db["sources"]

current_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
current_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)


internal = Blueprint("internal", __name__)


internal = Blueprint('internal', __name__)
@internal.route("/api/download", methods=["get"])
def download_file():
user = secure_filename(request.args.get("user"))
Expand All @@ -24,7 +29,6 @@ def download_file():
return send_from_directory(save_dir, filename, as_attachment=True)



@internal.route("/api/upload_index", methods=["POST"])
def upload_index_files():
"""Upload two files(index.faiss, index.pkl) to the user's folder."""
Expand All @@ -38,7 +42,8 @@ def upload_index_files():
retriever = secure_filename(request.form["retriever"])
id = secure_filename(request.form["id"])
type = secure_filename(request.form["type"])
remote_data = secure_filename(request.form["remote_data"]) if "remote_data" in request.form else None
remote_data = request.form["remote_data"] if "remote_data" in request.form else None
sync_frequency = secure_filename(request.form["sync_frequency"])

save_dir = os.path.join(current_dir, "indexes", str(id))
if settings.VECTOR_STORE == "faiss":
Expand All @@ -55,24 +60,45 @@ def upload_index_files():
if file_pkl.filename == "":
return {"status": "no file name"}
# saves index files

if not os.path.exists(save_dir):
os.makedirs(save_dir)
file_faiss.save(os.path.join(save_dir, "index.faiss"))
file_pkl.save(os.path.join(save_dir, "index.pkl"))
# create entry in sources_collection
sources_collection.insert_one(
{
"_id": ObjectId(id),
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data
}
)
return {"status": "ok"}

existing_entry = sources_collection.find_one({"_id": ObjectId(id)})
if existing_entry:
sources_collection.update_one(
{"_id": ObjectId(id)},
{
"$set": {
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
}
},
)
else:
sources_collection.insert_one(
{
"_id": ObjectId(id),
"user": user,
"name": job_name,
"language": job_name,
"date": datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
"model": settings.EMBEDDINGS_NAME,
"type": type,
"tokens": tokens,
"retriever": retriever,
"remote_data": remote_data,
"sync_frequency": sync_frequency,
}
)
return {"status": "ok"}
35 changes: 29 additions & 6 deletions application/api/user/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,13 @@ def combined_json():
data.append(
{
"id": str(index["_id"]),
"name": index["name"],
"date": index["date"],
"name": index.get("name"),
"date": index.get("date"),
"model": settings.EMBEDDINGS_NAME,
"location": "local",
"tokens": index["tokens"] if ("tokens" in index.keys()) else "",
"retriever": (
index["retriever"] if ("retriever" in index.keys()) else "classic"
),
"tokens": index.get("tokens", ""),
"retriever": index.get("retriever", "classic"),
"syncFrequency": index.get("sync_frequency", ""),
}
)
if "duckduck_search" in settings.RETRIEVERS_ENABLED:
Expand Down Expand Up @@ -1157,3 +1156,27 @@ def get_user_logs():
),
200,
)


@user.route("/api/manage_sync", methods=["POST"])
def manage_sync():
data = request.get_json()
source_id = data.get("source_id")
sync_frequency = data.get("sync_frequency")

if sync_frequency not in ["never", "daily", "weekly", "monthly"]:
return jsonify({"status": "invalid frequency"}), 400

update_data = {"$set": {"sync_frequency": sync_frequency}}
try:
sources_collection.update_one(
{
"_id": ObjectId(source_id),
"user": "local",
},
update_data,
)
except Exception as err:
print(err)
return jsonify({"status": "error"}), 400
return jsonify({"status": "ok"}), 200
28 changes: 27 additions & 1 deletion application/api/user/tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,38 @@
from application.worker import ingest_worker, remote_worker
from datetime import timedelta

from application.celery_init import celery
from application.worker import ingest_worker, remote_worker, sync_worker


@celery.task(bind=True)
def ingest(self, directory, formats, name_job, filename, user):
resp = ingest_worker(self, directory, formats, name_job, filename, user)
return resp


@celery.task(bind=True)
def ingest_remote(self, source_data, job_name, user, loader):
resp = remote_worker(self, source_data, job_name, user, loader)
return resp


@celery.task(bind=True)
def schedule_syncs(self, frequency):
resp = sync_worker(self, frequency)
return resp


@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(
timedelta(days=1),
schedule_syncs.s("daily"),
)
sender.add_periodic_task(
timedelta(weeks=1),
schedule_syncs.s("weekly"),
)
sender.add_periodic_task(
timedelta(days=30),
schedule_syncs.s("monthly"),
)
11 changes: 6 additions & 5 deletions application/parser/open_ai_func.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import os

from application.vectorstore.vector_creator import VectorCreator
from application.core.settings import settings
from retry import retry

from application.core.settings import settings

from application.vectorstore.vector_creator import VectorCreator


# from langchain_community.embeddings import HuggingFaceEmbeddings
# from langchain_community.embeddings import HuggingFaceInstructEmbeddings
Expand All @@ -12,7 +14,7 @@

@retry(tries=10, delay=60)
def store_add_texts_with_retry(store, i, id):
# add source_id to the metadata
# add source_id to the metadata
i.metadata["source_id"] = str(id)
store.add_texts([i.page_content], metadatas=[i.metadata])
# store_pine.add_texts([i.page_content], metadatas=[i.metadata])
Expand Down Expand Up @@ -43,6 +45,7 @@ def call_openai_api(docs, folder_name, id, task_status):
source_id=str(id),
embeddings_key=os.getenv("EMBEDDINGS_KEY"),
)
store.delete_index()
# Uncomment for MPNet embeddings
# model_name = "sentence-transformers/all-mpnet-base-v2"
# hf = HuggingFaceEmbeddings(model_name=model_name)
Expand Down Expand Up @@ -70,5 +73,3 @@ def call_openai_api(docs, folder_name, id, task_status):
c1 += 1
if settings.VECTOR_STORE == "faiss":
store.save_local(f"{folder_name}")


46 changes: 23 additions & 23 deletions application/vectorstore/mongodb.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from application.vectorstore.base import BaseVectorStore
from application.core.settings import settings
from application.vectorstore.base import BaseVectorStore
from application.vectorstore.document_class import Document


class MongoDBVectorStore(BaseVectorStore):
def __init__(
self,
Expand Down Expand Up @@ -33,27 +34,24 @@ def __init__(
self._database = self._client[database]
self._collection = self._database[collection]


def search(self, question, k=2, *args, **kwargs):
query_vector = self._embedding.embed_query(question)

pipeline = [
{
"$vectorSearch": {
"queryVector": query_vector,
"queryVector": query_vector,
"path": self._embedding_key,
"limit": k,
"numCandidates": k * 10,
"limit": k,
"numCandidates": k * 10,
"index": self._index_name,
"filter": {
"source_id": {"$eq": self._source_id}
}
"filter": {"source_id": {"$eq": self._source_id}},
}
}
]

cursor = self._collection.aggregate(pipeline)

results = []
for doc in cursor:
text = doc[self._text_key]
Expand All @@ -63,30 +61,32 @@ def search(self, question, k=2, *args, **kwargs):
metadata = doc
results.append(Document(text, metadata))
return results

def _insert_texts(self, texts, metadatas):
if not texts:
return []
embeddings = self._embedding.embed_documents(texts)

to_insert = [
{self._text_key: t, self._embedding_key: embedding, **m}
for t, m, embedding in zip(texts, metadatas, embeddings)
]
# insert the documents in MongoDB Atlas

insert_result = self._collection.insert_many(to_insert)
return insert_result.inserted_ids

def add_texts(self,
texts,
metadatas = None,
ids = None,
refresh_indices = True,
create_index_if_not_exists = True,
bulk_kwargs = None,
**kwargs,):

def add_texts(
self,
texts,
metadatas=None,
ids=None,
refresh_indices=True,
create_index_if_not_exists=True,
bulk_kwargs=None,
**kwargs,
):

#dims = self._embedding.client[1].word_embedding_dimension
# dims = self._embedding.client[1].word_embedding_dimension
# # check if index exists
# if create_index_if_not_exists:
# # check if index exists
Expand Down Expand Up @@ -121,6 +121,6 @@ def add_texts(self,
if texts_batch:
result_ids.extend(self._insert_texts(texts_batch, metadatas_batch))
return result_ids

def delete_index(self, *args, **kwargs):
self._collection.delete_many({"source_id": self._source_id})
self._collection.delete_many({"source_id": self._source_id})
Loading

0 comments on commit bc2241f

Please sign in to comment.