Skip to content

Commit

Permalink
Minor fixup (#60)
Browse files Browse the repository at this point in the history
* add log entry for timing

* use default mount

* bump version
  • Loading branch information
esseivaju authored Oct 2, 2024
1 parent 1ffaa77 commit 44bb4eb
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/raythena/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.9.0"
__version__ = "1.0.0"
1 change: 1 addition & 0 deletions src/raythena/actors/payloads/eventservice/pilothttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ async def handle_get_job(self, request: web.BaseRequest) -> web.Response:
"""
del request
job = self.current_job if self.current_job else dict()
self._logger.debug(f"Job assigned to worker {self.worker_id}")
return web.json_response(job, dumps=self.json_encoder)

async def handle_update_job(self, request: web.BaseRequest) -> web.Response:
Expand Down
9 changes: 4 additions & 5 deletions src/raythena/drivers/esdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ def retrieve_actors_messages(self, ready: Sequence[ObjectRef]) -> Iterator[Worke
else:
yield actor_id, message, data
else:
self._logger.debug(f"Start handling messages batch of {len(messages)} actors")
for actor_id, message, data in messages:
yield actor_id, message, data

Expand Down Expand Up @@ -320,7 +319,7 @@ def handle_actor_done(self, actor_id: str) -> bool:
else:
self.terminated.append(actor_id)
self.bookKeeper.process_actor_end(actor_id)
self._logger.info(f"{actor_id} stopped")
self._logger.debug(f"{actor_id} stopped")
# do not get new messages from this actor
return has_jobs

Expand Down Expand Up @@ -777,15 +776,15 @@ def handle_merge_transforms(self, wait_for_completion=False) -> bool:
to_remove.append(output_filename)
self.total_running_merge_transforms -= 1
if sub_process.returncode == 0:
self._logger.info(f"Merge transform for file {output_filename} finished.")
self._logger.debug(f"Merge transform for file {output_filename} finished.")
event_ranges_map = {}
guid = self.get_output_file_guid(job_report_file)
for (event_range_output, event_range) in event_ranges:
event_ranges_map[event_range.eventRangeID] = TaskStatus.build_eventrange_dict(event_range, event_range_output)
self.bookKeeper.report_merged_file(self.panda_taskid, output_filename, event_ranges_map, guid)
else:
self.bookKeeper.report_failed_merge_transform(self.panda_taskid, output_filename)
self._logger.debug(f"Merge transform failed with return code {sub_process.returncode}")
self._logger.debug(f"Merge transform for {output_filename} failed with return code {sub_process.returncode}")
for o in to_remove:
del self.running_merge_transforms[o]
return new_transforms
Expand Down Expand Up @@ -840,7 +839,7 @@ def hits_merge_transform(self, input_files: Iterable[str], output_file: str) ->
endtoken = "" if self.config.payload['containerextraargs'].strip().endswith(";") else ";"
cmd += (f"{self.config.payload['containerextraargs']}{endtoken}"
f"source ${{ATLAS_LOCAL_ROOT_BASE}}/user/atlasLocalSetup.sh --swtype {self.config.payload['containerengine']}"
f" -c $thePlatform -d -s /srv/release_setup.sh -r /srv/merge_transform.sh -e \"{self.container_options}\";"
f" -c $thePlatform -s /srv/release_setup.sh -r /srv/merge_transform.sh -e \"{self.container_options}\";"
f"RETURN_VAL=$?;if [ \"$RETURN_VAL\" -eq 0 ]; then cp jobReport.json {job_report_name};fi;exit $RETURN_VAL;")
return (Popen(cmd,
stdin=DEVNULL,
Expand Down

0 comments on commit 44bb4eb

Please sign in to comment.