Skip to content

Commit

Permalink
[HOPSWORKS 3241] add progress bar during stream data ingestion in pyt…
Browse files Browse the repository at this point in the history
  • Loading branch information
dhananjay-mk authored and kennethmhc committed Nov 16, 2022
1 parent b7114b0 commit 546a517
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
21 changes: 16 additions & 5 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from urllib.parse import urlparse
from typing import TypeVar, Optional, Dict, Any
from confluent_kafka import Producer
from tqdm.auto import tqdm

from hsfs import client, feature, util
from hsfs.core import (
Expand Down Expand Up @@ -751,9 +752,20 @@ def _write_dataframe_kafka(
writer = self._get_encoder_func(feature_group._get_encoded_avro_schema())

def acked(err, msg):
if err is not None:
if err is not None and offline_write_options.get("debug_kafka", False):
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))

else:
# update progress bar for each msg
progress_bar.update()

# initialize progress bar
progress_bar = tqdm(
total=dataframe.shape[0],
bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt} | "
"Elapsed Time: {elapsed} | Remaining Time: {remaining}",
desc="Uploading Dataframe",
mininterval=1,
)
# loop over rows
for r in dataframe.itertuples(index=False):
# itertuples returns Python NamedTyple, to be able to serialize it using
Expand Down Expand Up @@ -790,9 +802,7 @@ def acked(err, msg):
topic=feature_group._online_topic_name,
key=key,
value=encoded_row,
callback=acked
if offline_write_options.get("debug_kafka", False)
else None,
callback=acked,
)

# Trigger internal callbacks to empty op queue
Expand All @@ -806,6 +816,7 @@ def acked(err, msg):

# make sure producer blocks and everything is delivered
producer.flush()
progress_bar.close()

# start backfilling job
job_name = "{fg_name}_{version}_offline_fg_backfill".format(
Expand Down
1 change: 1 addition & 0 deletions python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def read(fname):
"pyarrow",
"confluent-kafka==1.8.2",
"fastavro==1.4.11",
"tqdm"
],
},
author="Hopsworks AB",
Expand Down

0 comments on commit 546a517

Please sign in to comment.