Skip to content

Commit

Permalink
Add support for indirections in preproc ref type for protobuf format (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
saiharshavellanki committed Jul 30, 2024
1 parent aa41684 commit d60f732
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/pages/api-reference/decorators/source.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ As of right now, there are two kinds of values of preproc:
a constant value.

:::info
Fennel supports preproc ref(str) values of type A[B][C] only for the JSON format and
Fennel supports preproc ref(str) values of type A[B][C] only for the JSON and Protobuf formats, and
A, B should be struct types. If you have data in other format or require indirection
for other parent types apart from struct, please reach out to Fennel support.
:::
Expand Down
3 changes: 3 additions & 0 deletions fennel/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## [1.4.6] - 2024-07-30
- Add support for indirections in preproc ref type for Protobuf format

## [1.4.4] - 2024-07-19
- Increase default timeout to 180 seconds.

Expand Down
24 changes: 15 additions & 9 deletions fennel/connectors/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,23 @@ def source(
isinstance(conn, KinesisConnector)
or isinstance(conn, S3Connector)
or isinstance(conn, PubSubConnector)
) and conn.format != "json":
raise ValueError(
"Preproc of type ref('A[B][C]') is applicable only for data in JSON format"
)
if (
isinstance(conn, KafkaConnector)
and conn.format is not None
and conn.format != "json"
):
if conn.format != "json":
raise ValueError(
"Preproc of type ref('A[B][C]') is applicable only for data in JSON format"
)
elif isinstance(conn, KafkaConnector):
if (
conn.format is not None
and conn.format != "json"
and not isinstance(conn.format, Protobuf)
):
raise ValueError(
"Preproc of type ref('A[B][C]') is applicable only for data in JSON and Protobuf formats"
)
else:
raise ValueError(
"Preproc of type ref('A[B][C]') is applicable only for data in JSON format"
"Preproc of type ref('A[B][C]') is not supported for table source"
)

if since is not None and not isinstance(since, datetime):
Expand Down
16 changes: 15 additions & 1 deletion fennel/connectors/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2191,7 +2191,7 @@ class UserInfoDataset:


def test_valid_preproc_value():
# Preproc value of type A[B][C] can be only set for data in JSON format
# Preproc value of type A[B][C] can be set for data in JSON and Protobuf formats
source(
s3.bucket(
bucket_name="all_ratings", prefix="prod/apac/", format="json"
Expand Down Expand Up @@ -2231,6 +2231,20 @@ def test_valid_preproc_value():
every="1h",
disorder="14d",
cdc="debezium",
preproc={"C": ref("A[B][C]"), "D": "A[B][D]"},
)

protobuf = Protobuf(
registry="confluent",
url="http://localhost:8000",
username="user",
password="pwd",
)
source(
kafka.topic(topic="topic", format=protobuf),
every="1h",
disorder="14d",
cdc="debezium",
preproc={"C": ref("A[B][C]"), "D": "A[B][C]"},
)

Expand Down
24 changes: 21 additions & 3 deletions fennel/connectors/test_invalid_connectors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timezone
from lib2to3.fixes.fix_tuple_params import tuple_name
from typing import Optional

from fennel.connectors.connectors import Protobuf, ref
Expand Down Expand Up @@ -669,7 +670,7 @@ def test_invalid_preproc_value():
== str(e.value)
)

# Preproc value of type A[B][C] cannot be set for data other than JSON format
# Preproc value of type A[B][C] cannot be set for data other than JSON and Protobuf formats
with pytest.raises(ValueError) as e:
source(
s3.bucket(
Expand All @@ -685,7 +686,7 @@ def test_invalid_preproc_value():
== str(e.value)
)

# Preproc value of type A[B][C] cannot be set for data other than JSON format
# Preproc value of type A[B][C] cannot be set for data other than JSON and Protobuf formats
with pytest.raises(ValueError) as e:
source(
kafka.topic(topic="topic", format="Avro"),
Expand All @@ -695,7 +696,24 @@ def test_invalid_preproc_value():
preproc={"C": ref("A[B][C]"), "D": "A[B][C]"},
)
assert (
"Preproc of type ref('A[B][C]') is applicable only for data in JSON format"
"Preproc of type ref('A[B][C]') is applicable only for data in JSON and Protobuf formats"
== str(e.value)
)

# Preproc value of type A[B][C] cannot be set for table sources
with pytest.raises(ValueError) as e:
source(
mysql.table(
"users",
cursor="added_on",
),
every="1h",
disorder="14d",
cdc="debezium",
preproc={"C": ref("A[B][C]"), "D": "A[B][C]"},
)
assert (
"Preproc of type ref('A[B][C]') is not supported for table source"
== str(e.value)
)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "fennel-ai"
version = "1.4.5"
version = "1.4.6"
description = "The modern realtime feature engineering platform"
authors = ["Fennel AI <developers@fennel.ai>"]
packages = [{ include = "fennel" }]
Expand Down

0 comments on commit d60f732

Please sign in to comment.