Skip to content

Commit

Permalink
Let schedview's read_opsim update column names if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
ehneilsen committed Sep 13, 2024
1 parent 96caded commit f67ebeb
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 13 deletions.
63 changes: 55 additions & 8 deletions schedview/collect/opsim.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sqlite3
from warnings import warn

import pandas as pd
import yaml
Expand All @@ -21,6 +22,45 @@ def _all_visits_columns():
return current_cols.union(backwards_cols)


def _normalize_opsim_columns(opsim_rp: ResourcePath, dbcols: list[str]):
# At least one opsim column has been renamed since the start of
# simulations. The mapping between new and old names can be found in
# rubin_scheduler.scheduler.utils.SchemaConverter.backwards.

# We want our queries to work the same whether run on simulation from
# before or after the name change. So, if a requested column is missing,
# see if it is the renamed value of one that used to exist, and if so,
# use the old column name instead.

# In addition to returning the list of columns with the replaced column
# name, return the mapping so that a table's column headings can be
# updated from the old name used to the new (requested) name.

with opsim_rp.as_local() as local_obs_path:
with sqlite3.connect(local_obs_path.ospath) as sim_connection:
query = "SELECT name FROM PRAGMA_TABLE_INFO('observations');"
present_columns = set(pd.read_sql(query, sim_connection).name.values)

new_columns = []
used_column_map = {}
backwards_column_map = {v: k for k, v in SchemaConverter().backwards.items()}
for column in dbcols:
if column in present_columns:
new_columns.append(column)
elif column in backwards_column_map:
old_column = backwards_column_map[column]
if old_column in present_columns:
warn(f"Column {column} not found in {opsim_rp}, using deprecated {old_column} instead")
used_column_map[old_column] = column
new_columns.append(old_column)
else:
warn(f"Neither column {column} nor deprecated {old_column} found in {opsim_rp}, skipping.")
else:
warn(f"Column {column} not found in {opsim_rp}, skipping.")

return new_columns, used_column_map


def read_opsim(
opsim_uri,
start_time=None,
Expand Down Expand Up @@ -85,26 +125,36 @@ def read_opsim(
with sqlite3.connect(local_obs_path.ospath) as sim_connection:
if dbcols is None:
col_query = "SELECT name FROM PRAGMA_TABLE_INFO('observations')"
dbcols = [
raw_dbcols = [
c for c in pd.read_sql(col_query, sim_connection).name if c in _all_visits_columns()
]

# Update any outdated column names
backwards = SchemaConverter().backwards
dbcols = [(backwards[c] if c in backwards else c) for c in raw_dbcols]

norm_columns, used_column_map = _normalize_opsim_columns(obs_path, dbcols)

try:
visits = pd.DataFrame(maf.get_sim_data(sim_connection, constraint, dbcols, **kwargs))
visits = pd.DataFrame(maf.get_sim_data(sim_connection, constraint, norm_columns, **kwargs))
except NameError as e:
if e.name == "maf" and e.args == ("name 'maf' is not defined",):
if len(kwargs) > 0:
raise NotImplementedError(
f"Argument {list(kwargs)[0]} not supported without rubin_sim installed"
)

query = f'SELECT {", ".join(dbcols)} FROM observations'
query = f'SELECT {", ".join(norm_columns)} FROM observations'
if constraint:
query += f" WHERE {constraint}"
visits = pd.read_sql(query, sim_connection)
else:
raise e

# If we replaced modern columns with legacy ones in the query,
# update the column names.
visits.rename(columns=used_column_map, inplace=True)

if "start_date" not in visits:
if "observationStartDatetime64" in visits:
visits["start_date"] = pd.to_datetime(
Expand All @@ -115,7 +165,6 @@ def read_opsim(
visits.observationStartMJD + 2400000.5, origin="julian", unit="D", utc=True
)

visits.rename(columns=SchemaConverter().backwards, inplace=True)
visits.set_index("observationId", inplace=True)

return visits
Expand All @@ -138,9 +187,8 @@ def read_ddf_visits(
The start time for visits to be loaded
end_time : `str`, `astropy.time.Time`
The end time for visits ot be loaded
dbcols : `None` or `list` [`str`]
Columns required from the database. Defaults to None, which queries
all columns known to rubin_scheduler.
dbcols : `list` [`str`]
Columns required from the database.
stackers : `list` [`rubin_sim.maf.stackers`], optional
Stackers to be used to generate additional columns.
Expand All @@ -149,7 +197,6 @@ def read_ddf_visits(
visits : `pandas.DataFrame`
The visits and their parameters.
"""

ddf_field_names = tuple(ddf_locations().keys())
constraint = f"target IN {tuple(field_name for field_name in ddf_field_names)}"
visits = read_opsim(
Expand Down
8 changes: 4 additions & 4 deletions schedview/compute/visits.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def accum_teff_by_night(visits):
The effective exposure time (`float`).
``"filter"``
The filter (`str`).
``"target"``
``"target_name"``
The target name (`str`).
Returns
Expand All @@ -220,11 +220,11 @@ def accum_teff_by_night(visits):
f"{teff_col} column not found for visits; use the rubin_sim.maf.stackers.TeffStacker."
)

nightly_teff = visits.groupby(["target", day_obs_col, "filter"])[teff_col].sum().reset_index()
nightly_teff = visits.groupby(["target_name", day_obs_col, "filter"])[teff_col].sum().reset_index()
nightly_teff = (
nightly_teff.pivot(index=["target", day_obs_col], columns="filter", values=teff_col)
nightly_teff.pivot(index=["target_name", day_obs_col], columns="filter", values=teff_col)
.fillna(0.0)
.reset_index()
.set_index(["target", day_obs_col])
.set_index(["target_name", day_obs_col])
)
return nightly_teff
Binary file added schedview/data/opsim_prenight_2024-07-30_1.db
Binary file not shown.
Binary file added schedview/data/opsim_prenight_2024-08-13_1.db
Binary file not shown.
16 changes: 16 additions & 0 deletions tests/test_collect_opsim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import unittest

import schedview.collect.opsim


class TestCollectOpsim(unittest.TestCase):
def test_read_opsim(self):
test_rp = "resource://schedview/data/opsim_prenight_2024-08-13_1.db"
visits = schedview.collect.opsim.read_opsim(test_rp)
assert "target_name" in visits.columns

# 'target_name' used to be called 'target'.
# Verify that read_opsim finds and renames it correctly
old_test_rp = "resource://schedview/data/opsim_prenight_2024-07-30_1.db"
old_visits = schedview.collect.opsim.read_opsim(old_test_rp)
assert "target_name" in old_visits.columns
2 changes: 1 addition & 1 deletion tests/test_compute_visits.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_accum_teff_by_night(self):

visits = schedview.collect.read_ddf_visits(self.visit_db_fname, stackers=stackers)
night_teff = schedview.compute.visits.accum_teff_by_night(visits)
self.assertEqual(night_teff.index.names[0], "target")
self.assertEqual(night_teff.index.names[0], "target_name")
self.assertEqual(night_teff.index.names[1], "day_obs_iso8601")
for col_name in night_teff.columns:
self.assertTrue(col_name in "ugrizy")

0 comments on commit f67ebeb

Please sign in to comment.