diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index bef1a8a55..48eafd34d 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.5.5] - 2024-08-20 +- Enable discrete aggregation with lookback + ## [1.5.4] - 2024-08-14 - Add support for removal of auto extractors diff --git a/fennel/client_tests/test_discrete_window_aggregation.py b/fennel/client_tests/test_discrete_window_aggregation.py index 30ec4b03d..a5eaa4e35 100644 --- a/fennel/client_tests/test_discrete_window_aggregation.py +++ b/fennel/client_tests/test_discrete_window_aggregation.py @@ -22,15 +22,17 @@ webhook = Webhook(name="fennel_webhook") +@source(webhook.endpoint("Transactions"), cdc="append", disorder="14d") +@dataset +class Transactions: + id: str + user_id: str + amount: float + t: datetime + + @mock def test_discrete_hopping_tumbling_window_aggregation(client): - @source(webhook.endpoint("Transactions"), cdc="append", disorder="14d") - @dataset - class Transactions: - id: str - user_id: str - amount: float - t: datetime @dataset(index=True) class Stats: @@ -122,7 +124,7 @@ def pipeline(cls, event: Dataset): EventCount, keys=pd.DataFrame({"user_id": [1, 2, 3]}), ) - assert df["count"].tolist() == [0, 0, 0] + assert df["count"].tolist() == [0, 3, 2] # Test offline lookups df, _ = client.lookup( @@ -219,5 +221,79 @@ def pipeline(cls, event: Dataset): EventCountAggregate, keys=pd.DataFrame({"user_id": [1, 2, 3]}), ) - expected = [2.454, 1.756, 1.648] + expected = [2.454, 1.613, 1.537] assert df["avg"].tolist() == pytest.approx(expected, abs=1e-3) + + +@mock +def test_discrete_aggregation_with_lookback(client): + + @dataset(index=True) + class Stats: + user_id: str = field(key=True) + sum: float + count: int + t: datetime + + @pipeline + @inputs(Transactions) + def pipeline(cls, event: Dataset): + return ( + event.dedup("id") + .groupby("user_id") + .aggregate( + count=Count(window=Tumbling("1h", lookback="1h")), + sum=Sum( + window=Hopping("1h", "30m", lookback="1h"), of="amount" + ), + ) + ) + + client.commit(datasets=[Transactions, Stats], message="first_commit") + + now = datetime(2024, 1, 1, 15, 30, 0, tzinfo=timezone.utc) + now_1h = now - timedelta(hours=1) + now_2h = now - timedelta(hours=2) + now_3h = now - timedelta(hours=3) + + # Frontier -> 2024-01-01 15:30:00 + # With lookback as one hour + # Count -> (2024-01-01 13:00:00, 2024-01-01 14:00:00) + # Sum -> (2024-01-01 13:30:00, 2024-01-01 14:30:00) + + df = pd.DataFrame( + { + "id": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "user_id": [1, 1, 1, 2, 2, 2, 3, 3, 3, 4], + "amount": [100, 200, 300, 400, 500, 600, 700, 800, 900, 1000], + "t": [ + now_1h, + now_2h, + now_3h, + now_1h, + now_2h, + now_3h, + now_1h, + now_2h, + now_3h, + now, + ], + } + ) + client.log("fennel_webhook", "Transactions", df) + client.sleep() + + df, _ = client.lookup( + Stats, + keys=pd.DataFrame({"user_id": [1, 2, 3]}), + timestamps=pd.Series([now, now, now]), + ) + assert df["count"].tolist() == [1, 1, 1] + assert df["sum"].tolist() == [200, 500, 800] + + df, _ = client.lookup( + Stats, + keys=pd.DataFrame({"user_id": [1, 2, 3]}), + ) + assert df["count"].tolist() == [1, 1, 1] + assert df["sum"].tolist() == [200, 500, 800] diff --git a/fennel/datasets/datasets.py b/fennel/datasets/datasets.py index cf839da90..078aad15b 100644 --- a/fennel/datasets/datasets.py +++ b/fennel/datasets/datasets.py @@ -2548,6 +2548,7 @@ def visitAggregate(self, obj) -> DSSchema: found_discrete = False found_non_discrete = False + lookback = None for agg in obj.aggregates: # If default window is present in groupby then each aggregate spec cannot have window different from @@ -2577,6 +2578,16 @@ def visitAggregate(self, obj) -> DSSchema: ) found_non_discrete = True + # Check lookback in all windows are same + if isinstance(agg.window, (Hopping, Tumbling)): + curr_lookback = agg.window.lookback_total_seconds() + if lookback and curr_lookback != lookback: + raise ValueError( + f"Windows in all specs must have same lookback found {lookback} and {curr_lookback} in " + f"pipeline `{self.pipeline_name}`." + ) + lookback = curr_lookback + exceptions = agg.validate() if exceptions is not None: raise ValueError(f"Invalid aggregate `{agg}`: {exceptions}") diff --git a/fennel/dtypes/dtypes.py b/fennel/dtypes/dtypes.py index 5eb1c2e89..56d600524 100644 --- a/fennel/dtypes/dtypes.py +++ b/fennel/dtypes/dtypes.py @@ -4,7 +4,15 @@ from dataclasses import dataclass from datetime import datetime from textwrap import dedent -from typing import TYPE_CHECKING, Union, List, get_args, ForwardRef, Any +from typing import ( + TYPE_CHECKING, + Union, + List, + get_args, + ForwardRef, + Any, + Optional, +) import google.protobuf.duration_pb2 as duration_proto # type: ignore import pandas as pd @@ -333,6 +341,7 @@ def signature(self) -> str: @dataclass class Tumbling: duration: str + lookback: Optional[str] = None def __post_init__(self): """ @@ -342,6 +351,11 @@ def __post_init__(self): raise ValueError( "'forever' is not a valid duration value for Tumbling window." ) + if self.lookback: + try: + duration_to_timedelta(self.lookback) + except Exception as e: + raise ValueError(f"Failed when parsing lookback : {e}.") try: duration_to_timedelta(self.duration) except Exception as e: @@ -363,11 +377,17 @@ def duration_total_seconds(self) -> int: def stride_total_seconds(self) -> int: return int(duration_to_timedelta(self.duration).total_seconds()) + def lookback_total_seconds(self) -> int: + if self.lookback is None: + return 0 + return int(duration_to_timedelta(self.lookback).total_seconds()) + @dataclass class Hopping: duration: str stride: str + lookback: Optional[str] = None def __post_init__(self): """ @@ -377,9 +397,12 @@ def __post_init__(self): stride_timedelta = duration_to_timedelta(self.stride) except Exception as e: raise ValueError(f"Failed when parsing stride : {e}.") - if self.duration == "forever": - raise ValueError("Forever hopping window is not yet supported.") - else: + if self.lookback: + try: + duration_to_timedelta(self.lookback) + except Exception as e: + raise ValueError(f"Failed when parsing lookback : {e}.") + if self.duration != "forever": try: duration_timedelta = duration_to_timedelta(self.duration) except Exception as e: @@ -390,14 +413,19 @@ def __post_init__(self): ) def to_proto(self) -> window_proto.Window: - duration = duration_proto.Duration() - duration.FromTimedelta(duration_to_timedelta(self.duration)) - stride = duration_proto.Duration() stride.FromTimedelta(duration_to_timedelta(self.stride)) - return window_proto.Window( - hopping=window_proto.Hopping(duration=duration, stride=stride) - ) + + if self.duration != "forever": + duration = duration_proto.Duration() + duration.FromTimedelta(duration_to_timedelta(self.duration)) + return window_proto.Window( + hopping=window_proto.Hopping(duration=duration, stride=stride) + ) + else: + return window_proto.Window( + forever_hopping=window_proto.ForeverHopping(stride=stride) + ) def signature(self) -> str: return f"hopping-{self.duration}-{self.stride}" @@ -412,6 +440,11 @@ def duration_total_seconds(self) -> int: def stride_total_seconds(self) -> int: return int(duration_to_timedelta(self.stride).total_seconds()) + def lookback_total_seconds(self) -> int: + if self.lookback is None: + return 0 + return int(duration_to_timedelta(self.lookback).total_seconds()) + @dataclass class Session: diff --git a/fennel/gen/dataset_pb2.py b/fennel/gen/dataset_pb2.py index e6f8c47ed..3fd621362 100644 --- a/fennel/gen/dataset_pb2.py +++ b/fennel/gen/dataset_pb2.py @@ -20,7 +20,7 @@ import fennel.gen.expr_pb2 as expr__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rdataset.proto\x12\x14\x66\x65nnel.proto.dataset\x1a\x1egoogle/protobuf/duration.proto\x1a\x0emetadata.proto\x1a\x0cpycode.proto\x1a\x0cschema.proto\x1a\nspec.proto\x1a\x0cwindow.proto\x1a\nexpr.proto\"\xe5\x03\n\x0b\x43oreDataset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12/\n\x08\x64sschema\x18\x03 \x01(\x0b\x32\x1d.fennel.proto.schema.DSSchema\x12*\n\x07history\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12,\n\tretention\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12L\n\x0e\x66ield_metadata\x18\x06 \x03(\x0b\x32\x34.fennel.proto.dataset.CoreDataset.FieldMetadataEntry\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x19\n\x11is_source_dataset\x18\x08 \x01(\x08\x12\x0f\n\x07version\x18\t \x01(\r\x12\x0c\n\x04tags\x18\n \x03(\t\x1aU\n\x12\x46ieldMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata:\x02\x38\x01\"Q\n\x08OnDemand\x12\x1c\n\x14\x66unction_source_code\x18\x01 \x01(\t\x12\x10\n\x08\x66unction\x18\x02 \x01(\x0c\x12\x15\n\rexpires_after\x18\x03 \x01(\x03\"\xd2\x01\n\x08Pipeline\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x14\n\x0c\x64\x61taset_name\x18\x02 \x01(\t\x12\x11\n\tsignature\x18\x03 \x01(\t\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x1b\n\x13input_dataset_names\x18\x05 \x03(\t\x12\x12\n\nds_version\x18\x06 \x01(\r\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\"\x8f\x08\n\x08Operator\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07is_root\x18\x02 \x01(\x08\x12\x15\n\rpipeline_name\x18\x03 \x01(\t\x12\x14\n\x0c\x64\x61taset_name\x18\x04 \x01(\t\x12\x12\n\nds_version\x18\x14 \x01(\r\x12\x34\n\taggregate\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.dataset.AggregateH\x00\x12*\n\x04join\x18\x06 \x01(\x0b\x32\x1a.fennel.proto.dataset.JoinH\x00\x12\x34\n\ttransform\x18\x07 \x01(\x0b\x32\x1f.fennel.proto.dataset.TransformH\x00\x12,\n\x05union\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.dataset.UnionH\x00\x12.\n\x06\x66ilter\x18\t \x01(\x0b\x32\x1c.fennel.proto.dataset.FilterH\x00\x12\x37\n\x0b\x64\x61taset_ref\x18\n \x01(\x0b\x32 .fennel.proto.dataset.DatasetRefH\x00\x12.\n\x06rename\x18\x0c \x01(\x0b\x32\x1c.fennel.proto.dataset.RenameH\x00\x12*\n\x04\x64rop\x18\r \x01(\x0b\x32\x1a.fennel.proto.dataset.DropH\x00\x12\x30\n\x07\x65xplode\x18\x0e \x01(\x0b\x32\x1d.fennel.proto.dataset.ExplodeH\x00\x12,\n\x05\x64\x65\x64up\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.dataset.DedupH\x00\x12,\n\x05\x66irst\x18\x10 \x01(\x0b\x32\x1b.fennel.proto.dataset.FirstH\x00\x12.\n\x06\x61ssign\x18\x11 \x01(\x0b\x32\x1c.fennel.proto.dataset.AssignH\x00\x12\x32\n\x08\x64ropnull\x18\x12 \x01(\x0b\x32\x1e.fennel.proto.dataset.DropnullH\x00\x12:\n\x06window\x18\x13 \x01(\x0b\x32(.fennel.proto.dataset.WindowOperatorKindH\x00\x12.\n\x06latest\x18\x15 \x01(\x0b\x32\x1c.fennel.proto.dataset.LatestH\x00\x12\x34\n\tchangelog\x18\x16 \x01(\x0b\x32\x1f.fennel.proto.dataset.ChangelogH\x00\x12\x37\n\x0b\x61ssign_expr\x18\x17 \x01(\x0b\x32 .fennel.proto.dataset.AssignExprH\x00\x12\x37\n\x0b\x66ilter_expr\x18\x18 \x01(\x0b\x32 .fennel.proto.dataset.FilterExprH\x00\x12\x0c\n\x04name\x18\x0b \x01(\tB\x06\n\x04kind\"\xf7\x01\n\tAggregate\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12)\n\x05specs\x18\x03 \x03(\x0b\x32\x1a.fennel.proto.spec.PreSpec\x12\x12\n\x05\x61long\x18\x05 \x01(\tH\x00\x88\x01\x01\x12\x43\n\remit_strategy\x18\x06 \x01(\x0e\x32,.fennel.proto.dataset.Aggregate.EmitStrategy\x12\x14\n\x0coperand_name\x18\x04 \x01(\t\"$\n\x0c\x45mitStrategy\x12\t\n\x05\x45\x61ger\x10\x00\x12\t\n\x05\x46inal\x10\x01\x42\x08\n\x06_along\"\xa2\x03\n\x04Join\x12\x16\n\x0elhs_operand_id\x18\x01 \x01(\t\x12\x1c\n\x14rhs_dsref_operand_id\x18\x02 \x01(\t\x12.\n\x02on\x18\x03 \x03(\x0b\x32\".fennel.proto.dataset.Join.OnEntry\x12\x32\n\nwithin_low\x18\x06 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x33\n\x0bwithin_high\x18\x07 \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12\x18\n\x10lhs_operand_name\x18\x04 \x01(\t\x12\x1e\n\x16rhs_dsref_operand_name\x18\x05 \x01(\t\x12+\n\x03how\x18\x08 \x01(\x0e\x32\x1e.fennel.proto.dataset.Join.How\x1a)\n\x07OnEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1a\n\x03How\x12\x08\n\x04Left\x10\x00\x12\t\n\x05Inner\x10\x01\x42\r\n\x0b_within_lowB\x0e\n\x0c_within_high\"\xed\x01\n\tTransform\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12;\n\x06schema\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.Transform.SchemaEntry\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x14\n\x0coperand_name\x18\x04 \x01(\t\x1aL\n\x0bSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType:\x02\x38\x01\"]\n\nFilterExpr\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12%\n\x04\x65xpr\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"_\n\x06\x46ilter\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12+\n\x06pycode\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xa8\x01\n\x06\x41ssign\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12+\n\x06pycode\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x13\n\x0b\x63olumn_name\x18\x03 \x01(\t\x12\x32\n\x0boutput_type\x18\x04 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x14\n\x0coperand_name\x18\x05 \x01(\t\"\xd5\x02\n\nAssignExpr\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12:\n\x05\x65xprs\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.AssignExpr.ExprsEntry\x12G\n\x0coutput_types\x18\x03 \x03(\x0b\x32\x31.fennel.proto.dataset.AssignExpr.OutputTypesEntry\x12\x14\n\x0coperand_name\x18\x05 \x01(\t\x1a\x45\n\nExprsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12&\n\x05value\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr:\x02\x38\x01\x1aQ\n\x10OutputTypesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType:\x02\x38\x01\"E\n\x08\x44ropnull\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"B\n\x04\x44rop\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x10\n\x08\x64ropcols\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xa5\x01\n\x06Rename\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12?\n\ncolumn_map\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.Rename.ColumnMapEntry\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\x1a\x30\n\x0e\x43olumnMapEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"3\n\x05Union\x12\x13\n\x0boperand_ids\x18\x01 \x03(\t\x12\x15\n\roperand_names\x18\x02 \x03(\t\"B\n\x05\x44\x65\x64up\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"D\n\x07\x45xplode\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"=\n\x05\x46irst\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\n\n\x02\x62y\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\">\n\x06Latest\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\n\n\x02\x62y\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"L\n\tChangelog\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x15\n\rdelete_column\x18\x02 \x01(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xcb\x01\n\x12WindowOperatorKind\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x30\n\x0bwindow_type\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\n\n\x02\x62y\x18\x03 \x03(\t\x12\r\n\x05\x66ield\x18\x04 \x01(\t\x12\x32\n\x07summary\x18\x06 \x01(\x0b\x32\x1c.fennel.proto.window.SummaryH\x00\x88\x01\x01\x12\x14\n\x0coperand_name\x18\x05 \x01(\tB\n\n\x08_summary\",\n\nDatasetRef\x12\x1e\n\x16referring_dataset_name\x18\x01 \x01(\t\"\x80\x02\n\x08\x44\x61taflow\x12\x16\n\x0c\x64\x61taset_name\x18\x01 \x01(\tH\x00\x12L\n\x11pipeline_dataflow\x18\x02 \x01(\x0b\x32/.fennel.proto.dataset.Dataflow.PipelineDataflowH\x00\x12\x0c\n\x04tags\x18\x03 \x03(\t\x1ax\n\x10PipelineDataflow\x12\x14\n\x0c\x64\x61taset_name\x18\x01 \x01(\t\x12\x15\n\rpipeline_name\x18\x02 \x01(\t\x12\x37\n\x0finput_dataflows\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.dataset.DataflowB\x06\n\x04kind\"\x9c\x01\n\x10PipelineLineages\x12\x14\n\x0c\x64\x61taset_name\x18\x01 \x01(\t\x12\x15\n\rpipeline_name\x18\x02 \x01(\t\x12=\n\x0einput_datasets\x18\x03 \x03(\x0b\x32%.fennel.proto.dataset.DatasetLineages\x12\x0e\n\x06\x61\x63tive\x18\x04 \x01(\x08\x12\x0c\n\x04tags\x18\x05 \x03(\t\"\\\n\x17\x44\x61tasetPipelineLineages\x12\x41\n\x11pipeline_lineages\x18\x02 \x03(\x0b\x32&.fennel.proto.dataset.PipelineLineages\"\x8b\x01\n\x0f\x44\x61tasetLineages\x12\x18\n\x0esource_dataset\x18\x01 \x01(\tH\x00\x12H\n\x0f\x64\x65rived_dataset\x18\x02 \x01(\x0b\x32-.fennel.proto.dataset.DatasetPipelineLineagesH\x00\x12\x0c\n\x04tags\x18\x03 \x03(\tB\x06\n\x04kindb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\rdataset.proto\x12\x14\x66\x65nnel.proto.dataset\x1a\x1egoogle/protobuf/duration.proto\x1a\x0emetadata.proto\x1a\x0cpycode.proto\x1a\x0cschema.proto\x1a\nspec.proto\x1a\x0cwindow.proto\x1a\nexpr.proto\"\xe5\x03\n\x0b\x43oreDataset\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x08metadata\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12/\n\x08\x64sschema\x18\x03 \x01(\x0b\x32\x1d.fennel.proto.schema.DSSchema\x12*\n\x07history\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12,\n\tretention\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12L\n\x0e\x66ield_metadata\x18\x06 \x03(\x0b\x32\x34.fennel.proto.dataset.CoreDataset.FieldMetadataEntry\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x19\n\x11is_source_dataset\x18\x08 \x01(\x08\x12\x0f\n\x07version\x18\t \x01(\r\x12\x0c\n\x04tags\x18\n \x03(\t\x1aU\n\x12\x46ieldMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12.\n\x05value\x18\x02 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata:\x02\x38\x01\"Q\n\x08OnDemand\x12\x1c\n\x14\x66unction_source_code\x18\x01 \x01(\t\x12\x10\n\x08\x66unction\x18\x02 \x01(\x0c\x12\x15\n\rexpires_after\x18\x03 \x01(\x03\"\xd2\x01\n\x08Pipeline\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x14\n\x0c\x64\x61taset_name\x18\x02 \x01(\t\x12\x11\n\tsignature\x18\x03 \x01(\t\x12\x31\n\x08metadata\x18\x04 \x01(\x0b\x32\x1f.fennel.proto.metadata.Metadata\x12\x1b\n\x13input_dataset_names\x18\x05 \x03(\t\x12\x12\n\nds_version\x18\x06 \x01(\r\x12+\n\x06pycode\x18\x07 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\"\x8f\x08\n\x08Operator\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07is_root\x18\x02 \x01(\x08\x12\x15\n\rpipeline_name\x18\x03 \x01(\t\x12\x14\n\x0c\x64\x61taset_name\x18\x04 \x01(\t\x12\x12\n\nds_version\x18\x14 \x01(\r\x12\x34\n\taggregate\x18\x05 \x01(\x0b\x32\x1f.fennel.proto.dataset.AggregateH\x00\x12*\n\x04join\x18\x06 \x01(\x0b\x32\x1a.fennel.proto.dataset.JoinH\x00\x12\x34\n\ttransform\x18\x07 \x01(\x0b\x32\x1f.fennel.proto.dataset.TransformH\x00\x12,\n\x05union\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.dataset.UnionH\x00\x12.\n\x06\x66ilter\x18\t \x01(\x0b\x32\x1c.fennel.proto.dataset.FilterH\x00\x12\x37\n\x0b\x64\x61taset_ref\x18\n \x01(\x0b\x32 .fennel.proto.dataset.DatasetRefH\x00\x12.\n\x06rename\x18\x0c \x01(\x0b\x32\x1c.fennel.proto.dataset.RenameH\x00\x12*\n\x04\x64rop\x18\r \x01(\x0b\x32\x1a.fennel.proto.dataset.DropH\x00\x12\x30\n\x07\x65xplode\x18\x0e \x01(\x0b\x32\x1d.fennel.proto.dataset.ExplodeH\x00\x12,\n\x05\x64\x65\x64up\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.dataset.DedupH\x00\x12,\n\x05\x66irst\x18\x10 \x01(\x0b\x32\x1b.fennel.proto.dataset.FirstH\x00\x12.\n\x06\x61ssign\x18\x11 \x01(\x0b\x32\x1c.fennel.proto.dataset.AssignH\x00\x12\x32\n\x08\x64ropnull\x18\x12 \x01(\x0b\x32\x1e.fennel.proto.dataset.DropnullH\x00\x12:\n\x06window\x18\x13 \x01(\x0b\x32(.fennel.proto.dataset.WindowOperatorKindH\x00\x12.\n\x06latest\x18\x15 \x01(\x0b\x32\x1c.fennel.proto.dataset.LatestH\x00\x12\x34\n\tchangelog\x18\x16 \x01(\x0b\x32\x1f.fennel.proto.dataset.ChangelogH\x00\x12\x37\n\x0b\x61ssign_expr\x18\x17 \x01(\x0b\x32 .fennel.proto.dataset.AssignExprH\x00\x12\x37\n\x0b\x66ilter_expr\x18\x18 \x01(\x0b\x32 .fennel.proto.dataset.FilterExprH\x00\x12\x0c\n\x04name\x18\x0b \x01(\tB\x06\n\x04kind\"\xc7\x01\n\tAggregate\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12)\n\x05specs\x18\x03 \x03(\x0b\x32\x1a.fennel.proto.spec.PreSpec\x12\x12\n\x05\x61long\x18\x05 \x01(\tH\x00\x88\x01\x01\x12\x39\n\remit_strategy\x18\x06 \x01(\x0e\x32\".fennel.proto.dataset.EmitStrategy\x12\x14\n\x0coperand_name\x18\x04 \x01(\tB\x08\n\x06_along\"\xa2\x03\n\x04Join\x12\x16\n\x0elhs_operand_id\x18\x01 \x01(\t\x12\x1c\n\x14rhs_dsref_operand_id\x18\x02 \x01(\t\x12.\n\x02on\x18\x03 \x03(\x0b\x32\".fennel.proto.dataset.Join.OnEntry\x12\x32\n\nwithin_low\x18\x06 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x33\n\x0bwithin_high\x18\x07 \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12\x18\n\x10lhs_operand_name\x18\x04 \x01(\t\x12\x1e\n\x16rhs_dsref_operand_name\x18\x05 \x01(\t\x12+\n\x03how\x18\x08 \x01(\x0e\x32\x1e.fennel.proto.dataset.Join.How\x1a)\n\x07OnEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\x1a\n\x03How\x12\x08\n\x04Left\x10\x00\x12\t\n\x05Inner\x10\x01\x42\r\n\x0b_within_lowB\x0e\n\x0c_within_high\"\xed\x01\n\tTransform\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12;\n\x06schema\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.Transform.SchemaEntry\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x14\n\x0coperand_name\x18\x04 \x01(\t\x1aL\n\x0bSchemaEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType:\x02\x38\x01\"]\n\nFilterExpr\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12%\n\x04\x65xpr\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"_\n\x06\x46ilter\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12+\n\x06pycode\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xa8\x01\n\x06\x41ssign\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12+\n\x06pycode\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCode\x12\x13\n\x0b\x63olumn_name\x18\x03 \x01(\t\x12\x32\n\x0boutput_type\x18\x04 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12\x14\n\x0coperand_name\x18\x05 \x01(\t\"\xd5\x02\n\nAssignExpr\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12:\n\x05\x65xprs\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.AssignExpr.ExprsEntry\x12G\n\x0coutput_types\x18\x03 \x03(\x0b\x32\x31.fennel.proto.dataset.AssignExpr.OutputTypesEntry\x12\x14\n\x0coperand_name\x18\x05 \x01(\t\x1a\x45\n\nExprsEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12&\n\x05value\x18\x02 \x01(\x0b\x32\x17.fennel.proto.expr.Expr:\x02\x38\x01\x1aQ\n\x10OutputTypesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12,\n\x05value\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType:\x02\x38\x01\"E\n\x08\x44ropnull\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"B\n\x04\x44rop\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x10\n\x08\x64ropcols\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xa5\x01\n\x06Rename\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12?\n\ncolumn_map\x18\x02 \x03(\x0b\x32+.fennel.proto.dataset.Rename.ColumnMapEntry\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\x1a\x30\n\x0e\x43olumnMapEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"3\n\x05Union\x12\x13\n\x0boperand_ids\x18\x01 \x03(\t\x12\x15\n\roperand_names\x18\x02 \x03(\t\"B\n\x05\x44\x65\x64up\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"D\n\x07\x45xplode\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x0f\n\x07\x63olumns\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"=\n\x05\x46irst\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\n\n\x02\x62y\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\">\n\x06Latest\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\n\n\x02\x62y\x18\x02 \x03(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"L\n\tChangelog\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x15\n\rdelete_column\x18\x02 \x01(\t\x12\x14\n\x0coperand_name\x18\x03 \x01(\t\"\xcb\x01\n\x12WindowOperatorKind\x12\x12\n\noperand_id\x18\x01 \x01(\t\x12\x30\n\x0bwindow_type\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\n\n\x02\x62y\x18\x03 \x03(\t\x12\r\n\x05\x66ield\x18\x04 \x01(\t\x12\x32\n\x07summary\x18\x06 \x01(\x0b\x32\x1c.fennel.proto.window.SummaryH\x00\x88\x01\x01\x12\x14\n\x0coperand_name\x18\x05 \x01(\tB\n\n\x08_summary\",\n\nDatasetRef\x12\x1e\n\x16referring_dataset_name\x18\x01 \x01(\t\"\x80\x02\n\x08\x44\x61taflow\x12\x16\n\x0c\x64\x61taset_name\x18\x01 \x01(\tH\x00\x12L\n\x11pipeline_dataflow\x18\x02 \x01(\x0b\x32/.fennel.proto.dataset.Dataflow.PipelineDataflowH\x00\x12\x0c\n\x04tags\x18\x03 \x03(\t\x1ax\n\x10PipelineDataflow\x12\x14\n\x0c\x64\x61taset_name\x18\x01 \x01(\t\x12\x15\n\rpipeline_name\x18\x02 \x01(\t\x12\x37\n\x0finput_dataflows\x18\x03 \x03(\x0b\x32\x1e.fennel.proto.dataset.DataflowB\x06\n\x04kind\"\x9c\x01\n\x10PipelineLineages\x12\x14\n\x0c\x64\x61taset_name\x18\x01 \x01(\t\x12\x15\n\rpipeline_name\x18\x02 \x01(\t\x12=\n\x0einput_datasets\x18\x03 \x03(\x0b\x32%.fennel.proto.dataset.DatasetLineages\x12\x0e\n\x06\x61\x63tive\x18\x04 \x01(\x08\x12\x0c\n\x04tags\x18\x05 \x03(\t\"\\\n\x17\x44\x61tasetPipelineLineages\x12\x41\n\x11pipeline_lineages\x18\x02 \x03(\x0b\x32&.fennel.proto.dataset.PipelineLineages\"\x8b\x01\n\x0f\x44\x61tasetLineages\x12\x18\n\x0esource_dataset\x18\x01 \x01(\tH\x00\x12H\n\x0f\x64\x65rived_dataset\x18\x02 \x01(\x0b\x32-.fennel.proto.dataset.DatasetPipelineLineagesH\x00\x12\x0c\n\x04tags\x18\x03 \x03(\tB\x06\n\x04kind*$\n\x0c\x45mitStrategy\x12\t\n\x05\x45\x61ger\x10\x00\x12\t\n\x05\x46inal\x10\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -39,6 +39,8 @@ _globals['_ASSIGNEXPR_OUTPUTTYPESENTRY']._serialized_options = b'8\001' _globals['_RENAME_COLUMNMAPENTRY']._options = None _globals['_RENAME_COLUMNMAPENTRY']._serialized_options = b'8\001' + _globals['_EMITSTRATEGY']._serialized_start=5158 + _globals['_EMITSTRATEGY']._serialized_end=5194 _globals['_COREDATASET']._serialized_start=154 _globals['_COREDATASET']._serialized_end=639 _globals['_COREDATASET_FIELDMETADATAENTRY']._serialized_start=554 @@ -50,63 +52,61 @@ _globals['_OPERATOR']._serialized_start=938 _globals['_OPERATOR']._serialized_end=1977 _globals['_AGGREGATE']._serialized_start=1980 - _globals['_AGGREGATE']._serialized_end=2227 - _globals['_AGGREGATE_EMITSTRATEGY']._serialized_start=2181 - _globals['_AGGREGATE_EMITSTRATEGY']._serialized_end=2217 - _globals['_JOIN']._serialized_start=2230 - _globals['_JOIN']._serialized_end=2648 - _globals['_JOIN_ONENTRY']._serialized_start=2548 - _globals['_JOIN_ONENTRY']._serialized_end=2589 - _globals['_JOIN_HOW']._serialized_start=2591 - _globals['_JOIN_HOW']._serialized_end=2617 - _globals['_TRANSFORM']._serialized_start=2651 - _globals['_TRANSFORM']._serialized_end=2888 - _globals['_TRANSFORM_SCHEMAENTRY']._serialized_start=2812 - _globals['_TRANSFORM_SCHEMAENTRY']._serialized_end=2888 - _globals['_FILTEREXPR']._serialized_start=2890 - _globals['_FILTEREXPR']._serialized_end=2983 - _globals['_FILTER']._serialized_start=2985 - _globals['_FILTER']._serialized_end=3080 - _globals['_ASSIGN']._serialized_start=3083 - _globals['_ASSIGN']._serialized_end=3251 - _globals['_ASSIGNEXPR']._serialized_start=3254 - _globals['_ASSIGNEXPR']._serialized_end=3595 - _globals['_ASSIGNEXPR_EXPRSENTRY']._serialized_start=3443 - _globals['_ASSIGNEXPR_EXPRSENTRY']._serialized_end=3512 - _globals['_ASSIGNEXPR_OUTPUTTYPESENTRY']._serialized_start=3514 - _globals['_ASSIGNEXPR_OUTPUTTYPESENTRY']._serialized_end=3595 - _globals['_DROPNULL']._serialized_start=3597 - _globals['_DROPNULL']._serialized_end=3666 - _globals['_DROP']._serialized_start=3668 - _globals['_DROP']._serialized_end=3734 - _globals['_RENAME']._serialized_start=3737 - _globals['_RENAME']._serialized_end=3902 - _globals['_RENAME_COLUMNMAPENTRY']._serialized_start=3854 - _globals['_RENAME_COLUMNMAPENTRY']._serialized_end=3902 - _globals['_UNION']._serialized_start=3904 - _globals['_UNION']._serialized_end=3955 - _globals['_DEDUP']._serialized_start=3957 - _globals['_DEDUP']._serialized_end=4023 - _globals['_EXPLODE']._serialized_start=4025 - _globals['_EXPLODE']._serialized_end=4093 - _globals['_FIRST']._serialized_start=4095 - _globals['_FIRST']._serialized_end=4156 - _globals['_LATEST']._serialized_start=4158 - _globals['_LATEST']._serialized_end=4220 - _globals['_CHANGELOG']._serialized_start=4222 - _globals['_CHANGELOG']._serialized_end=4298 - _globals['_WINDOWOPERATORKIND']._serialized_start=4301 - _globals['_WINDOWOPERATORKIND']._serialized_end=4504 - _globals['_DATASETREF']._serialized_start=4506 - _globals['_DATASETREF']._serialized_end=4550 - _globals['_DATAFLOW']._serialized_start=4553 - _globals['_DATAFLOW']._serialized_end=4809 - _globals['_DATAFLOW_PIPELINEDATAFLOW']._serialized_start=4681 - _globals['_DATAFLOW_PIPELINEDATAFLOW']._serialized_end=4801 - _globals['_PIPELINELINEAGES']._serialized_start=4812 - _globals['_PIPELINELINEAGES']._serialized_end=4968 - _globals['_DATASETPIPELINELINEAGES']._serialized_start=4970 - _globals['_DATASETPIPELINELINEAGES']._serialized_end=5062 - _globals['_DATASETLINEAGES']._serialized_start=5065 - _globals['_DATASETLINEAGES']._serialized_end=5204 + _globals['_AGGREGATE']._serialized_end=2179 + _globals['_JOIN']._serialized_start=2182 + _globals['_JOIN']._serialized_end=2600 + _globals['_JOIN_ONENTRY']._serialized_start=2500 + _globals['_JOIN_ONENTRY']._serialized_end=2541 + _globals['_JOIN_HOW']._serialized_start=2543 + _globals['_JOIN_HOW']._serialized_end=2569 + _globals['_TRANSFORM']._serialized_start=2603 + _globals['_TRANSFORM']._serialized_end=2840 + _globals['_TRANSFORM_SCHEMAENTRY']._serialized_start=2764 + _globals['_TRANSFORM_SCHEMAENTRY']._serialized_end=2840 + _globals['_FILTEREXPR']._serialized_start=2842 + _globals['_FILTEREXPR']._serialized_end=2935 + _globals['_FILTER']._serialized_start=2937 + _globals['_FILTER']._serialized_end=3032 + _globals['_ASSIGN']._serialized_start=3035 + _globals['_ASSIGN']._serialized_end=3203 + _globals['_ASSIGNEXPR']._serialized_start=3206 + _globals['_ASSIGNEXPR']._serialized_end=3547 + _globals['_ASSIGNEXPR_EXPRSENTRY']._serialized_start=3395 + _globals['_ASSIGNEXPR_EXPRSENTRY']._serialized_end=3464 + _globals['_ASSIGNEXPR_OUTPUTTYPESENTRY']._serialized_start=3466 + _globals['_ASSIGNEXPR_OUTPUTTYPESENTRY']._serialized_end=3547 + _globals['_DROPNULL']._serialized_start=3549 + _globals['_DROPNULL']._serialized_end=3618 + _globals['_DROP']._serialized_start=3620 + _globals['_DROP']._serialized_end=3686 + _globals['_RENAME']._serialized_start=3689 + _globals['_RENAME']._serialized_end=3854 + _globals['_RENAME_COLUMNMAPENTRY']._serialized_start=3806 + _globals['_RENAME_COLUMNMAPENTRY']._serialized_end=3854 + _globals['_UNION']._serialized_start=3856 + _globals['_UNION']._serialized_end=3907 + _globals['_DEDUP']._serialized_start=3909 + _globals['_DEDUP']._serialized_end=3975 + _globals['_EXPLODE']._serialized_start=3977 + _globals['_EXPLODE']._serialized_end=4045 + _globals['_FIRST']._serialized_start=4047 + _globals['_FIRST']._serialized_end=4108 + _globals['_LATEST']._serialized_start=4110 + _globals['_LATEST']._serialized_end=4172 + _globals['_CHANGELOG']._serialized_start=4174 + _globals['_CHANGELOG']._serialized_end=4250 + _globals['_WINDOWOPERATORKIND']._serialized_start=4253 + _globals['_WINDOWOPERATORKIND']._serialized_end=4456 + _globals['_DATASETREF']._serialized_start=4458 + _globals['_DATASETREF']._serialized_end=4502 + _globals['_DATAFLOW']._serialized_start=4505 + _globals['_DATAFLOW']._serialized_end=4761 + _globals['_DATAFLOW_PIPELINEDATAFLOW']._serialized_start=4633 + _globals['_DATAFLOW_PIPELINEDATAFLOW']._serialized_end=4753 + _globals['_PIPELINELINEAGES']._serialized_start=4764 + _globals['_PIPELINELINEAGES']._serialized_end=4920 + _globals['_DATASETPIPELINELINEAGES']._serialized_start=4922 + _globals['_DATASETPIPELINELINEAGES']._serialized_end=5014 + _globals['_DATASETLINEAGES']._serialized_start=5017 + _globals['_DATASETLINEAGES']._serialized_end=5156 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/dataset_pb2.pyi b/fennel/gen/dataset_pb2.pyi index 384699764..4a61e5a84 100644 --- a/fennel/gen/dataset_pb2.pyi +++ b/fennel/gen/dataset_pb2.pyi @@ -25,6 +25,21 @@ else: DESCRIPTOR: google.protobuf.descriptor.FileDescriptor +class _EmitStrategy: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + +class _EmitStrategyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[_EmitStrategy.ValueType], builtins.type): + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + Eager: _EmitStrategy.ValueType # 0 + Final: _EmitStrategy.ValueType # 1 + +class EmitStrategy(_EmitStrategy, metaclass=_EmitStrategyEnumTypeWrapper): ... + +Eager: EmitStrategy.ValueType # 0 +Final: EmitStrategy.ValueType # 1 +global___EmitStrategy = EmitStrategy + @typing_extensions.final class CoreDataset(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -281,19 +296,6 @@ global___Operator = Operator class Aggregate(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor - class _EmitStrategy: - ValueType = typing.NewType("ValueType", builtins.int) - V: typing_extensions.TypeAlias = ValueType - - class _EmitStrategyEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Aggregate._EmitStrategy.ValueType], builtins.type): # noqa: F821 - DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor - Eager: Aggregate._EmitStrategy.ValueType # 0 - Final: Aggregate._EmitStrategy.ValueType # 1 - - class EmitStrategy(_EmitStrategy, metaclass=_EmitStrategyEnumTypeWrapper): ... - Eager: Aggregate.EmitStrategy.ValueType # 0 - Final: Aggregate.EmitStrategy.ValueType # 1 - OPERAND_ID_FIELD_NUMBER: builtins.int KEYS_FIELD_NUMBER: builtins.int SPECS_FIELD_NUMBER: builtins.int @@ -306,7 +308,7 @@ class Aggregate(google.protobuf.message.Message): @property def specs(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[spec_pb2.PreSpec]: ... along: builtins.str - emit_strategy: global___Aggregate.EmitStrategy.ValueType + emit_strategy: global___EmitStrategy.ValueType operand_name: builtins.str """NOTE: FOLLOWING PROPERTIES ARE SET BY THE SERVER AND WILL BE IGNORED BY THE CLIENT @@ -318,7 +320,7 @@ class Aggregate(google.protobuf.message.Message): keys: collections.abc.Iterable[builtins.str] | None = ..., specs: collections.abc.Iterable[spec_pb2.PreSpec] | None = ..., along: builtins.str | None = ..., - emit_strategy: global___Aggregate.EmitStrategy.ValueType = ..., + emit_strategy: global___EmitStrategy.ValueType = ..., operand_name: builtins.str = ..., ) -> None: ... def HasField(self, field_name: typing_extensions.Literal["_along", b"_along", "along", b"along"]) -> builtins.bool: ... diff --git a/fennel/gen/window_pb2.py b/fennel/gen/window_pb2.py index 879a1cf2e..046c14ab3 100644 --- a/fennel/gen/window_pb2.py +++ b/fennel/gen/window_pb2.py @@ -16,7 +16,7 @@ import fennel.gen.schema_pb2 as schema__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cwindow.proto\x12\x13\x66\x65nnel.proto.window\x1a\x1egoogle/protobuf/duration.proto\x1a\x0cpycode.proto\x1a\x0cschema.proto\"\x8a\x02\n\x06Window\x12/\n\x07sliding\x18\x01 \x01(\x0b\x32\x1c.fennel.proto.window.SlidingH\x00\x12/\n\x07\x66orever\x18\x02 \x01(\x0b\x32\x1c.fennel.proto.window.ForeverH\x00\x12/\n\x07session\x18\x03 \x01(\x0b\x32\x1c.fennel.proto.window.SessionH\x00\x12\x31\n\x08tumbling\x18\x04 \x01(\x0b\x32\x1d.fennel.proto.window.TumblingH\x00\x12/\n\x07hopping\x18\x05 \x01(\x0b\x32\x1c.fennel.proto.window.HoppingH\x00\x42\t\n\x07variant\"6\n\x07Sliding\x12+\n\x08\x64uration\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\"\t\n\x07\x46orever\"7\n\x08Tumbling\x12+\n\x08\x64uration\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\"a\n\x07Hopping\x12+\n\x08\x64uration\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\x12)\n\x06stride\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\"1\n\x07Session\x12&\n\x03gap\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\"\x7f\n\x07Summary\x12\x13\n\x0b\x63olumn_name\x18\x01 \x01(\t\x12\x32\n\x0boutput_type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0cwindow.proto\x12\x13\x66\x65nnel.proto.window\x1a\x1egoogle/protobuf/duration.proto\x1a\x0cpycode.proto\x1a\x0cschema.proto\"\xca\x02\n\x06Window\x12/\n\x07sliding\x18\x01 \x01(\x0b\x32\x1c.fennel.proto.window.SlidingH\x00\x12/\n\x07\x66orever\x18\x02 \x01(\x0b\x32\x1c.fennel.proto.window.ForeverH\x00\x12/\n\x07session\x18\x03 \x01(\x0b\x32\x1c.fennel.proto.window.SessionH\x00\x12\x31\n\x08tumbling\x18\x04 \x01(\x0b\x32\x1d.fennel.proto.window.TumblingH\x00\x12/\n\x07hopping\x18\x05 \x01(\x0b\x32\x1c.fennel.proto.window.HoppingH\x00\x12>\n\x0f\x66orever_hopping\x18\x06 \x01(\x0b\x32#.fennel.proto.window.ForeverHoppingH\x00\x42\t\n\x07variant\"6\n\x07Sliding\x12+\n\x08\x64uration\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\"\t\n\x07\x46orever\"7\n\x08Tumbling\x12+\n\x08\x64uration\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\"a\n\x07Hopping\x12+\n\x08\x64uration\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\x12)\n\x06stride\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\";\n\x0e\x46oreverHopping\x12)\n\x06stride\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\"1\n\x07Session\x12&\n\x03gap\x18\x01 \x01(\x0b\x32\x19.google.protobuf.Duration\"\x7f\n\x07Summary\x12\x13\n\x0b\x63olumn_name\x18\x01 \x01(\t\x12\x32\n\x0boutput_type\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.schema.DataType\x12+\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -24,17 +24,19 @@ if _descriptor._USE_C_DESCRIPTORS == False: DESCRIPTOR._options = None _globals['_WINDOW']._serialized_start=98 - _globals['_WINDOW']._serialized_end=364 - _globals['_SLIDING']._serialized_start=366 - _globals['_SLIDING']._serialized_end=420 - _globals['_FOREVER']._serialized_start=422 - _globals['_FOREVER']._serialized_end=431 - _globals['_TUMBLING']._serialized_start=433 - _globals['_TUMBLING']._serialized_end=488 - _globals['_HOPPING']._serialized_start=490 - _globals['_HOPPING']._serialized_end=587 - _globals['_SESSION']._serialized_start=589 - _globals['_SESSION']._serialized_end=638 - _globals['_SUMMARY']._serialized_start=640 - _globals['_SUMMARY']._serialized_end=767 + _globals['_WINDOW']._serialized_end=428 + _globals['_SLIDING']._serialized_start=430 + _globals['_SLIDING']._serialized_end=484 + _globals['_FOREVER']._serialized_start=486 + _globals['_FOREVER']._serialized_end=495 + _globals['_TUMBLING']._serialized_start=497 + _globals['_TUMBLING']._serialized_end=552 + _globals['_HOPPING']._serialized_start=554 + _globals['_HOPPING']._serialized_end=651 + _globals['_FOREVERHOPPING']._serialized_start=653 + _globals['_FOREVERHOPPING']._serialized_end=712 + _globals['_SESSION']._serialized_start=714 + _globals['_SESSION']._serialized_end=763 + _globals['_SUMMARY']._serialized_start=765 + _globals['_SUMMARY']._serialized_end=892 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/window_pb2.pyi b/fennel/gen/window_pb2.pyi index 6da08c057..f15870647 100644 --- a/fennel/gen/window_pb2.pyi +++ b/fennel/gen/window_pb2.pyi @@ -26,6 +26,7 @@ class Window(google.protobuf.message.Message): SESSION_FIELD_NUMBER: builtins.int TUMBLING_FIELD_NUMBER: builtins.int HOPPING_FIELD_NUMBER: builtins.int + FOREVER_HOPPING_FIELD_NUMBER: builtins.int @property def sliding(self) -> global___Sliding: ... @property @@ -36,6 +37,8 @@ class Window(google.protobuf.message.Message): def tumbling(self) -> global___Tumbling: ... @property def hopping(self) -> global___Hopping: ... + @property + def forever_hopping(self) -> global___ForeverHopping: ... def __init__( self, *, @@ -44,10 +47,11 @@ class Window(google.protobuf.message.Message): session: global___Session | None = ..., tumbling: global___Tumbling | None = ..., hopping: global___Hopping | None = ..., + forever_hopping: global___ForeverHopping | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["forever", b"forever", "hopping", b"hopping", "session", b"session", "sliding", b"sliding", "tumbling", b"tumbling", "variant", b"variant"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["forever", b"forever", "hopping", b"hopping", "session", b"session", "sliding", b"sliding", "tumbling", b"tumbling", "variant", b"variant"]) -> None: ... - def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["sliding", "forever", "session", "tumbling", "hopping"] | None: ... + def HasField(self, field_name: typing_extensions.Literal["forever", b"forever", "forever_hopping", b"forever_hopping", "hopping", b"hopping", "session", b"session", "sliding", b"sliding", "tumbling", b"tumbling", "variant", b"variant"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["forever", b"forever", "forever_hopping", b"forever_hopping", "hopping", b"hopping", "session", b"session", "sliding", b"sliding", "tumbling", b"tumbling", "variant", b"variant"]) -> None: ... + def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["sliding", "forever", "session", "tumbling", "hopping", "forever_hopping"] | None: ... global___Window = Window @@ -120,6 +124,23 @@ class Hopping(google.protobuf.message.Message): global___Hopping = Hopping +@typing_extensions.final +class ForeverHopping(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + STRIDE_FIELD_NUMBER: builtins.int + @property + def stride(self) -> google.protobuf.duration_pb2.Duration: ... + def __init__( + self, + *, + stride: google.protobuf.duration_pb2.Duration | None = ..., + ) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["stride", b"stride"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["stride", b"stride"]) -> None: ... + +global___ForeverHopping = ForeverHopping + @typing_extensions.final class Session(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/fennel/internal_lib/to_proto/serializer.py b/fennel/internal_lib/to_proto/serializer.py index 52cedb0a4..65a8b2af6 100644 --- a/fennel/internal_lib/to_proto/serializer.py +++ b/fennel/internal_lib/to_proto/serializer.py @@ -185,9 +185,9 @@ def visitAssign(self, obj): def visitAggregate(self, obj): emit_strategy = ( - proto.Aggregate.Eager + proto.EmitStrategy.Eager if obj.emit_strategy == EmitStrategy.Eager - else proto.Aggregate.Final + else proto.EmitStrategy.Final ) if obj.window_field: return proto.Operator( diff --git a/fennel/lib/functions/functions.py b/fennel/lib/functions/functions.py index 020ce642c..f4ca3200d 100644 --- a/fennel/lib/functions/functions.py +++ b/fennel/lib/functions/functions.py @@ -35,12 +35,9 @@ def bucketize( elif isinstance(window, tuple): duration = int(duration_to_timedelta(window[0]).total_seconds()) stride = int(duration_to_timedelta(window[1]).total_seconds()) - elif isinstance(window, Tumbling): - duration = int(duration_to_timedelta(window.duration).total_seconds()) - stride = int(duration_to_timedelta(window.duration).total_seconds()) - elif isinstance(window, Hopping): - duration = int(duration_to_timedelta(window.duration).total_seconds()) - stride = int(duration_to_timedelta(window.stride).total_seconds()) + elif isinstance(window, (Tumbling, Hopping)): + duration = window.duration_total_seconds() + stride = window.stride_total_seconds() else: raise ValueError("Unsupported window type") diff --git a/fennel/testing/execute_aggregation.py b/fennel/testing/execute_aggregation.py index f8c8dc014..3127449e1 100644 --- a/fennel/testing/execute_aggregation.py +++ b/fennel/testing/execute_aggregation.py @@ -384,21 +384,27 @@ def get_val(self) -> List: def get_timestamps_for_hopping_window( - timestamp: datetime, duration: int, stride: int + timestamp: datetime, + frontier: datetime, + duration: int, + stride: int, + lookback: int, ) -> List[datetime]: """ Given a window duration, stride and a timestamp first fetch all the windows in which the given timestamp lies then return the list of window end timestamps. """ + until = int(frontier.timestamp()) - lookback + current_ts = int(timestamp.timestamp()) - # Get the window of which current timestamp is part of + # Get the first window of which current timestamp is part of start_ts = (((current_ts - duration) // stride) + 1) * stride results = [] # Getting all the window of which timestamp is a part of - while start_ts <= current_ts: + while start_ts <= current_ts and (start_ts + duration) <= until: results.append( datetime.fromtimestamp(start_ts + duration, tz=timezone.utc) ) @@ -602,17 +608,25 @@ def add_inserts_deletes_discrete_window( """ According to Aggregation Specification preprocess the dataframe to add delete or inserts. """ + frontier = input_df[ts_field].max() if window.duration != "forever": duration = window.duration_total_seconds() else: duration = window.stride_total_seconds() stride = window.stride_total_seconds() + lookback = window.lookback_total_seconds() + + # New frontier + until = frontier - timedelta(seconds=lookback) # Add the inserts for which row against end timestamp of every window in which the row belongs to input_df[ts_field] = input_df[ts_field].apply( - lambda y: get_timestamps_for_hopping_window(y, duration, stride) + lambda y: get_timestamps_for_hopping_window( + y, frontier, duration, stride, lookback + ) ) input_df = input_df.explode(ts_field).reset_index(drop=True) + input_df = input_df.loc[input_df[ts_field].notna()] input_df[ts_field] = cast_col_to_arrow_dtype( input_df[ts_field], get_timestamp_data_type() ) @@ -639,6 +653,9 @@ def add_inserts_deletes_discrete_window( del_df[ts_field] = del_df[FENNEL_DELETE_TIMESTAMP] input_df = pd.concat([input_df, del_df], ignore_index=True) + # Filter out rows which are greater than until -> (frontier - lookback) + input_df = input_df.loc[input_df[ts_field] <= until].reset_index(drop=True) + # Get the window field from end timestamp, then first cast arrow type and then to frozendict if is_window_key_field: input_df[WINDOW_FIELD_NAME] = cast_col_to_arrow_dtype( diff --git a/fennel/testing/executor.py b/fennel/testing/executor.py index a29fc9bc8..ccb67eca8 100644 --- a/fennel/testing/executor.py +++ b/fennel/testing/executor.py @@ -11,8 +11,21 @@ from frozendict import frozendict import fennel.gen.schema_pb2 as schema_proto -from fennel.datasets import Pipeline, Visitor, Dataset, Count -from fennel.datasets.datasets import UDFType, DSSchema, WINDOW_FIELD_NAME +from fennel.datasets import Pipeline, Visitor, Dataset +from fennel.datasets.datasets import DSSchema, UDFType, WINDOW_FIELD_NAME +from fennel.datasets.aggregate import ( + AggregateType, + Average, + Count, + Distinct, + ExpDecaySum, + Sum, + Min, + Max, + LastK, + Quantile, + Stddev, +) from fennel.gen.schema_pb2 import Field from fennel.internal_lib.duration import duration_to_timedelta from fennel.internal_lib.schema import get_datatype, fennel_is_optional @@ -274,7 +287,9 @@ def _merge_df( def visitAggregate(self, obj): def join_aggregated_dataset( - schema: DSSchema, column_wise_df: Dict[str, pd.DataFrame] + schema: DSSchema, + column_wise_df: Dict[str, pd.DataFrame], + aggregates: List[AggregateType], ) -> pd.DataFrame: """ Internal function to join aggregated datasets, where each aggregated @@ -324,6 +339,33 @@ def join_aggregated_dataset( # Delete FENNEL_TIMESTAMP column as this should be generated again if FENNEL_DELETE_TIMESTAMP in df.columns: # type: ignore final_df.drop(columns=[FENNEL_DELETE_TIMESTAMP], inplace=True) # type: ignore + + # During merging there can be multiple rows which has some columns as null, we need to fill them default + # values. + for aggregate in aggregates: + if isinstance(aggregate, (Count, Sum, ExpDecaySum)): + final_df[aggregate.into_field] = final_df[ + aggregate.into_field + ].fillna(0) + if isinstance(aggregate, (LastK, Distinct)): + # final_df[aggregate.into_field] = final_df[ + # aggregate.into_field + # ].fillna([]) + # fillna doesn't work for list type or dict type :cols + for row in final_df.loc[ + final_df[aggregate.into_field].isnull() + ].index: + final_df.loc[row, aggregate.into_field] = [] + if isinstance(aggregate, (Average, Min, Max, Stddev, Quantile)): + if pd.isna(aggregate.default): + final_df[aggregate.into_field] = final_df[ + aggregate.into_field + ].fillna(pd.NA) + else: + final_df[aggregate.into_field] = final_df[ + aggregate.into_field + ].fillna(aggregate.default) + return final_df input_ret = self.visit(obj.node) @@ -362,7 +404,9 @@ def join_aggregated_dataset( output_schema.values[aggregate.into_field], True if obj.window_field else False, ) - final_df = join_aggregated_dataset(output_schema, result) + final_df = join_aggregated_dataset( + output_schema, result, obj.aggregates + ) else: # this is the case where 'window' param in groupby is used if not obj.window_field: diff --git a/fennel/testing/test_execute_aggregation.py b/fennel/testing/test_execute_aggregation.py index 609affac3..8bdf47eb7 100644 --- a/fennel/testing/test_execute_aggregation.py +++ b/fennel/testing/test_execute_aggregation.py @@ -159,6 +159,7 @@ def test_distinct_state(): def test_get_timestamps_for_hopping_window(): timestamp = datetime(2020, 1, 2, 13, 0, 0, tzinfo=timezone.utc) + frontier = datetime.now(timezone.utc) secs_1d = 24 * 60 * 60 secs_1h = 60 * 60 @@ -166,10 +167,14 @@ def test_get_timestamps_for_hopping_window(): # Test tumbling window of duration 1d. assert [ datetime(2020, 1, 3, 0, 0, 0, tzinfo=timezone.utc) - ] == get_timestamps_for_hopping_window(timestamp, secs_1d, secs_1d) + ] == get_timestamps_for_hopping_window( + timestamp, frontier, secs_1d, secs_1d, 0 + ) # Test Hopping window of duration 1d stride 1h. - assert get_timestamps_for_hopping_window(timestamp, secs_1d, secs_1h) == [ + assert get_timestamps_for_hopping_window( + timestamp, frontier, secs_1d, secs_1h, 0 + ) == [ datetime(2020, 1, 2, 14, 0, tzinfo=timezone.utc), datetime(2020, 1, 2, 15, 0, tzinfo=timezone.utc), datetime(2020, 1, 2, 16, 0, tzinfo=timezone.utc), @@ -197,6 +202,55 @@ def test_get_timestamps_for_hopping_window(): ] +def test_get_timestamps_for_hopping_window_with_lookback(): + timestamp = datetime(2020, 1, 2, 13, 0, 0, tzinfo=timezone.utc) + frontier = datetime(2020, 1, 3, 14, 0, 0, tzinfo=timezone.utc) + + secs_1d = 24 * 60 * 60 + secs_1h = 60 * 60 + + # Test tumbling window of duration 1d with lookback of 15 hours + assert [] == get_timestamps_for_hopping_window( + timestamp, frontier, secs_1d, secs_1d, 15 * 60 * 60 + ) + + # Test tumbling window of duration 1d with lookback of 14 hours + assert [ + datetime(2020, 1, 3, 0, 0, 0, tzinfo=timezone.utc) + ] == get_timestamps_for_hopping_window( + timestamp, frontier, secs_1d, secs_1d, 60 * 60 + ) + + # Test Hopping window of duration 1d stride 1h and lookback of 2 hours. + assert get_timestamps_for_hopping_window( + timestamp, frontier, secs_1d, secs_1h, 2 * 60 * 60 + ) == [ + datetime(2020, 1, 2, 14, 0, tzinfo=timezone.utc), + datetime(2020, 1, 2, 15, 0, tzinfo=timezone.utc), + datetime(2020, 1, 2, 16, 0, tzinfo=timezone.utc), + datetime(2020, 1, 2, 17, 0, tzinfo=timezone.utc), + datetime(2020, 1, 2, 18, 0, tzinfo=timezone.utc), + datetime(2020, 1, 2, 19, 0, tzinfo=timezone.utc), + datetime(2020, 1, 2, 20, 0, tzinfo=timezone.utc), + datetime(2020, 1, 2, 21, 0, tzinfo=timezone.utc), + datetime(2020, 1, 2, 22, 0, tzinfo=timezone.utc), + datetime(2020, 1, 2, 23, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 0, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 1, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 2, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 3, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 4, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 5, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 6, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 7, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 8, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 9, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 10, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 11, 0, tzinfo=timezone.utc), + datetime(2020, 1, 3, 12, 0, tzinfo=timezone.utc), + ] + + def test_get_timestamps_for_session_window(): # test session end timestamps for gap of 1 secs data = pd.DataFrame( diff --git a/pyproject.toml b/pyproject.toml index 750c066b7..a8ab77626 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.5.4" +version = "1.5.5" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]