Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions astrodb_utils/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

__all__ = [
"find_source_in_db",
"ingest_names",
"ingest_name",
"ingest_source",
]

Expand All @@ -28,7 +28,7 @@ def find_source_in_db(
ra_col_name="ra_deg",
dec_col_name="dec_deg",
use_simbad=True,
fuzzy=False
fuzzy=False,
):
"""
Find a source in the database given a source name and optional coordinates.
Expand Down Expand Up @@ -187,7 +187,7 @@ def coords_from_simbad(source):


# NAMES
def ingest_names(
def ingest_name(
db, source: str = None, other_name: str = None, raise_error: bool = None
):
"""
Expand All @@ -206,22 +206,26 @@ def ingest_names(

Returns
-------
None
other_name: str
Name of the source as it appears in the Names table

or None if name was not ingested

"""
names_data = [{"source": source, "other_name": other_name}]
source = strip_unicode_dashes(source)
other_name = strip_unicode_dashes(other_name)
name_data = [{"source": source, "other_name": other_name}]
try:
with db.engine.connect() as conn:
conn.execute(db.Names.insert().values(names_data))
conn.execute(db.Names.insert().values(name_data))
conn.commit()
logger.info(f"Name added to database: {names_data}\n")
logger.info(f"Name added to database: {name_data}\n")
return other_name
except sqlalchemy.exc.IntegrityError as e:
msg = f"Could not add {names_data} to Names."
if "UNIQUE constraint failed:" in str(e):
msg += " Other name is already present."
if raise_error:
raise AstroDBError(msg) from e
else:
logger.warning(msg)
msg = f"Could not add {name_data} to Names."
if "UNIQUE constraint failed: " in str(e):
msg += "Other name is already present."
exit_function(msg, raise_error)


# SOURCES
Expand Down Expand Up @@ -286,6 +290,9 @@ def ingest_source(

logger.debug(f"Trying to ingest source: {source}")

# change unicode dashes characters to `-`
source = strip_unicode_dashes(source)

# Make sure reference is provided and in References table
ref_check = find_publication(db, reference=reference)
logger.debug(f"ref_check: {ref_check}")
Expand Down Expand Up @@ -323,7 +330,7 @@ def ingest_source(

# One source match in the database, ingesting possible alt name
if len(name_matches) == 1:
ingest_names(db, name_matches[0], source)
ingest_name(db, name_matches[0], source)
msg2 = f" Already in database as {name_matches[0]}. \n "

# Multiple source matches in the database, unable to ingest source
Expand Down Expand Up @@ -375,18 +382,15 @@ def ingest_source(
msg = f"Added {source_data}"
logger.info(f"Added {source}")
logger.debug(msg)
except sqlalchemy.exc.IntegrityError as e:
except sqlalchemy.exc.IntegrityError:
msg = f"Not ingesting {source}. Not sure why. \n"
msg2 = f" {source_data} "
logger.warning(msg)
logger.debug(msg2)
if raise_error:
raise AstroDBError(msg + msg2) from e
else:
return
exit_function(msg + msg2, raise_error)

# Add the source name to the Names table
ingest_names(db, source=source, other_name=source, raise_error=raise_error)
ingest_name(db, source=source, other_name=source, raise_error=raise_error)

return

Expand Down Expand Up @@ -462,3 +466,23 @@ def find_survey_name_in_simbad(sources, desig_prefix, source_id_index=None):
)

return result_table


def strip_unicode_dashes(source):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach. What about naming it more generally, such as string_formatter? This way you can use the same function for more than just dashes.

"""
Function to remove unicode dashes from source names and replace with `-`
"""

unicode_list = [
("\u2013", "en dash"),
("\u2014", "em dash"),
("\u2212", "minus sign"),
("\u2012", "figure dash"),
]

for char, char_name in unicode_list:
if char in source:
source = source.replace(char, "-")
logger.info(f"replaced {char_name}({char}) with - in {source}")

return source
5 changes: 0 additions & 5 deletions astrodb_utils/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from astrodb_utils import load_astrodb, logger
from astrodb_utils.publications import ingest_publication
from astrodb_utils.sources import ingest_source

logger.setLevel("DEBUG")

Expand All @@ -28,8 +27,6 @@ def db():

logger.info("Loaded AstroDB Template database using load_astrodb function in conftest.py")



ingest_publication(
db,
reference="Refr20",
Expand All @@ -40,8 +37,6 @@ def db():

ingest_publication(db, doi="10.1086/161442", reference="Prob83")

ingest_source(db, "LHS 2924", reference="Prob83")

return db


35 changes: 35 additions & 0 deletions astrodb_utils/tests/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from astrodb_utils.sources import (
coords_from_simbad,
find_source_in_db,
ingest_name,
ingest_source,
strip_unicode_dashes,
)


Expand Down Expand Up @@ -51,6 +53,14 @@
"raise_error": False,
}
),
{
"source": "LHS 2924", # needed for test_find_source_in_db
"ra": None,
"dec": None,
"reference": "Prob83",
"raise_error": False,
}

],
)
@pytest.mark.filterwarnings(
Expand Down Expand Up @@ -187,3 +197,28 @@ def test_coords_from_simbad():
coords = coords_from_simbad("Barnard Star")
assert math.isclose(coords.ra.deg, 269.452, abs_tol=0.001)
assert math.isclose(coords.dec.deg, 4.6933, abs_tol=0.001)


def test_ingest_name(db):
result = ingest_name(db, "TWA 26", "WISE J113951.07-315921.6")
assert result == "WISE J113951.07-315921.6"

# try to ingest names that are already in the database
result = ingest_name(db, "Gl 229b", "HD 42581b", raise_error=False)
assert result is None

with pytest.raises(AstroDBError) as error_message:
ingest_name(db, "Gl 229b", "HD 42581b", raise_error=True)
assert "Other name is already present." in str(error_message.value)


@pytest.mark.parametrize('input,expected', [
('CWISE J221706.28–145437.6', 'CWISE J221706.28-145437.6'), #en dash 2013
('2MASS J20115649—6201127', '2MASS J20115649-6201127'), # em dash 2014
('1234−5678', '1234-5678'), # minus sign 2212
('9W34‒aou', '9W34-aou'), # figure dash 2012
('should-work', 'should-work'), # no unicode dashes➖➖
])
def test_strip_unicode_dashes(input, expected):
result = strip_unicode_dashes(input)
assert result == expected