From 65207a4b0e52f9c6784f5d3d6d7c92b1656c0838 Mon Sep 17 00:00:00 2001 From: Adam Glustein Date: Thu, 18 Jul 2024 16:25:49 -0400 Subject: [PATCH] Fix up earthquake data push-pull adapter example (#345) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add PushPullAdapter example with earthquake data --------- Signed-off-by: Melissa Weber Mendonça Signed-off-by: Adam Glustein Co-authored-by: Melissa Weber Mendonça --- examples/07_end_to_end/README.md | 1 + examples/07_end_to_end/earthquake.ipynb | 433 ++++++++++++++++++++++++ 2 files changed, 434 insertions(+) create mode 100644 examples/07_end_to_end/earthquake.ipynb diff --git a/examples/07_end_to_end/README.md b/examples/07_end_to_end/README.md index 3a915d0b7..43a0b488c 100644 --- a/examples/07_end_to_end/README.md +++ b/examples/07_end_to_end/README.md @@ -3,3 +3,4 @@ - [Wikipedia Updates and Edits](./wikimedia.ipynb) - [Seismic Data with obspy](./seismic_waveform.ipynb) - [MTA Subway Data](./mta.ipynb) +- [Earthquake Data PushPullAdapter](./earthquake.ipynb) diff --git a/examples/07_end_to_end/earthquake.ipynb b/examples/07_end_to_end/earthquake.ipynb new file mode 100644 index 000000000..29c0fb369 --- /dev/null +++ b/examples/07_end_to_end/earthquake.ipynb @@ -0,0 +1,433 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "f375a246-44ee-474b-af5b-f56ee3a6fda8", + "metadata": {}, + "source": [ + "# Processing Earthquake Events Data with `obspy` and `csp`" + ] + }, + { + "cell_type": "markdown", + "id": "0d6e2db4-414c-4981-bd74-fcb8cee0f535", + "metadata": {}, + "source": [ + "## Introduction" + ] + }, + { + "cell_type": "markdown", + "id": "671af10e-c238-40e4-b155-a4dc2b9a750f", + "metadata": {}, + "source": [ + "In this example, we will use CSP to process earthquake events and plot them in a map using a [Perspective](https://perspective.finos.org) widget." + ] + }, + { + "cell_type": "markdown", + "id": "eb509f1a-22b8-48d9-939a-d46784815889", + "metadata": {}, + "source": [ + "First, we need to install a few extra libraries:\n", + "- `obspy`, for reading the earthquake stream from USGS;\n", + "- `perspective-python`, to create the visualization of the data; and \n", + "- `cartopy`, for plotting the individual events in the USGS catalog (this is optional)." + ] + }, + { + "cell_type": "markdown", + "id": "9e5ce935-7f5d-4c37-8bd9-cea707e0aefe", + "metadata": {}, + "source": [ + "**Note:** This example has been tested for `jupyterlab==4.2.0` and `perspective-python==2.10.0`." + ] + }, + { + "cell_type": "markdown", + "id": "7d470b6f-eddf-4617-9e12-d5b907ab9f54", + "metadata": {}, + "source": [ + "You can install these dependencies in your Python environment with the following command:" + ] + }, + { + "cell_type": "markdown", + "id": "2a6ebda7-caa6-4a23-abda-fe589171e2c2", + "metadata": { + "scrolled": true + }, + "source": [ + "```\n", + "pip install obspy cartopy jupyterlab==4.2.0 perspective-python==2.10.0\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "ed8fddaf-865c-42d0-b0bd-6d1061f2b72d", + "metadata": {}, + "source": [ + "#### Reading realtime data from USGS as QuakeML" + ] + }, + { + "cell_type": "markdown", + "id": "cea45007-2e4a-439c-8884-29a5e0232730", + "metadata": {}, + "source": [ + "The [USGS website](https://earthquake.usgs.gov/earthquakes) provides several feeds with recent seismic events, and we will read the \"All day\" feed containing all seismic events of the past 24h. Using `obspy`, we can read the feed in the [QUAKEML](https://earthquake.usgs.gov/earthquakes/feed/v1.0/quakeml.php) format as a `catalog`:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3b0da0df-21d9-434b-88ca-0b4bc5a1bae8", + "metadata": {}, + "outputs": [], + "source": [ + "from obspy import read_events\n", + "\n", + "url = \"https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.quakeml\"\n", + "catalog = read_events(url, format=\"QUAKEML\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20ebbd2b-2bc6-445f-a53b-920324e7ad5b", + "metadata": {}, + "outputs": [], + "source": [ + "catalog" + ] + }, + { + "cell_type": "markdown", + "id": "dc6eeaeb-17ec-4656-9675-112ab5552317", + "metadata": {}, + "source": [ + "Now, we can also use [cartopy](https://scitools.org.uk/cartopy/docs/latest/) to to plot all events in a world map:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2786ced2-1e5c-4c69-84ab-e4a4c16c7547", + "metadata": {}, + "outputs": [], + "source": [ + "catalog.plot();" + ] + }, + { + "cell_type": "markdown", + "id": "f0d17801-aadd-4c2c-b257-0b30de7e7b7d", + "metadata": {}, + "source": [ + "To see what these event objects are, we can look closer:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "692262d9-f217-4fa0-8140-2d5641f6aa75", + "metadata": {}, + "outputs": [], + "source": [ + "event = catalog[0]\n", + "event" + ] + }, + { + "cell_type": "markdown", + "id": "bd33d70d-da4b-41a6-bf72-01cc0bdcb875", + "metadata": {}, + "source": [ + "This feed lists different kinds of events, noted by `event.event_type`. We can also inspect location, time and magnitude information:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "81b272b6-ab8f-4853-a115-2a6fbc94a335", + "metadata": {}, + "outputs": [], + "source": [ + "event.origins, event.magnitudes" + ] + }, + { + "cell_type": "markdown", + "id": "c94fd3cc-5377-498d-ab2c-5b528a87d93c", + "metadata": {}, + "source": [ + "This feed is updated every minute, meaning we get a historical dataset of all seismic events in the past 24h, but we also get a continually updated feed that adds new events as they are entered into this feed (all events will contain this `creation_time` information)." + ] + }, + { + "cell_type": "markdown", + "id": "390b601f-8d13-496f-a3ca-820b08485009", + "metadata": {}, + "source": [ + "## Using CSP to Process Historical and Realtime Events" + ] + }, + { + "cell_type": "markdown", + "id": "f3b61d0a-1f3c-4d26-937a-4c14f05f757d", + "metadata": {}, + "source": [ + "We can now use CSP to read the same data in either realtime or simulation modes, by building a `PushPullAdapter`. This adapter combines a realtime or `PushAdapter` and a historical or `PullAdapter` into a single implementation. This makes it easy to switch from historical mode to realtime mode at runtime. " + ] + }, + { + "cell_type": "markdown", + "id": "e54bfc85-ddfe-4f09-a9b7-1212e67f6b62", + "metadata": {}, + "source": [ + "Because we want to visualize the results, we will start by creating a Perspective widget containing a table and a world map. This widget will be updated every time a new event is detected (and the corresponding CSP edge is *ticked*). We will read historical events from the past 6h, and then run the engine in realtime mode for 10 minutes while we wait for new events to be added to the catalog" + ] + }, + { + "cell_type": "markdown", + "id": "d0b620e9-42ae-4146-b6d6-36f2b0da8568", + "metadata": {}, + "source": [ + "First, we will create our Perspective widget to display the live updating map." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7cb746ba-98bc-4361-81d8-f64e99ee2723", + "metadata": {}, + "outputs": [], + "source": [ + "from perspective import PerspectiveWidget, Plugin\n", + "import ipywidgets as widgets\n", + "from datetime import datetime\n", + "\n", + "# Data schema for Perspective widget\n", + "data = {\n", + " \"longitude\": float,\n", + " \"latitude\": float,\n", + " \"magnitude\": float,\n", + " \"time\": datetime,\n", + "}\n", + "\n", + "datagrid = PerspectiveWidget(\n", + " data,\n", + " plugin=Plugin.GRID,\n", + " group_by=[\"time\"],\n", + " columns=[\"time\", \"longitude\", \"latitude\", \"magnitude\"],\n", + " aggregates={\n", + " \"time\": \"last\",\n", + " \"longitude\": \"last\",\n", + " \"latitude\": \"last\",\n", + " \"magnitude\": \"last\",\n", + " },\n", + ")\n", + "\n", + "worldmap = PerspectiveWidget(\n", + " data,\n", + " plugin=Plugin.MAP_SCATTER,\n", + " columns=[\"longitude\", \"latitude\", \"magnitude\", \"magnitude\", \"time\", \"time\"],\n", + ")\n", + "\n", + "# Create a tab widget with some PerspectiveWidgets inside\n", + "widget = widgets.Tab()\n", + "widget.children = [datagrid, worldmap]\n", + "widget.titles = [\"All events\", \"World map\"]\n", + "widget" + ] + }, + { + "cell_type": "markdown", + "id": "af9f6c87-060f-4e46-b2e3-d5f2ca0f6918", + "metadata": {}, + "source": [ + "After launching the Widget, we see that it is empty. We will pass the Widget to our `csp` application which will update the data. \n", + "\n", + "Next, we create a `PushPullInputAdapter` to bring in the earthquake data to a `csp.graph`. In `csp`, a *push* adapter pushes real-time events to the application and a *pull* adapter pulls in historical data. A push-pull adapter will pull in historical data until its source is exhausted and then transition to real-time mode on a live feed. \n", + "\n", + "The push-pull adapter is especially useful when real-time execution depends on some *state* influenced by prior events. We can playback the history to reach our desired starting state before processing live data. In this example, we will playback the past day of earthquake events to get some data on our map before listening for new " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4925e8c3-cdcd-4411-820c-90a284c927a1", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [], + "source": [ + "# PushPullAdapter\n", + "import threading\n", + "import csp\n", + "from obspy import read_events\n", + "from datetime import datetime, timedelta, timezone\n", + "from csp.impl.pushpulladapter import PushPullInputAdapter\n", + "from csp.impl.wiring import py_pushpull_adapter_def\n", + "import time\n", + "\n", + "# We use a csp.Struct to store the earthquake event data\n", + "class EventData(csp.Struct):\n", + " time: datetime\n", + " longitude: float\n", + " latitude: float\n", + " magnitude: float\n", + " \n", + "# Create a runtime implementation of the adapter\n", + "class EarthquakeEventAdapter(PushPullInputAdapter):\n", + " def __init__(self, interval, url):\n", + " self._interval = interval\n", + " self._thread = None\n", + " self._running = False\n", + " self._url = url\n", + "\n", + " def start(self, starttime, endtime):\n", + " print(\"EarthquakeEventAdapter::start\")\n", + " self._running = True\n", + " self._thread = threading.Thread(target=self._run)\n", + " self._thread.start()\n", + " self._starttime = starttime\n", + " self._endtime = endtime\n", + "\n", + " def stop(self):\n", + " print(\"EarthquakeEventAdapter::stop\")\n", + " if self._running:\n", + " self._running = False\n", + " self._thread.join()\n", + "\n", + " def _run(self):\n", + " # This is the function that defines how data is pushed/pulled into the graph\n", + " # First, we \"pull\" all the historical events in playback mode\n", + " catalog = read_events(self._url, format=\"QUAKEML\")\n", + " catalog.events.sort(key=lambda event: event.origins[0].time)\n", + " for event in catalog:\n", + " event_data = EventData(\n", + " time=event.origins[0].time.datetime,\n", + " longitude=event.origins[0].longitude,\n", + " latitude=event.origins[0].latitude,\n", + " magnitude=event.magnitudes[0].mag,\n", + " )\n", + " # push_tick for a push-pull adapter takes 3 arguments: live (bool), time, value\n", + " # for historical data live=False\n", + " self.push_tick(False, event_data.time, event_data)\n", + "\n", + " print(\"-------------------------------------------------------------------\")\n", + " print(f\"{datetime.utcnow()}: Historical replay complete, pulled {len(catalog)} events\")\n", + " print(\"-------------------------------------------------------------------\")\n", + " self.flag_replay_complete()\n", + "\n", + " last_event_time_pushed = catalog[-1].origins[0].time.datetime\n", + "\n", + " # Now we transition to live execution\n", + " # The while-loop will run every 1-minute in real-time mode\n", + " while self._running:\n", + " catalog = read_events(self._url, format=\"QUAKEML\")\n", + " catalog.events.sort(key=lambda event: event.origins[0].time)\n", + "\n", + " # Find any new events from the last minute\n", + " new_events = []\n", + " for event in reversed(catalog):\n", + " if event.origins[0].time.datetime > last_event_time_pushed:\n", + " new_events.append(event)\n", + " else:\n", + " break\n", + "\n", + " print(\"-------------------------------------------------------------------\")\n", + " print(f\"{datetime.utcnow()}: Refreshing earthquake live feed with {len(new_events)} events\")\n", + " print(\"-------------------------------------------------------------------\")\n", + " \n", + " for event in reversed(new_events):\n", + " # Push live data\n", + " event_data = EventData(\n", + " time=event.origins[0].time.datetime,\n", + " longitude=event.origins[0].longitude,\n", + " latitude=event.origins[0].latitude,\n", + " magnitude=event.magnitudes[0].mag,\n", + " )\n", + " # for historical data live=True\n", + " last_event_time_pushed = event_data.time\n", + " self.push_tick(True, event_data.time, event_data)\n", + "\n", + " time.sleep(self._interval.total_seconds())\n", + " \n", + "# Create the graph-time representation of our adapter\n", + "EarthquakeEvent = py_pushpull_adapter_def(\"EarthquakeEventAdapter\", EarthquakeEventAdapter, csp.ts[EventData], interval=timedelta, url=str)\n", + "\n", + "@csp.node\n", + "def update_widget(event: csp.ts[EventData], widget: widgets.widgets.widget_selectioncontainer.Tab):\n", + " if csp.ticked(event):\n", + " # widget.children = [datagrid, worldmap]\n", + " data = {\n", + " \"time\": [event.time],\n", + " \"longitude\": [event.longitude],\n", + " \"latitude\": [event.latitude],\n", + " \"magnitude\": [event.magnitude],\n", + " }\n", + " widget.children[0].update(data)\n", + " widget.children[1].update(data)\n", + "\n", + "@csp.graph\n", + "def earthquake_graph():\n", + " print(\"Start of graph building\")\n", + " url = \"https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.quakeml\"\n", + " interval = timedelta(seconds=60)\n", + " earthquakes = EarthquakeEvent(interval, url=url)\n", + " update_widget(earthquakes, widget=widget)\n", + " csp.add_graph_output(\"Earthquakes\", earthquakes)\n", + " print(\"End of graph building\")\n", + "\n", + "start = datetime.utcnow() - timedelta(hours=24)\n", + "end = datetime.utcnow() + timedelta(minutes=10)\n", + "csp.run(earthquake_graph, starttime=start, endtime=end, realtime=True)\n", + "print(\"Done.\")" + ] + }, + { + "cell_type": "markdown", + "id": "90cbb6a7-c683-44c7-a4e4-b7d7f308a08e", + "metadata": {}, + "source": [ + "## Conclusion" + ] + }, + { + "cell_type": "markdown", + "id": "1fb4b6de-d130-4ed1-ac05-f0aae2dd4ea0", + "metadata": {}, + "source": [ + "In this example, we created a push-pull adapter to process earthquake event data. We played back a day's worth of data before seamlessly transitioning to real-time mode and processing new events. Lastly, we displayed the data in a Perspective widget which plotted each earthquake on a world map." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}