Skip to content

Commit

Permalink
#1 - Update and create new unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosribas committed Jun 13, 2022
1 parent 80b1dd7 commit a91e583
Showing 1 changed file with 53 additions and 23 deletions.
76 changes: 53 additions & 23 deletions database/tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,72 @@
limitations under the License.
"""

import sqlalchemy as sa

from aiohttp.test_utils import unittest_run_loop
from database.job import save_job
from database.metadata import metadata, search_metadata
from database.metadata import metadata, search_metadata, update_metadata
from database.models import Job
from database.tests.test_base import DBTestCase


class SaveMetadataTestCase(DBTestCase):
class MetadataTestCase(DBTestCase):
"""
Run this test with the following command:
ENVIRONMENT=TEST python -m unittest database.tests.test_jobs.SaveMetadataTestCase
ENVIRONMENT=TEST python -m unittest database.tests.test_jobs.MetadataTestCase
"""
async def setUpAsync(self):
await super().setUpAsync()

async with self.app['engine'].acquire() as connection:
await connection.execute(Job.insert().values(job_id='foo.bar'))
await connection.execute(Job.insert().values(job_id='urs001'))

results = [
{"job_id": "foo.bar", "name": "rnacentral", "primary_id": "urs001"},
{"job_id": "urs001", "name": "rnacentral", "primary_id": None},
{"job_id": "foo.bar", "name": "test", "primary_id": None},
]
await metadata(self.app['engine'], results=results)

async def tearDownAsync(self):
async with self.app['engine'].acquire() as connection:
await connection.execute('DELETE FROM database')
await connection.execute('DELETE FROM result')
await connection.execute('DELETE FROM job')

await super().tearDownAsync()

@unittest_run_loop
async def test_save_job(self):
job_id = await save_job(
self.app['engine'],
job_id="foo.bar"
)
urs = await save_job(
self.app['engine'],
job_id="urs001"
)
results = [
{"job_id": job_id, "name": "rnacentral", "primary_id": urs},
{"job_id": urs, "name": "rnacentral", "primary_id": None},
{"job_id": job_id, "name": "test", "primary_id": None},
]
await metadata(self.app['engine'], results=results)

find_metadata_1 = await search_metadata(self.app['engine'], job_id, "test", None)
async def test_find_job_id_with_expert_db(self):
find_metadata_1 = await search_metadata(self.app['engine'], "foo.bar", "test", None)
assert find_metadata_1 is not None

find_metadata_2 = await search_metadata(self.app['engine'], job_id, "rnacentral", urs)
@unittest_run_loop
async def test_find_job_id_with_rnacentral(self):
find_metadata_2 = await search_metadata(self.app['engine'], "foo.bar", "rnacentral", 'urs001')
assert find_metadata_2 is not None

find_metadata_3 = await search_metadata(self.app['engine'], urs, "rnacentral", None)
@unittest_run_loop
async def test_find_urs(self):
find_metadata_3 = await search_metadata(self.app['engine'], "urs001", "rnacentral", None)
assert find_metadata_3 is not None

@unittest_run_loop
async def test_wrong_job_id(self):
wrong_job = await search_metadata(self.app['engine'], "wrong_job", "rnacentral", None)
assert wrong_job is None

@unittest_run_loop
async def test_update_manually_annotated(self):
await update_metadata(self.app['engine'], "foo.bar", "urs001", "rnacentral")

async with self.app['engine'].acquire() as connection:
query = sa.text('''
SELECT manually_annotated
FROM database
WHERE job_id=:job_id AND primary_id=:primary_id AND name=:name
''')

async for row in await connection.execute(query, job_id="foo.bar", primary_id="urs001", name="rnacentral"):
result = row.manually_annotated
assert result is True

0 comments on commit a91e583

Please sign in to comment.