-
Notifications
You must be signed in to change notification settings - Fork 76
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #23 from syrusakbary/async-dataloader
Re-Implementation + Provisional dataloader
- Loading branch information
Showing
25 changed files
with
2,351 additions
and
610 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
[run] | ||
omit = promise/compat.py,promise/iterate_promise.py | ||
omit = promise/compat.py,promise/iterate_promise.py,promise/utils.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,20 @@ | ||
from .promise import Promise, promise_for_dict, promisify, is_thenable | ||
from .pyutils.version import get_version | ||
|
||
__all__ = ['Promise', 'promise_for_dict', 'promisify', 'is_thenable'] | ||
|
||
try: | ||
# This variable is injected in the __builtins__ by the build | ||
# process. It used to enable importing subpackages when | ||
# the required packages are not installed | ||
__SETUP__ # type: ignore | ||
except NameError: | ||
__SETUP__ = False | ||
|
||
|
||
VERSION = (2, 0, 0, 'alpha', 0) | ||
|
||
__version__ = get_version(VERSION) | ||
|
||
if not __SETUP__: | ||
from .promise import Promise, promise_for_dict, promisify, is_thenable, async_instance | ||
|
||
__all__ = ['Promise', 'promise_for_dict', 'promisify', 'is_thenable', 'async_instance'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
# from threading import Timer, Thread | ||
|
||
from .compat import Queue | ||
# from .context import Context | ||
|
||
# Based on https://github.com/petkaantonov/bluebird/blob/master/src/async.js | ||
|
||
|
||
class Scheduler(object): | ||
|
||
def call(self, fn): | ||
# thread = Thread(target=fn) | ||
# thread = Timer(0.01, fn) | ||
# fn = thread.start | ||
try: | ||
# c = Context.peek_context() | ||
# if not c: | ||
fn() | ||
# else: | ||
# c.on_exit(fn) | ||
except: | ||
pass | ||
# thread = Thread(target=fn) | ||
# thread = Timer(0.001, fn) | ||
# thread.start() | ||
|
||
|
||
def get_default_scheduler(): | ||
return Scheduler() | ||
|
||
|
||
# https://docs.python.org/2/library/queue.html#Queue.Queue | ||
LATE_QUEUE_CAPACITY = 0 # The queue size is infinite | ||
NORMAL_QUEUE_CAPACITY = 0 # The queue size is infinite | ||
|
||
|
||
class Async(object): | ||
|
||
def __init__(self, schedule=None): | ||
self.is_tick_used = False | ||
self.late_queue = Queue(LATE_QUEUE_CAPACITY) | ||
self.normal_queue = Queue(NORMAL_QUEUE_CAPACITY) | ||
self.have_drained_queues = False | ||
self.trampoline_enabled = True | ||
self.schedule = schedule or get_default_scheduler() | ||
|
||
def enable_trampoline(self): | ||
self.trampoline_enabled = True | ||
|
||
def disable_trampoline(self): | ||
self.trampoline_enabled = False | ||
|
||
def have_items_queued(self): | ||
return self.is_tick_used or self.have_drained_queues | ||
|
||
def _async_invoke_later(self, fn, context): | ||
self.late_queue.put(fn) | ||
self.queue_tick(context) | ||
|
||
def _async_invoke(self, fn, context): | ||
self.normal_queue.put(fn) | ||
self.queue_tick(context) | ||
|
||
def _async_settle_promise(self, promise): | ||
self.normal_queue.put(promise) | ||
self.queue_tick(context=promise._trace) | ||
|
||
def invoke_later(self, fn, context): | ||
if self.trampoline_enabled: | ||
self._async_invoke_later(fn, context) | ||
else: | ||
self.schedule.call_later(0.1, fn) | ||
|
||
def invoke(self, fn, context): | ||
if self.trampoline_enabled: | ||
self._async_invoke(fn, context) | ||
else: | ||
self.schedule.call( | ||
fn | ||
) | ||
|
||
def settle_promises(self, promise): | ||
if self.trampoline_enabled: | ||
self._async_settle_promise(promise) | ||
else: | ||
self.schedule.call( | ||
promise._settle_promises | ||
) | ||
|
||
def throw_later(self, reason): | ||
def fn(): | ||
raise reason | ||
|
||
self.schedule.call(fn) | ||
|
||
fatal_error = throw_later | ||
|
||
def drain_queue(self, queue): | ||
from .promise import Promise | ||
while not queue.empty(): | ||
fn = queue.get() | ||
if (isinstance(fn, Promise)): | ||
fn._settle_promises() | ||
continue | ||
fn() | ||
|
||
def drain_queues(self): | ||
assert self.is_tick_used | ||
self.drain_queue(self.normal_queue) | ||
self.reset() | ||
self.have_drained_queues = True | ||
self.drain_queue(self.late_queue) | ||
|
||
def queue_context_tick(self): | ||
if not self.is_tick_used: | ||
self.is_tick_used = True | ||
self.schedule.call(self.drain_queues) | ||
|
||
def queue_tick(self, context): | ||
if not context: | ||
self.queue_context_tick() | ||
else: | ||
context.on_exit(self.queue_context_tick) | ||
|
||
def reset(self): | ||
self.is_tick_used = False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.