Skip to content

Commit

Permalink
Merge pull request #1269 from gerritholl/multiscene-multireader
Browse files Browse the repository at this point in the history
Support multiple readers in group_files and MultiScene.from_files
  • Loading branch information
djhoese authored Jul 29, 2020
2 parents 1ddbdb3 + ae0eb04 commit f80c568
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 44 deletions.
48 changes: 48 additions & 0 deletions doc/source/multiscene.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,51 @@ multiple Scenes use:
>>> mscn = MultiScene.from_files(glob('/data/abi/day_1/*C0[12]*.nc'), reader='abi_l1b')
>>> mscn.load(['C01', 'C02'])
>>> mscn.save_datasets(base_dir='/path/for/output')

Combining multiple readers
--------------------------

.. versionadded:: 0.23

The :meth:`~satpy.multiscene.MultiScene.from_files` constructor allows to
automatically combine multiple readers into a single MultiScene. It is no
longer necessary for the user to create the :class:`~satpy.scene.Scene`
objects themselves. For example, you can combine Advanced Baseline
Imager (ABI) and Global Lightning Mapper (GLM) measurements.
Constructing a multi-reader MultiScene requires more parameters than a
single-reader MultiScene, because Satpy can poorly guess how to group
files belonging to different instruments. For an example creating
a video with lightning superimposed on ABI channel 14 (11.2 µm)
using the built-in composite ``C14_flash_extent_density``,
which superimposes flash extent density from GLM (read with the
:class:`~satpy.readers.glm_l2.NCGriddedGLML2` or ``glm_l2`` reader) on ABI
channel 14 data (read with the :class:`~satpy.readers.abi_l1b.NC_ABI_L1B`
or ``abi_l1b`` reader), and therefore needs Scene objects that combine
both readers:

>>> glm_dir = "/path/to/GLMC/"
>>> abi_dir = "/path/to/ABI/"
>>> ms = satpy.MultiScene.from_files(
... glob.glob(glm_dir + "OR_GLM-L2-GLMC-M3_G16_s202010418*.nc") +
... glob.glob(abi_dir + "C*/OR_ABI-L1b-RadC-M6C*_G16_s202010418*_e*_c*.nc"),
... reader=["glm_l2", "abi_l1b"],
... ensure_all_readers=True,
... group_keys=["start_time"],
... time_threshold=30)
>>> ms.load(["C14_flash_extent_density"])
>>> ms = ms.resample(ms.first_scene["C14"].attrs["area"])
>>> ms.save_animation("/path/for/output/{name:s}_{start_time:%Y%m%d_%H%M}.mp4")

In this example, we pass to
:meth:`~satpy.multiscene.MultiScene.from_files` the additional parameters
``ensure_all_readers=True, group_keys=["start_time"], time_threshold=30``
so we only get scenes at times that both ABI and GLM have a file starting
within 30 seconds from each other, and ignore all other differences for
the purposes of grouping the two. For this example, the ABI files occur
every 5 minutes but the GLM files (processed with glmtools) every minute.
Scenes where there is a GLM file without an ABI file starting within at
most ±30 seconds are skipped. The ``group_keys`` and ``time_threshold``
keyword arguments are processed by the :func:`~satpy.readers.group_files`
function. The heavy work of blending the two instruments together is
performed by the :class:`~satpy.composites.BackgroundCompositor` class
through the `"C14_flash_extent_density"` composite.
17 changes: 14 additions & 3 deletions satpy/multiscene.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,29 @@ def first_scene(self):
return self._scene_gen.first

@classmethod
def from_files(cls, files_to_sort, reader=None, **kwargs):
def from_files(cls, files_to_sort, reader=None,
ensure_all_readers=False, **kwargs):
"""Create multiple Scene objects from multiple files.
Args:
files_to_sort (Collection[str]): files to read
reader (str or Collection[str]): reader or readers to use
ensure_all_readers (bool): If True, limit to scenes where all
readers have at least one file. If False (default), include
all scenes where at least one reader has at least one file.
This uses the :func:`satpy.readers.group_files` function to group
files. See this function for more details on possible keyword
arguments.
files. See this function for more details on additional possible
keyword arguments. In particular, it is strongly recommended to pass
`"group_keys"` when using multiple instruments.
.. versionadded:: 0.12
"""
from satpy.readers import group_files
file_groups = group_files(files_to_sort, reader=reader, **kwargs)
if ensure_all_readers:
file_groups = [fg for fg in file_groups if all(fg.values())]
scenes = (Scene(filenames=fg) for fg in file_groups)
return cls(scenes)

Expand Down
165 changes: 130 additions & 35 deletions satpy/readers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import numbers
import os
import warnings
from datetime import datetime, timedelta

import yaml
Expand Down Expand Up @@ -392,9 +393,9 @@ def group_files(files_to_sort, reader=None, time_threshold=10,
Args:
files_to_sort (iterable): File paths to sort in to group
reader (str): Reader whose file patterns should be used to sort files.
This is currently a required keyword argument, but may be optional
in the future (see inline code comments for details).
reader (str or Collection[str]): Reader or readers whose file patterns
should be used to sort files. If not given, try all readers (slow,
adding a list of readers is strongly recommended).
time_threshold (int): Number of seconds used to consider time elements
in a group as being equal. For example, if the 'start_time' item
is used to group files then any time within `time_threshold`
Expand All @@ -407,7 +408,9 @@ def group_files(files_to_sort, reader=None, time_threshold=10,
the first key in ``group_keys``. Otherwise, there is a good chance
that files will not be grouped properly (datetimes being barely
unequal). Defaults to a reader's ``group_keys`` configuration (set
in YAML), otherwise ``('start_time',)``.
in YAML), otherwise ``('start_time',)``. When passing multiple
readers, passing group_keys is strongly recommended as the
behaviour without doing so is undefined.
ppp_config_dir (str): Root usser configuration directory for Satpy.
This will be deprecated in the future, but is here for consistency
with other Satpy features.
Expand All @@ -420,40 +423,131 @@ def group_files(files_to_sort, reader=None, time_threshold=10,
a `Scene` object.
"""
# FUTURE: Find the best reader for each filename using `find_files_and_readers`
if reader is None:
raise ValueError("'reader' keyword argument is required.")
elif not isinstance(reader, (list, tuple)):

if reader is not None and not isinstance(reader, (list, tuple)):
reader = [reader]

# FUTURE: Handle multiple readers
reader = reader[0]
reader_configs = list(configs_for_reader(reader, ppp_config_dir))[0]
reader_kwargs = reader_kwargs or {}
try:
reader_instance = load_reader(reader_configs, **reader_kwargs)
except (KeyError, IOError, yaml.YAMLError) as err:
LOG.info('Cannot use %s', str(reader_configs))
LOG.debug(str(err))
# if reader and (isinstance(reader, str) or len(reader) == 1):
# # if it is a single reader then give a more usable error
# raise
raise

if group_keys is None:
group_keys = reader_instance.info.get('group_keys', ('start_time',))
file_keys = []
# make a copy because filename_items_for_filetype will modify inplace

reader_files = _assign_files_to_readers(
files_to_sort, reader, ppp_config_dir, reader_kwargs)

if reader is None:
reader = reader_files.keys()

file_keys = _get_file_keys_for_reader_files(
reader_files, group_keys=group_keys)

file_groups = _get_sorted_file_groups(file_keys, time_threshold)

return [{rn: file_groups[group_key].get(rn, []) for rn in reader} for group_key in file_groups]


def _assign_files_to_readers(files_to_sort, reader_names, ppp_config_dir,
reader_kwargs):
"""Assign files to readers.
Given a list of file names (paths), match those to reader instances.
Internal helper for group_files.
Args:
files_to_sort (Collection[str]): Files to assign to readers.
reader_names (Collection[str]): Readers to consider
ppp_config_dir (str):
reader_kwargs (Mapping):
Returns:
Mapping[str, Tuple[reader, Set[str]]]
Mapping where the keys are reader names and the values are tuples of
(reader_configs, filenames).
"""

files_to_sort = set(files_to_sort)
for _, filetype_info in reader_instance.sorted_filetype_items():
for f, file_info in reader_instance.filename_items_for_filetype(files_to_sort, filetype_info):
group_key = tuple(file_info.get(k) for k in group_keys)
file_keys.append((group_key, f))
reader_dict = {}
for reader_configs in configs_for_reader(reader_names, ppp_config_dir):
try:
reader = load_reader(reader_configs, **reader_kwargs)
except yaml.constructor.ConstructorError:
LOG.exception(
f"ConstructorError loading {reader_configs!s}, "
"probably a missing dependency, skipping "
"corresponding reader (if you did not explicitly "
"specify the reader, Satpy tries all; performance "
"will improve if you pass readers explicitly).")
continue
reader_name = reader.info["name"]
files_matching = set(reader.filter_selected_filenames(files_to_sort))
files_to_sort -= files_matching
if files_matching or reader_names is not None:
reader_dict[reader_name] = (reader, files_matching)
if files_to_sort:
raise ValueError("No matching readers found for these files: " +
", ".join(files_to_sort))
return reader_dict


def _get_file_keys_for_reader_files(reader_files, group_keys=None):
"""From a mapping from _assign_files_to_readers, get file keys.
Given a mapping where each key is a reader name and each value is a
tuple of reader instance (typically FileYAMLReader) and a collection
of files, return a mapping with the same keys, but where the values are
lists of tuples of (keys, filename), where keys are extracted from the filenames
according to group_keys and filenames are the names those keys were
extracted from.
Internal helper for group_files.
Returns:
Mapping[str, List[Tuple[Tuple, str]]], as described.
"""

file_keys = {}
for (reader_name, (reader_instance, files_to_sort)) in reader_files.items():
if group_keys is None:
group_keys = reader_instance.info.get('group_keys', ('start_time',))
file_keys[reader_name] = []
# make a copy because filename_items_for_filetype will modify inplace
files_to_sort = set(files_to_sort)
for _, filetype_info in reader_instance.sorted_filetype_items():
for f, file_info in reader_instance.filename_items_for_filetype(files_to_sort, filetype_info):
group_key = tuple(file_info.get(k) for k in group_keys)
if all(g is None for g in group_key):
warnings.warn(
f"Found matching file {f:s} for reader "
"{reader_name:s}, but none of group keys found. "
"Group keys requested: " + ", ".join(group_keys),
UserWarning)
file_keys[reader_name].append((group_key, f))
return file_keys


def _get_sorted_file_groups(all_file_keys, time_threshold):
"""Get sorted file groups.
Get a list of dictionaries, where each list item consists of a dictionary
mapping a tuple of keys to a mapping of reader names to files. The files
listed in each list item are considered to be grouped within the same time.
Args:
all_file_keys, as returned by _get_file_keys_for_reader_files
time_threshold: temporal threshold
Returns:
List[Mapping[Tuple, Mapping[str, List[str]]]], as described
Internal helper for group_files.
"""
# flatten to get an overall sorting; put the name in the middle in the
# interest of sorting
flat_keys = ((v[0], rn, v[1]) for (rn, vL) in all_file_keys.items() for v in vL)
prev_key = None
threshold = timedelta(seconds=time_threshold)
# file_groups is sorted, because dictionaries are sorted by insertion
# order in Python 3.7+
file_groups = {}
for gk, f in sorted(file_keys):
for gk, rn, f in sorted(flat_keys):
# use first element of key as time identifier (if datetime type)
if prev_key is None:
is_new_group = True
Expand All @@ -471,13 +565,14 @@ def group_files(files_to_sort, reader=None, time_threshold=10,
if this_val is not None and prev_val is not None)
# if this is a new group based on the first element
if is_new_group or any(vals_not_equal):
file_groups[gk] = [f]
file_groups[gk] = {rn: [f]}
prev_key = gk
else:
file_groups[prev_key].append(f)
sorted_group_keys = sorted(file_groups)
# passable to Scene as 'filenames'
return [{reader: file_groups[group_key]} for group_key in sorted_group_keys]
if rn not in file_groups[prev_key]:
file_groups[prev_key][rn] = [f]
else:
file_groups[prev_key][rn].append(f)
return file_groups


def read_reader_config(config_files, loader=UnsafeLoader):
Expand Down
45 changes: 41 additions & 4 deletions satpy/tests/test_multiscene.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,57 @@ def test_properties(self):
def test_from_files(self):
"""Test creating a multiscene from multiple files."""
from satpy import MultiScene
input_files = [
input_files_abi = [
"OR_ABI-L1b-RadC-M3C01_G16_s20171171502203_e20171171504576_c20171171505018.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171507203_e20171171509576_c20171171510018.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171512203_e20171171514576_c20171171515017.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171517203_e20171171519577_c20171171520019.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171522203_e20171171524576_c20171171525020.nc",
"OR_ABI-L1b-RadC-M3C01_G16_s20171171527203_e20171171529576_c20171171530017.nc",
]
input_files_glm = [
"OR_GLM-L2-GLMC-M3_G16_s20171171500000_e20171171501000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171501000_e20171171502000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171502000_e20171171503000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171503000_e20171171504000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171504000_e20171171505000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171505000_e20171171506000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171506000_e20171171507000_c20380190314080.nc",
"OR_GLM-L2-GLMC-M3_G16_s20171171507000_e20171171508000_c20380190314080.nc",
]
with mock.patch('satpy.multiscene.Scene') as scn_mock:
mscn = MultiScene.from_files(input_files, reader='abi_l1b')
self.assertEqual(len(mscn.scenes), 6)
calls = [mock.call(filenames={'abi_l1b': [in_file]}) for in_file in input_files]
mscn = MultiScene.from_files(
input_files_abi,
reader='abi_l1b')
assert len(mscn.scenes) == 6
calls = [mock.call(
filenames={'abi_l1b': [in_file_abi]})
for in_file_abi in input_files_abi]
scn_mock.assert_has_calls(calls)

scn_mock.reset_mock()
mscn = MultiScene.from_files(
input_files_abi + input_files_glm,
reader=('abi_l1b', "glm_l2"),
group_keys=["start_time"],
ensure_all_readers=True,
time_threshold=30)
assert len(mscn.scenes) == 2
calls = [mock.call(
filenames={'abi_l1b': [in_file_abi], 'glm_l2': [in_file_glm]})
for (in_file_abi, in_file_glm) in
zip(input_files_abi[0:2],
[input_files_glm[2]] + [input_files_glm[7]])]
scn_mock.assert_has_calls(calls)
scn_mock.reset_mock()
mscn = MultiScene.from_files(
input_files_abi + input_files_glm,
reader=('abi_l1b', "glm_l2"),
group_keys=["start_time"],
ensure_all_readers=False,
time_threshold=30)
assert len(mscn.scenes) == 12

def test_group(self):
from satpy import Scene, MultiScene, DatasetID

Expand Down
Loading

0 comments on commit f80c568

Please sign in to comment.