Skip to content

Commit

Permalink
Add a MQTT GTFS-RT trip update updater
Browse files Browse the repository at this point in the history
  • Loading branch information
hannesj committed Dec 7, 2017
1 parent 46e1f9f commit a306434
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ the License, or (at your option) any later version.
import org.opentripplanner.updater.bike_rental.BikeRentalUpdater;
import org.opentripplanner.updater.example.ExampleGraphUpdater;
import org.opentripplanner.updater.example.ExamplePollingGraphUpdater;
import org.opentripplanner.updater.stoptime.MqttGtfsRealtimeUpdater;
import org.opentripplanner.updater.stoptime.PollingStoptimeUpdater;
import org.opentripplanner.updater.stoptime.WebsocketGtfsRealtimeUpdater;
import org.opentripplanner.updater.street_notes.WinkkiPollingGraphUpdater;
Expand Down Expand Up @@ -96,6 +97,9 @@ else if (type.equals("stop-time-updater")) {
else if (type.equals("websocket-gtfs-rt-updater")) {
updater = new WebsocketGtfsRealtimeUpdater();
}
else if (type.equals("MQTT-gtfs-rt-updater")) {
updater = new MqttGtfsRealtimeUpdater();
}
else if (type.equals("real-time-alerts")) {
updater = new GtfsRealtimeAlertsUpdater();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package org.opentripplanner.updater.stoptime;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.transit.realtime.GtfsRealtime;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.opentripplanner.routing.graph.Graph;
import org.opentripplanner.updater.GraphUpdater;
import org.opentripplanner.updater.GraphUpdaterManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

/**
* This class starts an Paho MQTT client which opens a connection to a GTFS-RT data source.
* A callback is registered which handles incoming GTFS-RT messages as they stream in by placing a
* GTFS-RT decoder Runnable task in the single-threaded executor for handling.
*
* Usage example ('bessersmith' name is an example) in the file 'Graph.properties':
*
* <pre>
* bessersmith.type = mqtt-gtfs-rt-updater
* bessersmith.feedId = hsl
* bessersmith.url = ssl://mqtt.hsl.fi:443
* bessersmith.topic = "gtfs/trip-updates/#"
* </pre>
*
*/
public class MqttGtfsRealtimeUpdater implements GraphUpdater {
private static Logger LOG = LoggerFactory.getLogger(MqttGtfsRealtimeUpdater.class);

private GraphUpdaterManager updaterManager;

private String url;

private String topic;

private String feedId;

private int qos;

private String clientId = "OpenTripPlanner-" + MqttClient.generateClientId();

MemoryPersistence persistence = new MemoryPersistence();

private MqttClient client;

@Override public void configure(Graph graph, JsonNode config) throws Exception {
url = config.path("url").asText();
topic = config.path("topic").asText();
feedId = config.path("feedId").asText("");
qos = config.path("qos").asInt(0);
}

@Override public void setGraphUpdaterManager(GraphUpdaterManager updaterManager) {
this.updaterManager = updaterManager;
}

@Override public void setup() throws Exception {
updaterManager.executeBlocking(graph -> {
// Only create a realtime data snapshot source if none exists already
if (graph.timetableSnapshotSource == null) {
TimetableSnapshotSource snapshotSource = new TimetableSnapshotSource(graph);
// Add snapshot source to graph
graph.timetableSnapshotSource = (snapshotSource);
}
});
}

@Override public void run() throws Exception {
client = new MqttClient(url, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setAutomaticReconnect(true);
client.setCallback(new MqttCallbackExtended() {
@Override public void connectComplete(boolean reconnect, String serverURI) {
try {
LOG.debug("Connected");
client.subscribe(topic, qos);
} catch (MqttException e) {
LOG.warn("Could not subsctibe to: " + topic);
}
}

@Override public void connectionLost(Throwable cause) {
LOG.debug("Disconnected");
}

@Override public void messageArrived(String topic, MqttMessage message) throws Exception {
GtfsRealtime.FeedMessage feedMessage;
List<GtfsRealtime.FeedEntity> feedEntityList;
List<GtfsRealtime.TripUpdate> updates = null;
boolean fullDataset = true;
try {
// Decode message
feedMessage = GtfsRealtime.FeedMessage.PARSER.parseFrom(message.getPayload());
feedEntityList = feedMessage.getEntityList();

// Change fullDataset value if this is an incremental update
if (feedMessage.hasHeader()
&& feedMessage.getHeader().hasIncrementality()
&& feedMessage.getHeader().getIncrementality()
.equals(GtfsRealtime.FeedHeader.Incrementality.DIFFERENTIAL)) {
fullDataset = false;
}

// Create List of TripUpdates
updates = new ArrayList<>(feedEntityList.size());
for (GtfsRealtime.FeedEntity feedEntity : feedEntityList) {
if (feedEntity.hasTripUpdate()) {
updates.add(feedEntity.getTripUpdate());
}
}
} catch (InvalidProtocolBufferException e) {
LOG.error("Could not decode gtfs-rt message:", e);
}

if (updates != null) {
// Handle trip updates via graph writer runnable
TripUpdateGraphWriterRunnable runnable = new TripUpdateGraphWriterRunnable(
fullDataset, updates, feedId);
updaterManager.execute(runnable);
}
}

@Override public void deliveryComplete(IMqttDeliveryToken token) {

}
});

LOG.debug("Connecting to broker: " + url);
client.connect(connOpts);
}

@Override public void teardown() {
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}

}

0 comments on commit a306434

Please sign in to comment.