diff --git a/app/dependencies.py b/app/dependencies.py index cb685e7..e0c96a9 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -6,6 +6,7 @@ def get_env_vars(): cbioportal_url = os.getenv('CBIOPORTAL_URL') galaxy_url = os.getenv('GALAXY_URL') api_key = os.getenv('CBIOPORTAL_CACHE_API_KEY') + galaxy_workflow_name = os.getenv('GALAXY_WORKFLOW_NAME', None) missing_vars = [] @@ -25,5 +26,6 @@ def get_env_vars(): "study_directory_path": study_directory_path.strip(), "cbioportal_url": cbioportal_url.strip(), "api_key": api_key.strip(), - "galaxy_url": galaxy_url.strip() + "galaxy_url": galaxy_url.strip(), + "galaxy_workflow_name": galaxy_workflow_name.strip() } \ No newline at end of file diff --git a/app/main.py b/app/main.py index f7784a4..f818b64 100644 --- a/app/main.py +++ b/app/main.py @@ -8,7 +8,7 @@ from app.middleware.https_redirect import CustomHTTPSRedirectMiddleware import os -list_unrestricted_endpoints = ["/export-to-galaxy/"] +list_unrestricted_endpoints = ["/export-to-galaxy/", "/galaxy-workflow/"] logger = setup_logger("uvicorn.error") app = FastAPI() diff --git a/app/routers/cbioportal_to_galaxy_handler.py b/app/routers/cbioportal_to_galaxy_handler.py index 8afde1c..6a45960 100644 --- a/app/routers/cbioportal_to_galaxy_handler.py +++ b/app/routers/cbioportal_to_galaxy_handler.py @@ -52,39 +52,152 @@ def upload_data_string(galaxy_instance: GalaxyInstance, history_id: str, data_st upload_info = galaxy_instance.tools.upload_file(tmp_file_path, history_id, file_name=file_name) return upload_info + +def get_galaxy_instance_from_request(data: dict, env_vars: dict) -> GalaxyInstance: + galaxy_token = data.get('galaxyToken') + galaxy_url = env_vars['galaxy_url'] + if not galaxy_token: + logger.error("Missing Galaxy token in the request.") + raise ValueError("Missing Galaxy token in the request.") + return get_galaxy_instance(galaxy_url, galaxy_token) + + +def get_or_create_galaxy_history(gi: GalaxyInstance, galaxy_history_name: str) -> str: + histories = gi.histories.get_histories(name=galaxy_history_name) + if histories: + return histories[0]['id'] + else: + history = gi.histories.create_history(name=galaxy_history_name) + return history['id'] + + +def get_workflow_id(gi: GalaxyInstance, workflow_name: str) -> str: + workflows = gi.workflows.get_workflows(name=workflow_name) + if workflows: + return workflows[0]['id'] + else: + raise ValueError(f"Workflow with name {workflow_name} not found") + + +def upload_data_to_galaxy(gi: GalaxyInstance, history_id: str, data: str, cbioportal_study_id: str, + cbioportal_case_id: str) -> dict: + if not data: + logger.error("Missing data in the request.") + raise ValueError("Missing data in the request.") + return upload_data_string(gi, history_id, data, cbioportal_study_id, cbioportal_case_id) + + @router.post("/export-to-galaxy/") async def export_to_galaxy(request: Request, env_vars: dict = Depends(get_env_vars)) -> dict: try: data = await request.json() logger.debug(f"Received data: {data}") - galaxy_token = data.get('galaxyToken') - galaxy_history_name = data.get('galaxyHistoryName') - cbioportal_study_id = data.get('studyId') - cbioportal_case_id = data.get('caseId') - galaxy_url = env_vars['galaxy_url'] + gi = get_galaxy_instance_from_request(data, env_vars) + logger.info("Created GalaxyInstance successfully") - if not galaxy_token or not galaxy_history_name or 'data' not in data: - logger.error("Missing required fields in the request.") - raise ValueError("Missing required fields in the request.") + history_id = get_or_create_galaxy_history(gi, data.get('galaxyHistoryName')) + logger.info(f"Working with history ID: {history_id}") - gi = get_galaxy_instance(galaxy_url, galaxy_token) - logger.info("Created GalaxyInstance successfully") + upload_info = upload_data_to_galaxy(gi, history_id, data.get('data'), data.get('studyId'), data.get('caseId')) + logger.info(f"Uploaded: {upload_info['outputs'][0]['name']}, ID: {upload_info['outputs'][0]['id']}") - histories = gi.histories.get_histories(name=galaxy_history_name) - if histories: - history_id = histories[0]['id'] - else: - history = gi.histories.create_history(name=galaxy_history_name) - history_id = history['id'] + return {"message": "Data received successfully"} + except ConnectionError as e: + logger.error(f"Connection error: {e}") + raise HTTPException(status_code=500, detail=f"Failed to establish a new connection: {e}") + except Exception as e: + logger.error(f"An error occurred: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/galaxy-workflow/") +async def galaxy_workflow(request: Request, env_vars: dict = Depends(get_env_vars)) -> dict: + try: + data = await request.json() + logger.debug(f"Received data: {data}") + + gi = get_galaxy_instance_from_request(data, env_vars) + logger.info("Created GalaxyInstance successfully") + + history_id = get_or_create_galaxy_history(gi, data.get('galaxyHistoryName')) logger.info(f"Working with history ID: {history_id}") - data_modified = data.get('data') - upload_info = upload_data_string(gi, history_id, data_modified, cbioportal_study_id, cbioportal_case_id) + data_header, data_body = data.get('data').split('\n', 1) + fixed_header = data_header.replace(' ', '_').lower() + + data_modified = f"{fixed_header}\n{data_body}" + + upload_info = upload_data_to_galaxy(gi, history_id, data_modified, data.get('studyId'), data.get('caseId')) logger.info(f"Uploaded: {upload_info['outputs'][0]['name']}, ID: {upload_info['outputs'][0]['id']}") + logger.debug(f"Information: {upload_info}") + + # Bioblend, get workflow ID from name from environment variable + workflow_id = get_workflow_id(gi, env_vars['galaxy_workflow_name']) + + # List files in history every 5 seconds for 2 minutes + for _ in range(24): + files = gi.histories.show_history(history_id, contents=True) + for file in files: + if file['name'] == upload_info['outputs'][0]['name'] and file['state'] == 'ok': + logger.info(f"File {file['name']} is ready") + logger.debug(f"File info: {file}") + break + else: + time.sleep(5) + continue + break + else: + raise ValueError("No files found in history after 2 minutes") + + + + inputs = { + '0': { # Step ID in the workflow + 'src': 'hda', # Source type: hda (history dataset) + 'id': upload_info['outputs'][0]['id'] # Dataset ID + } + } + + # get file content from uploaded file + file_content = gi.datasets.show_dataset(upload_info['outputs'][0]['id']) + logger.info(f"File content: {file_content}") + logger.info(f"File uploaded: {upload_info}") + + logger.info(f"inputs: {inputs}") + + # Fetch the workflow details + workflow = gi.workflows.show_workflow(workflow_id) + inputs = workflow['inputs'] + + # Assuming the workflow has a single input we want to map to our uploaded dataset + # input_id = list(inputs.keys())[0] + logger.info(f"Input ID: {inputs}") + + + + # Bioblend, invoke workflow + workflow_info = gi.workflows.invoke_workflow(workflow_id, + inputs=inputs, + history_id=history_id) + logger.info(f"Workflow info: {workflow_info}") return {"message": "Data received successfully"} + + # workflow_id = data.get('workflowId') + # if not workflow_id: + # logger.error("Missing workflow ID in the request.") + # raise ValueError("Missing workflow ID in the request.") + # + # inputs = data.get('inputs') + # if not inputs: + # logger.error("Missing inputs in the request.") + # raise ValueError("Missing inputs in the request.") + # + # workflow_info = gi.workflows.invoke_workflow(workflow_id, inputs=inputs, history_id=history_id) + # logger.info(f"Invoked workflow: {workflow_info['id']}") + # + # return {"message": "Workflow invoked successfully"} except ConnectionError as e: logger.error(f"Connection error: {e}") raise HTTPException(status_code=500, detail=f"Failed to establish a new connection: {e}")