From 546b74939c14f053f8a9b76154917088efd6e798 Mon Sep 17 00:00:00 2001 From: Hemanth Kannekanti <86536164+hemanthk269@users.noreply.github.com> Date: Tue, 22 Oct 2024 20:42:19 +0530 Subject: [PATCH] added support for sampling (#590) * added support for sampling added 1 more unit test and fixed lint updated documentation * updated logic to address comments * addressed comments * addressed comments --------- Co-authored-by: Hemanth Kannekanti --- .wordlist.txt | 2 + docs/examples/api-reference/sources/source.py | 2 + docs/pages/api-reference/decorators/source.md | 15 + fennel/connectors/connectors.py | 36 ++ fennel/connectors/test_connectors.py | 402 +++++++++++++++++- fennel/gen/connector_pb2.py | 176 ++++---- fennel/gen/connector_pb2.pyi | 32 +- fennel/gen/dataset_pb2.pyi | 2 +- fennel/gen/expectations_pb2.pyi | 2 +- fennel/internal_lib/to_proto/to_proto.py | 117 ++++- 10 files changed, 692 insertions(+), 94 deletions(-) diff --git a/.wordlist.txt b/.wordlist.txt index cb44c07c9..18df8593c 100644 --- a/.wordlist.txt +++ b/.wordlist.txt @@ -121,6 +121,7 @@ SSD SSDs SSL SSO +Sample SearchRequest ShardIteratorType Signifier @@ -336,6 +337,7 @@ repo rowset rowsets runtime +sample sasl scalability scalable diff --git a/docs/examples/api-reference/sources/source.py b/docs/examples/api-reference/sources/source.py index 837b41285..79949e223 100644 --- a/docs/examples/api-reference/sources/source.py +++ b/docs/examples/api-reference/sources/source.py @@ -18,6 +18,7 @@ def test_source_decorator(client): import pandas as pd from fennel.connectors import source, S3, ref, eval from fennel.datasets import dataset, field + from fennel.connectors.connectors import Sample s3 = S3(name="my_s3") # using IAM role based access @@ -41,6 +42,7 @@ def test_source_decorator(client): ), # converting age dtype to int }, env="prod", + sample=Sample(0.2, using=["email"]), bounded=True, idleness="1h", ) diff --git a/docs/pages/api-reference/decorators/source.md b/docs/pages/api-reference/decorators/source.md index 3953f61d7..f42da5616 100644 --- a/docs/pages/api-reference/decorators/source.md +++ b/docs/pages/api-reference/decorators/source.md @@ -49,6 +49,21 @@ can ever arrive. And if such rows do arrive, Fennel has the liberty of discardin them and not including them in the computation. + +When specifying sampling for a dataset, it can be provided in two ways: +1. **Simply specify the sampling rate** when you want to sample the dataset without specifying the columns used for sampling. + - **Sampling Rate**: A float between 0 and 1 that determines the proportion of the dataset to include. +2. **Use the `Sample` object** when you want to specify both the sampling rate and the specific columns used for sampling. + - **Sampling Rate**: A float between 0 and 1 that determines the proportion of the dataset to include. + - **Using**: A list of columns used to hash for sampling the data. Preproc columns and the timestamp field cannot be included in this list. + +Default Behavior When No Columns Are Specified +1. For Keyed Datasets: +All key columns are used for sampling, excluding any preproc columns. +2. For Non-Keyed Datasets: +All columns are used for sampling except for the timestamp and preproc columns. + + Specifies how should valid change data be constructed from the ingested data. diff --git a/fennel/connectors/connectors.py b/fennel/connectors/connectors.py index d20b0cf16..a2781f1a1 100644 --- a/fennel/connectors/connectors.py +++ b/fennel/connectors/connectors.py @@ -94,6 +94,21 @@ def preproc_has_indirection(preproc: Optional[Dict[str, PreProcValue]]): return False +class Sample: + rate: float + using: List[str] + + def __init__(self, rate, using): + if rate < 0 or rate > 1: + raise ValueError("Sample rate should be between 0 and 1") + if using is None or len(using) == 0: + raise ValueError( + f"Using must be a non-empty list, try using sample={rate} instead" + ) + self.rate = rate + self.using = using + + def source( conn: DataConnector, disorder: Duration, @@ -106,6 +121,7 @@ def source( bounded: bool = False, idleness: Optional[Duration] = None, where: Optional[Callable] = None, + sample: Optional[Union[float, Sample]] = None, ) -> Callable[[T], Any]: """ Decorator to specify the source of data for a dataset. The source can be @@ -184,6 +200,24 @@ def source( "idleness parameter should not be passed when bounded is set as False" ) + if sample is not None: + if isinstance(sample, float): + if sample < 0 or sample > 1: + raise ValueError("Sample rate should be between 0 and 1") + elif isinstance(sample, Sample): + disallowed_columns = [] + if preproc is not None: + disallowed_columns.extend(list(preproc.keys())) + for column in sample.using: + if column in disallowed_columns: + raise ValueError( + f"Column {column} is part of preproc so cannot be used for sampling" + ) + else: + raise ValueError( + "Sample should be either a float or a Sample object" + ) + def decorator(dataset_cls: T): connector = copy.deepcopy(conn) connector.every = every if every is not None else DEFAULT_EVERY @@ -196,6 +230,7 @@ def decorator(dataset_cls: T): connector.bounded = bounded connector.idleness = idleness connector.where = where + connector.sample = sample connectors = getattr(dataset_cls, SOURCE_FIELD, []) connectors.append(connector) setattr(dataset_cls, SOURCE_FIELD, connectors) @@ -711,6 +746,7 @@ class DataConnector: bounded: bool = False idleness: Optional[Duration] = None where: Optional[Callable] = None + sample: Optional[Union[float, Sample]] = None how: Optional[Literal["incremental", "recreate"] | SnapshotData] = None create: Optional[bool] = None renames: Optional[Dict[str, str]] = {} diff --git a/fennel/connectors/test_connectors.py b/fennel/connectors/test_connectors.py index 9ac07900e..79d9a7673 100644 --- a/fennel/connectors/test_connectors.py +++ b/fennel/connectors/test_connectors.py @@ -25,7 +25,7 @@ PubSub, eval, ) -from fennel.connectors.connectors import CSV, Postgres +from fennel.connectors.connectors import CSV, Postgres, Sample from fennel.datasets import dataset, field, pipeline, Dataset from fennel.expr import col, lit from fennel.integrations.aws import Secret @@ -1004,6 +1004,406 @@ def create_user_transactions(cls, dataset: Dataset): ) +def test_source_with_sample(): + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + disorder="14d", + cdc="upsert", + sample=Sample(0.1, ["user_id", "name"]), + since=datetime.strptime("2021-08-10T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"), + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + source_request = sync_request.sources[0] + s = { + "table": { + "mysql_table": { + "db": { + "name": "mysql", + "mysql": { + "host": "localhost", + "database": "test", + "user": "root", + "password": "root", + "port": 3306, + }, + }, + "table_name": "users_mysql", + }, + }, + "dataset": "UserInfoDatasetMySql", + "dsVersion": 1, + "every": "3600s", + "cdc": "Upsert", + "disorder": "1209600s", + "samplingStrategy": { + "columnsUsed": ["name", "user_id"], + "samplingRate": 0.1, + }, + "cursor": "added_on", + "timestampField": "timestamp", + "startingFrom": "2021-08-10T00:00:00Z", + } + expected_source_request = ParseDict(s, connector_proto.Source()) + assert source_request == expected_source_request, error_message( + source_request, expected_source_request + ) + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + disorder="14d", + cdc="upsert", + sample=0.1, + since=datetime.strptime("2021-08-10T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"), + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + source_request = sync_request.sources[0] + s = { + "table": { + "mysql_table": { + "db": { + "name": "mysql", + "mysql": { + "host": "localhost", + "database": "test", + "user": "root", + "password": "root", + "port": 3306, + }, + }, + "table_name": "users_mysql", + }, + }, + "dataset": "UserInfoDatasetMySql", + "dsVersion": 1, + "every": "3600s", + "cdc": "Upsert", + "disorder": "1209600s", + "samplingStrategy": { + "samplingRate": 0.1, + "columnsUsed": ["user_id"], + }, + "cursor": "added_on", + "timestampField": "timestamp", + "startingFrom": "2021-08-10T00:00:00Z", + } + expected_source_request = ParseDict(s, connector_proto.Source()) + assert source_request == expected_source_request, error_message( + source_request, expected_source_request + ) + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + disorder="14d", + cdc="append", + sample=0.1, + since=datetime.strptime("2021-08-10T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ"), + ) + @dataset + class UserInfoDatasetMySql: + user_id: int + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + source_request = sync_request.sources[0] + s = { + "table": { + "mysql_table": { + "db": { + "name": "mysql", + "mysql": { + "host": "localhost", + "database": "test", + "user": "root", + "password": "root", + "port": 3306, + }, + }, + "table_name": "users_mysql", + }, + }, + "dataset": "UserInfoDatasetMySql", + "dsVersion": 1, + "every": "3600s", + "cdc": "Append", + "disorder": "1209600s", + "samplingStrategy": { + "samplingRate": 0.1, + "columnsUsed": ["country", "name", "user_id"], + }, + "cursor": "added_on", + "timestampField": "timestamp", + "startingFrom": "2021-08-10T00:00:00Z", + } + expected_source_request = ParseDict(s, connector_proto.Source()) + assert source_request == expected_source_request, error_message( + source_request, expected_source_request + ) + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + disorder="14d", + cdc="upsert", + sample=Sample(0.1, ["user_id", "age"]), + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + try: + sync_request = view._get_sync_request_proto() + except ValueError as e: + assert str(e) == "Column age is not part of dataset columns" + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + disorder="14d", + cdc="upsert", + sample=Sample(0.1, ["user_id", "timestamp"]), + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + try: + sync_request = view._get_sync_request_proto() + except ValueError as e: + assert ( + str(e) + == "Timestamp column: timestamp cannot be part of sampling columns" + ) + + try: + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + preproc={"name": "abc"}, + disorder="14d", + cdc="upsert", + sample=Sample(0.1, ["user_id", "name"]), + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + except ValueError as e: + assert ( + str(e) + == "Column name is part of preproc so cannot be used for sampling" + ) + + try: + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + preproc={"user_id": 1}, + disorder="14d", + cdc="upsert", + sample=0.1, + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + except ValueError as e: + assert ( + str(e) + == "No columns left to sample from. all key columns are part of preproc" + ) + + try: + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + preproc={"user_id": 1}, + disorder="14d", + cdc="append", + sample=0.1, + ) + @dataset + class UserInfoDatasetMySql: + user_id: int + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + except ValueError as e: + assert ( + str(e) + == "No columns left to sample from. all columns are part of preproc" + ) + + try: + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + disorder="14d", + cdc="upsert", + sample=Sample(1.1, ["user_id", "name"]), + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + except ValueError as e: + assert str(e) == "Sample rate should be between 0 and 1" + + try: + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + disorder="14d", + cdc="upsert", + sample=1.1, + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + except ValueError as e: + assert str(e) == "Sample rate should be between 0 and 1" + + try: + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + disorder="14d", + cdc="upsert", + sample=Sample(0.5, None), + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + except ValueError as e: + assert ( + str(e) + == "Using must be a non-empty list, try using sample=0.5 instead" + ) + + try: + + @meta(owner="test@test.com") + @source( + mysql.table("users_mysql", cursor="added_on"), + every="1h", + disorder="14d", + cdc="upsert", + sample=Sample(0.5, []), + ) + @dataset + class UserInfoDatasetMySql: + user_id: int = field(key=True) + name: str + country: Optional[str] + timestamp: datetime = field(timestamp=True) + + # mysql source + view = InternalTestClient() + view.add(UserInfoDatasetMySql) + sync_request = view._get_sync_request_proto() + except ValueError as e: + assert ( + str(e) + == "Using must be a non-empty list, try using sample=0.5 instead" + ) + + def test_multiple_sources_mysql(): @meta(owner="test@test.com") @source( diff --git a/fennel/gen/connector_pb2.py b/fennel/gen/connector_pb2.py index 52ca69dd1..5fb047ba6 100644 --- a/fennel/gen/connector_pb2.py +++ b/fennel/gen/connector_pb2.py @@ -22,7 +22,7 @@ import fennel.gen.secret_pb2 as secret__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x65xpression.proto\x1a\rkinesis.proto\x1a\x0cpycode.proto\x1a\x15schema_registry.proto\x1a\x0cschema.proto\x1a\x0csecret.proto\"\x8c\x05\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x12\x34\n\x08redshift\x18\x0b \x01(\x0b\x32 .fennel.proto.connector.RedshiftH\x00\x12.\n\x05mongo\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.connector.MongoH\x00\x12\x30\n\x06pubsub\x18\r \x01(\x0b\x32\x1e.fennel.proto.connector.PubSubH\x00\x42\t\n\x07variant\"M\n\x0cPubSubFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x42\t\n\x07variant\"\xbc\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x12:\n\x08protobuf\x18\x03 \x01(\x0b\x32&.fennel.proto.connector.ProtobufFormatH\x00\x42\t\n\x07variant\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"W\n\x0eProtobufFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xde\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"\x93\x01\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\x12\x0c\n\x08REDSHIFT\x10\x08\x12\t\n\x05MONGO\x10\t\x12\n\n\x06PUBSUB\x10\n\"E\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\tretention\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\"\x8c\x02\n\x05MySQL\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\x8f\x02\n\x08Postgres\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xb0\x02\n\x02S3\x12\x1f\n\x15\x61ws_secret_access_key\x18\x01 \x01(\tH\x00\x12\x46\n\x1c\x61ws_secret_access_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1b\n\x11\x61ws_access_key_id\x18\x02 \x01(\tH\x01\x12\x42\n\x18\x61ws_access_key_id_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x15\n\x08role_arn\x18\x03 \x01(\tH\x02\x88\x01\x01\x42\x1f\n\x1d\x61ws_secret_access_key_variantB\x1b\n\x19\x61ws_access_key_id_variantB\x0b\n\t_role_arn\"\xb6\x01\n\x08\x42igquery\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\tB\x1d\n\x1bservice_account_key_variant\"\xa1\x02\n\tSnowflake\x12\x0e\n\x04user\x18\x02 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x03 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xf9\x02\n\x05Kafka\x12\x1d\n\x13sasl_plain_username\x18\x05 \x01(\tH\x00\x12>\n\x14sasl_username_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1d\n\x13sasl_plain_password\x18\x06 \x01(\tH\x01\x12>\n\x14sasl_password_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x42\x17\n\x15sasl_username_variantB\x17\n\x15sasl_password_variant\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"\xd3\x01\n\x0b\x43redentials\x12\x12\n\x08username\x18\x01 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x02 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x42\x12\n\x10username_variantB\x12\n\x10password_variant\"}\n\x16RedshiftAuthentication\x12\x1c\n\x12s3_access_role_arn\x18\x01 \x01(\tH\x00\x12:\n\x0b\x63redentials\x18\x02 \x01(\x0b\x32#.fennel.proto.connector.CredentialsH\x00\x42\t\n\x07variant\"\x99\x01\n\x08Redshift\x12\x10\n\x08\x64\x61tabase\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x03 \x01(\r\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12O\n\x17redshift_authentication\x18\x05 \x01(\x0b\x32..fennel.proto.connector.RedshiftAuthentication\"\xe9\x01\n\x05Mongo\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x06 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xa0\x01\n\x06PubSub\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\nproject_id\x18\x01 \x01(\tB\x1d\n\x1bservice_account_key_variant\"\xc0\x05\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x12?\n\x0eredshift_table\x18\t \x01(\x0b\x32%.fennel.proto.connector.RedshiftTableH\x00\x12\x43\n\x10mongo_collection\x18\n \x01(\x0b\x32\'.fennel.proto.connector.MongoCollectionH\x00\x12;\n\x0cpubsub_topic\x18\x0b \x01(\x0b\x32#.fennel.proto.connector.PubSubTopicH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"z\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\x12\x16\n\tslot_name\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0c\n\n_slot_name\"\xf7\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\x12\x13\n\x0bpath_suffix\x18\x08 \x01(\t\x12.\n\x06spread\x18\x03 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x0f\n\x07headers\x18\t \x03(\tB\t\n\x07_spread\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x81\x01\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12+\n\x08\x64uration\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"T\n\rRedshiftTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x86\x01\n\x0bPubSubTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x12\x34\n\x06\x66ormat\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.PubSubFormat\"\xb1\x02\n\x0cPreProcValue\x12\r\n\x03ref\x18\x01 \x01(\tH\x00\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.schema.ValueH\x00\x12\x39\n\x04\x65val\x18\x03 \x01(\x0b\x32).fennel.proto.connector.PreProcValue.EvalH\x00\x1a\x9e\x01\n\x04\x45val\x12+\n\x06schema\x18\x01 \x01(\x0b\x32\x1b.fennel.proto.schema.Schema\x12-\n\x04\x65xpr\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x00\x12-\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x00\x42\x0b\n\teval_typeB\t\n\x07variant\"[\n\x0fMongoCollection\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x17\n\x0f\x63ollection_name\x18\x02 \x01(\t\"2\n\x0cSnapshotData\x12\x0e\n\x06marker\x18\x01 \x01(\t\x12\x12\n\nnum_retain\x18\x02 \x01(\r\"\r\n\x0bIncremental\"\n\n\x08Recreate\"\xbc\x01\n\x05Style\x12:\n\x0bincremental\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.IncrementalH\x00\x12\x34\n\x08recreate\x18\x02 \x01(\x0b\x32 .fennel.proto.connector.RecreateH\x00\x12\x38\n\x08snapshot\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.SnapshotDataH\x00\x42\x07\n\x05Style\"\xb1\x05\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12(\n\x05\x65very\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x05 \x01(\tH\x00\x88\x01\x01\x12+\n\x08\x64isorder\x18\x06 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x07 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x08 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategy\x12\x31\n\rstarting_from\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12=\n\x08pre_proc\x18\n \x03(\x0b\x32+.fennel.proto.connector.Source.PreProcEntry\x12\x0f\n\x07version\x18\x0b \x01(\r\x12\x0f\n\x07\x62ounded\x18\x0c \x01(\x08\x12\x30\n\x08idleness\x18\r \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12)\n\x05until\x18\x0e \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x30\n\x06\x66ilter\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x02\x88\x01\x01\x1aT\n\x0cPreProcEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.fennel.proto.connector.PreProcValue:\x02\x38\x01\x42\t\n\x07_cursorB\x0b\n\t_idlenessB\t\n\x07_filter\"\xee\x03\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12\x35\n\x03\x63\x64\x63\x18\x04 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategyH\x00\x88\x01\x01\x12(\n\x05\x65very\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12/\n\x03how\x18\x06 \x01(\x0b\x32\x1d.fennel.proto.connector.StyleH\x01\x88\x01\x01\x12\x0e\n\x06\x63reate\x18\x07 \x01(\x08\x12:\n\x07renames\x18\x08 \x03(\x0b\x32).fennel.proto.connector.Sink.RenamesEntry\x12.\n\x05since\x18\t \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x02\x88\x01\x01\x12.\n\x05until\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x03\x88\x01\x01\x1a.\n\x0cRenamesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x06\n\x04_cdcB\x06\n\x04_howB\x08\n\x06_sinceB\x08\n\x06_until*K\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x12\n\n\x06\x44\x65lete\x10\x04\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x63onnector.proto\x12\x16\x66\x65nnel.proto.connector\x1a\x1egoogle/protobuf/duration.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x10\x65xpression.proto\x1a\rkinesis.proto\x1a\x0cpycode.proto\x1a\x15schema_registry.proto\x1a\x0cschema.proto\x1a\x0csecret.proto\"\x8c\x05\n\x0b\x45xtDatabase\x12\x0c\n\x04name\x18\x01 \x01(\t\x12.\n\x05mysql\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.connector.MySQLH\x00\x12\x34\n\x08postgres\x18\x03 \x01(\x0b\x32 .fennel.proto.connector.PostgresH\x00\x12\x36\n\treference\x18\x04 \x01(\x0b\x32!.fennel.proto.connector.ReferenceH\x00\x12(\n\x02s3\x18\x05 \x01(\x0b\x32\x1a.fennel.proto.connector.S3H\x00\x12\x34\n\x08\x62igquery\x18\x06 \x01(\x0b\x32 .fennel.proto.connector.BigqueryH\x00\x12\x36\n\tsnowflake\x18\x07 \x01(\x0b\x32!.fennel.proto.connector.SnowflakeH\x00\x12.\n\x05kafka\x18\x08 \x01(\x0b\x32\x1d.fennel.proto.connector.KafkaH\x00\x12\x32\n\x07webhook\x18\t \x01(\x0b\x32\x1f.fennel.proto.connector.WebhookH\x00\x12\x32\n\x07kinesis\x18\n \x01(\x0b\x32\x1f.fennel.proto.connector.KinesisH\x00\x12\x34\n\x08redshift\x18\x0b \x01(\x0b\x32 .fennel.proto.connector.RedshiftH\x00\x12.\n\x05mongo\x18\x0c \x01(\x0b\x32\x1d.fennel.proto.connector.MongoH\x00\x12\x30\n\x06pubsub\x18\r \x01(\x0b\x32\x1e.fennel.proto.connector.PubSubH\x00\x42\t\n\x07variant\"?\n\x10SamplingStrategy\x12\x15\n\rsampling_rate\x18\x01 \x01(\x01\x12\x14\n\x0c\x63olumns_used\x18\x02 \x03(\t\"M\n\x0cPubSubFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x42\t\n\x07variant\"\xbc\x01\n\x0bKafkaFormat\x12\x32\n\x04json\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.JsonFormatH\x00\x12\x32\n\x04\x61vro\x18\x02 \x01(\x0b\x32\".fennel.proto.connector.AvroFormatH\x00\x12:\n\x08protobuf\x18\x03 \x01(\x0b\x32&.fennel.proto.connector.ProtobufFormatH\x00\x42\t\n\x07variant\"\x0c\n\nJsonFormat\"S\n\nAvroFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"W\n\x0eProtobufFormat\x12\x45\n\x0fschema_registry\x18\x01 \x01(\x0b\x32,.fennel.proto.schema_registry.SchemaRegistry\"\xde\x01\n\tReference\x12;\n\x06\x64\x62type\x18\x01 \x01(\x0e\x32+.fennel.proto.connector.Reference.ExtDBType\"\x93\x01\n\tExtDBType\x12\t\n\x05MYSQL\x10\x00\x12\x0c\n\x08POSTGRES\x10\x01\x12\x06\n\x02S3\x10\x02\x12\t\n\x05KAFKA\x10\x03\x12\x0c\n\x08\x42IGQUERY\x10\x04\x12\r\n\tSNOWFLAKE\x10\x05\x12\x0b\n\x07WEBHOOK\x10\x06\x12\x0b\n\x07KINESIS\x10\x07\x12\x0c\n\x08REDSHIFT\x10\x08\x12\t\n\x05MONGO\x10\t\x12\n\n\x06PUBSUB\x10\n\"E\n\x07Webhook\x12\x0c\n\x04name\x18\x01 \x01(\t\x12,\n\tretention\x18\x02 \x01(\x0b\x32\x19.google.protobuf.Duration\"\x8c\x02\n\x05MySQL\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\x8f\x02\n\x08Postgres\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x07 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x05 \x01(\r\x12\x13\n\x0bjdbc_params\x18\x06 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xb0\x02\n\x02S3\x12\x1f\n\x15\x61ws_secret_access_key\x18\x01 \x01(\tH\x00\x12\x46\n\x1c\x61ws_secret_access_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1b\n\x11\x61ws_access_key_id\x18\x02 \x01(\tH\x01\x12\x42\n\x18\x61ws_access_key_id_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x15\n\x08role_arn\x18\x03 \x01(\tH\x02\x88\x01\x01\x42\x1f\n\x1d\x61ws_secret_access_key_variantB\x1b\n\x19\x61ws_access_key_id_variantB\x0b\n\t_role_arn\"\xb6\x01\n\x08\x42igquery\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\ndataset_id\x18\x01 \x01(\t\x12\x12\n\nproject_id\x18\x03 \x01(\tB\x1d\n\x1bservice_account_key_variant\"\xa1\x02\n\tSnowflake\x12\x0e\n\x04user\x18\x02 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x03 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0f\n\x07\x61\x63\x63ount\x18\x01 \x01(\t\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12\x11\n\twarehouse\x18\x05 \x01(\t\x12\x0c\n\x04role\x18\x06 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x07 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xf9\x02\n\x05Kafka\x12\x1d\n\x13sasl_plain_username\x18\x05 \x01(\tH\x00\x12>\n\x14sasl_username_secret\x18\x08 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x1d\n\x13sasl_plain_password\x18\x06 \x01(\tH\x01\x12>\n\x14sasl_password_secret\x18\t \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x19\n\x11\x62ootstrap_servers\x18\x01 \x01(\t\x12\x19\n\x11security_protocol\x18\x02 \x01(\t\x12\x16\n\x0esasl_mechanism\x18\x03 \x01(\t\x12\x1c\n\x10sasl_jaas_config\x18\x04 \x01(\tB\x02\x18\x01\x12\x14\n\x08group_id\x18\x07 \x01(\tB\x02\x18\x01\x42\x17\n\x15sasl_username_variantB\x17\n\x15sasl_password_variant\"\x1b\n\x07Kinesis\x12\x10\n\x08role_arn\x18\x01 \x01(\t\"\xd3\x01\n\x0b\x43redentials\x12\x12\n\x08username\x18\x01 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x02 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x04 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x42\x12\n\x10username_variantB\x12\n\x10password_variant\"}\n\x16RedshiftAuthentication\x12\x1c\n\x12s3_access_role_arn\x18\x01 \x01(\tH\x00\x12:\n\x0b\x63redentials\x18\x02 \x01(\x0b\x32#.fennel.proto.connector.CredentialsH\x00\x42\t\n\x07variant\"\x99\x01\n\x08Redshift\x12\x10\n\x08\x64\x61tabase\x18\x01 \x01(\t\x12\x0c\n\x04host\x18\x02 \x01(\t\x12\x0c\n\x04port\x18\x03 \x01(\r\x12\x0e\n\x06schema\x18\x04 \x01(\t\x12O\n\x17redshift_authentication\x18\x05 \x01(\x0b\x32..fennel.proto.connector.RedshiftAuthentication\"\xe9\x01\n\x05Mongo\x12\x0e\n\x04user\x18\x03 \x01(\tH\x00\x12\x39\n\x0fusername_secret\x18\x05 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\x08password\x18\x04 \x01(\tH\x01\x12\x39\n\x0fpassword_secret\x18\x06 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x01\x12\x0c\n\x04host\x18\x01 \x01(\t\x12\x10\n\x08\x64\x61tabase\x18\x02 \x01(\tB\x12\n\x10username_variantB\x12\n\x10password_variant\"\xa0\x01\n\x06PubSub\x12\x1d\n\x13service_account_key\x18\x02 \x01(\tH\x00\x12\x44\n\x1aservice_account_key_secret\x18\x03 \x01(\x0b\x32\x1e.fennel.proto.secret.SecretRefH\x00\x12\x12\n\nproject_id\x18\x01 \x01(\tB\x1d\n\x1bservice_account_key_variant\"\xc0\x05\n\x08\x45xtTable\x12\x39\n\x0bmysql_table\x18\x01 \x01(\x0b\x32\".fennel.proto.connector.MySQLTableH\x00\x12\x39\n\x08pg_table\x18\x02 \x01(\x0b\x32%.fennel.proto.connector.PostgresTableH\x00\x12\x33\n\x08s3_table\x18\x03 \x01(\x0b\x32\x1f.fennel.proto.connector.S3TableH\x00\x12\x39\n\x0bkafka_topic\x18\x04 \x01(\x0b\x32\".fennel.proto.connector.KafkaTopicH\x00\x12\x41\n\x0fsnowflake_table\x18\x05 \x01(\x0b\x32&.fennel.proto.connector.SnowflakeTableH\x00\x12?\n\x0e\x62igquery_table\x18\x06 \x01(\x0b\x32%.fennel.proto.connector.BigqueryTableH\x00\x12;\n\x08\x65ndpoint\x18\x07 \x01(\x0b\x32\'.fennel.proto.connector.WebhookEndpointH\x00\x12?\n\x0ekinesis_stream\x18\x08 \x01(\x0b\x32%.fennel.proto.connector.KinesisStreamH\x00\x12?\n\x0eredshift_table\x18\t \x01(\x0b\x32%.fennel.proto.connector.RedshiftTableH\x00\x12\x43\n\x10mongo_collection\x18\n \x01(\x0b\x32\'.fennel.proto.connector.MongoCollectionH\x00\x12;\n\x0cpubsub_topic\x18\x0b \x01(\x0b\x32#.fennel.proto.connector.PubSubTopicH\x00\x42\t\n\x07variant\"Q\n\nMySQLTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"z\n\rPostgresTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\x12\x16\n\tslot_name\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x0c\n\n_slot_name\"\xf7\x01\n\x07S3Table\x12\x0e\n\x06\x62ucket\x18\x01 \x01(\t\x12\x13\n\x0bpath_prefix\x18\x02 \x01(\t\x12\x11\n\tdelimiter\x18\x04 \x01(\t\x12\x0e\n\x06\x66ormat\x18\x05 \x01(\t\x12/\n\x02\x64\x62\x18\x06 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\npre_sorted\x18\x07 \x01(\x08\x12\x13\n\x0bpath_suffix\x18\x08 \x01(\t\x12.\n\x06spread\x18\x03 \x01(\x0b\x32\x19.google.protobuf.DurationH\x00\x88\x01\x01\x12\x0f\n\x07headers\x18\t \x03(\tB\t\n\x07_spread\"\x81\x01\n\nKafkaTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\r\n\x05topic\x18\x02 \x01(\t\x12\x33\n\x06\x66ormat\x18\x03 \x01(\x0b\x32#.fennel.proto.connector.KafkaFormat\"T\n\rBigqueryTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"U\n\x0eSnowflakeTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x81\x01\n\x0fWebhookEndpoint\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08\x65ndpoint\x18\x02 \x01(\t\x12+\n\x08\x64uration\x18\x03 \x01(\x0b\x32\x19.google.protobuf.Duration\"\xd3\x01\n\rKinesisStream\x12\x12\n\nstream_arn\x18\x01 \x01(\t\x12\x39\n\rinit_position\x18\x02 \x01(\x0e\x32\".fennel.proto.kinesis.InitPosition\x12\x32\n\x0einit_timestamp\x18\x03 \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x0e\n\x06\x66ormat\x18\x04 \x01(\t\x12/\n\x02\x64\x62\x18\x05 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\"T\n\rRedshiftTable\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x12\n\ntable_name\x18\x02 \x01(\t\"\x86\x01\n\x0bPubSubTopic\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x10\n\x08topic_id\x18\x02 \x01(\t\x12\x34\n\x06\x66ormat\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.PubSubFormat\"\xb1\x02\n\x0cPreProcValue\x12\r\n\x03ref\x18\x01 \x01(\tH\x00\x12+\n\x05value\x18\x02 \x01(\x0b\x32\x1a.fennel.proto.schema.ValueH\x00\x12\x39\n\x04\x65val\x18\x03 \x01(\x0b\x32).fennel.proto.connector.PreProcValue.EvalH\x00\x1a\x9e\x01\n\x04\x45val\x12+\n\x06schema\x18\x01 \x01(\x0b\x32\x1b.fennel.proto.schema.Schema\x12-\n\x04\x65xpr\x18\x02 \x01(\x0b\x32\x1d.fennel.proto.expression.ExprH\x00\x12-\n\x06pycode\x18\x03 \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x00\x42\x0b\n\teval_typeB\t\n\x07variant\"[\n\x0fMongoCollection\x12/\n\x02\x64\x62\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.ExtDatabase\x12\x17\n\x0f\x63ollection_name\x18\x02 \x01(\t\"2\n\x0cSnapshotData\x12\x0e\n\x06marker\x18\x01 \x01(\t\x12\x12\n\nnum_retain\x18\x02 \x01(\r\"\r\n\x0bIncremental\"\n\n\x08Recreate\"\xbc\x01\n\x05Style\x12:\n\x0bincremental\x18\x01 \x01(\x0b\x32#.fennel.proto.connector.IncrementalH\x00\x12\x34\n\x08recreate\x18\x02 \x01(\x0b\x32 .fennel.proto.connector.RecreateH\x00\x12\x38\n\x08snapshot\x18\x03 \x01(\x0b\x32$.fennel.proto.connector.SnapshotDataH\x00\x42\x07\n\x05Style\"\x91\x06\n\x06Source\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12(\n\x05\x65very\x18\x04 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x13\n\x06\x63ursor\x18\x05 \x01(\tH\x00\x88\x01\x01\x12+\n\x08\x64isorder\x18\x06 \x01(\x0b\x32\x19.google.protobuf.Duration\x12\x17\n\x0ftimestamp_field\x18\x07 \x01(\t\x12\x30\n\x03\x63\x64\x63\x18\x08 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategy\x12\x31\n\rstarting_from\x18\t \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12=\n\x08pre_proc\x18\n \x03(\x0b\x32+.fennel.proto.connector.Source.PreProcEntry\x12\x0f\n\x07version\x18\x0b \x01(\r\x12\x0f\n\x07\x62ounded\x18\x0c \x01(\x08\x12\x30\n\x08idleness\x18\r \x01(\x0b\x32\x19.google.protobuf.DurationH\x01\x88\x01\x01\x12)\n\x05until\x18\x0e \x01(\x0b\x32\x1a.google.protobuf.Timestamp\x12\x30\n\x06\x66ilter\x18\x0f \x01(\x0b\x32\x1b.fennel.proto.pycode.PyCodeH\x02\x88\x01\x01\x12H\n\x11sampling_strategy\x18\x10 \x01(\x0b\x32(.fennel.proto.connector.SamplingStrategyH\x03\x88\x01\x01\x1aT\n\x0cPreProcEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x33\n\x05value\x18\x02 \x01(\x0b\x32$.fennel.proto.connector.PreProcValue:\x02\x38\x01\x42\t\n\x07_cursorB\x0b\n\t_idlenessB\t\n\x07_filterB\x14\n\x12_sampling_strategy\"\xee\x03\n\x04Sink\x12/\n\x05table\x18\x01 \x01(\x0b\x32 .fennel.proto.connector.ExtTable\x12\x0f\n\x07\x64\x61taset\x18\x02 \x01(\t\x12\x12\n\nds_version\x18\x03 \x01(\r\x12\x35\n\x03\x63\x64\x63\x18\x04 \x01(\x0e\x32#.fennel.proto.connector.CDCStrategyH\x00\x88\x01\x01\x12(\n\x05\x65very\x18\x05 \x01(\x0b\x32\x19.google.protobuf.Duration\x12/\n\x03how\x18\x06 \x01(\x0b\x32\x1d.fennel.proto.connector.StyleH\x01\x88\x01\x01\x12\x0e\n\x06\x63reate\x18\x07 \x01(\x08\x12:\n\x07renames\x18\x08 \x03(\x0b\x32).fennel.proto.connector.Sink.RenamesEntry\x12.\n\x05since\x18\t \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x02\x88\x01\x01\x12.\n\x05until\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x03\x88\x01\x01\x1a.\n\x0cRenamesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\x06\n\x04_cdcB\x06\n\x04_howB\x08\n\x06_sinceB\x08\n\x06_until*K\n\x0b\x43\x44\x43Strategy\x12\n\n\x06\x41ppend\x10\x00\x12\n\n\x06Upsert\x10\x01\x12\x0c\n\x08\x44\x65\x62\x65zium\x10\x02\x12\n\n\x06Native\x10\x03\x12\n\n\x06\x44\x65lete\x10\x04\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -37,92 +37,94 @@ _globals['_SOURCE_PREPROCENTRY']._serialized_options = b'8\001' _globals['_SINK_RENAMESENTRY']._loaded_options = None _globals['_SINK_RENAMESENTRY']._serialized_options = b'8\001' - _globals['_CDCSTRATEGY']._serialized_start=8147 - _globals['_CDCSTRATEGY']._serialized_end=8222 + _globals['_CDCSTRATEGY']._serialized_start=8308 + _globals['_CDCSTRATEGY']._serialized_end=8383 _globals['_EXTDATABASE']._serialized_start=207 _globals['_EXTDATABASE']._serialized_end=859 - _globals['_PUBSUBFORMAT']._serialized_start=861 - _globals['_PUBSUBFORMAT']._serialized_end=938 - _globals['_KAFKAFORMAT']._serialized_start=941 - _globals['_KAFKAFORMAT']._serialized_end=1129 - _globals['_JSONFORMAT']._serialized_start=1131 - _globals['_JSONFORMAT']._serialized_end=1143 - _globals['_AVROFORMAT']._serialized_start=1145 - _globals['_AVROFORMAT']._serialized_end=1228 - _globals['_PROTOBUFFORMAT']._serialized_start=1230 - _globals['_PROTOBUFFORMAT']._serialized_end=1317 - _globals['_REFERENCE']._serialized_start=1320 - _globals['_REFERENCE']._serialized_end=1542 - _globals['_REFERENCE_EXTDBTYPE']._serialized_start=1395 - _globals['_REFERENCE_EXTDBTYPE']._serialized_end=1542 - _globals['_WEBHOOK']._serialized_start=1544 - _globals['_WEBHOOK']._serialized_end=1613 - _globals['_MYSQL']._serialized_start=1616 - _globals['_MYSQL']._serialized_end=1884 - _globals['_POSTGRES']._serialized_start=1887 - _globals['_POSTGRES']._serialized_end=2158 - _globals['_S3']._serialized_start=2161 - _globals['_S3']._serialized_end=2465 - _globals['_BIGQUERY']._serialized_start=2468 - _globals['_BIGQUERY']._serialized_end=2650 - _globals['_SNOWFLAKE']._serialized_start=2653 - _globals['_SNOWFLAKE']._serialized_end=2942 - _globals['_KAFKA']._serialized_start=2945 - _globals['_KAFKA']._serialized_end=3322 - _globals['_KINESIS']._serialized_start=3324 - _globals['_KINESIS']._serialized_end=3351 - _globals['_CREDENTIALS']._serialized_start=3354 - _globals['_CREDENTIALS']._serialized_end=3565 - _globals['_REDSHIFTAUTHENTICATION']._serialized_start=3567 - _globals['_REDSHIFTAUTHENTICATION']._serialized_end=3692 - _globals['_REDSHIFT']._serialized_start=3695 - _globals['_REDSHIFT']._serialized_end=3848 - _globals['_MONGO']._serialized_start=3851 - _globals['_MONGO']._serialized_end=4084 - _globals['_PUBSUB']._serialized_start=4087 - _globals['_PUBSUB']._serialized_end=4247 - _globals['_EXTTABLE']._serialized_start=4250 - _globals['_EXTTABLE']._serialized_end=4954 - _globals['_MYSQLTABLE']._serialized_start=4956 - _globals['_MYSQLTABLE']._serialized_end=5037 - _globals['_POSTGRESTABLE']._serialized_start=5039 - _globals['_POSTGRESTABLE']._serialized_end=5161 - _globals['_S3TABLE']._serialized_start=5164 - _globals['_S3TABLE']._serialized_end=5411 - _globals['_KAFKATOPIC']._serialized_start=5414 - _globals['_KAFKATOPIC']._serialized_end=5543 - _globals['_BIGQUERYTABLE']._serialized_start=5545 - _globals['_BIGQUERYTABLE']._serialized_end=5629 - _globals['_SNOWFLAKETABLE']._serialized_start=5631 - _globals['_SNOWFLAKETABLE']._serialized_end=5716 - _globals['_WEBHOOKENDPOINT']._serialized_start=5719 - _globals['_WEBHOOKENDPOINT']._serialized_end=5848 - _globals['_KINESISSTREAM']._serialized_start=5851 - _globals['_KINESISSTREAM']._serialized_end=6062 - _globals['_REDSHIFTTABLE']._serialized_start=6064 - _globals['_REDSHIFTTABLE']._serialized_end=6148 - _globals['_PUBSUBTOPIC']._serialized_start=6151 - _globals['_PUBSUBTOPIC']._serialized_end=6285 - _globals['_PREPROCVALUE']._serialized_start=6288 - _globals['_PREPROCVALUE']._serialized_end=6593 - _globals['_PREPROCVALUE_EVAL']._serialized_start=6424 - _globals['_PREPROCVALUE_EVAL']._serialized_end=6582 - _globals['_MONGOCOLLECTION']._serialized_start=6595 - _globals['_MONGOCOLLECTION']._serialized_end=6686 - _globals['_SNAPSHOTDATA']._serialized_start=6688 - _globals['_SNAPSHOTDATA']._serialized_end=6738 - _globals['_INCREMENTAL']._serialized_start=6740 - _globals['_INCREMENTAL']._serialized_end=6753 - _globals['_RECREATE']._serialized_start=6755 - _globals['_RECREATE']._serialized_end=6765 - _globals['_STYLE']._serialized_start=6768 - _globals['_STYLE']._serialized_end=6956 - _globals['_SOURCE']._serialized_start=6959 - _globals['_SOURCE']._serialized_end=7648 - _globals['_SOURCE_PREPROCENTRY']._serialized_start=7529 - _globals['_SOURCE_PREPROCENTRY']._serialized_end=7613 - _globals['_SINK']._serialized_start=7651 - _globals['_SINK']._serialized_end=8145 - _globals['_SINK_RENAMESENTRY']._serialized_start=8063 - _globals['_SINK_RENAMESENTRY']._serialized_end=8109 + _globals['_SAMPLINGSTRATEGY']._serialized_start=861 + _globals['_SAMPLINGSTRATEGY']._serialized_end=924 + _globals['_PUBSUBFORMAT']._serialized_start=926 + _globals['_PUBSUBFORMAT']._serialized_end=1003 + _globals['_KAFKAFORMAT']._serialized_start=1006 + _globals['_KAFKAFORMAT']._serialized_end=1194 + _globals['_JSONFORMAT']._serialized_start=1196 + _globals['_JSONFORMAT']._serialized_end=1208 + _globals['_AVROFORMAT']._serialized_start=1210 + _globals['_AVROFORMAT']._serialized_end=1293 + _globals['_PROTOBUFFORMAT']._serialized_start=1295 + _globals['_PROTOBUFFORMAT']._serialized_end=1382 + _globals['_REFERENCE']._serialized_start=1385 + _globals['_REFERENCE']._serialized_end=1607 + _globals['_REFERENCE_EXTDBTYPE']._serialized_start=1460 + _globals['_REFERENCE_EXTDBTYPE']._serialized_end=1607 + _globals['_WEBHOOK']._serialized_start=1609 + _globals['_WEBHOOK']._serialized_end=1678 + _globals['_MYSQL']._serialized_start=1681 + _globals['_MYSQL']._serialized_end=1949 + _globals['_POSTGRES']._serialized_start=1952 + _globals['_POSTGRES']._serialized_end=2223 + _globals['_S3']._serialized_start=2226 + _globals['_S3']._serialized_end=2530 + _globals['_BIGQUERY']._serialized_start=2533 + _globals['_BIGQUERY']._serialized_end=2715 + _globals['_SNOWFLAKE']._serialized_start=2718 + _globals['_SNOWFLAKE']._serialized_end=3007 + _globals['_KAFKA']._serialized_start=3010 + _globals['_KAFKA']._serialized_end=3387 + _globals['_KINESIS']._serialized_start=3389 + _globals['_KINESIS']._serialized_end=3416 + _globals['_CREDENTIALS']._serialized_start=3419 + _globals['_CREDENTIALS']._serialized_end=3630 + _globals['_REDSHIFTAUTHENTICATION']._serialized_start=3632 + _globals['_REDSHIFTAUTHENTICATION']._serialized_end=3757 + _globals['_REDSHIFT']._serialized_start=3760 + _globals['_REDSHIFT']._serialized_end=3913 + _globals['_MONGO']._serialized_start=3916 + _globals['_MONGO']._serialized_end=4149 + _globals['_PUBSUB']._serialized_start=4152 + _globals['_PUBSUB']._serialized_end=4312 + _globals['_EXTTABLE']._serialized_start=4315 + _globals['_EXTTABLE']._serialized_end=5019 + _globals['_MYSQLTABLE']._serialized_start=5021 + _globals['_MYSQLTABLE']._serialized_end=5102 + _globals['_POSTGRESTABLE']._serialized_start=5104 + _globals['_POSTGRESTABLE']._serialized_end=5226 + _globals['_S3TABLE']._serialized_start=5229 + _globals['_S3TABLE']._serialized_end=5476 + _globals['_KAFKATOPIC']._serialized_start=5479 + _globals['_KAFKATOPIC']._serialized_end=5608 + _globals['_BIGQUERYTABLE']._serialized_start=5610 + _globals['_BIGQUERYTABLE']._serialized_end=5694 + _globals['_SNOWFLAKETABLE']._serialized_start=5696 + _globals['_SNOWFLAKETABLE']._serialized_end=5781 + _globals['_WEBHOOKENDPOINT']._serialized_start=5784 + _globals['_WEBHOOKENDPOINT']._serialized_end=5913 + _globals['_KINESISSTREAM']._serialized_start=5916 + _globals['_KINESISSTREAM']._serialized_end=6127 + _globals['_REDSHIFTTABLE']._serialized_start=6129 + _globals['_REDSHIFTTABLE']._serialized_end=6213 + _globals['_PUBSUBTOPIC']._serialized_start=6216 + _globals['_PUBSUBTOPIC']._serialized_end=6350 + _globals['_PREPROCVALUE']._serialized_start=6353 + _globals['_PREPROCVALUE']._serialized_end=6658 + _globals['_PREPROCVALUE_EVAL']._serialized_start=6489 + _globals['_PREPROCVALUE_EVAL']._serialized_end=6647 + _globals['_MONGOCOLLECTION']._serialized_start=6660 + _globals['_MONGOCOLLECTION']._serialized_end=6751 + _globals['_SNAPSHOTDATA']._serialized_start=6753 + _globals['_SNAPSHOTDATA']._serialized_end=6803 + _globals['_INCREMENTAL']._serialized_start=6805 + _globals['_INCREMENTAL']._serialized_end=6818 + _globals['_RECREATE']._serialized_start=6820 + _globals['_RECREATE']._serialized_end=6830 + _globals['_STYLE']._serialized_start=6833 + _globals['_STYLE']._serialized_end=7021 + _globals['_SOURCE']._serialized_start=7024 + _globals['_SOURCE']._serialized_end=7809 + _globals['_SOURCE_PREPROCENTRY']._serialized_start=7668 + _globals['_SOURCE_PREPROCENTRY']._serialized_end=7752 + _globals['_SINK']._serialized_start=7812 + _globals['_SINK']._serialized_end=8306 + _globals['_SINK_RENAMESENTRY']._serialized_start=8224 + _globals['_SINK_RENAMESENTRY']._serialized_end=8270 # @@protoc_insertion_point(module_scope) diff --git a/fennel/gen/connector_pb2.pyi b/fennel/gen/connector_pb2.pyi index ca83dab06..06ebd48f8 100644 --- a/fennel/gen/connector_pb2.pyi +++ b/fennel/gen/connector_pb2.pyi @@ -129,6 +129,26 @@ class ExtDatabase(google.protobuf.message.Message): global___ExtDatabase = ExtDatabase +@typing_extensions.final +class SamplingStrategy(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + SAMPLING_RATE_FIELD_NUMBER: builtins.int + COLUMNS_USED_FIELD_NUMBER: builtins.int + sampling_rate: builtins.float + @property + def columns_used(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """columns_used is a list of columns used for sampling""" + def __init__( + self, + *, + sampling_rate: builtins.float = ..., + columns_used: collections.abc.Iterable[builtins.str] | None = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["columns_used", b"columns_used", "sampling_rate", b"sampling_rate"]) -> None: ... + +global___SamplingStrategy = SamplingStrategy + @typing_extensions.final class PubSubFormat(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor @@ -225,7 +245,7 @@ class Reference(google.protobuf.message.Message): ValueType = typing.NewType("ValueType", builtins.int) V: typing_extensions.TypeAlias = ValueType - class _ExtDBTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Reference._ExtDBType.ValueType], builtins.type): # noqa: F821 + class _ExtDBTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Reference._ExtDBType.ValueType], builtins.type): DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor MYSQL: Reference._ExtDBType.ValueType # 0 POSTGRES: Reference._ExtDBType.ValueType # 1 @@ -1179,6 +1199,7 @@ class Source(google.protobuf.message.Message): IDLENESS_FIELD_NUMBER: builtins.int UNTIL_FIELD_NUMBER: builtins.int FILTER_FIELD_NUMBER: builtins.int + SAMPLING_STRATEGY_FIELD_NUMBER: builtins.int @property def table(self) -> global___ExtTable: ... dataset: builtins.str @@ -1202,6 +1223,8 @@ class Source(google.protobuf.message.Message): def until(self) -> google.protobuf.timestamp_pb2.Timestamp: ... @property def filter(self) -> pycode_pb2.PyCode: ... + @property + def sampling_strategy(self) -> global___SamplingStrategy: ... def __init__( self, *, @@ -1220,15 +1243,18 @@ class Source(google.protobuf.message.Message): idleness: google.protobuf.duration_pb2.Duration | None = ..., until: google.protobuf.timestamp_pb2.Timestamp | None = ..., filter: pycode_pb2.PyCode | None = ..., + sampling_strategy: global___SamplingStrategy | None = ..., ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "_filter", b"_filter", "_idleness", b"_idleness", "cursor", b"cursor", "disorder", b"disorder", "every", b"every", "filter", b"filter", "idleness", b"idleness", "starting_from", b"starting_from", "table", b"table", "until", b"until"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "_filter", b"_filter", "_idleness", b"_idleness", "bounded", b"bounded", "cdc", b"cdc", "cursor", b"cursor", "dataset", b"dataset", "disorder", b"disorder", "ds_version", b"ds_version", "every", b"every", "filter", b"filter", "idleness", b"idleness", "pre_proc", b"pre_proc", "starting_from", b"starting_from", "table", b"table", "timestamp_field", b"timestamp_field", "until", b"until", "version", b"version"]) -> None: ... + def HasField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "_filter", b"_filter", "_idleness", b"_idleness", "_sampling_strategy", b"_sampling_strategy", "cursor", b"cursor", "disorder", b"disorder", "every", b"every", "filter", b"filter", "idleness", b"idleness", "sampling_strategy", b"sampling_strategy", "starting_from", b"starting_from", "table", b"table", "until", b"until"]) -> builtins.bool: ... + def ClearField(self, field_name: typing_extensions.Literal["_cursor", b"_cursor", "_filter", b"_filter", "_idleness", b"_idleness", "_sampling_strategy", b"_sampling_strategy", "bounded", b"bounded", "cdc", b"cdc", "cursor", b"cursor", "dataset", b"dataset", "disorder", b"disorder", "ds_version", b"ds_version", "every", b"every", "filter", b"filter", "idleness", b"idleness", "pre_proc", b"pre_proc", "sampling_strategy", b"sampling_strategy", "starting_from", b"starting_from", "table", b"table", "timestamp_field", b"timestamp_field", "until", b"until", "version", b"version"]) -> None: ... @typing.overload def WhichOneof(self, oneof_group: typing_extensions.Literal["_cursor", b"_cursor"]) -> typing_extensions.Literal["cursor"] | None: ... @typing.overload def WhichOneof(self, oneof_group: typing_extensions.Literal["_filter", b"_filter"]) -> typing_extensions.Literal["filter"] | None: ... @typing.overload def WhichOneof(self, oneof_group: typing_extensions.Literal["_idleness", b"_idleness"]) -> typing_extensions.Literal["idleness"] | None: ... + @typing.overload + def WhichOneof(self, oneof_group: typing_extensions.Literal["_sampling_strategy", b"_sampling_strategy"]) -> typing_extensions.Literal["sampling_strategy"] | None: ... global___Source = Source diff --git a/fennel/gen/dataset_pb2.pyi b/fennel/gen/dataset_pb2.pyi index 146a4b2af..22f8d2ff9 100644 --- a/fennel/gen/dataset_pb2.pyi +++ b/fennel/gen/dataset_pb2.pyi @@ -337,7 +337,7 @@ class Join(google.protobuf.message.Message): ValueType = typing.NewType("ValueType", builtins.int) V: typing_extensions.TypeAlias = ValueType - class _HowEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Join._How.ValueType], builtins.type): # noqa: F821 + class _HowEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Join._How.ValueType], builtins.type): DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor Left: Join._How.ValueType # 0 Inner: Join._How.ValueType # 1 diff --git a/fennel/gen/expectations_pb2.pyi b/fennel/gen/expectations_pb2.pyi index 28a9a9a95..ae399ad75 100644 --- a/fennel/gen/expectations_pb2.pyi +++ b/fennel/gen/expectations_pb2.pyi @@ -27,7 +27,7 @@ class Expectations(google.protobuf.message.Message): ValueType = typing.NewType("ValueType", builtins.int) V: typing_extensions.TypeAlias = ValueType - class _EntityTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Expectations._EntityType.ValueType], builtins.type): # noqa: F821 + class _EntityTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[Expectations._EntityType.ValueType], builtins.type): DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor Dataset: Expectations._EntityType.ValueType # 0 Featureset: Expectations._EntityType.ValueType # 1 diff --git a/fennel/internal_lib/to_proto/to_proto.py b/fennel/internal_lib/to_proto/to_proto.py index 61426f0e6..b2dcde7ea 100644 --- a/fennel/internal_lib/to_proto/to_proto.py +++ b/fennel/internal_lib/to_proto/to_proto.py @@ -35,7 +35,7 @@ import fennel.gen.schema_registry_pb2 as schema_registry_proto import fennel.gen.services_pb2 as services_proto from fennel.connectors import kinesis -from fennel.connectors.connectors import CSV, SnapshotData +from fennel.connectors.connectors import CSV, SnapshotData, Sample, PreProcValue from fennel.datasets import Dataset, Pipeline, Field from fennel.datasets.datasets import ( indices_from_ds, @@ -938,6 +938,9 @@ def _webhook_to_source_proto( cdc=to_cdc_proto(connector.cdc), pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -981,6 +984,9 @@ def _kafka_conn_to_source_proto( starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -1084,6 +1090,9 @@ def _s3_conn_to_source_proto( cdc=to_cdc_proto(connector.cdc), pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -1104,6 +1113,88 @@ def _renames_to_proto(renames: Optional[Dict[str, str]]) -> Mapping[str, str]: return renames +def get_sampling_rate(sample: Optional[Union[float, Sample]]): + if isinstance(sample, float): + if sample < 0 or sample > 1: + raise ValueError("Sample rate should be between 0 and 1") + return sample + if isinstance(sample, Sample): + if sample.rate < 0 or sample.rate > 1: + raise ValueError("Sample rate should be between 0 and 1") + return sample.rate + raise ValueError(f"Invalid sample type: {type(sample)}") + + +def get_sampling_columns( + sample: Optional[Union[float, Sample]], + dataset: Dataset, + preproc: Optional[Dict[str, PreProcValue]], +): + disallowed_columns = [dataset.timestamp_field] + if preproc is not None: + disallowed_columns.extend(list(preproc.keys())) + + if isinstance(sample, Sample) and len(sample.using) != 0: + input_columns = sample.using + all_columns = [str(column) for column in dataset.fields] + for column in input_columns: + if column == dataset.timestamp_field: + raise ValueError( + f"Timestamp column: {column} cannot be part of sampling columns" + ) + if column in disallowed_columns: + raise ValueError( + f"Column {column} is part of preproc so cannot be used for sampling" + ) + if column not in all_columns: + raise ValueError( + f"Column {column} is not part of dataset columns" + ) + return sorted(input_columns) + + if len(dataset.key_fields) != 0: + key_columns = dataset.key_fields + sample_columns = sorted( + [ + column + for column in key_columns + if column not in disallowed_columns + ] + ) + if len(sample_columns) == 0: + raise ValueError( + "No columns left to sample from. all key columns are part of preproc" + ) + return sample_columns + else: + all_columns = [str(column) for column in dataset.fields] + sample_columns = sorted( + [ + column + for column in all_columns + if column not in disallowed_columns + ] + ) + if len(sample_columns) == 0: + raise ValueError( + "No columns left to sample from. all columns are part of preproc" + ) + return sample_columns + + +def _sample_to_proto( + sample: Optional[Union[float, Sample]], + dataset: Dataset, + preproc: Optional[Dict[str, PreProcValue]], +) -> Optional[connector_proto.SamplingStrategy]: + if sample is None: + return None + return connector_proto.SamplingStrategy( + sampling_rate=get_sampling_rate(sample), + columns_used=get_sampling_columns(sample, dataset, preproc), + ) + + def _how_to_proto( how: Optional[Literal["incremental", "recreate"] | SnapshotData] ) -> Optional[connector_proto.Style]: @@ -1300,6 +1391,9 @@ def _bigquery_conn_to_source_proto( cdc=to_cdc_proto(connector.cdc), pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -1377,6 +1471,9 @@ def _redshift_conn_to_source_proto( starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -1478,6 +1575,9 @@ def _mongo_conn_to_source_proto( starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -1554,6 +1654,9 @@ def _pubsub_conn_to_source_proto( cdc=to_cdc_proto(connector.cdc), pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -1641,6 +1744,9 @@ def _snowflake_conn_to_source_proto( starting_from=_to_timestamp_proto(connector.since), until=_to_timestamp_proto(connector.until), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -1749,6 +1855,9 @@ def _mysql_conn_to_source_proto( until=_to_timestamp_proto(connector.until), pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -1844,6 +1953,9 @@ def _pg_conn_to_source_proto( until=_to_timestamp_proto(connector.until), pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness @@ -1933,6 +2045,9 @@ def _kinesis_conn_to_source_proto( until=_to_timestamp_proto(connector.until), pre_proc=_pre_proc_to_proto(dataset, connector.pre_proc), bounded=connector.bounded, + sampling_strategy=_sample_to_proto( + connector.sample, dataset, connector.pre_proc + ), idleness=( to_duration_proto(connector.idleness) if connector.idleness