Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix SparkKafkaProcessor query_timeout parameter #2789

Merged
merged 1 commit into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

class SparkProcessorConfig(ProcessorConfig):
spark_session: SparkSession
processing_time: str
query_timeout: int


class SparkKafkaProcessor(StreamProcessor):
Expand All @@ -31,7 +33,7 @@ def __init__(
config: ProcessorConfig,
write_function: MethodType,
processing_time: str = "30 seconds",
query_timeout: str = "15 seconds",
query_timeout: int = 15,
):
if not isinstance(sfv.stream_source, KafkaSource):
raise ValueError("data source is not kafka source")
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/contrib/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def get_stream_processor_object(
if config.mode == "spark" and config.source == "kafka":
stream_processor = STREAM_PROCESSOR_CLASS_FOR_TYPE[("spark", "kafka")]
module_name, class_name = stream_processor.rsplit(".", 1)
cls = import_class(module_name, class_name, "Processor")
cls = import_class(module_name, class_name, "StreamProcessor")
return cls(sfv=sfv, config=config, write_function=write_function,)
else:
raise ValueError("other processors besides spark-kafka not supported")
2 changes: 2 additions & 0 deletions sdk/python/feast/stream_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,12 @@ def __eq__(self, other):

if not super().__eq__(other):
return False

if not self.udf:
return not other.udf
if not other.udf:
return False

if (
self.mode != other.mode
or self.timestamp_field != other.timestamp_field
Expand Down