Skip to content

Commit

Permalink
Trying to solve #13.
Browse files Browse the repository at this point in the history
  • Loading branch information
baffelli committed May 2, 2017
1 parent 14f223b commit ab01ec3
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
3 changes: 1 addition & 2 deletions pyperator/DAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ def __init__(self, name, log=None, log_level=logging.DEBUG, workdir=None):
self._log_path = None or log
self._log = _log.setup_custom_logger(self.name, file=self._log_path, level=log_level)
self.log.info("Created DAG {} with workdir {}".format(self.name, self.workdir))
#Add input and output port register to DAG


@property
Expand Down Expand Up @@ -161,7 +160,7 @@ def iternodes(self) -> nodes.Component:

def iterarcs(self):
for source in self.iternodes():
for name, port in source.outputs.items():
for port in list(source.outputs.values()):
for dest in port.iterends():
yield (port, dest)

Expand Down
40 changes: 28 additions & 12 deletions pyperator/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ def normalize_path_to_workdir(path, workdir):
return os.path.normpath(workdir + os.path.basename(path))


def check_missing(path, workdir):
return not os.path.exists(normalize_path_to_workdir(path, workdir))


def list_missing(out_packets, workdir):
return {port: packet for port, packet in out_packets.items() if
check_missing(str(packet), workdir)}



class PacketRegister(_collabc.Mapping):
"""
This class is used to represent a collection of
Expand Down Expand Up @@ -72,7 +82,7 @@ def copy_temp(self):
#Create temporary file
paths = {}
for k, v in self._packets.items():
paths[k] = IP.InformationPacket(_path.Path(_temp.NamedTemporaryFile(delete=False).name))
paths[k] = IP.InformationPacket(_path.Path(_temp.NamedTemporaryFile(delete=True).name))
self._temp_packets = PacketRegister(paths)
return self._temp_packets

Expand Down Expand Up @@ -119,8 +129,13 @@ def __enter__(self):
return self.copy_temp()

def __exit__(self, exc_type, exc_val, exc_tb):
print(list(self.values()), list(self._temp_packets.values()))
self.finalize_temp()
if exc_val:
print(exc_val)
for name, packet in self._temp_packets.items():
del packet
raise(exc_val)
else:
self.finalize_temp()



Expand Down Expand Up @@ -205,7 +220,7 @@ def generate_output_paths(self, received_data):
self.log.debug(
"Output port {} will send file '{}'".format(out_port, out_paths[out]))
except NameError as e:
ex_text = 'Port {} does not have a path formatter specified'.format( out)
ex_text = 'Port {} does not have a path formatter specified'.format(out)
self.log.error(ex_text)
raise FormatterError(ex_text)
except Exception as e:
Expand All @@ -224,8 +239,6 @@ def generate_packets(self, out_paths):
# if _os.path.getmtime(out_packet.path) < _os.path.getmtime(inpacket.path):


def enumerate_missing(self, out_packets):
return {port: packet for port, packet in out_packets.items() if not packet.exists()}

def produce_outputs(self, input_packets, output_packets, wildcards):
pass
Expand All @@ -239,9 +252,9 @@ async def __call__(self):
out_paths, wildcards = self.generate_output_paths(received_packets)
out_packets = self.generate_packets(out_paths)
# Check for missing packet
missing = self.enumerate_missing(out_packets)
missing = list_missing(out_paths, self.dag.workdir)
if missing:
self.log.info("Output files '{}' do not exist not exist, command will be run".format(
self.log.warn("Output files '{}' do not exist not exist, command will be run".format(
[
packet
for
Expand All @@ -251,10 +264,12 @@ async def __call__(self):
inputs_obj = PacketRegister(received_packets)
# Produce the outputs using the tempfile
#context manager
with out_packets as temp_out:
new_out = await self.produce_outputs(inputs_obj, temp_out, wildcards)
# with out_packets as temp_out:
new_out = await self.produce_outputs(inputs_obj, out_packets, wildcards)

# Check if the output files exist
missing_after = self.enumerate_missing(out_packets)
missing_after = list_missing(out_packets, self.dag.workdir)
print('m',missing_after)
if missing_after:
missing_err = "Following files are missing {}, check the command".format(
[packet for packet in missing_after.values()])
Expand All @@ -263,7 +278,7 @@ async def __call__(self):
else:
self.log.debug("All output files exist, command will not be run")
new_out = out_packets
await asyncio.wait(self.send_packets(new_out.as_dict()))
await asyncio.wait(self.send_packets(out_packets.as_dict()))
await asyncio.sleep(0)


Expand Down Expand Up @@ -313,6 +328,7 @@ def __init__(self, name, script):
# self.dag.commit_external(self.script, "Component {} uses script {}".format(self.name, self.script) )

async def produce_outputs(self, input_packets, output_packets, wildcards):
print(input_packets, output_packets)
with open(self.script) as input_script:
formatted_cmd = input_script.read().format(inputs=input_packets,
outputs=output_packets, wildcards=wildcards)
Expand Down
2 changes: 1 addition & 1 deletion pyperator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ def path(self, path):


def __repr__(self):
port_template = "{component.name} at {id}:{name} -> {other}"
port_template = "{id}:{name} at {component.name} -> {other}"
formatted = port_template.format(id=id(self.component),**self.__dict__)
return formatted

Expand Down

0 comments on commit ab01ec3

Please sign in to comment.