Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming support #137

Merged
merged 5 commits into from
Apr 21, 2022
Merged

Add streaming support #137

merged 5 commits into from
Apr 21, 2022

Conversation

jaymo001
Copy link
Collaborator

@jaymo001 jaymo001 commented Apr 19, 2022

Add streaming support for anchored features.
Example:

  1. Define streaming input source on Kafka
    Define input data schema:
 schema = AvroJsonSchema(schemaStr="""
    {
        "type": "record",
        "name": "DriverTrips",
        "fields": [
            {"name": "driver_id", "type": "long"},
            {"name": "trips_today", "type": "int"},
            {
                "name": "datetime",
                "type": {"type": "long", "logicalType": "timestamp-micros"}
            }
        ]
    }
    """)
stream_source = KafKaSource(name="kafkaStreamingSource",
                              kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
                                                      topics=["feathrcieventhub"],
                                                      schema=schema)
                              )
  1. Define feature definition with the Kafka source
  driver_id = TypedKey(key_column="driver_id",
                          key_column_type=ValueType.INT64,
                          description="driver id",
                          full_name="nyc driver id")

    kafkaAnchor = FeatureAnchor(name="kafkaAnchor",
                                      source=stream_source,
                                      features=[Feature(name="f_modified_streaming_count",
                                                        feature_type=INT32,
                                                        transform="trips_today + 1",
                                                        key=driver_id),
                                                Feature(name="f_modified_streaming_count2",
                                                        feature_type=INT32,
                                                        transform="trips_today + 2",
                                                        key=driver_id)]
                                      )
  1. Start streaming job
 redisSink = RedisSink(table_name="kafkaSampleDemoFeature", streaming=True, streamingTimeoutMs=10000)
    settings = MaterializationSettings(name="kafkaSampleDemo",
                                   sinks=[redisSink],
                                   feature_names=['f_modified_streaming_count']
                                   )
    client.materialize_features(settings)  # Will streaming for 10 seconds since streamingTimeoutMs is 10000

@hangfei
Copy link
Collaborator

hangfei commented Apr 19, 2022

Is schema required? can it work without schema like frame-offline?

@xiaoyongzhu xiaoyongzhu added the safe to test Tag to execute build pipeline for a PR from forked repo label Apr 19, 2022
@jaymo001
Copy link
Collaborator Author

Yes. I believe it is. Kafka returns a bytes array of any type, so we have to rely on the schema to create a decoder.

feathr_project/feathr/client.py Outdated Show resolved Hide resolved
feathr_project/feathr/source.py Show resolved Hide resolved
feathr_project/test/test_fixture.py Show resolved Hide resolved
README.md Show resolved Hide resolved
@xiaoyongzhu
Copy link
Member

Another bug while testing: We should make this kafka thing optional if the config does not have a kafka section, it will fail:

    self.kafka_endpoint = envutils.get_environment_variable_with_default(
  File "c:\users\xiaoyzhu\documents\github\feathr\feathr_project\feathr\_envvariableutil.py", line 43, in get_environment_variable_with_default
    yaml_layer = yaml_layer[arg]
KeyError: 'kafka'

@hangfei
Copy link
Collaborator

hangfei commented Apr 19, 2022

Discussed offline. Ideally we want users to define features without schema as well so it's aligned with offline and online. But seems Kafka doesn't provide such capability by itself yet. So instead, we will ask users to provide a schema.

In the future, if there is interface to allow kafka read without schema, we can make the schema optional.

@hangfei
Copy link
Collaborator

hangfei commented Apr 19, 2022

Could you add a dev guide as well for future devs?

@hangfei
Copy link
Collaborator

hangfei commented Apr 19, 2022

How do we support other streaming sources, like Flink?

Maybe we should do:

stream_source = StreamingSource(name="kafkaStreamingSource",
                              kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
                                                      topics=["feathrcieventhub"],
                                                      schema=schema)
                              )

hangfei
hangfei previously approved these changes Apr 19, 2022
@hangfei
Copy link
Collaborator

hangfei commented Apr 19, 2022

How do we support other streaming sources, like Flink?

Maybe we should do:

stream_source = StreamingSource(name="kafkaStreamingSource",
                              kafkaConfig=KafkaConfig(brokers=["feathrazureci.servicebus.windows.net:9093"],
                                                      topics=["feathrcieventhub"],
                                                      schema=schema)
                              )

Discussed offline. The current syntax just work so no need to introduce anohter layer.

@jaymo001 jaymo001 force-pushed the streaming branch 3 times, most recently from f413137 to acdb9cd Compare April 20, 2022 16:55
@jaymo001 jaymo001 force-pushed the streaming branch 6 times, most recently from 5edb8a7 to 350c244 Compare April 21, 2022 02:26
@jaymo001 jaymo001 merged commit 4fdfcf4 into main Apr 21, 2022
@xiaoyongzhu xiaoyongzhu deleted the streaming branch August 22, 2022 17:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
safe to test Tag to execute build pipeline for a PR from forked repo
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants