From c91099908f70a1c675276098862886cc6a2946a7 Mon Sep 17 00:00:00 2001 From: Xiao Jiang Date: Fri, 18 Aug 2023 12:58:11 -0700 Subject: [PATCH 1/2] [Doc] Add kafka and delta lake documentation (#269) --- .gitignore | 1 + docs/examples/api-reference/source.py | 45 ++++++++++++++++++++++++++ docs/pages/api-reference/sources.md | 22 +++++++++++++ fennel/.DS_Store | Bin 6148 -> 0 bytes fennel/sources/test_sources.py | 2 -- 5 files changed, 68 insertions(+), 2 deletions(-) delete mode 100644 fennel/.DS_Store diff --git a/.gitignore b/.gitignore index 32a2f8d50..4b27131f9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +**/.DS_Store fennel/proto/*.proto output.txt diff --git a/docs/examples/api-reference/source.py b/docs/examples/api-reference/source.py index 08fefcd14..ac8fcdc8f 100644 --- a/docs/examples/api-reference/source.py +++ b/docs/examples/api-reference/source.py @@ -133,3 +133,48 @@ class UserHudiSourcedDataset: # /docsnip + +# docsnip kafka_source +kafka = sources.Kafka( + name="kafka_src", + bootstrap_servers="localhost:9092", + security_protocol="PLAINTEXT", + sasl_mechanism="PLAIN", + sasl_plain_username="test", + sasl_plain_password="test", + verify_cert=False, +) + + +@source(kafka.topic("user")) +@meta(owner="abc@email.com") +@dataset +class UserKafkaSourcedDataset: + uid: int = field(key=True) + email: str + timestamp: datetime + ... + + +# /docsnip + + +# docsnip s3_delta_lake_source +s3 = sources.S3( + name="ratings_source", + aws_access_key_id="", + aws_secret_access_key="", +) + + +@source(s3.bucket("engagement", prefix="notion", format="delta"), every="30m") +@meta(owner="abc@email.com") +@dataset +class UserDeltaLakeSourcedDataset: + uid: int = field(key=True) + email: str + timestamp: datetime + ... + + +# /docsnip diff --git a/docs/pages/api-reference/sources.md b/docs/pages/api-reference/sources.md index 39e3da1e9..905604fd9 100644 --- a/docs/pages/api-reference/sources.md +++ b/docs/pages/api-reference/sources.md @@ -154,4 +154,26 @@ configuring the S3 bucket.

 
+### Kafka
 
+The following fields need to be defined for the source:
+
+1. **`name`** - A name to identify the source. The name should be unique across all sources.
+2. **`bootstrap_servers`** - A list of broker host or host\:port.
+3. **`security_protocol`** - Protocol used to communicate with brokers. Supported PLAINTEXT, SASL_PLAINTEXT, and SASL_SSL.
+4. **`sasl_mechanism`** - SASL mechanism to use for authentication. For example, SCRAM-SHA-256, PLAIN.
+5. **`sasl_plain_username`** - SASL username.
+6. **`sasl_plain_password`** - SASL password.
+7. **`verify_cert`** - Enable OpenSSL's builtin broker (server) certificate verification. Default is true.
+
+The following fields need to be defined on the topic:
+1. **`topic`** - The kafka topic.
+
+
+

+
+### Delta Lake
+
+Similar to Hudi, Fennel integrates with Delta Lkae via its S3 connector. To use delta lake, simply set the `format` field to "delta" when configuring the S3 bucket.
+
+

diff --git a/fennel/.DS_Store b/fennel/.DS_Store
deleted file mode 100644
index eac2327f95cc32bb2e833e1531442136211b6635..0000000000000000000000000000000000000000
GIT binary patch
literal 0
HcmV?d00001

literal 6148
zcmeHK(TdYR6ur~dZEAO+g2KLt0bfhqx=0mYYP*XHqF_ZIRBE%SyTNQyn$%KCq0jmc
z{(|`CkLbVnr032|mt?vXpG0JCnK_fmIg`oBO{POcqBHTEL^UFEP#8-WQJoWfp37Pk
zbk8NAkYfatQXjttjn>$
z-XR}PG(}0^+?sC%=4CUX0@h8GjZPQ78Hy^s1)r}mp)p$LUZ>bKR7E-lH_AW7;+HiR
z=ZTkRm97~q2P@PJnU|s-VN@gkWVV0GWP2HNjOA>Pc)t13iL*S*b}+8WxxUO=V>U9o
zv;3V5e}Z-ZRU%hW<1G~Y9J5LXbNU^hb3Dt(;aTWhajIuMwqYHx4p;}21N?n(Q5f4A
zmkQ;l1C`tY088kWfj0kHV2!J>t#PRkXAnY3fl4a$7DFgG`d!UyYg{T+auRy;A@r4n
z-k}J2b&T&yI*GPITU!UL19=CE>afP^|KaBSe_mvdtOM48|H=VTI_MpAa7%h^UAsA6
vYh9E#C>+dNDwGmbdO4O2UW#{7WZ<034Paa2QXzT}_K$$H!B*CRzv{p*lN1_F

diff --git a/fennel/sources/test_sources.py b/fennel/sources/test_sources.py
index 1e9639578..d709bf4b3 100644
--- a/fennel/sources/test_sources.py
+++ b/fennel/sources/test_sources.py
@@ -206,8 +206,6 @@ class UserInfoDataset:
 kafka = Kafka(
     name="kafka_src",
     bootstrap_servers="localhost:9092",
-    topic="test_topic",
-    group_id="test_group",
     security_protocol="PLAINTEXT",
     sasl_mechanism="PLAIN",
     sasl_plain_username="test",

From eeabb2d021a304723f3dc0fa7c2dbdda92e7c83a Mon Sep 17 00:00:00 2001
From: Mohit Reddy 
Date: Fri, 18 Aug 2023 16:42:36 -0700
Subject: [PATCH 2/2] add protos and e2e test distinct (#271)

---
 fennel/CHANGELOG.md               |  4 ++++
 fennel/gen/spec_pb2.py            | 34 ++++++++++++++++---------------
 fennel/gen/spec_pb2.pyi           | 33 +++++++++++++++++++++++++++---
 fennel/lib/aggregate/aggregate.py | 12 +++++++----
 pyproject.toml                    |  2 +-
 5 files changed, 61 insertions(+), 24 deletions(-)

diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md
index ec5b69284..a8d12b42f 100644
--- a/fennel/CHANGELOG.md
+++ b/fennel/CHANGELOG.md
@@ -1,5 +1,9 @@
 # Changelog
 
+
+## [0.17.8] - 2023-08-17
+- Added support for distinct aggregate in the backend 
+
 ## [0.17.7] - 2023-08-17
 - Distinct type for aggregations 
 
diff --git a/fennel/gen/spec_pb2.py b/fennel/gen/spec_pb2.py
index f45f3e03b..3e9fce222 100644
--- a/fennel/gen/spec_pb2.py
+++ b/fennel/gen/spec_pb2.py
@@ -14,7 +14,7 @@
 import fennel.gen.window_pb2 as window__pb2
 
 
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\"\xbc\x02\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"a\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"n\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\"l\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"]\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"]\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"`\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\x62\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\nspec.proto\x12\x11\x66\x65nnel.proto.spec\x1a\x0cwindow.proto\"\xed\x02\n\x07PreSpec\x12%\n\x03sum\x18\x01 \x01(\x0b\x32\x16.fennel.proto.spec.SumH\x00\x12-\n\x07\x61verage\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.spec.AverageH\x00\x12)\n\x05\x63ount\x18\x03 \x01(\x0b\x32\x18.fennel.proto.spec.CountH\x00\x12*\n\x06last_k\x18\x04 \x01(\x0b\x32\x18.fennel.proto.spec.LastKH\x00\x12%\n\x03min\x18\x05 \x01(\x0b\x32\x16.fennel.proto.spec.MinH\x00\x12%\n\x03max\x18\x06 \x01(\x0b\x32\x16.fennel.proto.spec.MaxH\x00\x12+\n\x06stddev\x18\x07 \x01(\x0b\x32\x19.fennel.proto.spec.StddevH\x00\x12/\n\x08\x64istinct\x18\x08 \x01(\x0b\x32\x1b.fennel.proto.spec.DistinctH\x00\x42\t\n\x07variant\"L\n\x03Sum\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"a\n\x07\x41verage\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"n\n\x05\x43ount\x12\x0c\n\x04name\x18\x01 \x01(\t\x12+\n\x06window\x18\x02 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0e\n\x06unique\x18\x03 \x01(\x08\x12\x0e\n\x06\x61pprox\x18\x04 \x01(\x08\x12\n\n\x02of\x18\x05 \x01(\t\"l\n\x05LastK\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05limit\x18\x03 \x01(\r\x12\r\n\x05\x64\x65\x64up\x18\x04 \x01(\x08\x12+\n\x06window\x18\x05 \x01(\x0b\x32\x1b.fennel.proto.window.Window\"]\n\x03Min\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"]\n\x03Max\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"`\n\x06Stddev\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Window\x12\x0f\n\x07\x64\x65\x66\x61ult\x18\x04 \x01(\x01\"Q\n\x08\x44istinct\x12\n\n\x02of\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12+\n\x06window\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.window.Windowb\x06proto3')
 
 _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
 _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'spec_pb2', globals())
@@ -22,19 +22,21 @@
 
   DESCRIPTOR._options = None
   _PRESPEC._serialized_start=48
-  _PRESPEC._serialized_end=364
-  _SUM._serialized_start=366
-  _SUM._serialized_end=442
-  _AVERAGE._serialized_start=444
-  _AVERAGE._serialized_end=541
-  _COUNT._serialized_start=543
-  _COUNT._serialized_end=653
-  _LASTK._serialized_start=655
-  _LASTK._serialized_end=763
-  _MIN._serialized_start=765
-  _MIN._serialized_end=858
-  _MAX._serialized_start=860
-  _MAX._serialized_end=953
-  _STDDEV._serialized_start=955
-  _STDDEV._serialized_end=1051
+  _PRESPEC._serialized_end=413
+  _SUM._serialized_start=415
+  _SUM._serialized_end=491
+  _AVERAGE._serialized_start=493
+  _AVERAGE._serialized_end=590
+  _COUNT._serialized_start=592
+  _COUNT._serialized_end=702
+  _LASTK._serialized_start=704
+  _LASTK._serialized_end=812
+  _MIN._serialized_start=814
+  _MIN._serialized_end=907
+  _MAX._serialized_start=909
+  _MAX._serialized_end=1002
+  _STDDEV._serialized_start=1004
+  _STDDEV._serialized_end=1100
+  _DISTINCT._serialized_start=1102
+  _DISTINCT._serialized_end=1183
 # @@protoc_insertion_point(module_scope)
diff --git a/fennel/gen/spec_pb2.pyi b/fennel/gen/spec_pb2.pyi
index 0b5da2801..e520d5253 100644
--- a/fennel/gen/spec_pb2.pyi
+++ b/fennel/gen/spec_pb2.pyi
@@ -26,6 +26,7 @@ class PreSpec(google.protobuf.message.Message):
     MIN_FIELD_NUMBER: builtins.int
     MAX_FIELD_NUMBER: builtins.int
     STDDEV_FIELD_NUMBER: builtins.int
+    DISTINCT_FIELD_NUMBER: builtins.int
     @property
     def sum(self) -> global___Sum: ...
     @property
@@ -40,6 +41,8 @@ class PreSpec(google.protobuf.message.Message):
     def max(self) -> global___Max: ...
     @property
     def stddev(self) -> global___Stddev: ...
+    @property
+    def distinct(self) -> global___Distinct: ...
     def __init__(
         self,
         *,
@@ -50,10 +53,11 @@ class PreSpec(google.protobuf.message.Message):
         min: global___Min | None = ...,
         max: global___Max | None = ...,
         stddev: global___Stddev | None = ...,
+        distinct: global___Distinct | None = ...,
     ) -> None: ...
-    def HasField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "last_k", b"last_k", "max", b"max", "min", b"min", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> builtins.bool: ...
-    def ClearField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "last_k", b"last_k", "max", b"max", "min", b"min", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> None: ...
-    def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["sum", "average", "count", "last_k", "min", "max", "stddev"] | None: ...
+    def HasField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "distinct", b"distinct", "last_k", b"last_k", "max", b"max", "min", b"min", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> builtins.bool: ...
+    def ClearField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "distinct", b"distinct", "last_k", b"last_k", "max", b"max", "min", b"min", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> None: ...
+    def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["sum", "average", "count", "last_k", "min", "max", "stddev", "distinct"] | None: ...
 
 global___PreSpec = PreSpec
 
@@ -241,3 +245,26 @@ class Stddev(google.protobuf.message.Message):
     def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...
 
 global___Stddev = Stddev
+
+@typing_extensions.final
+class Distinct(google.protobuf.message.Message):
+    DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+    OF_FIELD_NUMBER: builtins.int
+    NAME_FIELD_NUMBER: builtins.int
+    WINDOW_FIELD_NUMBER: builtins.int
+    of: builtins.str
+    name: builtins.str
+    @property
+    def window(self) -> window_pb2.Window: ...
+    def __init__(
+        self,
+        *,
+        of: builtins.str = ...,
+        name: builtins.str = ...,
+        window: window_pb2.Window | None = ...,
+    ) -> None: ...
+    def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ...
+    def ClearField(self, field_name: typing_extensions.Literal["name", b"name", "of", b"of", "window", b"window"]) -> None: ...
+
+global___Distinct = Distinct
diff --git a/fennel/lib/aggregate/aggregate.py b/fennel/lib/aggregate/aggregate.py
index 9a5047fdc..d459db979 100644
--- a/fennel/lib/aggregate/aggregate.py
+++ b/fennel/lib/aggregate/aggregate.py
@@ -64,13 +64,17 @@ def signature(self):
 
 
 class Distinct(AggregateType):
-    of: Optional[str] = None
+    of: str
     unordered: bool
 
     def to_proto(self) -> spec_proto.PreSpec:
-        return NotImplementedError(
-            "TODO: Generate protos for client"
-        )  # type: ignore
+        return spec_proto.PreSpec(
+            distinct=spec_proto.Distinct(
+                window=self.window.to_proto(),
+                name=self.into_field,
+                of=self.of,
+            )
+        )
 
     def validate(self):
         if not self.unordered:
diff --git a/pyproject.toml b/pyproject.toml
index 3aaafc5eb..c29a7bfe8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
 [tool.poetry]
 name = "fennel-ai"
-version = "0.17.7"
+version = "0.17.8"
 description = "The modern realtime feature engineering platform"
 authors = ["Fennel AI "]
 packages = [{ include = "fennel" }]