From bccf595abb307786faa85fa56093c31d0e42c546 Mon Sep 17 00:00:00 2001 From: joschrew Date: Fri, 17 Nov 2023 15:48:38 +0100 Subject: [PATCH 01/12] Change api workflow paths in ocrd_network --- ocrd_network/ocrd_network/processing_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index f0bad5429..4913f44db 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -195,7 +195,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None: ) self.router.add_api_route( - path='/workflow', + path='/workflow/run', endpoint=self.run_workflow, methods=['POST'], tags=['workflow', 'processing'], @@ -209,7 +209,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None: ) self.router.add_api_route( - path='/workflow/{workflow_job_id}', + path='/workflow/job/{workflow_job_id}', endpoint=self.get_workflow_info, methods=['GET'], tags=['workflow', 'processing'], From 0569222da6c53b96a61f1a79a066a842157662f6 Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 20 Nov 2023 11:09:55 +0100 Subject: [PATCH 02/12] Change error message with ssh client creation --- ocrd_network/ocrd_network/deployment_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocrd_network/ocrd_network/deployment_utils.py b/ocrd_network/ocrd_network/deployment_utils.py index c56f2851e..8c3ff7e46 100644 --- a/ocrd_network/ocrd_network/deployment_utils.py +++ b/ocrd_network/ocrd_network/deployment_utils.py @@ -24,7 +24,7 @@ def create_ssh_client(address: str, username: str, password: str = "", keypath: try: client.connect(hostname=address, username=username, password=password, key_filename=keypath) except Exception as error: - raise Exception(f"Error creating SSHClient of host '{address}', reason:") from error + raise Exception(f"Error creating SSHClient of host '{address}', reason: {error}") from error return client From db4b096758318ad2d1198b9d5806ea1e6d52a71a Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 20 Nov 2023 11:11:20 +0100 Subject: [PATCH 03/12] Add db model/table for workflow script uploading --- ocrd_network/ocrd_network/database.py | 17 +++++++++++++++-- ocrd_network/ocrd_network/models/__init__.py | 2 ++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/ocrd_network/ocrd_network/database.py b/ocrd_network/ocrd_network/database.py index 019eb071c..68725caca 100644 --- a/ocrd_network/ocrd_network/database.py +++ b/ocrd_network/ocrd_network/database.py @@ -22,7 +22,8 @@ from .models import ( DBProcessorJob, DBWorkflowJob, - DBWorkspace + DBWorkspace, + DBWorkflowScript, ) from .utils import call_sync @@ -31,7 +32,7 @@ async def initiate_database(db_url: str): client = AsyncIOMotorClient(db_url) await init_beanie( database=client.get_default_database(default='ocrd'), - document_models=[DBProcessorJob, DBWorkflowJob, DBWorkspace] + document_models=[DBProcessorJob, DBWorkflowJob, DBWorkspace, DBWorkflowScript] ) @@ -199,3 +200,15 @@ async def db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]: @call_sync async def sync_db_get_processing_jobs(job_ids: List[str]) -> [DBProcessorJob]: return await db_get_processing_jobs(job_ids) + + +async def db_get_workflow_script(workflow_id: str) -> DBWorkflowScript: + workflow = await DBWorkflowScript.find_one(DBWorkflowScript.workflow_id == workflow_id) + if not workflow: + raise ValueError(f'Workflow-script with id "{workflow_id}" not in the DB.') + return workflow + + +@call_sync +async def sync_db_get_workflow_script(workflow_id: str) -> DBWorkflowScript: + return await db_get_workflow_script(workflow_id) diff --git a/ocrd_network/ocrd_network/models/__init__.py b/ocrd_network/ocrd_network/models/__init__.py index a3abdb748..dc8231f76 100644 --- a/ocrd_network/ocrd_network/models/__init__.py +++ b/ocrd_network/ocrd_network/models/__init__.py @@ -7,6 +7,7 @@ 'DBProcessorJob', 'DBWorkflowJob', 'DBWorkspace', + 'DBWorkflowScript', 'PYJobInput', 'PYJobOutput', 'PYOcrdTool', @@ -26,3 +27,4 @@ from .messages import PYResultMessage from .ocrd_tool import PYOcrdTool from .workspace import DBWorkspace +from .workflow import DBWorkflowScript From 4d508eb2c549cfc8bdf00b1cd9ba8c1980fcbe7a Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 20 Nov 2023 11:12:26 +0100 Subject: [PATCH 04/12] Add endpoints for workflow script management --- .../ocrd_network/processing_server.py | 74 ++++++++++++++++++- 1 file changed, 72 insertions(+), 2 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 4913f44db..3e51722eb 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -14,7 +14,7 @@ UploadFile ) from fastapi.exceptions import RequestValidationError -from fastapi.responses import FileResponse, JSONResponse +from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse from pika.exceptions import ChannelClosedByBroker from ocrd.task_sequence import ProcessorTask @@ -29,13 +29,15 @@ db_get_workflow_job, db_get_workspace, db_update_processing_job, - db_update_workspace + db_update_workspace, + db_get_workflow_script, ) from .deployer import Deployer from .logging import get_processing_server_logging_file_path from .models import ( DBProcessorJob, DBWorkflowJob, + DBWorkflowScript, PYJobInput, PYJobOutput, PYResultMessage, @@ -217,6 +219,31 @@ def __init__(self, config_path: str, host: str, port: int) -> None: summary='Get information about a workflow run', ) + self.router.add_api_route( + path='/workflow', + endpoint=self.upload_workflow, + methods=['POST'], + tags=['workflow'], + status_code=status.HTTP_200_OK, + summary='Upload/Register a new workflow script', + ) + self.router.add_api_route( + path='/workflow/{workflow_id}', + endpoint=self.replace_workflow, + methods=['PUT'], + tags=['workflow'], + status_code=status.HTTP_200_OK, + summary='Update/Replace a workflow script', + ) + self.router.add_api_route( + path='/workflow/{workflow_id}', + endpoint=self.download_workflow, + methods=['GET'], + tags=['workflow'], + status_code=status.HTTP_200_OK, + summary='Download a workflow script', + ) + @self.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): exc_str = f'{exc}'.replace('\n', ' ').replace(' ', ' ') @@ -817,3 +844,46 @@ async def get_workflow_info(self, workflow_job_id) -> Dict: "page_id": job.page_id, }) return res + + async def upload_workflow(self, workflow: UploadFile) -> str: + """ Store a script for a workflow in the database + """ + workflow_id = generate_id() + content = (await workflow.read()).decode("utf-8") + + db_workflow_script = DBWorkflowScript( + workflow_id=workflow_id, + content=content, + ) + await db_workflow_script.insert() + return workflow_id + + async def replace_workflow(self, workflow_id, workflow: UploadFile) -> None: + """ Update a workflow script file in the database + """ + try: + # Currently only replace is supported, not creating with put + db_workflow_script = await db_get_workflow_script(workflow_id) + content = (await workflow.read()).decode("utf-8") + db_workflow_script.content = content + await db_workflow_script.save() + return db_workflow_script.workflow_id + except ValueError as e: + self.log.exception(f"Workflow with id '{workflow_id}' not existing, error: {e}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow-script with id '{workflow_id}' not existing" + ) + + async def download_workflow(self, workflow_id) -> PlainTextResponse: + """ Load workflow-script from the database + """ + try: + workflow = await db_get_workflow_script(workflow_id) + return PlainTextResponse(workflow.content) + except ValueError as e: + self.log.exception(f"Workflow with id '{workflow_id}' not existing, error: {e}") + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow-script with id '{workflow_id}' not existing" + ) From 13cb763e7b12eb784c20a3d8e262f5c51f952cc8 Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 20 Nov 2023 13:43:22 +0100 Subject: [PATCH 05/12] Validate workflow before saving to database --- ocrd_network/ocrd_network/processing_server.py | 18 ++++++++++++------ ocrd_network/ocrd_network/utils.py | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 3e51722eb..3d1f23ce1 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -63,7 +63,8 @@ download_ocrd_all_tool_json, generate_created_time, generate_id, - get_ocrd_workspace_physical_pages + get_ocrd_workspace_physical_pages, + validate_workflow, ) from urllib.parse import urljoin @@ -850,6 +851,9 @@ async def upload_workflow(self, workflow: UploadFile) -> str: """ workflow_id = generate_id() content = (await workflow.read()).decode("utf-8") + if not validate_workflow(content): + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Provided workflow script is invalid") db_workflow_script = DBWorkflowScript( workflow_id=workflow_id, @@ -858,22 +862,24 @@ async def upload_workflow(self, workflow: UploadFile) -> str: await db_workflow_script.insert() return workflow_id - async def replace_workflow(self, workflow_id, workflow: UploadFile) -> None: + async def replace_workflow(self, workflow_id, workflow: UploadFile) -> str: """ Update a workflow script file in the database """ + content = (await workflow.read()).decode("utf-8") + if not validate_workflow(content): + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Provided workflow script is invalid") try: - # Currently only replace is supported, not creating with put db_workflow_script = await db_get_workflow_script(workflow_id) - content = (await workflow.read()).decode("utf-8") db_workflow_script.content = content - await db_workflow_script.save() - return db_workflow_script.workflow_id except ValueError as e: self.log.exception(f"Workflow with id '{workflow_id}' not existing, error: {e}") raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Workflow-script with id '{workflow_id}' not existing" ) + await db_workflow_script.save() + return db_workflow_script.workflow_id async def download_workflow(self, workflow_id) -> PlainTextResponse: """ Load workflow-script from the database diff --git a/ocrd_network/ocrd_network/utils.py b/ocrd_network/ocrd_network/utils.py index f613eecdd..0aeca165c 100644 --- a/ocrd_network/ocrd_network/utils.py +++ b/ocrd_network/ocrd_network/utils.py @@ -12,6 +12,7 @@ from ocrd import Resolver, Workspace from ocrd_validators import ProcessingServerConfigValidator from .rabbitmq_utils import OcrdResultMessage +from ocrd.task_sequence import ProcessorTask # Based on: https://gist.github.com/phizaz/20c36c6734878c6ec053245a477572ec @@ -147,3 +148,20 @@ def stop_mets_server(mets_server_url: str) -> bool: if response.status_code == 200: return True return False + + +def validate_workflow(workflow: str, logger=None) -> bool: + """ Check that workflow is not empty and parseable to a lists of ProcessorTask + """ + if workflow.strip() == "": + if logger: + logger.info("Workflow is invalid (empty string)") + return + try: + tasks_list = workflow.splitlines() + [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] + except BaseException as e: + if logger: + logger.info(f"Workflow is invalid, parsing to ProcessorTasks failed: {e}") + return False + return True From 8eee19dbfa1e663b2d9d4906559cfcc7d43f5a1c Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 20 Nov 2023 13:44:08 +0100 Subject: [PATCH 06/12] Run workflow, optional with script from mongodb --- .../ocrd_network/processing_server.py | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 3d1f23ce1..c53b5757d 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -3,7 +3,7 @@ import requests import httpx from os import getpid -from typing import Dict, List +from typing import Dict, List, Union import uvicorn from fastapi import ( @@ -11,7 +11,8 @@ status, Request, HTTPException, - UploadFile + UploadFile, + File, ) from fastapi.exceptions import RequestValidationError from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse @@ -734,8 +735,9 @@ async def task_sequence_to_processing_jobs( async def run_workflow( self, - workflow: UploadFile, mets_path: str, + workflow: Union[UploadFile, None] = File(None), + workflow_id: str = None, agent_type: str = 'worker', page_id: str = None, page_wise: bool = False, @@ -749,7 +751,19 @@ async def run_workflow( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Mets file not existing: {mets_path}") - workflow = (await workflow.read()).decode("utf-8") + if not workflow: + if not workflow_id: + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Either workflow or workflow_id must be provided") + try: + workflow = await db_get_workflow_script(workflow_id) + except ValueError: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow with id '{workflow_id}' not found") + workflow = workflow.content + else: + workflow = (await workflow.read()).decode("utf-8") + try: tasks_list = workflow.splitlines() tasks = [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] From 4b5ee5ab5d3e0957ad9bab6a895e85b57a51a386 Mon Sep 17 00:00:00 2001 From: joschrew Date: Mon, 20 Nov 2023 16:05:55 +0100 Subject: [PATCH 07/12] Change response of workflow script upload --- ocrd_network/ocrd_network/processing_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index c53b5757d..a46edfa31 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -860,7 +860,7 @@ async def get_workflow_info(self, workflow_job_id) -> Dict: }) return res - async def upload_workflow(self, workflow: UploadFile) -> str: + async def upload_workflow(self, workflow: UploadFile) -> Dict: """ Store a script for a workflow in the database """ workflow_id = generate_id() @@ -874,7 +874,7 @@ async def upload_workflow(self, workflow: UploadFile) -> str: content=content, ) await db_workflow_script.insert() - return workflow_id + return {"workflow_id": workflow_id} async def replace_workflow(self, workflow_id, workflow: UploadFile) -> str: """ Update a workflow script file in the database From fe41d201da9b2dd8e066f7790852267e0d5fb79f Mon Sep 17 00:00:00 2001 From: joschrew Date: Tue, 21 Nov 2023 17:05:37 +0100 Subject: [PATCH 08/12] Add prev missed file for workflow model --- ocrd_network/ocrd_network/models/workflow.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 ocrd_network/ocrd_network/models/workflow.py diff --git a/ocrd_network/ocrd_network/models/workflow.py b/ocrd_network/ocrd_network/models/workflow.py new file mode 100644 index 000000000..3387a9035 --- /dev/null +++ b/ocrd_network/ocrd_network/models/workflow.py @@ -0,0 +1,8 @@ +from beanie import Document + + +class DBWorkflowScript(Document): + """ Model to store a workflow-script in the database + """ + workflow_id: str + content: str From 93a742d61ca46b739264477a50254e9f55b7174a Mon Sep 17 00:00:00 2001 From: joschrew <91774427+joschrew@users.noreply.github.com> Date: Tue, 21 Nov 2023 17:10:12 +0100 Subject: [PATCH 09/12] Update ocrd_network/ocrd_network/processing_server.py Co-authored-by: Konstantin Baierer --- ocrd_network/ocrd_network/processing_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index a46edfa31..5138cb5ff 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -226,7 +226,7 @@ def __init__(self, config_path: str, host: str, port: int) -> None: endpoint=self.upload_workflow, methods=['POST'], tags=['workflow'], - status_code=status.HTTP_200_OK, + status_code=status.HTTP_201_CREATED, summary='Upload/Register a new workflow script', ) self.router.add_api_route( From 37813adebd6f62ae9820b04530edacb5556e981c Mon Sep 17 00:00:00 2001 From: joschrew <91774427+joschrew@users.noreply.github.com> Date: Tue, 21 Nov 2023 17:11:25 +0100 Subject: [PATCH 10/12] Update ocrd_network/ocrd_network/utils.py Co-authored-by: Konstantin Baierer --- ocrd_network/ocrd_network/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ocrd_network/ocrd_network/utils.py b/ocrd_network/ocrd_network/utils.py index 0aeca165c..5a526f999 100644 --- a/ocrd_network/ocrd_network/utils.py +++ b/ocrd_network/ocrd_network/utils.py @@ -153,10 +153,10 @@ def stop_mets_server(mets_server_url: str) -> bool: def validate_workflow(workflow: str, logger=None) -> bool: """ Check that workflow is not empty and parseable to a lists of ProcessorTask """ - if workflow.strip() == "": + if not workflow.strip(): if logger: logger.info("Workflow is invalid (empty string)") - return + return False try: tasks_list = workflow.splitlines() [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] From d695f4955dfbc7d4be166ebb2db9520a1d7b7524 Mon Sep 17 00:00:00 2001 From: joschrew Date: Thu, 23 Nov 2023 10:00:59 +0100 Subject: [PATCH 11/12] Raise ValueError when parsing ProcessorTask fails --- ocrd/ocrd/task_sequence.py | 4 ++-- ocrd_network/ocrd_network/processing_server.py | 2 +- ocrd_network/ocrd_network/utils.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ocrd/ocrd/task_sequence.py b/ocrd/ocrd/task_sequence.py index dcf9c86bc..6d3860186 100644 --- a/ocrd/ocrd/task_sequence.py +++ b/ocrd/ocrd/task_sequence.py @@ -35,7 +35,7 @@ def parse(cls, argstr): set_json_key_value_overrides(parameters, tokens[1:3]) tokens = tokens[3:] else: - raise Exception("Failed parsing task description '%s' with tokens remaining: '%s'" % (argstr, tokens)) + raise ValueError("Failed parsing task description '%s' with tokens remaining: '%s'" % (argstr, tokens)) return cls(executable, input_file_grps, output_file_grps, parameters) def __init__(self, executable, input_file_grps, output_file_grps, parameters): @@ -108,7 +108,7 @@ def validate_tasks(tasks, workspace, page_id=None, overwrite=False): # TODO disable output_file_grps checks once CLI parameter 'overwrite' is implemented # XXX Thu Jan 16 20:14:17 CET 2020 still not sufficiently clever. # if len(prev_output_file_grps) != len(set(prev_output_file_grps)): - # report.add_error("Output file group specified multiple times: %s" % + # report.add_error("Output file group specified multiple times: %s" % # [grp for grp, count in Counter(prev_output_file_grps).items() if count >= 2]) prev_output_file_grps += task.output_file_grps if not report.is_valid: diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 5138cb5ff..882c07956 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -767,7 +767,7 @@ async def run_workflow( try: tasks_list = workflow.splitlines() tasks = [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] - except BaseException as e: + except ValueError as e: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Error parsing tasks: {e}") diff --git a/ocrd_network/ocrd_network/utils.py b/ocrd_network/ocrd_network/utils.py index 5a526f999..8f92706f4 100644 --- a/ocrd_network/ocrd_network/utils.py +++ b/ocrd_network/ocrd_network/utils.py @@ -160,7 +160,7 @@ def validate_workflow(workflow: str, logger=None) -> bool: try: tasks_list = workflow.splitlines() [ProcessorTask.parse(task_str) for task_str in tasks_list if task_str.strip()] - except BaseException as e: + except ValueError as e: if logger: logger.info(f"Workflow is invalid, parsing to ProcessorTasks failed: {e}") return False From d14c787029392435eb362bc2ab485948eabb0b0d Mon Sep 17 00:00:00 2001 From: joschrew Date: Thu, 23 Nov 2023 10:38:58 +0100 Subject: [PATCH 12/12] Avoid workflow-script redundancy when uploading --- ocrd_network/ocrd_network/database.py | 12 ++++++++++++ ocrd_network/ocrd_network/models/workflow.py | 1 + ocrd_network/ocrd_network/processing_server.py | 14 ++++++++++++++ 3 files changed, 27 insertions(+) diff --git a/ocrd_network/ocrd_network/database.py b/ocrd_network/ocrd_network/database.py index 68725caca..4945c1335 100644 --- a/ocrd_network/ocrd_network/database.py +++ b/ocrd_network/ocrd_network/database.py @@ -212,3 +212,15 @@ async def db_get_workflow_script(workflow_id: str) -> DBWorkflowScript: @call_sync async def sync_db_get_workflow_script(workflow_id: str) -> DBWorkflowScript: return await db_get_workflow_script(workflow_id) + + +async def db_find_first_workflow_script_by_content(content_hash: str) -> DBWorkflowScript: + workflow = await DBWorkflowScript.find_one(DBWorkflowScript.content_hash == content_hash) + if not workflow: + raise ValueError(f'Workflow-script with content_hash "{content_hash}" not in the DB.') + return workflow + + +@call_sync +async def sync_db_find_first_workflow_script_by_content(workflow_id: str) -> DBWorkflowScript: + return await db_get_workflow_script(workflow_id) diff --git a/ocrd_network/ocrd_network/models/workflow.py b/ocrd_network/ocrd_network/models/workflow.py index 3387a9035..53479e49e 100644 --- a/ocrd_network/ocrd_network/models/workflow.py +++ b/ocrd_network/ocrd_network/models/workflow.py @@ -6,3 +6,4 @@ class DBWorkflowScript(Document): """ workflow_id: str content: str + content_hash: str diff --git a/ocrd_network/ocrd_network/processing_server.py b/ocrd_network/ocrd_network/processing_server.py index 882c07956..5f740d3c5 100644 --- a/ocrd_network/ocrd_network/processing_server.py +++ b/ocrd_network/ocrd_network/processing_server.py @@ -32,6 +32,7 @@ db_update_processing_job, db_update_workspace, db_get_workflow_script, + db_find_first_workflow_script_by_content ) from .deployer import Deployer from .logging import get_processing_server_logging_file_path @@ -68,6 +69,7 @@ validate_workflow, ) from urllib.parse import urljoin +from hashlib import md5 class ProcessingServer(FastAPI): @@ -869,9 +871,19 @@ async def upload_workflow(self, workflow: UploadFile) -> Dict: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Provided workflow script is invalid") + content_hash = md5(content.encode("utf-8")).hexdigest() + try: + db_workflow_script = await db_find_first_workflow_script_by_content(content_hash) + if db_workflow_script: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="The same workflow" + f"-script exists with id '{db_workflow_script.workflow_id}'") + except ValueError: + pass + db_workflow_script = DBWorkflowScript( workflow_id=workflow_id, content=content, + content_hash=content_hash, ) await db_workflow_script.insert() return {"workflow_id": workflow_id} @@ -886,6 +898,8 @@ async def replace_workflow(self, workflow_id, workflow: UploadFile) -> str: try: db_workflow_script = await db_get_workflow_script(workflow_id) db_workflow_script.content = content + content_hash = md5(content.encode("utf-8")).hexdigest() + db_workflow_script.content_hash = content_hash except ValueError as e: self.log.exception(f"Workflow with id '{workflow_id}' not existing, error: {e}") raise HTTPException(