Skip to content

Check that all tasks have been processed instead of for empty queue #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
The intended audience of this file is for py42 consumers -- as such, changes that don't affect
how a consumer would use the library (e.g. adding unit tests, updating documentation, etc) are not captured here.

## 0.7.3 - 2020-06-23

### Fixed

- Fixed bug that caused the last few entries in csv files to sometimes not be processed when performing bulk processing actions.

## 0.7.2 - 2020-06-11

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion src/code42cli/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.7.2"
__version__ = "0.7.3"
4 changes: 3 additions & 1 deletion src/code42cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(self, thread_count, expected_total):
self._queue = queue.Queue()
self._thread_count = thread_count
self._stats = WorkerStats(expected_total)
self._tasks = 0
self.__started = False
self.__start_lock = Lock()

Expand All @@ -72,6 +73,7 @@ def do_async(self, func, *args, **kwargs):
self.__start()
self.__started = True
self._queue.put({u"func": func, u"args": args, u"kwargs": kwargs})
self._tasks += 1

@property
def stats(self):
Expand All @@ -82,7 +84,7 @@ def stats(self):
def wait(self):
"""Wait for the tasks in the queue to complete. This should usually be called before
program termination."""
while not self._queue.empty():
while not self._stats.total_processed >= self._tasks:
sleep(0.5)

def _process_queue(self):
Expand Down