Skip to content

Commit

Permalink
Fix load from pdf component (#778)
Browse files Browse the repository at this point in the history
PR that fixes the load from PDF component. Previously the spec was
passed as an argument to the component but with the new changes we only
need to pass in the `produces` section of the spec
  • Loading branch information
PhilippeMoussalli committed Jan 15, 2024
1 parent 0c92737 commit 2738aad
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
14 changes: 8 additions & 6 deletions components/load_from_pdf/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,25 @@
import fsspec as fs
import pandas as pd
from fondant.component import DaskLoadComponent
from fondant.core.component_spec import OperationSpec
from fondant.core.schema import Field

logger = logging.getLogger(__name__)


class PDFReader(DaskLoadComponent):
def __init__(
self,
spec: OperationSpec,
produces: t.Dict[str, Field],
*,
pdf_path: str,
n_rows_to_load: t.Optional[int] = None,
index_column: t.Optional[str] = None,
n_partitions: t.Optional[int] = None,
**kwargs,
) -> None:
"""
Args:
spec: the operation spec for the component
produces: The schema the component should produce
pdf_path: Path to the PDF file
n_rows_to_load: optional argument that defines the number of rows to load.
Useful for testing pipeline runs on a small scale.
Expand All @@ -33,8 +34,9 @@ def __init__(
n_partitions: Number of partitions of the dask dataframe. If not specified, the number
of partitions will be equal to the number of CPU cores. Set to high values if
the data is large and the pipeline is running out of memory.
kwargs: Unhandled keyword arguments passed in by Fondant.
"""
self.spec = spec
self.produces = produces
self.pdf_path = pdf_path
self.n_rows_to_load = n_rows_to_load
self.index_column = index_column
Expand All @@ -61,7 +63,7 @@ def _set_unique_index(dataframe: pd.DataFrame, partition_info=None):

def _get_meta_df() -> pd.DataFrame:
meta_dict = {"id": pd.Series(dtype="object")}
for field_name, field in self.spec.inner_produces.items():
for field_name, field in self.produces.items():
meta_dict[field_name] = pd.Series(
dtype=pd.ArrowDtype(field.type.value),
)
Expand Down Expand Up @@ -112,7 +114,7 @@ def load(self) -> dd.DataFrame:
)

meta_dict = {}
for field_name, field in self.spec.inner_produces.items():
for field_name, field in self.produces.items():
meta_dict[field_name] = pd.Series(
dtype=pd.ArrowDtype(field.type.value),
)
Expand Down
10 changes: 6 additions & 4 deletions components/load_from_pdf/tests/component_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ def test_pdf_reader():
the papers from Arxiv.
"""
with open(Path(__file__).with_name("fondant_component.yaml")) as f:
print(f.name)
spec = ComponentSpec(yaml.safe_load(f))

spec = OperationSpec(spec)

pdf_path = ["tests/test_file/dummy.pdf", "tests/test_folder"]

for path in pdf_path:
component = PDFReader(
spec=spec,
produces=dict(spec.inner_produces),
pdf_path=path,
n_rows_to_load=None,
index_column=None,
Expand All @@ -37,9 +37,11 @@ def test_pdf_reader():
assert output_dataframe["text"].tolist() == ["Dummy PDF file\n"]
else:
assert output_dataframe.shape == (2, 3)
assert output_dataframe["file_name"].tolist() == [
"dummy_2.pdf",
file_names = output_dataframe["file_name"].tolist()
file_names.sort()
assert file_names == [
"dummy_1.pdf",
"dummy_2.pdf",
]
assert output_dataframe["text"].tolist() == [
"Dummy PDF file\n",
Expand Down

0 comments on commit 2738aad

Please sign in to comment.