diff --git a/examples/json-csv-reader.py b/examples/json-csv-reader.py index 459fd99e5..faca24406 100644 --- a/examples/json-csv-reader.py +++ b/examples/json-csv-reader.py @@ -103,7 +103,8 @@ def main(): print("========================================================================") print("static CSV with header schema test parsing 3.5K objects") print("========================================================================") - static_csv_ds = DataChain.from_csv(uri, spec=ChatFeature) + static_csv_ds = DataChain.from_csv(uri, output=ChatFeature, object_name="chat") + static_csv_ds.print_schema() print(static_csv_ds.to_pandas()) uri = "gs://datachain-demo/laion-aesthetics-csv" @@ -111,7 +112,8 @@ def main(): print("========================================================================") print("dynamic CSV with header schema test parsing 3M objects") print("========================================================================") - dynamic_csv_ds = DataChain.from_csv(uri, object_name="laion", show_schema=True) + dynamic_csv_ds = DataChain.from_csv(uri, object_name="laion") + dynamic_csv_ds.print_schema() print(dynamic_csv_ds.to_pandas()) diff --git a/examples/multimodal/clip_fine_tuning.ipynb b/examples/multimodal/clip_fine_tuning.ipynb index e49cb1cd0..0e8a5cc58 100644 --- a/examples/multimodal/clip_fine_tuning.ipynb +++ b/examples/multimodal/clip_fine_tuning.ipynb @@ -127,7 +127,8 @@ "name": "stderr", "output_type": "stream", "text": [ - "Processed: 422 rows [00:00, 23553.16 rows/s]\n" + "Listing gs://datachain-demo: 423 objects [00:00, 1522.96 objects/s]\n", + "Processed: 422 rows [00:00, 16131.50 rows/s]\n" ] }, { @@ -178,7 +179,7 @@ " \n", " 0\n", " 1\n", - " 7240897025793984262\n", + " 1105792836931684037\n", " \n", " 0\n", " newyorker_caption_contest/images\n", @@ -202,7 +203,7 @@ " \n", " 1\n", " 2\n", - " 2282553912840270646\n", + " 6206810117153843293\n", " \n", " 0\n", " newyorker_caption_contest/images\n", @@ -226,7 +227,7 @@ " \n", " 2\n", " 3\n", - " 7756063507247068366\n", + " 4851141198788365394\n", " \n", " 0\n", " newyorker_caption_contest/images\n", @@ -253,9 +254,9 @@ ], "text/plain": [ " id random vtype dir_type parent \\\n", - "0 1 7240897025793984262 0 newyorker_caption_contest/images \n", - "1 2 2282553912840270646 0 newyorker_caption_contest/images \n", - "2 3 7756063507247068366 0 newyorker_caption_contest/images \n", + "0 1 1105792836931684037 0 newyorker_caption_contest/images \n", + "1 2 6206810117153843293 0 newyorker_caption_contest/images \n", + "2 3 4851141198788365394 0 newyorker_caption_contest/images \n", "\n", " name etag version is_latest \\\n", "0 101.jpeg CMeLzNiXhocDEAE= 1719848704083399 1 \n", @@ -315,7 +316,7 @@ "source": [ "### From parquet\n", "\n", - "Use `DataChain.parse_parquet()` to load data from a dataset of any number of parquet files (you can also read the data into another library like pandas and use `DataChain.from_dataframe()`). Here we use it to load the metadata about the cartoons, including the text for all caption choices." + "Use `DataChain.from_parquet()` to load data from a dataset of any number of parquet files (you can also read the data into another library like pandas and use `DataChain.from_dataframe()`). Here we use it to load the metadata about the cartoons, including the text for all caption choices." ] }, { @@ -328,12 +329,12 @@ "name": "stderr", "output_type": "stream", "text": [ - "Processed: 1 rows [00:00, 1515.28 rows/s]\n" + "Processed: 1 rows [00:00, 1636.48 rows/s]\n" ] } ], "source": [ - "meta_dc = DataChain.from_storage(\"gs://datachain-demo/newyorker_caption_contest/new_yorker_meta.parquet\").parse_parquet()" + "meta_dc = DataChain.from_parquet(\"gs://datachain-demo/newyorker_caption_contest/new_yorker_meta.parquet\")" ] }, { @@ -354,11 +355,11 @@ "name": "stderr", "output_type": "stream", "text": [ - "Processed: 1 rows [00:00, 1312.36 rows/s]\n", + "Processed: 1 rows [00:00, 418.26 rows/s]\n", "Processed: 0 rows [00:00, ? rows/s]\n", "Generated: 0 rows [00:00, ? rows/s]\u001b[A\n", - "Processed: 1 rows [00:00, 2.34 rows/s]ows/s]\u001b[A\n", - "Generated: 9792 rows [00:00, 22919.32 rows/s]\n" + "Processed: 1 rows [00:00, 1.43 rows/s]ows/s]\u001b[A\n", + "Generated: 9792 rows [00:00, 14079.60 rows/s]\n" ] }, { @@ -409,7 +410,7 @@ " \n", " 0\n", " 1\n", - " 3891531844767248658\n", + " 7819739929141727606\n", " gs://datachain-demo\n", " newyorker_caption_contest\n", " new_yorker_meta.parquet\n", @@ -433,7 +434,7 @@ " \n", " 1\n", " 2\n", - " 4531363468689655038\n", + " 5565576549161677201\n", " gs://datachain-demo\n", " newyorker_caption_contest\n", " new_yorker_meta.parquet\n", @@ -457,7 +458,7 @@ " \n", " 2\n", " 3\n", - " 842984054593741747\n", + " 8246773485257615432\n", " gs://datachain-demo\n", " newyorker_caption_contest\n", " new_yorker_meta.parquet\n", @@ -484,9 +485,9 @@ ], "text/plain": [ " id random source.file.source source.file.parent \\\n", - "0 1 3891531844767248658 gs://datachain-demo newyorker_caption_contest \n", - "1 2 4531363468689655038 gs://datachain-demo newyorker_caption_contest \n", - "2 3 842984054593741747 gs://datachain-demo newyorker_caption_contest \n", + "0 1 7819739929141727606 gs://datachain-demo newyorker_caption_contest \n", + "1 2 5565576549161677201 gs://datachain-demo newyorker_caption_contest \n", + "2 3 8246773485257615432 gs://datachain-demo newyorker_caption_contest \n", "\n", " source.file.name source.file.size source.file.version \\\n", "0 new_yorker_meta.parquet 3765680 1719847348473242 \n", @@ -569,10 +570,10 @@ "name": "stderr", "output_type": "stream", "text": [ - "Processed: 1 rows [00:00, 1562.12 rows/s]\n", + "Processed: 1 rows [00:00, 1082.40 rows/s]\n", "Processed: 0 rows [00:00, ? rows/s]\n", - "Processed: 1 rows [00:00, 2.62 rows/s]\n", - "Generated: 9792 rows [00:00, 25776.10 rows/s]\n" + "Processed: 1 rows [00:00, 2.44 rows/s]\n", + "Generated: 9792 rows [00:00, 23886.01 rows/s]\n" ] }, { @@ -757,11 +758,12 @@ "name": "stderr", "output_type": "stream", "text": [ - "Processed: 422 rows [00:00, 21900.74 rows/s]\n", - "Processed: 1 rows [00:00, 921.83 rows/s]\n", + "Processed: 422 rows [00:00, 20949.92 rows/s]\n", + "Processed: 1 rows [00:00, 1261.07 rows/s]\n", "Processed: 0 rows [00:00, ? rows/s]\n", - "Processed: 1 rows [00:00, 2.58 rows/s]\n", - "Generated: 9792 rows [00:00, 25325.90 rows/s]\n" + "Generated: 0 rows [00:00, ? rows/s]\u001b[A\n", + "Processed: 1 rows [00:00, 2.03 rows/s]ows/s]\u001b[A\n", + "Generated: 9792 rows [00:00, 19922.88 rows/s]\n" ] }, { @@ -877,12 +879,12 @@ "name": "stderr", "output_type": "stream", "text": [ - "Processed: 422 rows [00:00, 18992.19 rows/s]\n", - "Processed: 1 rows [00:00, 580.45 rows/s]\n", + "Processed: 422 rows [00:00, 20143.12 rows/s]\n", + "Processed: 1 rows [00:00, 696.38 rows/s]\n", "Processed: 0 rows [00:00, ? rows/s]\n", "Generated: 0 rows [00:00, ? rows/s]\u001b[A\n", - "Processed: 1 rows [00:00, 2.53 rows/s]ows/s]\u001b[A\n", - "Generated: 9792 rows [00:00, 24786.87 rows/s]\n" + "Processed: 1 rows [00:00, 2.37 rows/s]ows/s]\u001b[A\n", + "Generated: 9792 rows [00:00, 23245.82 rows/s]\n" ] }, { @@ -1107,11 +1109,12 @@ "name": "stderr", "output_type": "stream", "text": [ - "Processed: 422 rows [00:00, 19869.07 rows/s]\n", - "Processed: 1 rows [00:00, 1349.95 rows/s]\n", + "Processed: 422 rows [00:00, 19108.65 rows/s]\n", + "Processed: 1 rows [00:00, 370.91 rows/s]\n", "Processed: 0 rows [00:00, ? rows/s]\n", - "Processed: 1 rows [00:00, 2.50 rows/s]\n", - "Generated: 9792 rows [00:00, 24507.27 rows/s]\n" + "Generated: 0 rows [00:00, ? rows/s]\u001b[A\n", + "Processed: 1 rows [00:00, 2.36 rows/s]ows/s]\u001b[A\n", + "Generated: 9792 rows [00:00, 23131.49 rows/s]\n" ] } ], @@ -1219,9 +1222,9 @@ "name": "stderr", "output_type": "stream", "text": [ - "Processed: 10 rows [00:01, 6.29 rows/s]\n", - "Processed: 10 rows [00:00, 6455.75 rows/s]\n", - "Processed: 10 rows [00:00, 3949.81 rows/s]\n" + "Processed: 10 rows [00:01, 6.54 rows/s]\n", + "Processed: 10 rows [00:00, 6724.87 rows/s]\n", + "Processed: 10 rows [00:00, 5694.91 rows/s]\n" ] } ], @@ -1291,74 +1294,74 @@ " \n", " 0\n", " 1\n", - " 3311866728181183\n", + " 3962661586109317\n", " \n", " 0\n", " newyorker_caption_contest/images\n", - " 660.jpeg\n", - " CJCloeiXhocDEAE=\n", - " 1719848736936592\n", + " 745.jpeg\n", + " CJf+7dWXhocDEAE=\n", + " 1719848698347287\n", " 1\n", - " 2024-07-01 15:45:36.967000+00:00\n", + " 2024-07-01 15:44:58.391000+00:00\n", " ...\n", - " [Why is the king at a doctor's office?]\n", - " [Put it down slowly, the mothers are very prot...\n", - " scene: doctor's office description: A king wea...\n", - " B\n", - " 1\n", - " cf85e2ed637a41cbd9de5d1ce2b91255\n", - " 660.jpeg\n", - " [0.001104930299334228, 0.2960589528083801, 0.0...\n", + " [Why is there a snail attacking the city?]\n", + " [If I am lying then let God strike me dead!, F...\n", + " scene: downtown city description: A giant, mon...\n", + " D\n", " 1\n", - " 0.296059\n", + " aac090451cb92e27f23c8145462ba509\n", + " 745.jpeg\n", + " [0.05347629263997078, 0.28638720512390137, 0.6...\n", + " 3\n", + " 0.023123\n", " \n", " \n", " 1\n", " 2\n", - " 3311866728181183\n", + " 3962661586109317\n", " \n", " 0\n", " newyorker_caption_contest/images\n", - " 660.jpeg\n", - " CJCloeiXhocDEAE=\n", - " 1719848736936592\n", + " 745.jpeg\n", + " CJf+7dWXhocDEAE=\n", + " 1719848698347287\n", " 1\n", - " 2024-07-01 15:45:36.967000+00:00\n", + " 2024-07-01 15:44:58.391000+00:00\n", " ...\n", - " [Why is the king at a doctor's office?]\n", - " [Under pre-existing conditions, why did you wr...\n", - " scene: a doctor's office description: A king w...\n", + " [Why is there a snail attacking the city?]\n", + " [If you turn the binoculars around, the proble...\n", + " scene: a city description: A giant, monstrous ...\n", " A\n", " 1\n", - " 118ec0f16b3b0c67388659f32fb9d8b6\n", - " 660.jpeg\n", - " [0.7271297574043274, 0.20022231340408325, 0.00...\n", + " 1874b4a62eb8637e77822f7b98202519\n", + " 745.jpeg\n", + " [0.6407938599586487, 0.054330047219991684, 0.2...\n", " 0\n", - " 0.727130\n", + " 0.640794\n", " \n", " \n", " 2\n", " 3\n", - " 3311866728181183\n", + " 3962661586109317\n", " \n", " 0\n", " newyorker_caption_contest/images\n", - " 660.jpeg\n", - " CJCloeiXhocDEAE=\n", - " 1719848736936592\n", + " 745.jpeg\n", + " CJf+7dWXhocDEAE=\n", + " 1719848698347287\n", " 1\n", - " 2024-07-01 15:45:36.967000+00:00\n", + " 2024-07-01 15:44:58.391000+00:00\n", " ...\n", - " [Why is the King visiting the doctor?]\n", - " [On second thought, it's more of a sandals day...\n", - " scene: doctor's office description: A king sit...\n", - " C\n", + " [How did the snail get so big?]\n", + " [If you turn the binoculars around, the proble...\n", + " scene: downtown city description: There is a g...\n", + " A\n", " 1\n", - " e25c28b3485994bd5968d0d5eafb5c3f\n", - " 660.jpeg\n", - " [0.035876620560884476, 0.023404479026794434, 0...\n", - " 2\n", - " 0.903180\n", + " 4016acc82c89798cda25dcfe6546eb9d\n", + " 745.jpeg\n", + " [0.6407938599586487, 0.054330047219991684, 0.2...\n", + " 0\n", + " 0.640794\n", " \n", " \n", "\n", @@ -1366,44 +1369,44 @@ ], "text/plain": [ " id random vtype dir_type parent \\\n", - "0 1 3311866728181183 0 newyorker_caption_contest/images \n", - "1 2 3311866728181183 0 newyorker_caption_contest/images \n", - "2 3 3311866728181183 0 newyorker_caption_contest/images \n", + "0 1 3962661586109317 0 newyorker_caption_contest/images \n", + "1 2 3962661586109317 0 newyorker_caption_contest/images \n", + "2 3 3962661586109317 0 newyorker_caption_contest/images \n", "\n", " name etag version is_latest \\\n", - "0 660.jpeg CJCloeiXhocDEAE= 1719848736936592 1 \n", - "1 660.jpeg CJCloeiXhocDEAE= 1719848736936592 1 \n", - "2 660.jpeg CJCloeiXhocDEAE= 1719848736936592 1 \n", + "0 745.jpeg CJf+7dWXhocDEAE= 1719848698347287 1 \n", + "1 745.jpeg CJf+7dWXhocDEAE= 1719848698347287 1 \n", + "2 745.jpeg CJf+7dWXhocDEAE= 1719848698347287 1 \n", "\n", " last_modified ... \\\n", - "0 2024-07-01 15:45:36.967000+00:00 ... \n", - "1 2024-07-01 15:45:36.967000+00:00 ... \n", - "2 2024-07-01 15:45:36.967000+00:00 ... \n", + "0 2024-07-01 15:44:58.391000+00:00 ... \n", + "1 2024-07-01 15:44:58.391000+00:00 ... \n", + "2 2024-07-01 15:44:58.391000+00:00 ... \n", "\n", - " questions \\\n", - "0 [Why is the king at a doctor's office?] \n", - "1 [Why is the king at a doctor's office?] \n", - "2 [Why is the King visiting the doctor?] \n", + " questions \\\n", + "0 [Why is there a snail attacking the city?] \n", + "1 [Why is there a snail attacking the city?] \n", + "2 [How did the snail get so big?] \n", "\n", " caption_choices \\\n", - "0 [Put it down slowly, the mothers are very prot... \n", - "1 [Under pre-existing conditions, why did you wr... \n", - "2 [On second thought, it's more of a sandals day... \n", + "0 [If I am lying then let God strike me dead!, F... \n", + "1 [If you turn the binoculars around, the proble... \n", + "2 [If you turn the binoculars around, the proble... \n", "\n", " from_description label n_tokens_label \\\n", - "0 scene: doctor's office description: A king wea... B 1 \n", - "1 scene: a doctor's office description: A king w... A 1 \n", - "2 scene: doctor's office description: A king sit... C 1 \n", + "0 scene: downtown city description: A giant, mon... D 1 \n", + "1 scene: a city description: A giant, monstrous ... A 1 \n", + "2 scene: downtown city description: There is a g... A 1 \n", "\n", " instance_id filename \\\n", - "0 cf85e2ed637a41cbd9de5d1ce2b91255 660.jpeg \n", - "1 118ec0f16b3b0c67388659f32fb9d8b6 660.jpeg \n", - "2 e25c28b3485994bd5968d0d5eafb5c3f 660.jpeg \n", + "0 aac090451cb92e27f23c8145462ba509 745.jpeg \n", + "1 1874b4a62eb8637e77822f7b98202519 745.jpeg \n", + "2 4016acc82c89798cda25dcfe6546eb9d 745.jpeg \n", "\n", " scores label_ind label_prob \n", - "0 [0.001104930299334228, 0.2960589528083801, 0.0... 1 0.296059 \n", - "1 [0.7271297574043274, 0.20022231340408325, 0.00... 0 0.727130 \n", - "2 [0.035876620560884476, 0.023404479026794434, 0... 2 0.903180 " + "0 [0.05347629263997078, 0.28638720512390137, 0.6... 3 0.023123 \n", + "1 [0.6407938599586487, 0.054330047219991684, 0.2... 0 0.640794 \n", + "2 [0.6407938599586487, 0.054330047219991684, 0.2... 0 0.640794 " ] }, "metadata": {}, @@ -1438,7 +1441,7 @@ { "data": { "text/plain": [ - "0.7005960181355476" + "0.22845924664288758" ] }, "execution_count": 23, @@ -1558,11 +1561,11 @@ "name": "stdout", "output_type": "stream", "text": [ - "loss for epoch 0: 14.143621398194227\n", - "loss for epoch 1: 10.231055159121752\n", - "loss for epoch 2: 0.22259006446620333\n", - "loss for epoch 3: 6.318037593899817e-05\n", - "loss for epoch 4: 3.862364474116475e-05\n" + "loss for epoch 0: 15.363719150424004\n", + "loss for epoch 1: 1.2078554394829553\n", + "loss for epoch 2: 0.24927332557854243\n", + "loss for epoch 3: 0.00029108165836078115\n", + "loss for epoch 4: 9.536738616588991e-07\n" ] } ], @@ -1640,14 +1643,14 @@ "text": [ "/Users/dave/Code/dvcx/src/datachain/lib/text.py:49: UserWarning: To copy construct from a tensor, it is recommended to use sourceTensor.clone().detach() or sourceTensor.clone().detach().requires_grad_(True), rather than torch.tensor(sourceTensor).\n", " return encoder(torch.tensor(tokens))\n", - "Processed: 10 rows [00:01, 7.56 rows/s]\n", - "Processed: 10 rows [00:00, 4635.61 rows/s]\n" + "Processed: 10 rows [00:01, 7.21 rows/s]\n", + "Processed: 10 rows [00:00, 5388.37 rows/s]\n" ] }, { "data": { "text/plain": [ - "0.9999965786933899" + "1.0" ] }, "execution_count": 29, diff --git a/examples/wds.py b/examples/wds.py index 7bc2c7aa1..b8f02323b 100644 --- a/examples/wds.py +++ b/examples/wds.py @@ -18,12 +18,9 @@ .map(stem=lambda file: file.get_file_stem(), params=["emd.file"], output=str) ) -meta_pq = ( - DataChain.from_storage("gs://dvcx-datacomp-small/metadata") - .filter(C.name.glob("0020f*.parquet")) - .parse_parquet() - .map(stem=lambda file: file.get_file_stem(), params=["source.file"], output=str) -) +meta_pq = DataChain.from_parquet( + "gs://dvcx-datacomp-small/metadata/0020f*.parquet" +).map(stem=lambda file: file.get_file_stem(), params=["source.file"], output=str) meta = meta_emd.merge( meta_pq, on=["stem", "emd.index"], right_on=["stem", "source.index"] diff --git a/src/datachain/lib/arrow.py b/src/datachain/lib/arrow.py index 16f2e56c5..e9b4ed776 100644 --- a/src/datachain/lib/arrow.py +++ b/src/datachain/lib/arrow.py @@ -1,13 +1,15 @@ import re +from collections.abc import Sequence from typing import TYPE_CHECKING, Optional +import pyarrow as pa from pyarrow.dataset import dataset from datachain.lib.file import File, IndexedFile from datachain.lib.udf import Generator if TYPE_CHECKING: - import pyarrow as pa + from datachain.lib.dc import DataChain class ArrowGenerator(Generator): @@ -35,12 +37,29 @@ def process(self, file: File): index += 1 -def schema_to_output(schema: "pa.Schema"): +def infer_schema(chain: "DataChain", **kwargs) -> pa.Schema: + schemas = [] + for file in chain.iterate_one("file"): + ds = dataset(file.get_path(), filesystem=file.get_fs(), **kwargs) # type: ignore[union-attr] + schemas.append(ds.schema) + return pa.unify_schemas(schemas) + + +def schema_to_output(schema: pa.Schema, col_names: Optional[Sequence[str]] = None): """Generate UDF output schema from pyarrow schema.""" + if col_names and (len(schema) != len(col_names)): + raise ValueError( + "Error generating output from Arrow schema - " + f"Schema has {len(schema)} columns but got {len(col_names)} column names." + ) default_column = 0 - output = {"source": IndexedFile} - for field in schema: - column = field.name.lower() + output = {} + for i, field in enumerate(schema): + if col_names: + column = col_names[i] + else: + column = field.name + column = column.lower() column = re.sub("[^0-9a-z_]+", "", column) if not column: column = f"c{default_column}" @@ -50,12 +69,10 @@ def schema_to_output(schema: "pa.Schema"): return output -def _arrow_type_mapper(col_type: "pa.DataType") -> type: # noqa: PLR0911 +def _arrow_type_mapper(col_type: pa.DataType) -> type: # noqa: PLR0911 """Convert pyarrow types to basic types.""" from datetime import datetime - import pyarrow as pa - if pa.types.is_timestamp(col_type): return datetime if pa.types.is_binary(col_type): diff --git a/src/datachain/lib/dc.py b/src/datachain/lib/dc.py index 5316419c2..a49a6965f 100644 --- a/src/datachain/lib/dc.py +++ b/src/datachain/lib/dc.py @@ -13,7 +13,7 @@ import sqlalchemy from datachain.lib.feature import Feature, FeatureType -from datachain.lib.feature_utils import features_to_tuples +from datachain.lib.feature_utils import dict_to_feature, features_to_tuples from datachain.lib.file import File, IndexedFile, get_file from datachain.lib.meta_formats import read_meta, read_schema from datachain.lib.settings import Settings @@ -39,8 +39,6 @@ import pandas as pd from typing_extensions import Self - from datachain.catalog import Catalog - C = Column @@ -205,9 +203,10 @@ def from_storage( path, *, type: Literal["binary", "text", "image"] = "binary", - catalog: Optional["Catalog"] = None, + session: Optional[Session] = None, recursive: Optional[bool] = True, - anon: bool = False, + object_name: str = "file", + **kwargs, ) -> "Self": """Get data from a storage as a list of file with all file attributes. It returns the chain itself as usual. @@ -217,7 +216,7 @@ def from_storage( as `s3://`, `gs://`, `az://` or "file:///" type : read file as "binary", "text", or "image" data. Default is "binary". recursive : search recursively for the given path. - anon : use anonymous mode to access the storage. + object_name : Created object column name. Example: ```py @@ -225,7 +224,9 @@ def from_storage( ``` """ func = get_file(type) - return cls(path, catalog=catalog, recursive=recursive, anon=anon).map(file=func) + return cls(path, session=session, recursive=recursive, **kwargs).map( + **{object_name: func} + ) @classmethod def from_dataset(cls, name: str, version: Optional[int] = None) -> "DataChain": @@ -240,66 +241,19 @@ def from_dataset(cls, name: str, version: Optional[int] = None) -> "DataChain": """ return DataChain(name=name, version=version) - @classmethod - def from_csv( - cls, - path, - type: Literal["binary", "text", "image"] = "text", - anon: bool = False, - spec: Optional[FeatureType] = None, - schema_from: Optional[str] = "auto", - object_name: Optional[str] = "csv", - model_name: Optional[str] = None, - show_schema: Optional[bool] = False, - ) -> "DataChain": - """Get data from CSV. It returns the chain itself. - - Parameters: - path : storage URI with directory. URI must start with storage prefix such - as `s3://`, `gs://`, `az://` or "file:///" - type : read file as "binary", "text", or "image" data. Default is "text". - anon : use anonymous mode to access the storage. - spec : Data Model for CSV file - object_name : generated object column name - model_name : generated model name - schema_from : path to sample to infer spec from - show_schema : print auto-generated schema - - Examples: - infer model from the first two lines (header + data) - >>> chain = DataChain.from_csv("gs://csv") - - use a particular data model - >>> chain = DataChain.from_csv("gs://csv"i, spec=MyModel) - """ - if schema_from == "auto": - schema_from = path - - chain = DataChain.from_storage(path=path, type=type, anon=anon) - signal_dict = { - object_name: read_meta( - schema_from=schema_from, - meta_type="csv", - spec=spec, - model_name=model_name, - show_schema=show_schema, - ) - } - return chain.gen(**signal_dict) # type: ignore[misc, arg-type] - @classmethod def from_json( cls, path, type: Literal["binary", "text", "image"] = "text", - anon: bool = False, spec: Optional[FeatureType] = None, schema_from: Optional[str] = "auto", jmespath: Optional[str] = None, - object_name: Optional[str] = None, + object_name: str = "", model_name: Optional[str] = None, show_schema: Optional[bool] = False, meta_type: Optional[str] = "json", + **kwargs, ) -> "DataChain": """Get data from JSON. It returns the chain itself. @@ -307,7 +261,6 @@ def from_json( path : storage URI with directory. URI must start with storage prefix such as `s3://`, `gs://`, `az://` or "file:///" type : read file as "binary", "text", or "image" data. Default is "binary". - anon : use anonymous mode to access the storage. spec : optional Data Model schema_from : path to sample to infer spec from object_name : generated object column name @@ -333,7 +286,7 @@ def jmespath_to_name(s: str): object_name = jmespath_to_name(jmespath) if not object_name: object_name = "json" - chain = DataChain.from_storage(path=path, type=type, anon=anon) + chain = DataChain.from_storage(path=path, type=type, **kwargs) signal_dict = { object_name: read_meta( schema_from=schema_from, @@ -706,6 +659,7 @@ def from_features( ds_name: str = "", session: Optional[Session] = None, output: Union[None, FeatureType, Sequence[str], dict[str, FeatureType]] = None, + object_name: str = "", **fr_map, ) -> "DataChain": """Generate chain from list of features.""" @@ -715,11 +669,17 @@ def _func_fr() -> Iterator[tuple_type]: # type: ignore[valid-type] yield from tuples chain = DataChain.create_empty(DataChain.DEFAULT_FILE_RECORD, session=session) + if object_name: + output = {object_name: dict_to_feature(object_name, output)} # type: ignore[arg-type] return chain.gen(_func_fr, output=output) @classmethod def from_pandas( # type: ignore[override] - cls, df: "pd.DataFrame", name: str = "", session: Optional[Session] = None + cls, + df: "pd.DataFrame", + name: str = "", + session: Optional[Session] = None, + object_name: str = "", ) -> "DataChain": """Generate chain from pandas data-frame.""" fr_map = {col.lower(): df[col].tolist() for col in df.columns} @@ -737,17 +697,25 @@ def from_pandas( # type: ignore[override] f"import from pandas error - '{column}' cannot be a column name", ) - return cls.from_features(name, session, **fr_map) + return cls.from_features(name, session, object_name=object_name, **fr_map) def parse_tabular( self, - output: Optional[dict[str, FeatureType]] = None, + output: Union[ + None, type[Feature], Sequence[str], dict[str, FeatureType] + ] = None, + object_name: str = "", + model_name: str = "", **kwargs, ) -> "DataChain": """Generate chain from list of tabular files. Parameters: - output : Dictionary defining column names and their corresponding types. + output : Dictionary or feature class defining column names and their + corresponding types. List of column names is also accepted, in which + case types will be inferred. + object_name : Generated object column name. + model_name : Generated model name. kwargs : Parameters to pass to pyarrow.dataset.dataset. Examples: @@ -760,107 +728,122 @@ def parse_tabular( >>> dc = dc.filter(C("file.name").glob("*.jsonl")) >>> dc = dc.parse_tabular(format="json") """ - from pyarrow import unify_schemas - from pyarrow.dataset import dataset - from datachain.lib.arrow import ArrowGenerator, schema_to_output + from datachain.lib.arrow import ArrowGenerator, infer_schema, schema_to_output schema = None - if output: - output = {"source": IndexedFile} | output - else: - schemas = [] - for row in self.select("file").iterate(): - file = row[0] - ds = dataset(file.get_path(), filesystem=file.get_fs(), **kwargs) # type: ignore[union-attr] - schemas.append(ds.schema) - if not schemas: - msg = "error parsing tabular data schema - found no files to parse" - raise DatasetPrepareError(self.name, msg) - schema = unify_schemas(schemas) + col_names = output if isinstance(output, Sequence) else None + if col_names or not output: try: - output = schema_to_output(schema) + schema = infer_schema(self, **kwargs) + output = schema_to_output(schema, col_names) except ValueError as e: raise DatasetPrepareError(self.name, e) from e + if object_name: + if isinstance(output, dict): + model_name = model_name or object_name + output = dict_to_feature(model_name, output) + output = {object_name: output} # type: ignore[dict-item] + elif isinstance(output, type(Feature)): + output = { + name: info.annotation # type: ignore[misc] + for name, info in output.model_fields.items() + } + output = {"source": IndexedFile} | output # type: ignore[assignment,operator] return self.gen(ArrowGenerator(schema, **kwargs), output=output) - def parse_csv( - self, + @classmethod + def from_csv( + cls, + path, delimiter: str = ",", header: bool = True, column_names: Optional[list[str]] = None, - output: Optional[dict[str, FeatureType]] = None, + output: Union[ + None, type[Feature], Sequence[str], dict[str, FeatureType] + ] = None, + object_name: str = "", + model_name: str = "", + **kwargs, ) -> "DataChain": - """Generate chain from list of csv files. + """Generate chain from csv files. Parameters: + path : Storage URI with directory. URI must start with storage prefix such + as `s3://`, `gs://`, `az://` or "file:///". delimiter : Character for delimiting columns. header : Whether the files include a header row. - column_names : Column names if no header. Implies `header = False`. - output : Dictionary defining column names and their corresponding types. + output : Dictionary or feature class defining column names and their + corresponding types. List of column names is also accepted, in which + case types will be inferred. + object_name : Created object column name. + model_name : Generated model name. Examples: Reading a csv file: - >>> dc = DataChain.from_storage("s3://mybucket/file.csv") - >>> dc = dc.parse_tabular(format="csv") + >>> dc = DataChain.from_csv("s3://mybucket/file.csv") - Reading a filtered list of csv files as a dataset: - >>> dc = DataChain.from_storage("s3://mybucket") - >>> dc = dc.filter(C("file.name").glob("*.csv")) - >>> dc = dc.parse_tabular() + Reading csv files from a directory as a combined dataset: + >>> dc = DataChain.from_csv("s3://mybucket/dir") """ from pyarrow.csv import ParseOptions, ReadOptions from pyarrow.dataset import CsvFileFormat - if column_names and output: - msg = "error parsing csv - only one of column_names or output is allowed" - raise DatasetPrepareError(self.name, msg) + chain = DataChain.from_storage(path, **kwargs) - if not header and not column_names: - if output: + if not header: + if not output: + msg = "error parsing csv - provide output if no header" + raise DatasetPrepareError(chain.name, msg) + if isinstance(output, Sequence): + column_names = output # type: ignore[assignment] + elif isinstance(output, dict): column_names = list(output.keys()) else: - msg = "error parsing csv - provide column_names or output if no header" - raise DatasetPrepareError(self.name, msg) + column_names = list(output.model_fields.keys()) parse_options = ParseOptions(delimiter=delimiter) read_options = ReadOptions(column_names=column_names) format = CsvFileFormat(parse_options=parse_options, read_options=read_options) - return self.parse_tabular(output=output, format=format) + return chain.parse_tabular( + output=output, object_name=object_name, model_name=model_name, format=format + ) - def parse_parquet( - self, + @classmethod + def from_parquet( + cls, + path, partitioning: Any = "hive", output: Optional[dict[str, FeatureType]] = None, + object_name: str = "", + model_name: str = "", + **kwargs, ) -> "DataChain": - """Generate chain from list of parquet files. + """Generate chain from parquet files. Parameters: + path : Storage URI with directory. URI must start with storage prefix such + as `s3://`, `gs://`, `az://` or "file:///". partitioning : Any pyarrow partitioning schema. output : Dictionary defining column names and their corresponding types. + object_name : Created object column name. + model_name : Generated model name. Examples: Reading a single file: - >>> dc = DataChain.from_storage("s3://mybucket/file.parquet") - >>> dc = dc.parse_tabular() + >>> dc = DataChain.from_parquet("s3://mybucket/file.parquet") Reading a partitioned dataset from a directory: - >>> dc = DataChain.from_storage("path/to/dir") - >>> dc = dc.parse_tabular() - - Reading a filtered list of files as a dataset: - >>> dc = DataChain.from_storage("s3://mybucket") - >>> dc = dc.filter(C("file.name").glob("*.parquet")) - >>> dc = dc.parse_tabular() - - Reading a filtered list of partitions as a dataset: - >>> dc = DataChain.from_storage("s3://mybucket") - >>> dc = dc.filter(C("file.parent").glob("*month=1*")) - >>> dc = dc.parse_tabular() + >>> dc = DataChain.from_parquet("s3://mybucket/dir") """ - return self.parse_tabular( - output=output, format="parquet", partitioning=partitioning + chain = DataChain.from_storage(path, **kwargs) + return chain.parse_tabular( + output=output, + object_name=object_name, + model_name=model_name, + format="parquet", + partitioning=partitioning, ) @classmethod diff --git a/src/datachain/lib/feature_utils.py b/src/datachain/lib/feature_utils.py index 85f549455..15d716c72 100644 --- a/src/datachain/lib/feature_utils.py +++ b/src/datachain/lib/feature_utils.py @@ -81,6 +81,15 @@ def _to_feature_type(anno): return anno +def dict_to_feature(name: str, data_dict: dict[str, FeatureType]) -> type[Feature]: + fields = {name: (anno, ...) for name, anno in data_dict.items()} + return create_model( # type: ignore[call-overload] + name, + __base__=Feature, + **fields, + ) + + def features_to_tuples( ds_name: str = "", output: Union[None, FeatureType, Sequence[str], dict[str, FeatureType]] = None, diff --git a/src/datachain/lib/udf.py b/src/datachain/lib/udf.py index 38c4d6023..a997360fa 100644 --- a/src/datachain/lib/udf.py +++ b/src/datachain/lib/udf.py @@ -125,7 +125,11 @@ def __call__(self, *rows): else: # Generator expression is required, otherwise the value will be materialized res = ( - obj._flatten() if isinstance(obj, Feature) else (obj,) + obj._flatten() + if isinstance(obj, Feature) + else obj + if isinstance(obj, tuple) + else (obj,) for obj in result_objs ) diff --git a/tests/examples/test_wds_e2e.py b/tests/examples/test_wds_e2e.py index 63e203f37..a7d887335 100644 --- a/tests/examples/test_wds_e2e.py +++ b/tests/examples/test_wds_e2e.py @@ -100,7 +100,7 @@ def test_wds_merge_with_parquet_meta(catalog, webdataset_tars, webdataset_metada laion=process_webdataset(spec=WDSLaion), params="file" ) - meta = DataChain.from_storage(Path(webdataset_metadata).as_uri()).parse_parquet() + meta = DataChain.from_parquet(Path(webdataset_metadata).as_uri()) res = wds.merge(meta, on="laion.json.uid", right_on="uid") diff --git a/tests/unit/lib/test_arrow.py b/tests/unit/lib/test_arrow.py index 6bf0224ae..826d8a838 100644 --- a/tests/unit/lib/test_arrow.py +++ b/tests/unit/lib/test_arrow.py @@ -74,7 +74,6 @@ def test_arrow_type_error(): def test_schema_to_output(): schema = pa.schema([("some_int", pa.int32()), ("some_string", pa.string())]) assert schema_to_output(schema) == { - "source": IndexedFile, "some_int": int, "some_string": str, } @@ -90,7 +89,6 @@ def test_parquet_convert_column_names(): ] ) assert list(schema_to_output(schema)) == [ - "source", "uppercasecol", "dotnotationcol", "withdashes", @@ -105,4 +103,20 @@ def test_parquet_missing_column_names(): ("", pa.int32()), ] ) - assert list(schema_to_output(schema)) == ["source", "c0", "c1"] + assert list(schema_to_output(schema)) == ["c0", "c1"] + + +def test_parquet_override_column_names(): + schema = pa.schema([("some_int", pa.int32()), ("some_string", pa.string())]) + col_names = ["n1", "n2"] + assert schema_to_output(schema, col_names) == { + "n1": int, + "n2": str, + } + + +def test_parquet_override_column_names_invalid(): + schema = pa.schema([("some_int", pa.int32()), ("some_string", pa.string())]) + col_names = ["n1", "n2", "n3"] + with pytest.raises(ValueError): + schema_to_output(schema, col_names) diff --git a/tests/unit/lib/test_datachain.py b/tests/unit/lib/test_datachain.py index 15a5ae96c..57c1459e4 100644 --- a/tests/unit/lib/test_datachain.py +++ b/tests/unit/lib/test_datachain.py @@ -617,6 +617,20 @@ def test_parse_tabular_format(tmp_dir, catalog): assert df1.equals(df) +def test_parse_tabular_partitions(tmp_dir, catalog): + df = pd.DataFrame(DF_DATA) + path = tmp_dir / "test.parquet" + df.to_parquet(path, partition_cols=["first_name"]) + dc = ( + DataChain.from_storage(path.as_uri()) + .filter(C("parent").glob("*first_name=Alice*")) + .parse_tabular(partitioning="hive") + ) + df1 = dc.select("first_name", "age", "city").to_pandas() + df1 = df1.sort_values("first_name").reset_index(drop=True) + assert df1.equals(df.loc[:0]) + + def test_parse_tabular_empty(tmp_dir, catalog): path = tmp_dir / "test.parquet" with pytest.raises(DataChainParamsError): @@ -630,6 +644,7 @@ def test_parse_tabular_unify_schema(tmp_dir, catalog): path2 = tmp_dir / "df2.parquet" df1.to_parquet(path1) df2.to_parquet(path2) + df_combined = ( pd.concat([df1, df2], ignore_index=True) .replace({"": None, 0: None, np.nan: None}) @@ -650,7 +665,7 @@ def test_parse_tabular_unify_schema(tmp_dir, catalog): assert df.equals(df_combined) -def test_parse_tabular_output(tmp_dir, catalog): +def test_parse_tabular_output_dict(tmp_dir, catalog): df = pd.DataFrame(DF_DATA) path = tmp_dir / "test.jsonl" path.write_text(df.to_json(orient="records", lines=True)) @@ -663,88 +678,116 @@ def test_parse_tabular_output(tmp_dir, catalog): assert df1.equals(df) -def test_parse_csv(tmp_dir, catalog): +def test_parse_tabular_output_feature(tmp_dir, catalog): + class Output(Feature): + fname: str + age: int + loc: str + + df = pd.DataFrame(DF_DATA) + path = tmp_dir / "test.jsonl" + path.write_text(df.to_json(orient="records", lines=True)) + dc = DataChain.from_storage(path.as_uri()).parse_tabular( + format="json", output=Output + ) + df1 = dc.select("fname", "age", "loc").to_pandas() + df.columns = ["fname", "age", "loc"] + assert df1.equals(df) + + +def test_parse_tabular_output_list(tmp_dir, catalog): + df = pd.DataFrame(DF_DATA) + path = tmp_dir / "test.jsonl" + path.write_text(df.to_json(orient="records", lines=True)) + output = ["fname", "age", "loc"] + dc = DataChain.from_storage(path.as_uri()).parse_tabular( + format="json", output=output + ) + df1 = dc.select("fname", "age", "loc").to_pandas() + df.columns = ["fname", "age", "loc"] + assert df1.equals(df) + + +def test_from_csv(tmp_dir, catalog): df = pd.DataFrame(DF_DATA) path = tmp_dir / "test.csv" df.to_csv(path) - dc = DataChain.from_storage(path.as_uri()).parse_csv() + dc = DataChain.from_csv(path.as_uri()) df1 = dc.select("first_name", "age", "city").to_pandas() assert df1.equals(df) -def test_parse_csv_no_header_error(tmp_dir, catalog): +def test_from_csv_no_header_error(tmp_dir, catalog): df = pd.DataFrame(DF_DATA.values()).transpose() path = tmp_dir / "test.csv" df.to_csv(path, header=False, index=False) with pytest.raises(DataChainParamsError): - DataChain.from_storage(path.as_uri()).parse_csv(header=False) + DataChain.from_csv(path.as_uri(), header=False) -def test_parse_csv_no_header_output(tmp_dir, catalog): +def test_from_csv_no_header_output_dict(tmp_dir, catalog): df = pd.DataFrame(DF_DATA.values()).transpose() path = tmp_dir / "test.csv" df.to_csv(path, header=False, index=False) - dc = DataChain.from_storage(path.as_uri()).parse_csv( - header=False, output={"first_name": str, "age": int, "city": str} + dc = DataChain.from_csv( + path.as_uri(), header=False, output={"first_name": str, "age": int, "city": str} ) df1 = dc.select("first_name", "age", "city").to_pandas() assert (df1.values != df.values).sum() == 0 -def test_parse_csv_no_header_column_names(tmp_dir, catalog): +def test_from_csv_no_header_output_feature(tmp_dir, catalog): + class Output(Feature): + first_name: str + age: int + city: str + df = pd.DataFrame(DF_DATA.values()).transpose() path = tmp_dir / "test.csv" df.to_csv(path, header=False, index=False) - dc = DataChain.from_storage(path.as_uri()).parse_csv( - header=False, column_names=["first_name", "age", "city"] - ) + dc = DataChain.from_csv(path.as_uri(), header=False, output=Output) df1 = dc.select("first_name", "age", "city").to_pandas() assert (df1.values != df.values).sum() == 0 -def test_parse_csv_column_names_and_output(tmp_dir, catalog): - df = pd.DataFrame(DF_DATA) +def test_from_csv_no_header_output_list(tmp_dir, catalog): + df = pd.DataFrame(DF_DATA.values()).transpose() path = tmp_dir / "test.csv" - df.to_csv(path) - column_names = ["fname", "age", "loc"] - output = {"fname": str, "age": int, "loc": str} - with pytest.raises(DataChainParamsError): - DataChain.from_storage(path.as_uri()).parse_csv( - column_names=column_names, output=output - ) + df.to_csv(path, header=False, index=False) + dc = DataChain.from_csv( + path.as_uri(), header=False, output=["first_name", "age", "city"] + ) + df1 = dc.select("first_name", "age", "city").to_pandas() + assert (df1.values != df.values).sum() == 0 -def test_parse_csv_tab_delimited(tmp_dir, catalog): +def test_from_csv_tab_delimited(tmp_dir, catalog): df = pd.DataFrame(DF_DATA) path = tmp_dir / "test.csv" df.to_csv(path, sep="\t") - dc = DataChain.from_storage(path.as_uri()).parse_csv(delimiter="\t") + dc = DataChain.from_csv(path.as_uri(), delimiter="\t") df1 = dc.select("first_name", "age", "city").to_pandas() assert df1.equals(df) -def test_parse_parquet_partitioned(tmp_dir, catalog): +def test_from_parquet(tmp_dir, catalog): df = pd.DataFrame(DF_DATA) path = tmp_dir / "test.parquet" - df.to_parquet(path, partition_cols=["first_name"]) - dc = DataChain.from_storage(path.as_uri()).parse_parquet() + df.to_parquet(path) + dc = DataChain.from_parquet(path.as_uri()) df1 = dc.select("first_name", "age", "city").to_pandas() - df1 = df1.sort_values("first_name").reset_index(drop=True) + assert df1.equals(df) -def test_parse_parquet_filter_partitions(tmp_dir, catalog): +def test_from_parquet_partitioned(tmp_dir, catalog): df = pd.DataFrame(DF_DATA) path = tmp_dir / "test.parquet" df.to_parquet(path, partition_cols=["first_name"]) - dc = ( - DataChain.from_storage(path.as_uri()) - .filter(C("parent").glob("*first_name=Alice*")) - .parse_parquet() - ) + dc = DataChain.from_parquet(path.as_uri()) df1 = dc.select("first_name", "age", "city").to_pandas() df1 = df1.sort_values("first_name").reset_index(drop=True) - assert df1.equals(df.loc[:0]) + assert df1.equals(df) @pytest.mark.parametrize("processes", [False, 2, True]) @@ -784,3 +827,27 @@ def test_extend_features(catalog): res = dc._extend_features("sum", "num") assert res == sum(range(len(features))) + + +def test_from_storage_object_name(tmp_dir, catalog): + df = pd.DataFrame(DF_DATA) + path = tmp_dir / "test.parquet" + df.to_parquet(path) + dc = DataChain.from_storage(path.as_uri(), object_name="custom") + assert dc.schema["custom"] == File + + +def test_from_features_object_name(tmp_dir, catalog): + fib = [1, 1, 2, 3, 5, 8] + values = ["odd" if num % 2 else "even" for num in fib] + + dc = DataChain.from_features(fib=fib, odds=values, object_name="custom") + assert "custom.fib" in dc.to_pandas().columns + + +def test_parse_tabular_object_name(tmp_dir, catalog): + df = pd.DataFrame(DF_DATA) + path = tmp_dir / "test.parquet" + df.to_parquet(path) + dc = DataChain.from_storage(path.as_uri()).parse_tabular(object_name="name") + assert "name.first_name" in dc.to_pandas().columns diff --git a/tests/unit/lib/test_feature.py b/tests/unit/lib/test_feature.py index cca69491c..d5b8138de 100644 --- a/tests/unit/lib/test_feature.py +++ b/tests/unit/lib/test_feature.py @@ -5,7 +5,7 @@ from datachain.lib.feature import Feature from datachain.lib.feature_registry import Registry -from datachain.lib.feature_utils import pydantic_to_feature +from datachain.lib.feature_utils import dict_to_feature, pydantic_to_feature from datachain.lib.signal_schema import SignalSchema from datachain.sql.types import ( Array, @@ -372,3 +372,20 @@ class _Company(Feature): "name": "Co", "parents": [{"name": "parent1", "children": [{"type": 12, "name": "child1"}]}], } + + +def test_dict_to_feature(): + data_dict = {"file": FileBasic, "id": int, "type": Literal["text"]} + + cls = dict_to_feature("val", data_dict) + assert Feature.is_feature(cls) + + spec = SignalSchema({"val": cls}).to_udf_spec() + assert list(spec.keys()) == [ + "val__file__parent", + "val__file__name", + "val__file__size", + "val__id", + "val__type", + ] + assert list(spec.values()) == [String, String, Int64, Int64, String]