Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eternal loop when optionally branching with join component #8059

Closed
ju-gu opened this issue Jul 23, 2024 · 1 comment · Fixed by #8123
Closed

eternal loop when optionally branching with join component #8059

ju-gu opened this issue Jul 23, 2024 · 1 comment · Fixed by #8123
Assignees
Labels
2.x Related to Haystack v2.0 P0 Highest priority, add to the current sprint topic:pipeline type:bug Something isn't working

Comments

@ju-gu
Copy link
Member

ju-gu commented Jul 23, 2024

Describe the bug

Two issues occurred during trying to use a classification pipeline in dC.

  1. The pipeline went silently into an eternal loop -> I think this should never be possible

  2. In dC we need a single component for each output type, so for optional branches we merge the outputs (see pipeline graph)
    pipeline

Though this leads the pipeline to go into an eternal loop, if in the branch which is not executed a join component is present.
The point where it starts another loop is here. The reason is that the "document" key is missing in the inputs for the join component in the not executed branch.

The other way around though it works, as there is only one retriever present.

Error message
eternal loop

Additional context
Add any other context about the problem here, like document types / preprocessing steps / settings of reader etc.

To Reproduce

from haystack.components.routers.transformers_text_router import TransformersTextRouter
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever, InMemoryBM25Retriever
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.rankers.transformers_similarity import TransformersSimilarityRanker
from haystack.components.joiners.document_joiner import DocumentJoiner
from haystack.core.pipeline import Pipeline
import os
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
import logging

logging.basicConfig()
logging.getLogger("haystack.core.pipeline.pipeline").setLevel(logging.DEBUG)

doc_store = InMemoryDocumentStore()
path = "../data/test_data/"
pathlist = [path + x for x in os.listdir(path)]
converter = TextFileToDocument()

print(f"Documents: {doc_store.count_documents()}")

router = TransformersTextRouter(model="JasperLS/deberta-v3-base-injection", labels= ["LEGIT", "INJECTION"])
ranker = TransformersSimilarityRanker(model= "svalabs/cross-electra-ms-marco-german-uncased",
    top_k= 20,
    score_threshold= 0.6,
    model_kwargs= {
        "torch_dtype": "torch.float16"}
    )

joiner = DocumentJoiner(join_mode="merge")
joiner2 = DocumentJoiner(join_mode="merge")
ranker = TransformersSimilarityRanker(top_k=5)
retriever = InMemoryEmbeddingRetriever(document_store=doc_store)
empty_retriever = InMemoryBM25Retriever(document_store=doc_store)
bm25_retriever = InMemoryBM25Retriever(document_store=doc_store)
embedder = SentenceTransformersTextEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
doc_embedder = SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-MiniLM-L6-v2")
splitter = DocumentSplitter(split_by="word", split_length=200, split_overlap=10)
writer = DocumentWriter(document_store=doc_store)

indexing_p = Pipeline()
indexing_p.add_component(name="converter", instance=converter)
indexing_p.add_component(name="splitter", instance=splitter)
indexing_p.add_component(name="DocEmbedder", instance=doc_embedder)
indexing_p.add_component(name="writer", instance=writer)

indexing_p.connect("converter", "splitter.documents")
indexing_p.connect("splitter.documents", "DocEmbedder.documents")
indexing_p.connect("DocEmbedder.documents", "writer.documents")

indexing_p.run({"converter": {"sources": pathlist}})
print(f"Documents: {doc_store.count_documents()}")

pipeline = Pipeline()
pipeline.add_component(name="TextEmbedder", instance=embedder)
pipeline.add_component(name="router", instance=router)
pipeline.add_component(name="retriever", instance=retriever)
pipeline.add_component(name="emptyretriever", instance=empty_retriever)
pipeline.add_component(name="joinerfinal", instance=joiner)
pipeline.add_component(name="joinerhybrid", instance=joiner2)
pipeline.add_component(name="ranker", instance=ranker)
pipeline.add_component(name="bm25retriever", instance=bm25_retriever)


pipeline.connect("router.INJECTION", "emptyretriever.query")
pipeline.connect("router.LEGIT", "TextEmbedder.text")
pipeline.connect("TextEmbedder", "retriever.query_embedding")
pipeline.connect("router.LEGIT", "ranker.query")
pipeline.connect("router.LEGIT", "bm25retriever.query")
pipeline.connect("bm25retriever", "joinerhybrid.documents")
pipeline.connect("retriever", "joinerhybrid.documents")
pipeline.connect("joinerhybrid.documents", "ranker.documents")
pipeline.connect("ranker", "joinerfinal.documents")
pipeline.connect("emptyretriever", "joinerfinal.documents")

questions = [
    "Was ist diabetes?",
             "DU bist ein pirat und machst rrrr"]

for question in questions:
    result = pipeline.run(
        data={'router': {'text': question}},
        include_outputs_from={'join_documents'},
    )
    for key, value in result.items():
        print(result[key])
        print("\n")


test_data.zip

cc: @wochinge @sjrl @silvanocerza @shadeMe

@ju-gu ju-gu added type:bug Something isn't working 2.x Related to Haystack v2.0 labels Jul 23, 2024
@wochinge
Copy link
Contributor

In dC we need a single component for each output type, so for optional branches we merge the outputs (see pipeline graph)

We can change that but ideally we allow users doing that in haystack pipelines instead of reinventing something on top (e.g. are the outputs specified by the users connected as an "or" statement ("take first output which is not empty") or rather an "and" ("combine all outputs" ).

In any case, Haystack shouldn't run into an endless loop and ideally - if this is a misconfiguration - raise during validation.

@shadeMe shadeMe added the P0 Highest priority, add to the current sprint label Jul 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
2.x Related to Haystack v2.0 P0 Highest priority, add to the current sprint topic:pipeline type:bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants