diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 9ecf6f5a05..66bce0ce6d 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -34,6 +34,7 @@ from urllib.parse import urlparse from typing import TypeVar, Optional, Dict, Any from confluent_kafka import Producer +from tqdm.auto import tqdm from hsfs import client, feature, util from hsfs.core import ( @@ -751,9 +752,20 @@ def _write_dataframe_kafka( writer = self._get_encoder_func(feature_group._get_encoded_avro_schema()) def acked(err, msg): - if err is not None: + if err is not None and offline_write_options.get("debug_kafka", False): print("Failed to deliver message: %s: %s" % (str(msg), str(err))) - + else: + # update progress bar for each msg + progress_bar.update() + + # initialize progress bar + progress_bar = tqdm( + total=dataframe.shape[0], + bar_format="{desc}: {percentage:.2f}% |{bar}| Rows {n_fmt}/{total_fmt} | " + "Elapsed Time: {elapsed} | Remaining Time: {remaining}", + desc="Uploading Dataframe", + mininterval=1, + ) # loop over rows for r in dataframe.itertuples(index=False): # itertuples returns Python NamedTyple, to be able to serialize it using @@ -790,9 +802,7 @@ def acked(err, msg): topic=feature_group._online_topic_name, key=key, value=encoded_row, - callback=acked - if offline_write_options.get("debug_kafka", False) - else None, + callback=acked, ) # Trigger internal callbacks to empty op queue @@ -806,6 +816,7 @@ def acked(err, msg): # make sure producer blocks and everything is delivered producer.flush() + progress_bar.close() # start backfilling job job_name = "{fg_name}_{version}_offline_fg_backfill".format( diff --git a/python/setup.py b/python/setup.py index 1d40e644a3..253f01def5 100644 --- a/python/setup.py +++ b/python/setup.py @@ -55,6 +55,7 @@ def read(fname): "pyarrow", "confluent-kafka==1.8.2", "fastavro==1.4.11", + "tqdm" ], }, author="Hopsworks AB",