diff --git a/airbyte/progress.py b/airbyte/progress.py index af4204a3..8cbd4a45 100644 --- a/airbyte/progress.py +++ b/airbyte/progress.py @@ -36,7 +36,6 @@ from airbyte_protocol.models import ( AirbyteMessage, AirbyteStreamStatus, - Type, ) from airbyte import logs @@ -275,16 +274,17 @@ def tally_records_read( self.stream_read_counts[message.record.stream] += 1 if message.record.stream not in self.stream_read_start_times: - self._log_stream_read_start(stream_name=message.record.stream) + self.log_stream_start(stream_name=message.record.stream) - elif ( - message.trace - and message.trace.stream_status - and message.trace.stream_status.status is AirbyteStreamStatus.COMPLETE - ): - self._log_stream_read_end( - stream_name=message.trace.stream_status.stream_descriptor.name - ) + elif message.trace and message.trace.stream_status: + if message.trace.stream_status.status is AirbyteStreamStatus.STARTED: + self.log_stream_start( + stream_name=message.trace.stream_status.stream_descriptor.name + ) + if message.trace.stream_status.status is AirbyteStreamStatus.COMPLETE: + self._log_stream_read_end( + stream_name=message.trace.stream_status.stream_descriptor.name + ) # Bail if we're not due for a progress update. if count % update_period != 0: @@ -345,12 +345,12 @@ def tally_confirmed_writes( """ self._start_rich_view() # Start Rich's live view if not already running. for message in messages: - if message.type is Type.STATE: + if message.state: # This is a state message from the destination. Tally the records written. if message.state.stream and message.state.destinationStats: stream_name = message.state.stream.stream_descriptor.name - self.destination_stream_records_confirmed[stream_name] += ( - message.state.destinationStats.recordCount + self.destination_stream_records_confirmed[stream_name] += int( + message.state.destinationStats.recordCount or 0 ) self._update_display() @@ -418,11 +418,14 @@ def _log_sync_start(self) -> None: event_type=EventType.SYNC, ) - def _log_stream_read_start(self, stream_name: str) -> None: - self._print_info_message( - f"Read started on stream `{stream_name}` at `{pendulum.now().format('HH:mm:ss')}`..." - ) - self.stream_read_start_times[stream_name] = time.time() + def log_stream_start(self, stream_name: str) -> None: + """Log that a stream has started reading.""" + if stream_name not in self.stream_read_start_times: + self._print_info_message( + f"Read started on stream `{stream_name}` at " + f"`{pendulum.now().format('HH:mm:ss')}`..." + ) + self.stream_read_start_times[stream_name] = time.time() def _log_stream_read_end(self, stream_name: str) -> None: self._print_info_message(