Skip to content

Commit

Permalink
Using dict union operator
Browse files Browse the repository at this point in the history
  • Loading branch information
GlassOfWhiskey committed Oct 19, 2024
1 parent 68dd844 commit e142780
Show file tree
Hide file tree
Showing 25 changed files with 403 additions and 511 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
python -m pip install -r docs/requirements.txt
- name: "Build documentation and check for consistency"
env:
CHECKSUM: "66ec8ce6a6a09bb8bc95a2bae09405c075ddbd2343a7f8c93bb961b337d4bde4"
CHECKSUM: "bedaf099ee8247691295d0be167c943499658b140cf4b188129f3f279ee5ea1b"
run: |
cd docs
HASH="$(make checksum | tail -n1)"
Expand Down
4 changes: 2 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def patched_run(self, schema, pointer=''):
if prop in schema['properties']:
props[prop] = schema['properties'][prop]
del schema['properties'][prop]
schema['properties'] = {**props, **schema['properties']}
schema['properties'] = props | schema['properties']
return original_run(self, schema, pointer)


Expand All @@ -226,7 +226,7 @@ def patched_get_json_data(self):
target_file = os.path.join(os.path.dirname(filename), obj['$ref'])
target_schema, _ = self.from_file(target_file)
target_schema = self.ordered_load(target_schema)
schema['properties'] = {**target_schema.get('properties', {}), **schema['properties']}
schema['properties'] = target_schema.get('properties', {}) | schema['properties']
del schema['allOf']
schema['properties'] = dict(sorted(schema['properties'].items()))
return schema, source, pointer
Expand Down
12 changes: 6 additions & 6 deletions docs/source/ext/database.rst
Original file line number Diff line number Diff line change
Expand Up @@ -209,32 +209,32 @@ The ``Database`` interface, defined in the ``streamflow.core.persistence`` modul
...
async def update_deployment(
self, deployment_id: int, updates: MutableMapping[str, Any]
self, deployment_id: int, updates: dict[str, Any]
) -> int:
...
async def update_execution(
self, execution_id: int, updates: MutableMapping[str, Any]
self, execution_id: int, updates: dict[str, Any]
) -> int:
...
async def update_port(
self, port_id: int, updates: MutableMapping[str, Any]
self, port_id: int, updates: dict[str, Any]
) -> int:
...
async def update_step(
self, step_id: int, updates: MutableMapping[str, Any]
self, step_id: int, updates: dict[str, Any]
) -> int:
...
async def update_target(
self, target_id: str, updates: MutableMapping[str, Any]
self, target_id: str, updates: dict[str, Any]
) -> int:
...
async def update_workflow(
self, workflow_id: int, updates: MutableMapping[str, Any]
self, workflow_id: int, updates: dict[str, Any]
) -> int:
...
Expand Down
68 changes: 30 additions & 38 deletions streamflow/core/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def _load(

async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]:
) -> dict[str, Any]:
return {}


Expand Down Expand Up @@ -109,7 +109,9 @@ async def _load(
),
)

async def _save_additional_params(self, context: StreamFlowContext):
async def _save_additional_params(
self, context: StreamFlowContext
) -> dict[str, Any]:
if self.target:
await self.target.save(context)
return {
Expand Down Expand Up @@ -169,7 +171,7 @@ async def _load(

async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]:
) -> dict[str, Any]:
return {"name": self.name}

@abstractmethod
Expand Down Expand Up @@ -232,9 +234,8 @@ async def _load(
async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]:
return {
**await super()._save_additional_params(context),
**{"processor": await self.processor.save(context)},
return await super()._save_additional_params(context) | {
"processor": await self.processor.save(context)
}

def _update_options(self, options: CommandOptions, token: Token) -> CommandOptions:
Expand Down Expand Up @@ -319,22 +320,19 @@ async def _load(
async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]:
return {
**await super()._save_additional_params(context),
**{
"processors": {
name: token
for name, token in zip(
self.processors.keys(),
await asyncio.gather(
*(
asyncio.create_task(t.save(context))
for t in self.processors.values()
)
),
)
}
},
return await super()._save_additional_params(context) | {
"processors": {
name: token
for name, token in zip(
self.processors.keys(),
await asyncio.gather(
*(
asyncio.create_task(t.save(context))
for t in self.processors.values()
)
),
)
}
}

def _update_options(self, options: CommandOptions, token: Token) -> CommandOptions:
Expand Down Expand Up @@ -409,13 +407,10 @@ async def _load(
async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]:
return {
**await super()._save_additional_params(context),
**{
"processors": await asyncio.gather(
*(asyncio.create_task(t.save(context)) for t in self.processors)
)
},
return await super()._save_additional_params(context) | {
"processors": await asyncio.gather(
*(asyncio.create_task(t.save(context)) for t in self.processors)
)
}

def _update_options(self, options: CommandOptions, token: Token) -> CommandOptions:
Expand Down Expand Up @@ -483,14 +478,11 @@ async def _load(
async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]:
return {
**await super()._save_additional_params(context),
**{
"processors": await asyncio.gather(
*(
asyncio.create_task(processor.save(context))
for processor in self.processors
)
return await super()._save_additional_params(context) | {
"processors": await asyncio.gather(
*(
asyncio.create_task(processor.save(context))
for processor in self.processors
)
},
)
}
22 changes: 7 additions & 15 deletions streamflow/core/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,35 +274,27 @@ async def get_workflows_list(

@abstractmethod
async def update_deployment(
self, deployment_id: int, updates: MutableMapping[str, Any]
self, deployment_id: int, updates: dict[str, Any]
) -> int: ...

@abstractmethod
async def update_execution(
self, execution_id: int, updates: MutableMapping[str, Any]
self, execution_id: int, updates: dict[str, Any]
) -> int: ...

@abstractmethod
async def update_filter(
self, filter_id: int, updates: MutableMapping[str, Any]
) -> int: ...
async def update_filter(self, filter_id: int, updates: dict[str, Any]) -> int: ...

@abstractmethod
async def update_port(
self, port_id: int, updates: MutableMapping[str, Any]
) -> int: ...
async def update_port(self, port_id: int, updates: dict[str, Any]) -> int: ...

@abstractmethod
async def update_step(
self, step_id: int, updates: MutableMapping[str, Any]
) -> int: ...
async def update_step(self, step_id: int, updates: dict[str, Any]) -> int: ...

@abstractmethod
async def update_target(
self, target_id: str, updates: MutableMapping[str, Any]
) -> int: ...
async def update_target(self, target_id: str, updates: dict[str, Any]) -> int: ...

@abstractmethod
async def update_workflow(
self, workflow_id: int, updates: MutableMapping[str, Any]
self, workflow_id: int, updates: dict[str, Any]
) -> int: ...
2 changes: 1 addition & 1 deletion streamflow/core/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def _load(
@abstractmethod
async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]: ...
) -> dict[str, Any]: ...

@abstractmethod
def eval(self, job: Job) -> Hardware: ...
Expand Down
14 changes: 9 additions & 5 deletions streamflow/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ async def _load(
tmp_directory=row["tmp_directory"],
)

async def _save_additional_params(self, context: StreamFlowContext):
async def _save_additional_params(
self, context: StreamFlowContext
) -> dict[str, Any]:
await asyncio.gather(
*(asyncio.create_task(t.save(context)) for t in self.inputs.values())
)
Expand Down Expand Up @@ -140,7 +142,7 @@ async def _load(

async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]:
) -> dict[str, Any]:
return {}

def close(self, consumer: str):
Expand Down Expand Up @@ -244,7 +246,7 @@ async def _load(

async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]:
) -> dict[str, Any]:
return {}

async def _set_status(self, status: Status):
Expand Down Expand Up @@ -461,7 +463,9 @@ async def _load(
workflow=await loading_context.load_workflow(context, row["workflow"]),
)

async def _save_additional_params(self, context: StreamFlowContext):
async def _save_additional_params(
self, context: StreamFlowContext
) -> dict[str, Any]:
return {"name": self.name, "workflow": self.workflow.persistent_id}

@classmethod
Expand Down Expand Up @@ -519,7 +523,7 @@ async def _load(

async def _save_additional_params(
self, context: StreamFlowContext
) -> MutableMapping[str, Any]:
) -> dict[str, Any]:
return {"config": self.config, "output_ports": self.output_ports}

def create_port(self, cls: type[P] = Port, name: str = None, **kwargs) -> P:
Expand Down
15 changes: 7 additions & 8 deletions streamflow/cwl/combinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ async def _load(
flatten=row["flatten"],
)

async def _save_additional_params(self, context: StreamFlowContext):
return {
**await super()._save_additional_params(context),
**{
"input_names": self.input_names,
"output_name": self.output_name,
"flatten": self.flatten,
},
async def _save_additional_params(
self, context: StreamFlowContext
) -> dict[str, Any]:
return await super()._save_additional_params(context) | {
"input_names": self.input_names,
"output_name": self.output_name,
"flatten": self.flatten,
}

async def combine(
Expand Down
Loading

0 comments on commit e142780

Please sign in to comment.