From 162cb7f2eca8df38d2021ee1d0422f25f0e80059 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 11 Aug 2022 10:20:55 -0500 Subject: [PATCH 1/6] add `.progress()` to schema --- datajoint/schemas.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/datajoint/schemas.py b/datajoint/schemas.py index ab2fc03af..98b8d6629 100644 --- a/datajoint/schemas.py +++ b/datajoint/schemas.py @@ -4,6 +4,7 @@ import re import itertools import collections +import pandas as pd from .connection import conn from .diagram import Diagram, _get_tier from .settings import config @@ -14,6 +15,7 @@ from .utils import user_choice, to_camel_case from .user_tables import Part, Computed, Imported, Manual, Lookup from .table import lookup_class_name, Log, FreeTable +from .expression import U import types logger = logging.getLogger(__name__) @@ -376,6 +378,39 @@ def list_tables(self): for full_t in Diagram(self).topological_sort()) if d == self.database] + def progress(self): + # get job status from jobs table + job_status_df = {job_status: U('table_name').aggr( + self.jobs & f'status = "{job_status}"', + **{job_status: 'count(table_name)'}).fetch(format='frame') + for job_status in ('reserved', 'error', 'ignore')} + # get imported/computed tables + _tables = {} + self.spawn_missing_classes(context=_tables) + process_tables = {process.table_name: process + for process in _tables.values() if process.table_name.startswith('_')} + # analyse progress of the schema + workflow_status = pd.DataFrame(list(process_tables), columns=['table_name']) + workflow_status.set_index('table_name', inplace=True) + + workflow_status['total'] = [len(process_tables[t].key_source) + for t in workflow_status.index] + workflow_status['in_queue'] = [len(process_tables[t].key_source + - process_tables[t].proj()) + for t in workflow_status.index] + + workflow_status = workflow_status.join(job_status_df['reserved'].join( + job_status_df['error'], how='outer').join( + job_status_df['ignore'], how='outer'), how='left') + workflow_status.fillna(0, inplace=True) + + workflow_status['remaining'] = (workflow_status.in_queue + - workflow_status.reserved + - workflow_status.error + - workflow_status.ignore) + + return workflow_status + class VirtualModule(types.ModuleType): """ From 6aa6792d3effc78d0d99c879b5c34a438ff45a8a Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 11 Aug 2022 11:12:34 -0500 Subject: [PATCH 2/6] add docstring for `schema.progress()` --- datajoint/schemas.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datajoint/schemas.py b/datajoint/schemas.py index 98b8d6629..79481fef8 100644 --- a/datajoint/schemas.py +++ b/datajoint/schemas.py @@ -379,6 +379,18 @@ def list_tables(self): if d == self.database] def progress(self): + """ + Function to retrieve the processing status of this schema + Return a dataframe with all Imported/Computed tables, and their corresponding processing status: + + total - number of entries in key_source + + in queue - subset of the key_source that is not yet populated + + reserved - number of reserved jobs + + error - number of error jobs + + ignore - number of ignore jobs + + remaining - number of remaining jobs to be worked on + (note: tables not topologically sorted) + :return: pandas DataFrame of the table's progress + """ # get job status from jobs table job_status_df = {job_status: U('table_name').aggr( self.jobs & f'status = "{job_status}"', From c105878434fc9241ebbfe9ce3c77dc20c66bdf3b Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 11 Aug 2022 11:12:43 -0500 Subject: [PATCH 3/6] add test for `schema.progress()` --- tests/test_schema.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/test_schema.py b/tests/test_schema.py index 42d4e0c0e..9c3b685f7 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -1,5 +1,6 @@ from nose.tools import assert_false, assert_true, raises import datajoint as dj +import pandas as pd from inspect import getmembers from . import schema from . import schema_empty @@ -157,3 +158,45 @@ class Recording(dj.Manual): schema2.drop() schema1.drop() + + +def test_schema_progress(): + expected_progress = pd.DataFrame({ + '__error_class': {'total': 13, + 'in_queue': 13, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 13}, + '__sig_int_table': {'total': 10, + 'in_queue': 10, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 10}, + '__sig_term_table': {'total': 10, + 'in_queue': 10, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 10}, + '_ephys': {'total': 0, + 'in_queue': 0, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 0}, + '_experiment': {'total': 4, + 'in_queue': 4, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 4}, + '_trial': {'total': 0, + 'in_queue': 0, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 0}}).T + schema_progress = schema_simple.progress() + assert expected_progress.equals(schema_progress) From d51812dfdc3146dd85714574023743929c3597a8 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 11 Aug 2022 11:18:28 -0500 Subject: [PATCH 4/6] code cleanup - var renaming --- datajoint/schemas.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/datajoint/schemas.py b/datajoint/schemas.py index 79481fef8..93fb5128b 100644 --- a/datajoint/schemas.py +++ b/datajoint/schemas.py @@ -392,36 +392,36 @@ def progress(self): :return: pandas DataFrame of the table's progress """ # get job status from jobs table - job_status_df = {job_status: U('table_name').aggr( - self.jobs & f'status = "{job_status}"', - **{job_status: 'count(table_name)'}).fetch(format='frame') - for job_status in ('reserved', 'error', 'ignore')} + job_status = {status: U('table_name').aggr( + self.jobs & f'status = "{status}"', + **{status: 'count(table_name)'}).fetch(format='frame') + for status in ('reserved', 'error', 'ignore')} # get imported/computed tables _tables = {} self.spawn_missing_classes(context=_tables) process_tables = {process.table_name: process for process in _tables.values() if process.table_name.startswith('_')} # analyse progress of the schema - workflow_status = pd.DataFrame(list(process_tables), columns=['table_name']) - workflow_status.set_index('table_name', inplace=True) + schema_progress = pd.DataFrame(list(process_tables), columns=['table_name']) + schema_progress.set_index('table_name', inplace=True) - workflow_status['total'] = [len(process_tables[t].key_source) - for t in workflow_status.index] - workflow_status['in_queue'] = [len(process_tables[t].key_source + schema_progress['total'] = [len(process_tables[t].key_source) + for t in schema_progress.index] + schema_progress['in_queue'] = [len(process_tables[t].key_source - process_tables[t].proj()) - for t in workflow_status.index] + for t in schema_progress.index] - workflow_status = workflow_status.join(job_status_df['reserved'].join( - job_status_df['error'], how='outer').join( - job_status_df['ignore'], how='outer'), how='left') - workflow_status.fillna(0, inplace=True) + schema_progress = schema_progress.join(job_status['reserved'].join( + job_status['error'], how='outer').join( + job_status['ignore'], how='outer'), how='left') + schema_progress.fillna(0, inplace=True) - workflow_status['remaining'] = (workflow_status.in_queue - - workflow_status.reserved - - workflow_status.error - - workflow_status.ignore) + schema_progress['remaining'] = (schema_progress.in_queue + - schema_progress.reserved + - schema_progress.error + - schema_progress.ignore) - return workflow_status + return schema_progress class VirtualModule(types.ModuleType): From 5d5502d340e32b5b63227057035fd559aa859a1f Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 11 Aug 2022 11:29:23 -0500 Subject: [PATCH 5/6] bugfix test for schema.progress() --- tests/test_schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_schema.py b/tests/test_schema.py index 9c3b685f7..db10aaf45 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -198,5 +198,5 @@ def test_schema_progress(): 'error': 0, 'ignore': 0, 'remaining': 0}}).T - schema_progress = schema_simple.progress() + schema_progress = schema.schema.progress() assert expected_progress.equals(schema_progress) From 080ff87e80d3a3f051d627428cf43c8c6697a7b4 Mon Sep 17 00:00:00 2001 From: Thinh Nguyen Date: Thu, 11 Aug 2022 12:03:08 -0500 Subject: [PATCH 6/6] reorder tests in `test_schema` --- tests/test_schema.py | 84 ++++++++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/tests/test_schema.py b/tests/test_schema.py index db10aaf45..49dd43ba4 100644 --- a/tests/test_schema.py +++ b/tests/test_schema.py @@ -32,6 +32,48 @@ def test_schema_list(): assert_true(schema.schema.database in schemas) +def test_schema_progress(): + expected_progress = pd.DataFrame({ + '__error_class': {'total': 13, + 'in_queue': 13, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 13}, + '__sig_int_table': {'total': 10, + 'in_queue': 10, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 10}, + '__sig_term_table': {'total': 10, + 'in_queue': 10, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 10}, + '_ephys': {'total': 0, + 'in_queue': 0, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 0}, + '_experiment': {'total': 4, + 'in_queue': 4, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 4}, + '_trial': {'total': 0, + 'in_queue': 0, + 'reserved': 0, + 'error': 0, + 'ignore': 0, + 'remaining': 0}}).T + schema_progress = schema.schema.progress() + assert expected_progress.equals(schema_progress) + + @raises(dj.errors.AccessError) def test_drop_unauthorized(): info_schema = dj.schema('information_schema') @@ -158,45 +200,3 @@ class Recording(dj.Manual): schema2.drop() schema1.drop() - - -def test_schema_progress(): - expected_progress = pd.DataFrame({ - '__error_class': {'total': 13, - 'in_queue': 13, - 'reserved': 0, - 'error': 0, - 'ignore': 0, - 'remaining': 13}, - '__sig_int_table': {'total': 10, - 'in_queue': 10, - 'reserved': 0, - 'error': 0, - 'ignore': 0, - 'remaining': 10}, - '__sig_term_table': {'total': 10, - 'in_queue': 10, - 'reserved': 0, - 'error': 0, - 'ignore': 0, - 'remaining': 10}, - '_ephys': {'total': 0, - 'in_queue': 0, - 'reserved': 0, - 'error': 0, - 'ignore': 0, - 'remaining': 0}, - '_experiment': {'total': 4, - 'in_queue': 4, - 'reserved': 0, - 'error': 0, - 'ignore': 0, - 'remaining': 4}, - '_trial': {'total': 0, - 'in_queue': 0, - 'reserved': 0, - 'error': 0, - 'ignore': 0, - 'remaining': 0}}).T - schema_progress = schema.schema.progress() - assert expected_progress.equals(schema_progress)