Skip to content

Commit

Permalink
better task contention
Browse files Browse the repository at this point in the history
it can spawns more threads than cpu cores in some case, but that's OK
the idea is to spawn tasks as subprocesses within threads
so we count the threads of the subprocesses (and not our threads)

it can spawns more threads than cpu cores because an heavy task
can be spawned right after another heavy one was spawned but hasn't
found the time to spawn its core itself, is usually only happens
on very first tasks when nothing is already running

Process.children() is very slow, see:
giampaolo/psutil#1183

it's highly recommended to use python3-psutil >= 5.4.2
because some great speed-up were implemented
  • Loading branch information
illwieckz committed Apr 3, 2018
1 parent 44a78b6 commit 2181cff
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
23 changes: 16 additions & 7 deletions Urcheon/Pak.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import os
import sys
import tempfile
import time
import threading
import zipfile
from collections import OrderedDict
Expand Down Expand Up @@ -191,6 +192,8 @@ def build(self):
action_thread_list = []
produced_unit_list = []

main_process = ThreadCounter.getProcess()

for action_type in Action.list():
for file_path in self.action_list.active_action_dict[action_type.keyword]:
# no need to use multiprocessing module to manage task contention, since each task will call its own process
Expand All @@ -209,9 +212,9 @@ def build(self):
# use multiple threads themselves
thread_count = cpu_count
else:
# this compute is super slow
child_thread_count = ThreadCounter.countChildThread(ThreadCounter.getProcess())
thread_count = max(0, cpu_count - child_thread_count)
# this compute is super slow because of process.children()
child_thread_count = ThreadCounter.countChildThread(main_process)
thread_count = max(1, cpu_count - child_thread_count)

action.thread_count = thread_count

Expand All @@ -223,13 +226,19 @@ def build(self):
# action that can't be run concurrently to others
produced_unit_list.extend(action.run())
else:
# do not use >= in case of there is some extra thread we don't think about
# it's better to spawn an extra one than looping forever
while child_thread_count > cpu_count:
# no need to loop at full cpu speed
time.sleep(.05)
child_thread_count = ThreadCounter.countChildThread(main_process)
pass

action.thread_count = max(2, cpu_count - child_thread_count)

# wrapper does: produced_unit_list.append(a.run())
action_thread = threading.Thread(target=self.threadExtendRes, args=(action.run, (), produced_unit_list))
action_thread_list.append(action_thread)

while cpu_count < ThreadCounter.countChildThread(ThreadCounter.getProcess()):
pass

action_thread.start()

# wait for all threads to end, otherwise it will start packaging next
Expand Down
9 changes: 5 additions & 4 deletions Urcheon/ThreadCounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def countThread(process):
# one is called, in this case return 0
try:
return process.num_threads()
except:
except (psutil.NoSuchProcess, psutil.ZombieProcess):
return 0

def countChildThread(process):
Expand All @@ -28,8 +28,9 @@ def countChildThread(process):
# one is called, in this case return 0
try:
thread_count = 0
for subprocess in process.children():
thread_count = thread_count + countThread(subprocess) + countChildThread(subprocess)
# process.children() is super slow
for subprocess in process.children(recursive=True):
thread_count = thread_count + countThread(subprocess)
return thread_count
except:
except (psutil.NoSuchProcess, psutil.ZombieProcess):
return 0

0 comments on commit 2181cff

Please sign in to comment.