Skip to content

Commit

Permalink
Merge pull request #1068 from TOMToolkit/feature/upgrade_hermes_sharing
Browse files Browse the repository at this point in the history
Add customizable class for formatting targets and datums into hermes …
  • Loading branch information
jchate6 authored Oct 11, 2024
2 parents 2f2d225 + e9c9a48 commit 5b26c54
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 112 deletions.
9 changes: 9 additions & 0 deletions docs/managing_data/stream_pub_sub.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ need to copy your Hermes API Key from your Hermes profile page. When hermes shar
buttons to open your data in hermes with the form pre-filled - this is a good option if you want to make slight changes
to your message or data before sharing.

To customize what data is sent to hermes from your ReducedDatum or Target models, please re-implement your own
``tom_dataproducts.alertstreams.hermes.HermesDataConverter`` and customize the `get_hermes_*` methods to pull out
the proper data you want to share. You then provide the class dotpath to your custom class in your TOM's settings
for hermes ``DATA_SHARING`` in the `DATA_CONVERTER_CLASS` key. This is especially useful if you store extra target
or datum information in custom associated models in your TOM or with custom model field keys. For more information on
the structure of data HERMES expects, check the `API Schema Registry here <https://hermes.lco.global/about>`_. This is
the structure you should be mapping your ReducedDatum values to in the Data Converter Class.


Configuring your TOM to Publish Data to a stream:
*************************************************
Expand All @@ -33,6 +41,7 @@ for the various streams with which you wish to share data.
'HERMES_API_KEY': os.getenv('HERMES_API_KEY', 'set HERMES_API_KEY value in environment'),
'DEFAULT_AUTHORS': os.getenv('HERMES_DEFAULT_AUTHORS', 'set your default authors here'),
'USER_TOPICS': ['hermes.test', 'tomtoolkit.test'] # You must have write permissions on these topics
'DATA_CONVERTER_CLASS': 'tom_dataproducts.alertstreams.hermes.HermesDataConverter'
},
}
Expand Down
243 changes: 131 additions & 112 deletions tom_dataproducts/alertstreams/hermes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from django.conf import settings
from django.core.cache import cache
from django.utils.module_loading import import_string

# from hop.io import Metadata

Expand All @@ -20,6 +21,131 @@ class HermesMessageException(Exception):
pass


def get_hermes_data_converter_class():
return import_string(settings.DATA_SHARING['hermes'].get(
'DATA_CONVERTER_CLASS', 'tom_dataproducts.alertstreams.hermes.HermesDataConverter'))


class HermesDataConverter():
""" Class is used to encapsulate getting all the hermes values associated with
a ReducedDatum for either spectroscopy or photometry or a Target. This class
can be subclassed and reimplemented for TOMs that store the properties of
their ReducedDatums in a different way, or store Target props in a different way.
"""
def __init__(self, validate=True):
self.validate = validate

def get_hermes_target(self, target):
"""Build a row for a Hermes Target Table from a TOM BaseTarget Model.
"""
if target.type == "SIDEREAL":
target_table_row = {
'name': target.name,
'ra': target.ra,
'dec': target.dec,
}
if target.epoch:
target_table_row['epoch'] = target.epoch
if target.pm_ra:
target_table_row['pm_ra'] = target.pm_ra
if target.pm_dec:
target_table_row['pm_dec'] = target.pm_dec
else:
target_table_row = {
'name': target.name,
'orbital_elements': {
"epoch_of_elements": target.epoch_of_elements,
"eccentricity": target.eccentricity,
"argument_of_the_perihelion": target.arg_of_perihelion,
"mean_anomaly": target.mean_anomaly,
"orbital_inclination": target.inclination,
"longitude_of_the_ascending_node": target.lng_asc_node,
"semimajor_axis": target.semimajor_axis,
"epoch_of_perihelion": target.epoch_of_perihelion,
"perihelion_distance": target.perihdist,
}
}
target_table_row['aliases'] = [alias.name for alias in target.aliases.all()]
return target_table_row

def get_hermes_photometry(self, datum):
"""Build a row for a Hermes Photometry Table using a TOM Photometry datum
"""
phot_table_row = {
'target_name': datum.target.name,
'date_obs': datum.timestamp.isoformat(),
'telescope': datum.value.get('telescope'),
'instrument': datum.value.get('instrument'),
'bandpass': datum.value.get('filter', ''),
}
brightness_unit = convert_astropy_brightness_to_hermes(datum.value.get('unit'))
if brightness_unit:
phot_table_row['brightness_unit'] = brightness_unit
if datum.value.get('magnitude', None):
phot_table_row['brightness'] = datum.value['magnitude']
else:
phot_table_row['limiting_brightness'] = datum.value.get('limit', None)
error_value = datum.value.get('error', datum.value.get('magnitude_error', None))
if error_value is not None:
phot_table_row['brightness_error'] = error_value
return phot_table_row

def get_hermes_spectroscopy(self, datum):
"""Build a row for a Hermes Spectroscopy Table using a TOM Spectroscopy datum
The datum is assumed to have is json value be of the form {1: {flux: 1, wavelength:200}, 2: {},...}
Or the form {'flux': [1,2,3,...], 'wavelength': [1,2,3,...]}
"""
flux_list = []
flux_error_list = []
wavelength_list = []
if 'flux' in datum.value and 'wavelength' in datum.value:
flux_list = datum.value['flux']
wavelength_list = datum.value['wavelength']
flux_error_list = datum.value.get('flux_error', datum.value.get('error', []))
else:
for entry in datum.value.values():
if 'flux' in entry:
flux_list.append(entry['flux'])
if 'wavelength' in entry:
wavelength_list.append(entry['wavelength'])
if 'error' in entry:
flux_error_list.append(entry['error'])
if 'flux_error' in entry:
flux_error_list.append(entry['flux_error'])

if self.validate:
if len(flux_list) != len(wavelength_list):
msg = f"Spectroscopy Datum {datum.id} has mismatched flux and wavelength values"
logger.error(msg)
raise HermesMessageException(msg)
elif len(flux_list) == 0 or len(wavelength_list) == 0:
msg = f"Spectroscopy Datum {datum.id} has spectrum data in unknown format."
msg += "Please implement a custom HermesDatumConverter to support your data format."
logger.error(msg)
raise HermesMessageException(msg)
if flux_error_list and len(flux_error_list) != len(flux_list):
msg = f"Spectroscopy Datum {datum.id} must have the same number of flux and flux error datapoints"
logger.error(msg)
raise HermesMessageException(msg)

spectroscopy_table_row = {
'target_name': datum.target.name,
'date_obs': datum.timestamp.isoformat(),
'telescope': datum.value.get('telescope'),
'instrument': datum.value.get('instrument'),
'reducer': datum.value.get('reducer'),
'observer': datum.value.get('observer'),
'flux': flux_list,
'wavelength': wavelength_list,
'flux_units': datum.value.get('flux_units'),
'wavelength_units': convert_astropy_wavelength_to_hermes(datum.value.get('wavelength_units')),
}
if flux_error_list:
spectroscopy_table_row['flux_error'] = flux_error_list

return spectroscopy_table_row


def convert_astropy_brightness_to_hermes(brightness_unit):
if not brightness_unit:
return brightness_unit
Expand Down Expand Up @@ -118,20 +244,20 @@ def create_hermes_alert(message_info, datums, targets=Target.objects.none(), **k
hermes_spectroscopy_data = []
hermes_target_dict = {}

hermes_data_converter = get_hermes_data_converter_class()(validate=True)
for datum in datums:
if datum.target.name not in hermes_target_dict:
hermes_target_dict[datum.target.name] = create_hermes_target_table_row(
datum.target, **kwargs)
hermes_target_dict[datum.target.name] = hermes_data_converter.get_hermes_target(datum.target)
if datum.data_type == 'photometry':
hermes_photometry_data.append(create_hermes_phot_table_row(datum, **kwargs))
hermes_photometry_data.append(hermes_data_converter.get_hermes_photometry(datum))
elif datum.data_type == 'spectroscopy':
hermes_spectroscopy_data.append(create_hermes_spectro_table_row(datum, **kwargs))
hermes_spectroscopy_data.append(hermes_data_converter.get_hermes_spectroscopy(datum))

# Now go through the targets queryset and ensure we have all of them in the table
# This is needed since some targets may have no corresponding photometry datums but that is still valid to share
for target in targets:
if target.name not in hermes_target_dict:
hermes_target_dict[target.name] = create_hermes_target_table_row(target, **kwargs)
hermes_target_dict[target.name] = hermes_data_converter.get_hermes_target(target)

alert = {
'topic': message_info.topic,
Expand All @@ -149,113 +275,6 @@ def create_hermes_alert(message_info, datums, targets=Target.objects.none(), **k
return alert


def create_hermes_target_table_row(target, **kwargs):
"""Build a row for a Hermes Target Table from a TOM target Model.
"""
if target.type == "SIDEREAL":
target_table_row = {
'name': target.name,
'ra': target.ra,
'dec': target.dec,
}
if target.epoch:
target_table_row['epoch'] = target.epoch
if target.pm_ra:
target_table_row['pm_ra'] = target.pm_ra
if target.pm_dec:
target_table_row['pm_dec'] = target.pm_dec
else:
target_table_row = {
'name': target.name,
'orbital_elements': {
"epoch_of_elements": target.epoch_of_elements,
"eccentricity": target.eccentricity,
"argument_of_the_perihelion": target.arg_of_perihelion,
"mean_anomaly": target.mean_anomaly,
"orbital_inclination": target.inclination,
"longitude_of_the_ascending_node": target.lng_asc_node,
"semimajor_axis": target.semimajor_axis,
"epoch_of_perihelion": target.epoch_of_perihelion,
"perihelion_distance": target.perihdist,
}
}
target_table_row['aliases'] = [alias.name for alias in target.aliases.all()]
return target_table_row


def create_hermes_phot_table_row(datum, **kwargs):
"""Build a row for a Hermes Photometry Table using a TOM Photometry datum
"""
phot_table_row = {
'target_name': datum.target.name,
'date_obs': datum.timestamp.isoformat(),
'telescope': datum.value.get('telescope'),
'instrument': datum.value.get('instrument'),
'bandpass': datum.value.get('filter', ''),
'brightness_unit': convert_astropy_brightness_to_hermes(datum.value.get('unit')),
}
if datum.value.get('magnitude', None):
phot_table_row['brightness'] = datum.value['magnitude']
else:
phot_table_row['limiting_brightness'] = datum.value.get('limit', None)
error_value = datum.value.get('error', datum.value.get('magnitude_error', None))
if error_value is not None:
phot_table_row['brightness_error'] = error_value
return phot_table_row


def create_hermes_spectro_table_row(datum, **kwargs):
"""Build a row for a Hermes Spectroscopy Table using a TOM Spectroscopy datum
The datum is assumed to have is json value be of the form {1: {flux: 1, wavelength:200}, 2: {},...}
Or the form {'flux': [1,2,3,...], 'wavelength': [1,2,3,...]}
"""
flux_list = []
flux_error_list = []
wavelength_list = []
if 'flux' in datum.value and 'wavelength' in datum.value:
flux_list = datum.value['flux']
wavelength_list = datum.value['wavelength']
flux_error_list = datum.value.get('flux_error', datum.value.get('error', []))
else:
for entry in datum.value.values():
if 'flux' in entry:
flux_list.append(entry['flux'])
if 'wavelength' in entry:
wavelength_list.append(entry['wavelength'])
if 'error' in entry:
flux_error_list.append(entry['error'])
if 'flux_error' in entry:
flux_error_list.append(entry['flux_error'])

if len(flux_list) != len(wavelength_list):
msg = f"Spectroscopy Datum {datum.id} has mismatched flux and wavelength values"
logger.error(msg)
raise HermesMessageException(msg)
elif len(flux_list) == 0 or len(wavelength_list) == 0:
msg = f"Spectroscopy Datum {datum.id} has spectrum data in unknown format"
logger.error(msg)
raise HermesMessageException(msg)
if flux_error_list and len(flux_error_list) != len(flux_list):
msg = f"Spectroscopy Datum {datum.id} must have the same number of flux and flux error datapoints"
logger.error(msg)
raise HermesMessageException(msg)

spectroscopy_table_row = {
'target_name': datum.target.name,
'date_obs': datum.timestamp.isoformat(),
'telescope': datum.value.get('telescope'),
'instrument': datum.value.get('instrument'),
'flux': flux_list,
'wavelength': wavelength_list,
'flux_units': datum.value.get('flux_units'),
'wavelength_units': convert_astropy_wavelength_to_hermes(datum.value.get('wavelength_units')),
}
if flux_error_list:
spectroscopy_table_row['flux_error'] = flux_error_list

return spectroscopy_table_row


def get_hermes_topics(**kwargs):
"""
Method to retrieve a list of available topics from HOP.
Expand Down

0 comments on commit 5b26c54

Please sign in to comment.