Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into davidwf/add-spellchec…
Browse files Browse the repository at this point in the history
…k-to-docs
  • Loading branch information
davidfaulkner12 committed Aug 21, 2023
2 parents fee2f92 + eeabb2d commit 11e48a3
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
**/.DS_Store
fennel/proto/*.proto
output.txt

Expand Down
45 changes: 45 additions & 0 deletions docs/examples/api-reference/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,48 @@ class UserHudiSourcedDataset:


# /docsnip

# docsnip kafka_source
kafka = sources.Kafka(
name="kafka_src",
bootstrap_servers="localhost:9092",
security_protocol="PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username="test",
sasl_plain_password="test",
verify_cert=False,
)


@source(kafka.topic("user"))
@meta(owner="abc@email.com")
@dataset
class UserKafkaSourcedDataset:
uid: int = field(key=True)
email: str
timestamp: datetime
...


# /docsnip


# docsnip s3_delta_lake_source
s3 = sources.S3(
name="ratings_source",
aws_access_key_id="<SOME_ACCESS_KEY>",
aws_secret_access_key="<SOME_SECRET_ACCESS_KEY>",
)


@source(s3.bucket("engagement", prefix="notion", format="delta"), every="30m")
@meta(owner="abc@email.com")
@dataset
class UserDeltaLakeSourcedDataset:
uid: int = field(key=True)
email: str
timestamp: datetime
...


# /docsnip
22 changes: 22 additions & 0 deletions docs/pages/api-reference/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,26 @@ configuring the S3 bucket.

<pre snippet="api-reference/source#s3_hudi_source"></pre>

### Kafka

The following fields need to be defined for the source:

1. **`name`** - A name to identify the source. The name should be unique across all sources.
2. **`bootstrap_servers`** - A list of broker host or host\:port.
3. **`security_protocol`** - Protocol used to communicate with brokers. Supported PLAINTEXT, SASL_PLAINTEXT, and SASL_SSL.
4. **`sasl_mechanism`** - SASL mechanism to use for authentication. For example, SCRAM-SHA-256, PLAIN.
5. **`sasl_plain_username`** - SASL username.
6. **`sasl_plain_password`** - SASL password.
7. **`verify_cert`** - Enable OpenSSL's builtin broker (server) certificate verification. Default is true.

The following fields need to be defined on the topic:
1. **`topic`** - The kafka topic.


<pre snippet="api-reference/source#kafka_source"></pre>

### Delta Lake

Similar to Hudi, Fennel integrates with Delta Lkae via its S3 connector. To use delta lake, simply set the `format` field to "delta" when configuring the S3 bucket.

<pre snippet="api-reference/source#s3_delta_lake_source"></pre>
Binary file removed fennel/.DS_Store
Binary file not shown.
4 changes: 4 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog


## [0.17.8] - 2023-08-17
- Added support for distinct aggregate in the backend

## [0.17.7] - 2023-08-17
- Distinct type for aggregations

Expand Down
34 changes: 18 additions & 16 deletions fennel/gen/spec_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 30 additions & 3 deletions fennel/gen/spec_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class PreSpec(google.protobuf.message.Message):
MIN_FIELD_NUMBER: builtins.int
MAX_FIELD_NUMBER: builtins.int
STDDEV_FIELD_NUMBER: builtins.int
DISTINCT_FIELD_NUMBER: builtins.int
@property
def sum(self) -> global___Sum: ...
@property
Expand All @@ -40,6 +41,8 @@ class PreSpec(google.protobuf.message.Message):
def max(self) -> global___Max: ...
@property
def stddev(self) -> global___Stddev: ...
@property
def distinct(self) -> global___Distinct: ...
def __init__(
self,
*,
Expand All @@ -50,10 +53,11 @@ class PreSpec(google.protobuf.message.Message):
min: global___Min | None = ...,
max: global___Max | None = ...,
stddev: global___Stddev | None = ...,
distinct: global___Distinct | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "last_k", b"last_k", "max", b"max", "min", b"min", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "last_k", b"last_k", "max", b"max", "min", b"min", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["sum", "average", "count", "last_k", "min", "max", "stddev"] | None: ...
def HasField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "distinct", b"distinct", "last_k", b"last_k", "max", b"max", "min", b"min", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["average", b"average", "count", b"count", "distinct", b"distinct", "last_k", b"last_k", "max", b"max", "min", b"min", "stddev", b"stddev", "sum", b"sum", "variant", b"variant"]) -> None: ...
def WhichOneof(self, oneof_group: typing_extensions.Literal["variant", b"variant"]) -> typing_extensions.Literal["sum", "average", "count", "last_k", "min", "max", "stddev", "distinct"] | None: ...

global___PreSpec = PreSpec

Expand Down Expand Up @@ -241,3 +245,26 @@ class Stddev(google.protobuf.message.Message):
def ClearField(self, field_name: typing_extensions.Literal["default", b"default", "name", b"name", "of", b"of", "window", b"window"]) -> None: ...

global___Stddev = Stddev

@typing_extensions.final
class Distinct(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor

OF_FIELD_NUMBER: builtins.int
NAME_FIELD_NUMBER: builtins.int
WINDOW_FIELD_NUMBER: builtins.int
of: builtins.str
name: builtins.str
@property
def window(self) -> window_pb2.Window: ...
def __init__(
self,
*,
of: builtins.str = ...,
name: builtins.str = ...,
window: window_pb2.Window | None = ...,
) -> None: ...
def HasField(self, field_name: typing_extensions.Literal["window", b"window"]) -> builtins.bool: ...
def ClearField(self, field_name: typing_extensions.Literal["name", b"name", "of", b"of", "window", b"window"]) -> None: ...

global___Distinct = Distinct
12 changes: 8 additions & 4 deletions fennel/lib/aggregate/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ def signature(self):


class Distinct(AggregateType):
of: Optional[str] = None
of: str
unordered: bool

def to_proto(self) -> spec_proto.PreSpec:
return NotImplementedError(
"TODO: Generate protos for client"
) # type: ignore
return spec_proto.PreSpec(
distinct=spec_proto.Distinct(
window=self.window.to_proto(),
name=self.into_field,
of=self.of,
)
)

def validate(self):
if not self.unordered:
Expand Down
2 changes: 0 additions & 2 deletions fennel/sources/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,6 @@ class UserInfoDataset:
kafka = Kafka(
name="kafka_src",
bootstrap_servers="localhost:9092",
topic="test_topic",
group_id="test_group",
security_protocol="PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username="test",
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "0.17.7"
version = "0.17.8"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <developers@fennel.ai>"]
packages = [{ include = "fennel" }]
Expand Down

0 comments on commit 11e48a3

Please sign in to comment.