Skip to content

Commit

Permalink
Merge pull request #24 from BrainCOGS/db_redesign
Browse files Browse the repository at this point in the history
Database redesign to add upstream `recording` schema
  • Loading branch information
Alvalunasan authored Feb 17, 2022
2 parents 76fbf13 + 21172aa commit 380d497
Show file tree
Hide file tree
Showing 28 changed files with 3,055 additions and 12 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,5 @@ data/*

# svg file
*.svg

slurm_files/slurm_recording_process_id*
1,136 changes: 1,136 additions & 0 deletions notebooks/TestAutomaticPipeline.ipynb

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions notebooks/imaging_element/02-process-imaging-workflow.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@
}
],
"source": [
"subject = 'emdiamanti_gps8'\n",
"date = '2021-02-08'\n",
"subject = 'jeremyjc_j016'\n",
"date = '2021-11-21'\n",
"\n",
"key = (imaging.Scan & dict(session_date =date, subject_fullname=subject)).fetch1('KEY')\n",
"key"
Expand Down Expand Up @@ -2705,4 +2705,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
globus-cli
scipy
python-dotenv
element-calcium-imaging
Expand All @@ -8,3 +9,4 @@ numpy
astropy
jupyterlab
bitstring
element-interface @ git+https://github.com/datajoint/element-interface.git
Empty file.
Empty file added slurm_files/slurm_test.slurm
Empty file.
Empty file.
101 changes: 101 additions & 0 deletions u19_pipeline/automatic_job/clusters_paths_and_transfers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import datajoint as dj
import pathlib
import subprocess
import json
import re
from element_interface.utils import dict_to_uuid

#Functions to transfer files (globus, scp, smbclient)

#FOR PNI endpoint
pni_ep_id = '6ce834d6-ff8a-11e6-bad1-22000b9a448b'
#pni_ephys_sorted_data_dir = '/mnt/cup/labs/brody/RATTER/PhysData/Test_ephys_pipeline_NP_sorted/'

#PNI directories
pni_root_data_dir = dj.config['custom']['root_data_dir']

#For tiger endpoint
default_user = 'alvaros' # This will change to our automatic client for globus transfers
tiger_gpu_host = 'tigergpu.princeton.edu'
tiger_ep_dir = 'a9df83d2-42f0-11e6-80cf-22000b1701d1'




tiger_home_dir = '/scratch/gpfs/BRAINCOGS'
spock_home_dir = '/usr/people/alvaros/BrainCogsProjects/ProcessJobsPipeline'
#Cluster directories
cluster_vars = {
"tiger": {
"home_dir": tiger_home_dir,
"root_data_dir": tiger_home_dir + "/Data",
"sorted_data_dir": tiger_home_dir + "/DataSorted",
"slurm_files_dir": tiger_home_dir + "/slurm_files",
"log_files_dir": tiger_home_dir + "/job_log",
"user": default_user,
"hostname": 'tigergpu.princeton.edu',
},
"spock": {
"home_dir": spock_home_dir,
"root_data_dir": dj.config['custom']['root_data_dir'],
"slurm_files_dir": spock_home_dir + "/slurm_files",
"log_files_dir": spock_home_dir + "/job_log",
"user": default_user,
"hostname": 'spock.princeton.edu',
}
}


def get_cluster_vars(cluster):

if cluster in cluster_vars:
return cluster_vars[cluster]
else:
raise('Non existing cluster')


def scp_file_transfer(source, dest):

print("scp", source, dest)
p = subprocess.Popen(["scp", "-i", "~/.ssh/id_rsa_alvaros_tiger.pub", source, dest])
transfer_status = p.wait()
return transfer_status


def request_globus_transfer(source, destination):

globus_command = ["globus", "transfer", source, destination, '--recursive', '--format', 'json']
#print(globus_command)
#s = subprocess.run(globus_command, capture_output=True)
#transfer_request = json.loads(s.stdout.decode('UTF-8'))
transfer_request = dict()
transfer_request['code'] = 'Accepted'
transfer_request['task_id'] = dict_to_uuid({'test':1})
return transfer_request


def request_globus_transfer_status(id_task):

globus_command = ["globus", "task", "show", id_task, '--format', 'json']
#print(globus_command)
#s = subprocess.run(globus_command, capture_output=True)
#transfer_request = json.loads(s.stdout.decode('UTF-8'))
transfer_request = dict()
transfer_request['status'] = 'SUCCEEDED'
return transfer_request


def globus_transfer_to_tiger(raw_rel_path):

source_ep = pni_ep_id + ':' + pni_root_data_dir + raw_rel_path
dest_ep = tiger_ep_dir + ':' + cluster_vars['tiger']['root_data_dir'] + raw_rel_path
transfer_request = request_globus_transfer(source_ep, dest_ep)
return transfer_request


def globus_transfer_to_pni(sorted_rel_path):

source_ep = tiger_ep_dir + ':' + cluster_vars['tiger']['sorted_data_dir'] + sorted_rel_path
dest_ep = pni_ep_id + ':' + pni_root_data_dir + sorted_rel_path
transfer_request = request_globus_transfer(source_ep, dest_ep)
return transfer_request
14 changes: 14 additions & 0 deletions u19_pipeline/automatic_job/ingest_scaninfo_shell.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

# 1st Argument is directory where matlab script is located
cd $1

# 2nd Argument is string_key for given recording
key=$2
matlab_command="populate_ScanInfo_spock('"
matlab_command+=$2
matlab_command+="');exit;"

# Load module and execute string
module load matlab/R2020b
matlab -singleCompThread -nodisplay -nosplash -r $matlab_command
199 changes: 199 additions & 0 deletions u19_pipeline/automatic_job/params_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import pandas as pd
import numpy as np

recording_modality_dict = [
{
'RecordingModality': 'electrophysiology',
'Description': '',
'RootDirectorty': '/braininit/Data/electrophysiology',
'FileExtensions': np.asarray(['ap.bin', 'ap.meta']),
'RecordingFilePattern': np.asarray(['/*g[0-9]/*imec[0-9]']),
'ProcessUnitFilePattern': np.asarray(['/*imec[0-9]/']),
'ProcessUnitDirectoryField': 'probe_directory',
'ProcessUnitField': 'probe',
'ProcessingRepository': 'BrainCogsEphysSorters',
},
{
'RecordingModality': 'imaging',
'Description': '',
'RootDirectorty': '/braininit/Data/imaging',
'FileExtensions': np.asarray(['.avi', '.tiff','.tif']),
'RecordingFilePattern': np.asarray(['']),
'ProcessUnitFilePattern': np.asarray(['']),
'ProcessUnitDirectoryField': 'fov_directory',
'ProcessUnitField': 'fov',
'ProcessingRepository': 'BrainCogsImagingSegmentation',
},
{
'RecordingModality': 'video_acquisition',
'Description': '',
'RootDirectorty': '/braininit/Data/imaging',
'FileExtensions': np.asarray(['.avi', '.mp4']),
'RecordingFilePattern': np.asarray(['']),
'ProcessUnitFilePattern': np.asarray(['']),
'ProcessUnitDirectoryField': 'video_directory',
'ProcessUnitField': '',
'ProcessingRepository': 'None',
},
]

recording_modality_list = [list(i.values()) for i in recording_modality_dict]
recording_modality_df = pd.DataFrame(recording_modality_dict)

recording_status_dict = [
{
'Value': -1,
'Key': 'ERROR',
'Label': 'Error in recording handling',
'UpdateField': None,
'ProcessFunction': None,
'FunctionField': None,
},
{
'Value': 0,
'Key': 'NEW_RECORDING',
'Label': 'New recording',
'UpdateField': None,
'ProcessFunction': None,
'FunctionField': None,
},
{
'Value': 1,
'Key': 'PNI_DRIVE_TRANSFER_REQUEST',
'Label': 'Recording directory transfer to PNI requested',
'UpdateField': 'task_copy_id_pni',
'ProcessFunction': 'local_transfer_request',
'FunctionField': 'recording_process_pre_path',
},
{
'Value': 2,
'Key': 'PNI_DRIVE_TRANSFER_END',
'Label': 'Recording directory transferred to PNI',
'UpdateField': None,
'ProcessFunction': 'local_transfer_check',
'FunctionField': 'task_copy_id_pni',
},
{
'Value': 3,
'Key': 'MODALITY_PREINGESTION',
'Label': 'modality ingestion & Syncing jobs done',
'UpdateField': None,
'ProcessFunction': 'modality_preingestion',
'FunctionField': None,
},

]

recording_status_list = [[i['Value'], i['Label']] for i in recording_status_dict]
recording_status_df = pd.DataFrame(recording_status_dict)


recording_process_status_dict = [
{
'Value': -1,
'Key': 'ERROR',
'Label': 'Error in recording process',
'UpdateField': None,
'ProcessFunction': None,
'FunctionField': None,
},
{
'Value': 0,
'Key': 'NEW_RECORDING_PROCESS',
'Label': 'New recording process',
'UpdateField': None,
'ProcessFunction': None,
'FunctionField': None,
},
{
'Value': 1,
'Key': 'RAW_FILE_TRANSFER_REQUEST',
'Label': 'Raw file transfer requested',
'UpdateField': 'task_copy_id_pre',
'ProcessFunction': 'transfer_request',
'FunctionField': 'recording_process_pre_path',
},
{
'Value': 2,
'Key': 'RAW_FILE_TRANSFER_END',
'Label': 'Raw file transferred to cluster',
'UpdateField': None,
'ProcessFunction': 'transfer_check',
'FunctionField': 'task_copy_id_pre',
},
{
'Value': 3,
'Key': 'JOB_QUEUE',
'Label': 'Processing job in queue',
'UpdateField': 'slurm_id',
'ProcessFunction': 'slurm_job_queue',
'FunctionField': None,
},
{
'Value': 4,
'Key': 'JOB_FINISHED',
'Label': 'Processing job finished',
'UpdateField': None,
'ProcessFunction': 'slurm_job_check',
'FunctionField': 'slurm_id',
},
{
'Value': 5,
'Key': 'PROC_FILE_TRANSFER_REQUEST',
'Label': 'Processed file transfer requested',
'UpdateField': 'task_copy_id_post',
'ProcessFunction': 'transfer_request',
'FunctionField': 'recording_process_post_path',
},
{
'Value': 6,
'Key': 'PROC_FILE_TRANSFER_END',
'Label': 'Processed file transferred to PNI',
'UpdateField': None,
'ProcessFunction': 'transfer_check',
'FunctionField': 'task_copy_id_post',
},
{
'Value': 7,
'Key': 'JOB_QUEUE_ELEMENT_WORKFLOW',
'Label': 'Job Queue Element Workflow ingestion',
'UpdateField': None,
'ProcessFunction': None,
#'ProcessFunction': RecProcessHandler.slurm_job_element,
'FunctionField': None,
},
{
'Value': 8,
'Key': 'JOB_FINSISHED_ELEMENT_WORKFLOW',
'Label': 'Process finished',
'UpdateField': None,
'ProcessFunction': 'slurm_job_check',
'FunctionField': None,
}
]

all_preprocess_params = {
"process_cluster": [
"tiger",
"spock"],
"dj_element_processing":[
"trigger",
"load",
],
"processing_algorithm": [
"kilosort2",
"suite2p",
]
}


recording_process_status_list = [[i['Value'], i['Label']] for i in recording_process_status_dict]
recording_process_status_df = pd.DataFrame(recording_process_status_dict)

system_process = {
'SUCCESS': 0
}


startup_pipeline_matlab_dir = '/usr/people/alvaros/BrainCogsProjects/Datajoint_projs/U19-pipeline-matlab/scripts'
ingest_scaninfo_script = '/usr/people/alvaros/BrainCogsProjects/Datajoint_projs/U19-pipeline_python/u19_pipeline/automatic_job/ingest_scaninfo_shell.sh'
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"cat_gt": 2,
"sorting_algorithm": "kilosort2"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"process_cluster": "spock",
"dj_element_processing": "trigger",
"sorting_algorithm": "suite2p"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"process_cluster": "tiger",
"dj_element_processing": "load",
"sorting_algorithm": "suite2p"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{"fs": 30000,
"fshigh": 150,
"minfr_goodchannels": 0.1,
"Th": [10, 4],
"lam": 10,
"AUCsplit": 0.9,
"minFR": 0.02,
"momentum": [20, 400],
"sigmaMask": 30,
"ThPr": 8,
"spkTh": -6,
"reorder": 1,
"nskip": 25,
"GPU": 1,
"Nfilt": 1024,
"nfilt_factor": 4,
"ntbuff": 64,
"whiteningRange": 32,
"nSkipCov": 25,
"scaleproc": 200,
"nPCs": 3,
"useRAM": 0}
Loading

0 comments on commit 380d497

Please sign in to comment.