diff --git a/datajoint/schemas.py b/datajoint/schemas.py index 62f45fa6..001c2633 100644 --- a/datajoint/schemas.py +++ b/datajoint/schemas.py @@ -4,6 +4,7 @@ import re import itertools import collections +import pandas as pd from .connection import conn from .diagram import Diagram, _get_tier from .settings import config @@ -14,6 +15,7 @@ from .utils import user_choice, to_camel_case from .user_tables import Part, Computed, Imported, Manual, Lookup from .table import lookup_class_name, Log, FreeTable +from .expression import U import types logger = logging.getLogger(__name__.split(".")[0]) @@ -489,6 +491,51 @@ def list_tables(self): if d == self.database ] + def progress(self): + """ + Function to retrieve the processing status of this schema + Return a dataframe with all Imported/Computed tables, and their corresponding processing status: + + total - number of entries in key_source + + in queue - subset of the key_source that is not yet populated + + reserved - number of reserved jobs + + error - number of error jobs + + ignore - number of ignore jobs + + remaining - number of remaining jobs to be worked on + (note: tables not topologically sorted) + :return: pandas DataFrame of the table's progress + """ + # get job status from jobs table + job_status = {status: U('table_name').aggr( + self.jobs & f'status = "{status}"', + **{status: 'count(table_name)'}).fetch(format='frame') + for status in ('reserved', 'error', 'ignore')} + # get imported/computed tables + _tables = {} + self.spawn_missing_classes(context=_tables) + process_tables = {process.table_name: process + for process in _tables.values() if process.table_name.startswith('_')} + # analyse progress of the schema + schema_progress = pd.DataFrame(list(process_tables), columns=['table_name']) + schema_progress.set_index('table_name', inplace=True) + + schema_progress['total'] = [len(process_tables[t].key_source) + for t in schema_progress.index] + schema_progress['in_queue'] = [len(process_tables[t].key_source + - process_tables[t].proj()) + for t in schema_progress.index] + + schema_progress = schema_progress.join(job_status['reserved'].join( + job_status['error'], how='outer').join( + job_status['ignore'], how='outer'), how='left') + schema_progress.fillna(0, inplace=True) + + schema_progress['remaining'] = (schema_progress.in_queue + - schema_progress.reserved + - schema_progress.error + - schema_progress.ignore) + + return schema_progress + class VirtualModule(types.ModuleType): """ diff --git a/tests/test_schema.py b/tests/test_schema.py index 257de221..86eee3bb 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -2,6 +2,7 @@ import pytest import inspect import datajoint as dj +import pandas as pd from inspect import getmembers from . import schema @@ -68,6 +69,48 @@ def test_schema_list(schema_any): schemas = dj.list_schemas() assert schema_any.database in schemas + +def test_schema_progress(): + expected_progress = pd.DataFrame({ + '__error_class': {'total': 13, + 'in_queue': 13, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 13}, + '__sig_int_table': {'total': 10, + 'in_queue': 10, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 10}, + '__sig_term_table': {'total': 10, + 'in_queue': 10, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 10}, + '_ephys': {'total': 0, + 'in_queue': 0, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 0}, + '_experiment': {'total': 4, + 'in_queue': 4, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 4}, + '_trial': {'total': 0, + 'in_queue': 0, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 0}}).T + schema_progress = schema.schema.progress() + assert expected_progress.equals(schema_progress) + def test_drop_unauthorized(): info_schema = dj.schema("information_schema")