-
Notifications
You must be signed in to change notification settings - Fork 27
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #50 from socrata/cduranti/major-version
Cduranti/major version
- Loading branch information
Showing
25 changed files
with
699 additions
and
660 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
from threading import Thread, Lock, Semaphore, Event | ||
from queue import Queue | ||
|
||
|
||
ONE_YEAR = 365 * 24 * 60 * 60 | ||
|
||
THREAD_DONE = object() | ||
|
||
|
||
class LazyThreadPoolExecutor(object): | ||
def __init__(self, num_workers=1): | ||
self.num_workers = num_workers | ||
self.result_queue = Queue() | ||
self.thread_sem = Semaphore(num_workers) | ||
self._shutdown = Event() | ||
self.threads = [] | ||
|
||
def map(self, predicate, iterable): | ||
self._shutdown.clear() | ||
self.iterable = ThreadSafeIterator(iterable) | ||
self._start_threads(predicate) | ||
return self._result_iterator() | ||
|
||
def shutdown(self, wait=True): | ||
self._shutdown.set() | ||
if wait: | ||
for t in self.threads: | ||
t.join() | ||
|
||
def _start_threads(self, predicate): | ||
for i in range(self.num_workers): | ||
t = Thread( | ||
name="LazyChild #{0}".format(i), | ||
target=self._make_worker(predicate) | ||
) | ||
t.daemon = True | ||
self.threads.append(t) | ||
t.start() | ||
|
||
def _make_worker(self, predicate): | ||
def _w(): | ||
with self.thread_sem: | ||
for thing in self.iterable: | ||
self.result_queue.put(predicate(thing)) | ||
if self._shutdown.is_set(): | ||
break | ||
self.result_queue.put(THREAD_DONE) | ||
return _w | ||
|
||
def _result_iterator(self): | ||
while 1: | ||
# Queue.get is not interruptable w/ ^C unless you specify a | ||
# timeout. | ||
# Hopefully one year is long enough... | ||
# See http://bugs.python.org/issue1360 | ||
result = self.result_queue.get(True, ONE_YEAR) | ||
if result is not THREAD_DONE: | ||
yield result | ||
else: | ||
# if all threads have exited | ||
# sorry, this is kind of a gross way to use semaphores | ||
# break | ||
if self.thread_sem._value == self.num_workers: | ||
break | ||
else: | ||
continue | ||
|
||
|
||
|
||
class ThreadSafeIterator(object): | ||
def __init__(self, it): | ||
self._it = iter(it) | ||
self.lock = Lock() | ||
|
||
def __iter__(self): | ||
return self | ||
|
||
def __next__(self): | ||
with self.lock: | ||
return self._it.__next__() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,15 @@ | ||
from socrata.operations.utils import get_filename, SocrataException | ||
from socrata.operations.utils import get_filename | ||
from socrata.operations.operation import Operation | ||
|
||
class ConfiguredJob(Operation): | ||
def run(self, data, put_bytes, filename = None): | ||
filename = get_filename(data, filename) | ||
|
||
(ok, rev) = self.properties['view'].revisions.create_using_config( | ||
rev = self.properties['view'].revisions.create_using_config( | ||
self.properties['config'] | ||
) | ||
if not ok: | ||
raise SocrataException("Failed to create the revision", rev) | ||
|
||
(ok, source) = rev.create_upload(filename) | ||
if not ok: | ||
raise SocrataException("Failed to create the upload", source) | ||
|
||
(ok, source) = put_bytes(source) | ||
if not ok: | ||
raise SocrataException("Failed to upload the file", source) | ||
|
||
source = rev.create_upload(filename) | ||
source = put_bytes(source) | ||
output_schema = source.get_latest_input_schema().get_latest_output_schema() | ||
|
||
(ok, output_schema) = output_schema.wait_for_finish() | ||
if not ok: | ||
raise SocrataException("The dataset failed to validate", output_schema) | ||
|
||
(ok, job) = rev.apply(output_schema = output_schema) | ||
if not ok: | ||
raise SocrataException("Failed to apply the change", job) | ||
output_schema = output_schema.wait_for_finish() | ||
job = rev.apply(output_schema = output_schema) | ||
return (rev, job) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,12 @@ | ||
from socrata.operations.utils import get_filename, SocrataException | ||
from socrata.operations.utils import get_filename | ||
from socrata.operations.operation import Operation | ||
|
||
class Create(Operation): | ||
def run(self, data, put_bytes, filename = None): | ||
filename = get_filename(data, filename) | ||
|
||
(ok, rev) = self.publish.new(self.properties['metadata']) | ||
if not ok: | ||
raise SocrataException("Failed to create the view and revision", view) | ||
|
||
(ok, source) = rev.create_upload(filename) | ||
if not ok: | ||
raise SocrataException("Failed to create the upload", source) | ||
|
||
(ok, source) = put_bytes(source) | ||
if not ok: | ||
raise SocrataException("Failed to upload the file", source) | ||
|
||
rev = self.publish.new(self.properties['metadata']) | ||
source = rev.create_upload(filename) | ||
source = put_bytes(source) | ||
output_schema = source.get_latest_input_schema().get_latest_output_schema() | ||
|
||
(ok, output_schema) = output_schema.wait_for_finish() | ||
if not ok: | ||
raise SocrataException("The dataset failed to validate") | ||
|
||
output_schema = output_schema.wait_for_finish() | ||
return (rev, output_schema) |
Oops, something went wrong.