diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py index 5149da6f6c..1f2d079e43 100644 --- a/utils/python/hsfs_utils.py +++ b/utils/python/hsfs_utils.py @@ -290,7 +290,7 @@ def offline_fg_materialization( print(f"startingOffsets: {offset_string}") # read kafka topic - df = ( + kafka_df = ( spark.read.format("kafka") .options(**read_options) .option("subscribe", entity._online_topic_name) @@ -302,7 +302,7 @@ def offline_fg_materialization( ) # filter only the necassary entries - df = df.filter( + df = kafka_df.filter( expr( "CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)" ) @@ -323,7 +323,9 @@ def offline_fg_materialization( entity.insert(deserialized_df, storage="offline") # update offsets - df_offsets = df.groupBy("partition").agg(max("offset").alias("offset")).collect() + df_offsets = ( + kafka_df.groupBy("partition").agg(max("offset").alias("offset")).collect() + ) if offset_string == "earliest": offset_dict = {entity._online_topic_name: {}} else: