diff --git a/.github/workflows/ci-tests.yaml b/.github/workflows/ci-tests.yaml index 5332054b4..90f9a45c1 100644 --- a/.github/workflows/ci-tests.yaml +++ b/.github/workflows/ci-tests.yaml @@ -147,7 +147,7 @@ jobs: python -m pip install -r docs/requirements.txt - name: "Build documentation and check for consistency" env: - CHECKSUM: "66ec8ce6a6a09bb8bc95a2bae09405c075ddbd2343a7f8c93bb961b337d4bde4" + CHECKSUM: "bedaf099ee8247691295d0be167c943499658b140cf4b188129f3f279ee5ea1b" run: | cd docs HASH="$(make checksum | tail -n1)" diff --git a/docs/source/conf.py b/docs/source/conf.py index 62e0294e8..7b9e51093 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -204,7 +204,7 @@ def patched_run(self, schema, pointer=''): if prop in schema['properties']: props[prop] = schema['properties'][prop] del schema['properties'][prop] - schema['properties'] = {**props, **schema['properties']} + schema['properties'] = props | schema['properties'] return original_run(self, schema, pointer) @@ -226,7 +226,7 @@ def patched_get_json_data(self): target_file = os.path.join(os.path.dirname(filename), obj['$ref']) target_schema, _ = self.from_file(target_file) target_schema = self.ordered_load(target_schema) - schema['properties'] = {**target_schema.get('properties', {}), **schema['properties']} + schema['properties'] = target_schema.get('properties', {}) | schema['properties'] del schema['allOf'] schema['properties'] = dict(sorted(schema['properties'].items())) return schema, source, pointer diff --git a/docs/source/ext/database.rst b/docs/source/ext/database.rst index e40af965f..756518005 100644 --- a/docs/source/ext/database.rst +++ b/docs/source/ext/database.rst @@ -209,32 +209,32 @@ The ``Database`` interface, defined in the ``streamflow.core.persistence`` modul ... async def update_deployment( - self, deployment_id: int, updates: MutableMapping[str, Any] + self, deployment_id: int, updates: dict[str, Any] ) -> int: ... async def update_execution( - self, execution_id: int, updates: MutableMapping[str, Any] + self, execution_id: int, updates: dict[str, Any] ) -> int: ... async def update_port( - self, port_id: int, updates: MutableMapping[str, Any] + self, port_id: int, updates: dict[str, Any] ) -> int: ... async def update_step( - self, step_id: int, updates: MutableMapping[str, Any] + self, step_id: int, updates: dict[str, Any] ) -> int: ... async def update_target( - self, target_id: str, updates: MutableMapping[str, Any] + self, target_id: str, updates: dict[str, Any] ) -> int: ... async def update_workflow( - self, workflow_id: int, updates: MutableMapping[str, Any] + self, workflow_id: int, updates: dict[str, Any] ) -> int: ... diff --git a/streamflow/core/command.py b/streamflow/core/command.py index 165e9342d..4f80097be 100644 --- a/streamflow/core/command.py +++ b/streamflow/core/command.py @@ -53,7 +53,7 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: return {} @@ -109,7 +109,9 @@ async def _load( ), ) - async def _save_additional_params(self, context: StreamFlowContext): + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: if self.target: await self.target.save(context) return { @@ -169,7 +171,7 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: return {"name": self.name} @abstractmethod @@ -232,9 +234,8 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{"processor": await self.processor.save(context)}, + return await super()._save_additional_params(context) | { + "processor": await self.processor.save(context) } def _update_options(self, options: CommandOptions, token: Token) -> CommandOptions: @@ -319,22 +320,19 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "processors": { - name: token - for name, token in zip( - self.processors.keys(), - await asyncio.gather( - *( - asyncio.create_task(t.save(context)) - for t in self.processors.values() - ) - ), - ) - } - }, + return await super()._save_additional_params(context) | { + "processors": { + name: token + for name, token in zip( + self.processors.keys(), + await asyncio.gather( + *( + asyncio.create_task(t.save(context)) + for t in self.processors.values() + ) + ), + ) + } } def _update_options(self, options: CommandOptions, token: Token) -> CommandOptions: @@ -409,13 +407,10 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "processors": await asyncio.gather( - *(asyncio.create_task(t.save(context)) for t in self.processors) - ) - }, + return await super()._save_additional_params(context) | { + "processors": await asyncio.gather( + *(asyncio.create_task(t.save(context)) for t in self.processors) + ) } def _update_options(self, options: CommandOptions, token: Token) -> CommandOptions: @@ -483,14 +478,11 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "processors": await asyncio.gather( - *( - asyncio.create_task(processor.save(context)) - for processor in self.processors - ) + return await super()._save_additional_params(context) | { + "processors": await asyncio.gather( + *( + asyncio.create_task(processor.save(context)) + for processor in self.processors ) - }, + ) } diff --git a/streamflow/core/persistence.py b/streamflow/core/persistence.py index 838a82010..04c47048d 100644 --- a/streamflow/core/persistence.py +++ b/streamflow/core/persistence.py @@ -274,35 +274,27 @@ async def get_workflows_list( @abstractmethod async def update_deployment( - self, deployment_id: int, updates: MutableMapping[str, Any] + self, deployment_id: int, updates: dict[str, Any] ) -> int: ... @abstractmethod async def update_execution( - self, execution_id: int, updates: MutableMapping[str, Any] + self, execution_id: int, updates: dict[str, Any] ) -> int: ... @abstractmethod - async def update_filter( - self, filter_id: int, updates: MutableMapping[str, Any] - ) -> int: ... + async def update_filter(self, filter_id: int, updates: dict[str, Any]) -> int: ... @abstractmethod - async def update_port( - self, port_id: int, updates: MutableMapping[str, Any] - ) -> int: ... + async def update_port(self, port_id: int, updates: dict[str, Any]) -> int: ... @abstractmethod - async def update_step( - self, step_id: int, updates: MutableMapping[str, Any] - ) -> int: ... + async def update_step(self, step_id: int, updates: dict[str, Any]) -> int: ... @abstractmethod - async def update_target( - self, target_id: str, updates: MutableMapping[str, Any] - ) -> int: ... + async def update_target(self, target_id: str, updates: dict[str, Any]) -> int: ... @abstractmethod async def update_workflow( - self, workflow_id: int, updates: MutableMapping[str, Any] + self, workflow_id: int, updates: dict[str, Any] ) -> int: ... diff --git a/streamflow/core/scheduling.py b/streamflow/core/scheduling.py index efe6ad22f..949d413e0 100644 --- a/streamflow/core/scheduling.py +++ b/streamflow/core/scheduling.py @@ -172,7 +172,7 @@ async def _load( @abstractmethod async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: ... + ) -> dict[str, Any]: ... @abstractmethod def eval(self, job: Job) -> Hardware: ... diff --git a/streamflow/core/workflow.py b/streamflow/core/workflow.py index af5175f32..fb6d94cd0 100644 --- a/streamflow/core/workflow.py +++ b/streamflow/core/workflow.py @@ -83,7 +83,9 @@ async def _load( tmp_directory=row["tmp_directory"], ) - async def _save_additional_params(self, context: StreamFlowContext): + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: await asyncio.gather( *(asyncio.create_task(t.save(context)) for t in self.inputs.values()) ) @@ -140,7 +142,7 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: return {} def close(self, consumer: str): @@ -244,7 +246,7 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: return {} async def _set_status(self, status: Status): @@ -461,7 +463,9 @@ async def _load( workflow=await loading_context.load_workflow(context, row["workflow"]), ) - async def _save_additional_params(self, context: StreamFlowContext): + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: return {"name": self.name, "workflow": self.workflow.persistent_id} @classmethod @@ -519,7 +523,7 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: return {"config": self.config, "output_ports": self.output_ports} def create_port(self, cls: type[P] = Port, name: str = None, **kwargs) -> P: diff --git a/streamflow/cwl/combinator.py b/streamflow/cwl/combinator.py index e1fd736ae..811e87dea 100644 --- a/streamflow/cwl/combinator.py +++ b/streamflow/cwl/combinator.py @@ -54,14 +54,13 @@ async def _load( flatten=row["flatten"], ) - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{ - "input_names": self.input_names, - "output_name": self.output_name, - "flatten": self.flatten, - }, + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "input_names": self.input_names, + "output_name": self.output_name, + "flatten": self.flatten, } async def combine( diff --git a/streamflow/cwl/command.py b/streamflow/cwl/command.py index f014c975c..9a8fb83c6 100644 --- a/streamflow/cwl/command.py +++ b/streamflow/cwl/command.py @@ -570,25 +570,22 @@ def _get_timeout(self, job: Job) -> int | None: async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "absolute_initial_workdir_allowed": self.absolute_initial_workdir_allowed, - "base_command": self.base_command, - "environment": self.environment, - "expression_lib": self.expression_lib, - "failure_codes": self.failure_codes, - "full_js": self.full_js, - "initial_work_dir": self.initial_work_dir, - "inplace_update": self.inplace_update, - "is_shell_command": self.is_shell_command, - "success_codes": self.success_codes, - "stderr": self.stderr, # TODO: manage when is IO type - "stdin": self.stdin, # TODO: manage when is IO type - "stdout": self.stdout, # TODO: manage when is IO type - "time_limit": self.time_limit, - }, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "absolute_initial_workdir_allowed": self.absolute_initial_workdir_allowed, + "base_command": self.base_command, + "environment": self.environment, + "expression_lib": self.expression_lib, + "failure_codes": self.failure_codes, + "full_js": self.full_js, + "initial_work_dir": self.initial_work_dir, + "inplace_update": self.inplace_update, + "is_shell_command": self.is_shell_command, + "success_codes": self.success_codes, + "stderr": self.stderr, # TODO: manage when is IO type + "stdin": self.stdin, # TODO: manage when is IO type + "stdout": self.stdout, # TODO: manage when is IO type + "time_limit": self.time_limit, } @classmethod @@ -620,7 +617,7 @@ async def _load( ) def _get_executable_command( - self, context: MutableMapping[str, Any], inputs: MutableMapping[str, Token] + self, context: dict[str, Any], inputs: MutableMapping[str, Token] ) -> MutableSequence[str]: command = [] options = CWLCommandOptions( @@ -850,11 +847,11 @@ class CWLCommandOptions(CommandOptions): def __init__( self, - context: MutableMapping[str, Any], + context: dict[str, Any], expression_lib: MutableSequence[str] | None = None, full_js: bool = False, ): - self.context: MutableMapping[str, Any] = context + self.context: dict[str, Any] = context self.expression_lib: MutableSequence[str] | None = expression_lib self.full_js: bool = full_js @@ -943,10 +940,8 @@ def bind( if isinstance(self.position, str) and not self.position.isnumeric(): position = utils.eval_expression( expression=self.position, - context={ - **options.context, - **{"self": get_token_value(token) if token else None}, - }, + context=options.context + | {"self": get_token_value(token) if token else None}, full_js=options.full_js, expression_lib=options.expression_lib, ) @@ -1003,24 +998,21 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "expression": self.expression, - "processor": ( - await self.processor.save(context) - if self.processor is not None - else None - ), - "token_type": self.token_type, - "is_shell_command": self.is_shell_command, - "item_separator": self.item_separator, - "position": self.position, - "prefix": self.prefix, - "separate": self.separate, - "shell_quote": self.shell_quote, - }, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "expression": self.expression, + "processor": ( + await self.processor.save(context) + if self.processor is not None + else None + ), + "token_type": self.token_type, + "is_shell_command": self.is_shell_command, + "item_separator": self.item_separator, + "position": self.position, + "prefix": self.prefix, + "separate": self.separate, + "shell_quote": self.shell_quote, } @@ -1047,10 +1039,9 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{"token_type": self.token_type}, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "token_type": self.token_type } def bind( @@ -1072,10 +1063,7 @@ def _update_options( self, options: CWLCommandOptions, token: Token ) -> CWLCommandOptions: return CWLCommandOptions( - context={ - **options.context, - **{"inputs": {self.name: get_token_value(token)}}, - }, + context=options.context | {"inputs": {self.name: get_token_value(token)}}, expression_lib=options.expression_lib, full_js=options.full_js, ) @@ -1087,10 +1075,7 @@ def _update_options( ) -> CWLCommandOptions: value = get_token_value(token) return CWLCommandOptions( - context={ - **options.context, - **{"inputs": {self.name: value}, "self": value}, - }, + context=options.context | {"inputs": {self.name: value}, "self": value}, expression_lib=options.expression_lib, full_js=options.full_js, ) @@ -1170,18 +1155,15 @@ async def execute(self, job: Job) -> CWLCommandOutput: async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "absolute_initial_workdir_allowed": self.absolute_initial_workdir_allowed, - "expression": self.expression, - "expression_lib": self.expression_lib, - "initial_work_dir": self.initial_work_dir, - "inplace_update": self.inplace_update, - "full_js": self.full_js, - "time_limit": self.time_limit, - }, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "absolute_initial_workdir_allowed": self.absolute_initial_workdir_allowed, + "expression": self.expression, + "expression_lib": self.expression_lib, + "initial_work_dir": self.initial_work_dir, + "inplace_update": self.inplace_update, + "full_js": self.full_js, + "time_limit": self.time_limit, } @classmethod diff --git a/streamflow/cwl/hardware.py b/streamflow/cwl/hardware.py index 0aaf9ad04..b041eca22 100644 --- a/streamflow/cwl/hardware.py +++ b/streamflow/cwl/hardware.py @@ -52,7 +52,9 @@ async def _load( expression_lib=row["expression_lib"], ) - async def _save_additional_params(self, context: StreamFlowContext): + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: return { "cores": self.cores, "memory": self.memory, diff --git a/streamflow/cwl/processor.py b/streamflow/cwl/processor.py index 5de780bda..3567714c9 100644 --- a/streamflow/cwl/processor.py +++ b/streamflow/cwl/processor.py @@ -182,7 +182,7 @@ async def _process_file_token( filepath = utils.get_path_from_token(token_value) # Check file format if present if self.file_format: - context = {**context, **{"self": token_value}} + context |= {"self": token_value} input_formats = utils.eval_expression( expression=self.file_format, context=context, @@ -261,7 +261,7 @@ async def _process_file_token( else: sf_map = {} if self.secondary_files: - sf_context = {**context, "self": token_value} + sf_context = context | {"self": token_value} await utils.process_secondary_files( context=self.workflow.context, cwl_version=cwl_workflow.cwl_version, @@ -291,28 +291,24 @@ async def _process_file_token( # Return token value return token_value - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{ - "token_type": self.token_type, - "check_type": self.check_type, - "enum_symbols": self.enum_symbols, - "expression_lib": self.expression_lib, - "file_format": self.file_format, - "full_js": self.full_js, - "load_contents": self.load_contents, - "load_listing": self.load_listing.value if self.load_listing else None, - "only_propagate_secondary_files": self.only_propagate_secondary_files, - "optional": self.optional, - "secondary_files": await asyncio.gather( - *( - asyncio.create_task(s.save(context)) - for s in self.secondary_files - ) - ), - "streamable": self.streamable, - }, + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "token_type": self.token_type, + "check_type": self.check_type, + "enum_symbols": self.enum_symbols, + "expression_lib": self.expression_lib, + "file_format": self.file_format, + "full_js": self.full_js, + "load_contents": self.load_contents, + "load_listing": self.load_listing.value if self.load_listing else None, + "only_propagate_secondary_files": self.only_propagate_secondary_files, + "optional": self.optional, + "secondary_files": await asyncio.gather( + *(asyncio.create_task(s.save(context)) for s in self.secondary_files) + ), + "streamable": self.streamable, } async def process(self, inputs: MutableMapping[str, Token], token: Token) -> Token: @@ -423,7 +419,7 @@ async def _build_token( self, job: Job, connector: Connector | None, - context: MutableMapping[str, Any], + context: dict[str, Any], token_value: Any, ) -> Token: if isinstance(token_value, MutableMapping): @@ -442,7 +438,7 @@ async def _build_token( ) # Process file format if self.file_format: - context = {**context, **{"self": token_value}} + context |= {"self": token_value} token_value["format"] = utils.eval_expression( expression=self.file_format, context=context, @@ -655,28 +651,24 @@ async def _process_command_output( load_listing=self.load_listing, ) - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{ - "token_type": self.token_type, - "enum_symbols": self.enum_symbols, - "expression_lib": self.expression_lib, - "file_format": self.file_format, - "full_js": self.full_js, - "glob": self.glob, - "load_contents": self.load_contents, - "load_listing": self.load_listing.value if self.load_listing else None, - "optional": self.optional, - "output_eval": self.output_eval, - "secondary_files": await asyncio.gather( - *( - asyncio.create_task(s.save(context)) - for s in self.secondary_files - ) - ), - "streamable": self.streamable, - }, + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "token_type": self.token_type, + "enum_symbols": self.enum_symbols, + "expression_lib": self.expression_lib, + "file_format": self.file_format, + "full_js": self.full_js, + "glob": self.glob, + "load_contents": self.load_contents, + "load_listing": self.load_listing.value if self.load_listing else None, + "optional": self.optional, + "output_eval": self.output_eval, + "secondary_files": await asyncio.gather( + *(asyncio.create_task(s.save(context)) for s in self.secondary_files) + ), + "streamable": self.streamable, } async def process( @@ -751,13 +743,12 @@ async def _load( optional=row["optional"], ) - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{ - "processor": await self.processor.save(context), - "optional": self.optional, - }, + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "processor": await self.processor.save(context), + "optional": self.optional, } async def process(self, inputs: MutableMapping[str, Token], token: Token) -> Token: @@ -858,10 +849,11 @@ async def process( ) return token.update(token.value) - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{"processor": await self.processor.save(context)}, + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "processor": await self.processor.save(context) } @@ -907,24 +899,23 @@ async def _load( optional=row["optional"], ) - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{ - "processors": { - k: v - for k, v in zip( - self.processors.keys(), - await asyncio.gather( - *( - asyncio.create_task(p.save(context)) - for p in self.processors.values() - ) - ), - ) - }, - "optional": self.optional, + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "processors": { + k: v + for k, v in zip( + self.processors.keys(), + await asyncio.gather( + *( + asyncio.create_task(p.save(context)) + for p in self.processors.values() + ) + ), + ) }, + "optional": self.optional, } async def process(self, inputs: MutableMapping[str, Token], token: Token) -> Token: @@ -1018,26 +1009,25 @@ async def _load( output_eval=params["output_eval"], ) - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{ - "processors": { - k: v - for k, v in zip( - self.processors.keys(), - await asyncio.gather( - *( - asyncio.create_task(p.save(context)) - for p in self.processors.values() - ) - ), - ) - }, - "expression_lib": self.expression_lib, - "full_js": self.full_js, - "output_eval": self.output_eval, + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "processors": { + k: v + for k, v in zip( + self.processors.keys(), + await asyncio.gather( + *( + asyncio.create_task(p.save(context)) + for p in self.processors.values() + ) + ), + ) }, + "expression_lib": self.expression_lib, + "full_js": self.full_js, + "output_eval": self.output_eval, } async def process( @@ -1196,14 +1186,13 @@ async def _load( ), ) - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{ - "processors": await asyncio.gather( - *(asyncio.create_task(v.save(context)) for v in self.processors) - ) - }, + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "processors": await asyncio.gather( + *(asyncio.create_task(v.save(context)) for v in self.processors) + ) } def get_processor(self, token_value: Any) -> TokenProcessor: @@ -1272,14 +1261,13 @@ def _check_union_processor( self.check_processor[type(p)](p, token_value) for p in processor.processors ) - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{ - "processors": await asyncio.gather( - *(asyncio.create_task(v.save(context)) for v in self.processors) - ) - }, + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "processors": await asyncio.gather( + *(asyncio.create_task(v.save(context)) for v in self.processors) + ) } def get_processor(self, token_value: Any) -> CommandOutputProcessor: diff --git a/streamflow/cwl/step.py b/streamflow/cwl/step.py index f06396009..ffdfa9533 100644 --- a/streamflow/cwl/step.py +++ b/streamflow/cwl/step.py @@ -98,7 +98,7 @@ async def _process_file_token( for t in token_value["listing"] ) ) - new_token_value = {**new_token_value, **{"listing": listing}} + new_token_value |= {"listing": listing} return new_token_value @@ -161,14 +161,9 @@ def __init__(self, name: str, workflow: CWLWorkflow): async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "skip_ports": { - k: p.persistent_id for k, p in self.get_skip_ports().items() - } - }, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "skip_ports": {k: p.persistent_id for k, p in self.get_skip_ports().items()} } def add_skip_port(self, name: str, port: Port) -> None: @@ -233,14 +228,11 @@ async def _on_false(self, inputs: MutableMapping[str, Token]): async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "expression": self.expression, - "expression_lib": self.expression_lib, - "full_js": self.full_js, - }, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "expression": self.expression, + "expression_lib": self.expression_lib, + "full_js": self.full_js, } @classmethod @@ -375,10 +367,9 @@ async def _on_false(self, inputs: MutableMapping[str, Token]): async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{"scatter_method": self.scatter_method}, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "scatter_method": self.scatter_method } @@ -463,10 +454,9 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{"writable": self.writable}, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "writable": self.writable } async def _transfer_value(self, job: Job, token_value: Any) -> Any: @@ -590,14 +580,11 @@ async def _update_file_token( ) ) # Add size, checksum and format fields - new_token_value = { - **new_token_value, - **{ - "nameroot": token_value["nameroot"], - "nameext": token_value["nameext"], - "size": token_value["size"], - "checksum": original_checksum, - }, + new_token_value |= { + "nameroot": token_value["nameroot"], + "nameext": token_value["nameext"], + "size": token_value["size"], + "checksum": original_checksum, } if "format" in token_value: new_token_value["format"] = token_value["format"] diff --git a/streamflow/cwl/transformer.py b/streamflow/cwl/transformer.py index fa2753580..52b0002c1 100644 --- a/streamflow/cwl/transformer.py +++ b/streamflow/cwl/transformer.py @@ -135,13 +135,10 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "port_name": self.port_name, - "processor": await self.processor.save(context), - }, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "port_name": self.port_name, + "processor": await self.processor.save(context), } async def transform( @@ -181,10 +178,9 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{"default_port": self.default_port.persistent_id}, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "default_port": self.default_port.persistent_id } async def transform( @@ -364,19 +360,16 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: job_port = self.get_input_port("__job__") await job_port.save(context) - return { - **await super()._save_additional_params(context), - **{ - "port_name": self.port_name, - "processor": await self.processor.save(context), - "value_from": self.value_from, - "expression_lib": self.expression_lib, - "full_js": self.full_js, - "job_port": job_port.persistent_id, - }, + return await super()._save_additional_params(context) | { + "port_name": self.port_name, + "processor": await self.processor.save(context), + "value_from": self.value_from, + "expression_lib": self.expression_lib, + "full_js": self.full_js, + "job_port": job_port.persistent_id, } async def transform( @@ -384,16 +377,11 @@ async def transform( ) -> MutableMapping[str, Token | MutableSequence[Token]]: output_name = self.get_output_name() if output_name in inputs: - inputs = { - **inputs, - **{ - output_name: await self.processor.process( - inputs, inputs[output_name] - ) - }, + inputs |= { + output_name: await self.processor.process(inputs, inputs[output_name]) } context = utils.build_context(inputs) - context = {**context, **{"self": context["inputs"].get(output_name)}} + context |= {"self": context["inputs"].get(output_name)} token_value = utils.eval_expression( expression=self.value_from, context=context, @@ -456,8 +444,9 @@ async def transform( if self.loop_source_port else None ) - context = utils.build_context(loop_inputs) - context = {**context, **{"self": get_token_value(self_token)}} + context = utils.build_context(loop_inputs) | { + "self": get_token_value(self_token) + } return { self.get_output_name(): Token( tag=get_tag(inputs.values()), diff --git a/streamflow/cwl/translator.py b/streamflow/cwl/translator.py index cdeaf15af..1a5b9dfc6 100644 --- a/streamflow/cwl/translator.py +++ b/streamflow/cwl/translator.py @@ -125,7 +125,7 @@ def _create_command( ) -> CWLCommand: command = CWLCommand(step) # Process InitialWorkDirRequirement - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] if "InitialWorkDirRequirement" in requirements: command.initial_work_dir = requirements["InitialWorkDirRequirement"]["listing"] command.absolute_initial_workdir_allowed = ( @@ -194,7 +194,7 @@ def _create_command_output_processor_base( "long" if t == "int" else "double" if t == "float" else t for t in port_type ] # Process InlineJavascriptRequirement - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) # Create OutputProcessor if "File" in port_type: @@ -275,7 +275,7 @@ def _create_command_output_processor( # Enum type: -> create command output processor elif port_type["type"] == "enum": # Process InlineJavascriptRequirement - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) enum_prefix = ( utils.get_name(posixpath.sep, posixpath.sep, port_type["name"]) @@ -301,7 +301,7 @@ def _create_command_output_processor( # Record type: -> ObjectCommandOutputProcessor elif port_type["type"] == "record": # Process InlineJavascriptRequirement - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) # Create processor record_name_prefix = utils.get_name( @@ -613,7 +613,7 @@ def _create_token_processor( # Enum type: -> create output processor elif port_type["type"] == "enum": # Process InlineJavascriptRequirement - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) enum_prefix = ( utils.get_name(posixpath.sep, posixpath.sep, port_type["name"]) @@ -736,7 +736,7 @@ def _create_token_processor( # Simple type -> Create typed processor else: # Process InlineJavascriptRequirement - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] expression_lib, full_js = _process_javascript_requirement(requirements) # Create OutputProcessor if port_type == "File": @@ -1012,7 +1012,7 @@ def _get_hardware_requirement( def _get_load_listing( port_description: MutableMapping[str, Any], context: MutableMapping[str, Any] ) -> LoadListing: - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] if "loadListing" in port_description: return LoadListing[port_description["loadListing"]] elif ( @@ -1228,7 +1228,7 @@ def _process_loop_transformers( ) # Put transformer output ports in input ports map new_input_ports[input_name] = token_transformer.get_output_port() - return {**input_ports, **loop_input_ports, **new_input_ports} + return input_ports | loop_input_ports | new_input_ports def _process_transformers( @@ -1260,7 +1260,7 @@ def _process_transformers( ) # Put transformer output ports in input ports map new_input_ports[input_name] = token_transformer.get_output_port() - return {**input_ports, **new_input_ports} + return input_ports | new_input_ports def _remap_path( @@ -1598,7 +1598,7 @@ def _translate_command_line_tool( if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Translating {cwl_element.__class__.__name__} {name_prefix}") # Extract custom types if present - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] schema_def_types = _get_schema_def_types(requirements) # Process InlineJavascriptRequirement expression_lib, full_js = _process_javascript_requirement(requirements) @@ -1805,7 +1805,7 @@ def _translate_workflow( if logger.isEnabledFor(logging.DEBUG): logger.debug(f"Translating Workflow {step_name}") # Extract custom types if present - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] schema_def_types = _get_schema_def_types(requirements) # Extract JavaScript requirements expression_lib, full_js = _process_javascript_requirement(requirements) @@ -1975,7 +1975,7 @@ def _translate_workflow_step( context["hints"][hint["class"]] = hint for requirement in cwl_element.embedded_tool.requirements: context["requirements"][requirement["class"]] = requirement - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] # Extract JavaScript requirements expression_lib, full_js = _process_javascript_requirement(requirements) # Find scatter elements @@ -2035,12 +2035,11 @@ def _translate_workflow_step( posixpath.relpath(default_name, step_name), workflow.create_port() ) default_ports[default_name] = transformer.get_output_port() - input_ports = {**input_ports, **default_ports} + input_ports |= default_ports # Process loop inputs element_requirements = { - **{h["class"]: h for h in cwl_element.embedded_tool.hints}, - **{r["class"]: r for r in cwl_element.embedded_tool.requirements}, - } + h["class"]: h for h in cwl_element.embedded_tool.hints + } | {r["class"]: r for r in cwl_element.embedded_tool.requirements} if "http://commonwl.org/cwltool#Loop" in element_requirements: loop_requirement = element_requirements["http://commonwl.org/cwltool#Loop"] # Build combinator @@ -2197,7 +2196,7 @@ def _translate_workflow_step( ) # Save input ports in the global map - self.input_ports = {**self.input_ports, **input_ports} + self.input_ports |= input_ports # Process condition conditional_step = None if "when" in cwl_element.tool: @@ -2409,7 +2408,7 @@ def _translate_workflow_step( workflow.create_port(), ) loop_default_ports[default_name] = transformer.get_output_port() - loop_input_ports = {**loop_input_ports, **loop_default_ports} + loop_input_ports |= loop_default_ports # Process inputs again to attach ports to transformers loop_input_ports = _process_loop_transformers( step_name=step_name, @@ -2451,7 +2450,7 @@ def _translate_workflow_step( port_name, skip_port ) # Update output ports with the internal ones - self.output_ports = {**self.output_ports, **internal_output_ports} + self.output_ports |= internal_output_ports # Process inner element inner_cwl_name_prefix = utils.get_inner_cwl_prefix( cwl_name_prefix, name_prefix, cwl_element @@ -2464,7 +2463,7 @@ def _translate_workflow_step( cwl_name_prefix=inner_cwl_name_prefix, ) # Update output ports with the external ones - self.output_ports = {**self.output_ports, **external_output_ports} + self.output_ports |= external_output_ports def _translate_workflow_step_input( self, @@ -2496,10 +2495,7 @@ def _translate_workflow_step_input( port_name = posixpath.relpath(global_name, step_name) # Adjust type to handle scatter if global_name in scatter_inputs: - element_input = { - **element_input, - **{"type": element_input["type"]["items"]}, - } + element_input |= {"type": element_input["type"]["items"]} # If element contains `valueFrom` directive if "valueFrom" in element_input: # Check if StepInputExpressionRequirement is specified @@ -2702,7 +2698,7 @@ def translate(self) -> Workflow: context["hints"][hint["class"]] = hint for requirement in self.cwl_definition.requirements: context["requirements"][requirement["class"]] = requirement - requirements = {**context["hints"], **context["requirements"]} + requirements = context["hints"] | context["requirements"] # Extract workflow outputs cwl_elements = { utils.get_name("/", cwl_root_prefix, element["id"]): element diff --git a/streamflow/cwl/utils.py b/streamflow/cwl/utils.py index 3a2778009..f88179216 100644 --- a/streamflow/cwl/utils.py +++ b/streamflow/cwl/utils.py @@ -236,7 +236,7 @@ def build_context( output_directory: str | None = None, tmp_directory: str | None = None, hardware: Hardware | None = None, -) -> MutableMapping[str, Any]: +) -> dict[str, Any]: context = {"inputs": {}, "self": None, "runtime": {}} for name, token in inputs.items(): context["inputs"][name] = get_token_value(token) @@ -335,7 +335,7 @@ async def build_token_value( load_listing=load_listing, ) # Compute new secondary files from port specification - sf_context = {**js_context, **{"self": token_value}} + sf_context = js_context | {"self": token_value} if secondary_files: await process_secondary_files( context=context, @@ -1025,17 +1025,14 @@ async def update_file_token( filepath = get_path_from_token(token_value) if load_contents is not None: if load_contents and "contents" not in token_value: - token_value = { - **token_value, - **{ - "contents": await _get_contents( - connector, - location, - filepath, - token_value["size"], - cwl_version, - ) - }, + token_value |= { + "contents": await _get_contents( + connector, + location, + filepath, + token_value["size"], + cwl_version, + ) } elif not load_contents and "contents" in token_value: token_value = {k: token_value[k] for k in token_value if k != "contents"} @@ -1047,30 +1044,24 @@ async def update_file_token( token_value = {k: token_value[k] for k in token_value if k != "listing"} # If listing is not present or if the token needs a deep listing, process directory contents elif "listing" not in token_value or load_listing == LoadListing.deep_listing: - token_value = { - **token_value, - **{ - "listing": await get_listing( - context=context, - connector=connector, - cwl_version=cwl_version, - locations=[location], - dirpath=filepath, - load_contents=False, - recursive=load_listing == LoadListing.deep_listing, - ) - }, + token_value |= { + "listing": await get_listing( + context=context, + connector=connector, + cwl_version=cwl_version, + locations=[location], + dirpath=filepath, + load_contents=False, + recursive=load_listing == LoadListing.deep_listing, + ) } # If load listing is set to `shallow_listing`, remove the deep listing entries if present elif load_listing == LoadListing.shallow_listing: - token_value = { - **token_value, - **{ - "listing": [ - {k: v[k] for k in v if k != "listing"} - for v in token_value["listing"] - ] - }, + token_value |= { + "listing": [ + {k: v[k] for k in v if k != "listing"} + for v in token_value["listing"] + ] } return token_value diff --git a/streamflow/cwl/workflow.py b/streamflow/cwl/workflow.py index cddbf01ed..3a82b6456 100644 --- a/streamflow/cwl/workflow.py +++ b/streamflow/cwl/workflow.py @@ -30,17 +30,12 @@ def __init__( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "cwl_version": self.cwl_version, - "format_graph": ( - self.format_graph.serialize() - if self.format_graph is not None - else None - ), - }, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "cwl_version": self.cwl_version, + "format_graph": ( + self.format_graph.serialize() if self.format_graph is not None else None + ), } @classmethod diff --git a/streamflow/deployment/connector/base.py b/streamflow/deployment/connector/base.py index 0d69ee708..c7a102cac 100644 --- a/streamflow/deployment/connector/base.py +++ b/streamflow/deployment/connector/base.py @@ -383,7 +383,7 @@ async def run( run_command = self._get_run_command(command, location) proc = await asyncio.create_subprocess_exec( *shlex.split(" ".join(run_command)), - env=({**os.environ, **location.environment}), + env=(os.environ | location.environment), stdin=None, stdout=( asyncio.subprocess.PIPE diff --git a/streamflow/deployment/connector/queue_manager.py b/streamflow/deployment/connector/queue_manager.py index 2ee178a5e..382e93661 100644 --- a/streamflow/deployment/connector/queue_manager.py +++ b/streamflow/deployment/connector/queue_manager.py @@ -956,7 +956,7 @@ async def _run_batch_command( if stderr == stdout: batch_command.append(get_option("j", "oe")) if service := cast(PBSService, self.services.get(location.service)): - resources = {**service.resources, **resources} + resources = service.resources | resources batch_command.extend( [ get_option("a", service.begin), diff --git a/streamflow/deployment/manager.py b/streamflow/deployment/manager.py index 64c5185c9..322e5bcfd 100644 --- a/streamflow/deployment/manager.py +++ b/streamflow/deployment/manager.py @@ -154,12 +154,10 @@ async def _inner_deploy( return DeploymentConfig( name=deployment_config.name, type=deployment_config.type, - config={ - **deployment_config.config, - **{ - "connector": self.deployments_map[deployment_name], - "service": service, - }, + config=deployment_config.config + | { + "connector": self.deployments_map[deployment_name], + "service": service, }, external=deployment_config.external, lazy=deployment_config.lazy, diff --git a/streamflow/ext/utils.py b/streamflow/ext/utils.py index c2199076c..f4133cf3a 100644 --- a/streamflow/ext/utils.py +++ b/streamflow/ext/utils.py @@ -31,11 +31,10 @@ def _filter_by_name(classes: MutableMapping[str, Any], name: str): def _flatten_all_of(entity_schema): for obj in entity_schema["allOf"]: if "allOf" in obj: - obj["properties"] = {**_flatten_all_of(obj), **obj.get("properties", {})} - entity_schema["properties"] = { - **obj.get("properties", {}), - **entity_schema.get("properties", {}), - } + obj["properties"] = _flatten_all_of(obj) | obj.get("properties", {}) + entity_schema["properties"] = obj.get("properties", {}) | entity_schema.get( + "properties", {} + ) del entity_schema["allOf"] return dict(sorted(entity_schema["properties"].items())) @@ -126,7 +125,7 @@ def _split_refs(refs: MutableMapping[str, Any], processed: MutableSequence[str]) ] processed.append(k) if subrefs := {k: v for k, v in subrefs.items() if k not in processed}: - refs_descs = {**refs_descs, **_split_refs(subrefs, processed)} + refs_descs |= _split_refs(subrefs, processed) return refs_descs diff --git a/streamflow/main.py b/streamflow/main.py index e6c4a025f..5d295626a 100644 --- a/streamflow/main.py +++ b/streamflow/main.py @@ -199,7 +199,7 @@ def _get_instance_from_config( if config is not None: enabled = config.get("enabled", enabled_by_default) class_name = config.get("type", "default" if enabled else "dummy") - kwargs = {**kwargs, **config.get("config", {})} + kwargs |= config.get("config", {}) else: class_name = "default" if enabled_by_default else "dummy" class_ = classes[class_name] diff --git a/streamflow/persistence/sqlite.py b/streamflow/persistence/sqlite.py index 55c967b73..4aa81f38f 100644 --- a/streamflow/persistence/sqlite.py +++ b/streamflow/persistence/sqlite.py @@ -498,87 +498,79 @@ async def get_workflows_list( return await cursor.fetchall() async def update_deployment( - self, deployment_id: int, updates: MutableMapping[str, Any] + self, deployment_id: int, updates: dict[str, Any] ) -> int: async with self.connection as db: await db.execute( "UPDATE deployment SET {} WHERE id = :id".format( # nosec ", ".join([f"{k} = :{k}" for k in updates]) ), - {**updates, **{"id": deployment_id}}, + updates | {"id": deployment_id}, ) self.deployment_cache.pop(deployment_id, None) return deployment_id - async def update_execution( - self, execution_id: int, updates: MutableMapping[str, Any] - ) -> int: + async def update_execution(self, execution_id: int, updates: dict[str, Any]) -> int: async with self.connection as db: await db.execute( "UPDATE execution SET {} WHERE id = :id".format( # nosec ", ".join([f"{k} = :{k}" for k in updates]) ), - {**updates, **{"id": execution_id}}, + updates | {"id": execution_id}, ) return execution_id - async def update_filter( - self, filter_id: int, updates: MutableMapping[str, Any] - ) -> int: + async def update_filter(self, filter_id: int, updates: dict[str, Any]) -> int: async with self.connection as db: await db.execute( "UPDATE filter SET {} WHERE id = :id".format( # nosec ", ".join([f"{k} = :{k}" for k in updates]) ), - {**updates, **{"id": filter_id}}, + updates | {"id": filter_id}, ) self.filter_cache.pop(filter_id, None) return filter_id - async def update_port(self, port_id: int, updates: MutableMapping[str, Any]) -> int: + async def update_port(self, port_id: int, updates: dict[str, Any]) -> int: async with self.connection as db: await db.execute( "UPDATE port SET {} WHERE id = :id".format( # nosec ", ".join([f"{k} = :{k}" for k in updates]) ), - {**updates, **{"id": port_id}}, + updates | {"id": port_id}, ) self.port_cache.pop(port_id, None) return port_id - async def update_step(self, step_id: int, updates: MutableMapping[str, Any]) -> int: + async def update_step(self, step_id: int, updates: dict[str, Any]) -> int: async with self.connection as db: await db.execute( "UPDATE step SET {} WHERE id = :id".format( # nosec ", ".join([f"{k} = :{k}" for k in updates]) ), - {**updates, **{"id": step_id}}, + updates | {"id": step_id}, ) self.step_cache.pop(step_id, None) return step_id - async def update_target( - self, target_id: int, updates: MutableMapping[str, Any] - ) -> int: + async def update_target(self, target_id: int, updates: dict[str, Any]) -> int: async with self.connection as db: await db.execute( "UPDATE target SET {} WHERE id = :id".format( # nosec ", ".join([f"{k} = :{k}" for k in updates]) ), - {**updates, **{"id": target_id}}, + updates | {"id": target_id}, ) self.target_cache.pop(target_id, None) return target_id - async def update_workflow( - self, workflow_id: int, updates: MutableMapping[str, Any] - ) -> int: + async def update_workflow(self, workflow_id: int, updates: dict[str, Any]) -> int: async with self.connection as db: await db.execute( "UPDATE workflow SET {} WHERE id = :id".format( # nosec ", ".join([f"{k} = :{k}" for k in updates]) ), - {**updates, **{"id": workflow_id}}, + updates | {"id": workflow_id}, ) self.workflow_cache.pop(workflow_id, None) return workflow_id diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index d4ebc5bb9..881a578db 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -154,7 +154,7 @@ def _get_item_preview(tag: Any, text: Any, value: Any) -> None: ) -def _get_workflow_template(entity_id: str, name: str) -> MutableMapping[str, Any]: +def _get_workflow_template(entity_id: str, name: str) -> dict[str, Any]: return { "@id": entity_id, "@type": ["SoftwareSourceCode", "ComputationalWorkflow", "HowTo"], @@ -1127,12 +1127,12 @@ def _get_workflow( # Create entity entity_id = _get_cwl_entity_id(cwl_workflow.tool["id"]) path = cwl_workflow.tool["id"].split("#")[0][7:] - jsonld_workflow = cast(MutableMapping[str, Any], _get_workflow_template( + jsonld_workflow = _get_workflow_template( entity_id, entity_id.split("#")[-1] ) | cast( - type[MutableMapping[str, Any]], + dict[str, Any], {"sha1": _file_checksum(path, hashlib.new("sha1", usedforsecurity=False))}, - )) + ) if (path := cwl_workflow.tool["id"].split("#")[0][7:]) not in self.files_map: self.graph["./"]["hasPart"].append({"@id": entity_id}) self.files_map[path] = os.path.basename(path) diff --git a/streamflow/workflow/combinator.py b/streamflow/workflow/combinator.py index 622ecba73..471e01bf6 100644 --- a/streamflow/workflow/combinator.py +++ b/streamflow/workflow/combinator.py @@ -50,7 +50,7 @@ async def _product( schema = {} for key in self.items: if key in self.combinators: - schema = {**schema, **config[key]} + schema |= config[key] else: schema[key] = config[key] suffix = [t.tag.split(".")[-1] for t in schema.values()] @@ -62,11 +62,10 @@ async def _product( for k, t in schema.items() } - async def _save_additional_params(self, context: StreamFlowContext): - return { - **await super()._save_additional_params(context), - **{"depth": self.depth}, - } + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | {"depth": self.depth} async def combine( self, @@ -122,7 +121,7 @@ async def _product(self) -> AsyncIterable[MutableMapping[str, Token]]: for key, elements in self._token_values[tag].items(): element = elements.pop() if key in self.combinators: - schema = {**schema, **element} + schema |= element else: schema[key] = { "token": element, @@ -218,9 +217,10 @@ async def _load( combinator.add_output_item(item) return combinator - async def _save_additional_params(self, context: StreamFlowContext): + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: # self._token_values is not saved because it is always empty at the beginning of execution - return { - **await super()._save_additional_params(context), - **{"output_items": self.output_items}, + return await super()._save_additional_params(context) | { + "output_items": self.output_items } diff --git a/streamflow/workflow/step.py b/streamflow/workflow/step.py index 207448694..95e0d7142 100644 --- a/streamflow/workflow/step.py +++ b/streamflow/workflow/step.py @@ -172,7 +172,9 @@ async def _load( workflow=await loading_context.load_workflow(context, row["workflow"]), ) - async def _save_additional_params(self, context: StreamFlowContext): + async def _save_additional_params( + self, context: StreamFlowContext + ) -> dict[str, Any]: return { "name": self.name, "combinators": { @@ -195,10 +197,7 @@ async def _save_additional_params(self, context: StreamFlowContext): def add_combinator(self, combinator: Combinator, items: set[str]) -> None: self.combinators[combinator.name] = combinator self.items.append(combinator.name) - self.combinators_map = { - **self.combinators_map, - **{p: combinator.name for p in items}, - } + self.combinators_map |= {p: combinator.name for p in items} def add_item(self, item: str) -> None: self.items.append(item) @@ -304,10 +303,9 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{"combinator": await self.combinator.save(context)}, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "combinator": await self.combinator.save(context) } async def run(self): @@ -476,16 +474,13 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: await self.deployment_config.save(context) - return { - **await super()._save_additional_params(context), - **{ - "deployment_config": self.deployment_config.persistent_id, - "connector_port": self.get_output_port( - self.deployment_config.name - ).persistent_id, - }, + return await super()._save_additional_params(context) | { + "deployment_config": self.deployment_config.persistent_id, + "connector_port": self.get_output_port( + self.deployment_config.name + ).persistent_id, } def add_output_port(self, name: str, port: ConnectorPort) -> None: @@ -750,26 +745,23 @@ async def _run_job( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{ - "job_port": self.get_input_port("__job__").persistent_id, - "output_connectors": self.output_connectors, - "output_processors": { - k: v - for k, v in zip( - self.output_processors.keys(), - await asyncio.gather( - *( - asyncio.create_task(p.save(context)) - for p in self.output_processors.values() - ) - ), - ) - }, - "command": await self.command.save(context) if self.command else None, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "job_port": self.get_input_port("__job__").persistent_id, + "output_connectors": self.output_connectors, + "output_processors": { + k: v + for k, v in zip( + self.output_processors.keys(), + await asyncio.gather( + *( + asyncio.create_task(p.save(context)) + for p in self.output_processors.values() + ) + ), + ) }, + "command": await self.command.save(context) if self.command else None, } def add_output_port( @@ -910,11 +902,11 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: await self.get_size_port().save(context) - return { - **await super()._save_additional_params(context), - **{"depth": self.depth, "size_port": self.get_size_port().persistent_id}, + return await super()._save_additional_params(context) | { + "depth": self.depth, + "size_port": self.get_size_port().persistent_id, } def add_input_port(self, name: str, port: Port) -> None: @@ -1044,10 +1036,9 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{"job_port": self.get_input_port("__job__").persistent_id}, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "job_port": self.get_input_port("__job__").persistent_id } def add_output_port(self, name: str, port: Port) -> None: @@ -1411,23 +1402,20 @@ async def _propagate_job( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: await self.get_output_port("__job__").save(context) - params = { - **await super()._save_additional_params(context), - **{ - "connector_ports": { - name: self.get_input_port(name).persistent_id - for name in self.input_ports - if name.startswith("__connector__") - }, - "job_port": self.get_output_port("__job__").persistent_id, - "binding_config": await self.binding_config.save(context), - "job_prefix": self.job_prefix, - "input_directory": self.input_directory, - "output_directory": self.output_directory, - "tmp_directory": self.tmp_directory, + params = await super()._save_additional_params(context) | { + "connector_ports": { + name: self.get_input_port(name).persistent_id + for name in self.input_ports + if name.startswith("__connector__") }, + "job_port": self.get_output_port("__job__").persistent_id, + "binding_config": await self.binding_config.save(context), + "job_prefix": self.job_prefix, + "input_directory": self.input_directory, + "output_directory": self.output_directory, + "tmp_directory": self.tmp_directory, } if self.hardware_requirement: params["hardware_requirement"] = await self.hardware_requirement.save( @@ -1589,11 +1577,10 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: + ) -> dict[str, Any]: await self.get_size_port().save(context) - return { - **await super()._save_additional_params(context), - **{"size_port": self.get_size_port().persistent_id}, + return await super()._save_additional_params(context) | { + "size_port": self.get_size_port().persistent_id } async def _scatter(self, token: Token): @@ -1691,10 +1678,9 @@ async def _load( async def _save_additional_params( self, context: StreamFlowContext - ) -> MutableMapping[str, Any]: - return { - **await super()._save_additional_params(context), - **{"job_port": self.get_input_port("__job__").persistent_id}, + ) -> dict[str, Any]: + return await super()._save_additional_params(context) | { + "job_port": self.get_input_port("__job__").persistent_id } async def run(self):