From d60f73223e984c19119302b234becee03ba2f869 Mon Sep 17 00:00:00 2001 From: Vellanki Sai Harsha Date: Tue, 30 Jul 2024 17:43:57 +0530 Subject: [PATCH] Add support for indirections in preproc ref type for protobuf format (#520) --- docs/pages/api-reference/decorators/source.md | 2 +- fennel/CHANGELOG.md | 3 +++ fennel/connectors/connectors.py | 24 ++++++++++++------- fennel/connectors/test_connectors.py | 16 ++++++++++++- fennel/connectors/test_invalid_connectors.py | 24 ++++++++++++++++--- pyproject.toml | 2 +- 6 files changed, 56 insertions(+), 15 deletions(-) diff --git a/docs/pages/api-reference/decorators/source.md b/docs/pages/api-reference/decorators/source.md index bd366c692..8da65a47d 100644 --- a/docs/pages/api-reference/decorators/source.md +++ b/docs/pages/api-reference/decorators/source.md @@ -99,7 +99,7 @@ As of right now, there are two kinds of values of preproc: a constant value. :::info -Fennel supports preproc ref(str) values of type A[B][C] only for the JSON format and +Fennel supports preproc ref(str) values of type A[B][C] only for the JSON and Protobuf formats, and A, B should be struct types. If you have data in other format or require indirection for other parent types apart from struct, please reach out to Fennel support. ::: diff --git a/fennel/CHANGELOG.md b/fennel/CHANGELOG.md index 89bfc65da..35b38dee4 100644 --- a/fennel/CHANGELOG.md +++ b/fennel/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## [1.4.6] - 2024-07-30 +- Add support for indirections in preproc ref type for Protobuf format + ## [1.4.4] - 2024-07-19 - Increase default timeout to 180 seconds. diff --git a/fennel/connectors/connectors.py b/fennel/connectors/connectors.py index 3e1e94f27..b922fa2ad 100644 --- a/fennel/connectors/connectors.py +++ b/fennel/connectors/connectors.py @@ -116,17 +116,23 @@ def source( isinstance(conn, KinesisConnector) or isinstance(conn, S3Connector) or isinstance(conn, PubSubConnector) - ) and conn.format != "json": - raise ValueError( - "Preproc of type ref('A[B][C]') is applicable only for data in JSON format" - ) - if ( - isinstance(conn, KafkaConnector) - and conn.format is not None - and conn.format != "json" ): + if conn.format != "json": + raise ValueError( + "Preproc of type ref('A[B][C]') is applicable only for data in JSON format" + ) + elif isinstance(conn, KafkaConnector): + if ( + conn.format is not None + and conn.format != "json" + and not isinstance(conn.format, Protobuf) + ): + raise ValueError( + "Preproc of type ref('A[B][C]') is applicable only for data in JSON and Protobuf formats" + ) + else: raise ValueError( - "Preproc of type ref('A[B][C]') is applicable only for data in JSON format" + "Preproc of type ref('A[B][C]') is not supported for table source" ) if since is not None and not isinstance(since, datetime): diff --git a/fennel/connectors/test_connectors.py b/fennel/connectors/test_connectors.py index a11089f0d..64d7a340f 100644 --- a/fennel/connectors/test_connectors.py +++ b/fennel/connectors/test_connectors.py @@ -2191,7 +2191,7 @@ class UserInfoDataset: def test_valid_preproc_value(): - # Preproc value of type A[B][C] can be only set for data in JSON format + # Preproc value of type A[B][C] can be set for data in JSON and Protobuf formats source( s3.bucket( bucket_name="all_ratings", prefix="prod/apac/", format="json" @@ -2231,6 +2231,20 @@ def test_valid_preproc_value(): every="1h", disorder="14d", cdc="debezium", + preproc={"C": ref("A[B][C]"), "D": "A[B][D]"}, + ) + + protobuf = Protobuf( + registry="confluent", + url="http://localhost:8000", + username="user", + password="pwd", + ) + source( + kafka.topic(topic="topic", format=protobuf), + every="1h", + disorder="14d", + cdc="debezium", preproc={"C": ref("A[B][C]"), "D": "A[B][C]"}, ) diff --git a/fennel/connectors/test_invalid_connectors.py b/fennel/connectors/test_invalid_connectors.py index b07eeae8d..49f82225c 100644 --- a/fennel/connectors/test_invalid_connectors.py +++ b/fennel/connectors/test_invalid_connectors.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +from lib2to3.fixes.fix_tuple_params import tuple_name from typing import Optional from fennel.connectors.connectors import Protobuf, ref @@ -669,7 +670,7 @@ def test_invalid_preproc_value(): == str(e.value) ) - # Preproc value of type A[B][C] cannot be set for data other than JSON format + # Preproc value of type A[B][C] cannot be set for data other than JSON and Protobuf formats with pytest.raises(ValueError) as e: source( s3.bucket( @@ -685,7 +686,7 @@ def test_invalid_preproc_value(): == str(e.value) ) - # Preproc value of type A[B][C] cannot be set for data other than JSON format + # Preproc value of type A[B][C] cannot be set for data other than JSON and Protobuf formats with pytest.raises(ValueError) as e: source( kafka.topic(topic="topic", format="Avro"), @@ -695,7 +696,24 @@ def test_invalid_preproc_value(): preproc={"C": ref("A[B][C]"), "D": "A[B][C]"}, ) assert ( - "Preproc of type ref('A[B][C]') is applicable only for data in JSON format" + "Preproc of type ref('A[B][C]') is applicable only for data in JSON and Protobuf formats" + == str(e.value) + ) + + # Preproc value of type A[B][C] cannot be set for table sources + with pytest.raises(ValueError) as e: + source( + mysql.table( + "users", + cursor="added_on", + ), + every="1h", + disorder="14d", + cdc="debezium", + preproc={"C": ref("A[B][C]"), "D": "A[B][C]"}, + ) + assert ( + "Preproc of type ref('A[B][C]') is not supported for table source" == str(e.value) ) diff --git a/pyproject.toml b/pyproject.toml index 14de344ce..558708e05 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "fennel-ai" -version = "1.4.5" +version = "1.4.6" description = "The modern realtime feature engineering platform" authors = ["Fennel AI "] packages = [{ include = "fennel" }]